2
0
Fork 0
tidal-scraper/tidal_scraper/download.py

212 lines
6.4 KiB
Python

import tidal_scraper.metadata as metadata
from tidal_scraper.helper import extensions, clean_template, log_error
import tidalapi
import os
import json
import requests
import io
import time
import random
from tqdm import tqdm
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Util import Counter
from typing import Tuple
from typing import BinaryIO
MASTER_KEY = (
b"P\x89SLC&\x98\xb7\xc6\xa3\n?P.\xb4\xc7a\xf8\xe5n\x8cth\x13E\xfa?\xbah8\xef\x9e"
)
def __decode_key_id(key_id: str) -> Tuple[bytes, bytes]:
decoded_key_id = b64decode(key_id)
init_vector = decoded_key_id[:16]
encrypted_token = decoded_key_id[16:]
decryptor = AES.new(MASTER_KEY, AES.MODE_CBC, init_vector)
decrypted_token = decryptor.decrypt(encrypted_token)
key = decrypted_token[:16]
nonce = decrypted_token[16:24]
return key, nonce
def __decrypt_file(fp: BinaryIO, key: bytes, nonce: bytes) -> None:
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
fp.seek(0)
data = fp.read()
data = decryptor.decrypt(data)
fp.write(data)
def __download_file(url: str, fp: BinaryIO) -> str:
with requests.get(url, stream=True) as r:
r.raise_for_status()
mime = r.headers.get("Content-Type", "")
total_bytes = int(r.headers.get("Content-Length", 0))
with tqdm(total=total_bytes, unit="iB", unit_scale=True) as p:
for data in r.iter_content(1024):
fp.write(data)
p.update(len(data))
return mime
def download_track(
track: tidalapi.Track,
sleep: bool,
conf: dict | None = None,
name_template: str | None = None,
dest_dir: str | None = None,
skip_downloaded: bool | None = None,
errorfile: str | None = None,
) -> None:
album = track.album
assert album
assert album.artist
if conf is None:
assert skip_downloaded is not None
assert errorfile is not None
assert name_template is not None
assert dest_dir is not None
else:
skip_downloaded = skip_downloaded or conf["skip_downloaded"]
errorfile = errorfile or conf["error_log"]
dest_dir = dest_dir or conf["dest_dir"]
name_template = name_template or conf["track_name"]
dest = dest_dir + clean_template(name_template, track=track)
print(f"Starting {album.artist.name} - {track.name}")
http_failures = 0
while http_failures <= 3:
try:
stream = track.stream()
manifest = json.loads(b64decode(stream.manifest))
url = manifest["urls"][0]
codec = manifest["codecs"]
if ".mp4" in url:
if "ac4" in codec or "mha1" in codec:
dest += ".mp4"
else:
dest += ".m4a"
else:
for ext in extensions:
if ext in url:
dest += ext
break
if os.path.exists(dest) and skip_downloaded:
print("Skipping track\n")
if sleep:
t = random.randrange(750, 1500) / 1000
time.sleep(t)
return
assert track.name and album.name
with io.BytesIO() as b:
print("Downloading track")
key_id = manifest.get("keyId", None)
mime = __download_file(url, b)
if key_id:
print("Decrypting track")
__decrypt_file(b, *__decode_key_id(key_id))
metadata.write(
b,
mime,
track.name,
album.name,
str(track.track_num),
str(album.num_tracks),
)
with open(dest, "wb") as f:
data = b.getvalue()
f.write(data)
print()
if sleep:
t = random.randrange(1000, 5000) / 1000
time.sleep(t)
break
except requests.HTTPError:
http_failures += 1
t = random.randrange(10000, 20000) / 1000
time.sleep(t)
except KeyboardInterrupt as e:
raise e
except Exception:
log_error(
errorfile or "error.log",
"Failure while downloading {artist} - {track}",
artist=album.artist.name,
track=track.name,
)
break
def download_cover(
obj: tidalapi.Album | tidalapi.Playlist,
conf: dict | None = None,
dest: str | None = None,
size: int | None = None,
skip_dl: bool | None = None,
) -> None:
if conf is None:
assert dest is not None
assert size is not None
assert skip_dl is not None
else:
if type(obj) is tidalapi.Album:
dest = clean_template(
conf["dest_dir"] + "/" + conf["album_dir"],
album=obj,
)
size = conf["album_image_size"]
elif type(obj) is tidalapi.Playlist:
dest = clean_template(
conf["dest_dir"] + "/" + conf["playlist_dir"],
playlist=obj,
)
size = conf["playlist_image_size"]
skip_dl = conf["skip_downloaded"]
assert dest
assert size
if os.path.exists(dest) and skip_dl:
return
url = obj.image(size)
with open(dest, "wb") as f:
__download_file(url, f)
def download_album(album: tidalapi.Album, conf: dict) -> None:
dest = clean_template(
conf["dest_dir"] + "/" + conf["album_dir"],
album=album,
)
os.makedirs(os.path.dirname(dest), exist_ok=True)
download_cover(album, conf)
tracks = album.tracks()
for track in tracks:
download_track(track, True, conf, dest_dir=dest)
def download_playlist(playlist: tidalapi.Playlist, conf: dict) -> None:
dest = clean_template(
conf["dest_dir"] + "/" + conf["playlist_dir"],
playlist=playlist,
)
os.makedirs(os.path.dirname(dest), exist_ok=True)
download_cover(playlist, conf)
tracks = playlist.tracks()
for track in tracks:
download_track(track, True, conf, dest_dir=dest)
def download_artist(artist: tidalapi.Artist, conf: dict) -> None:
albums = artist.get_albums()
for album in albums:
download_album(album, conf)