2
0
Fork 0
tidal-scraper/tidal_scraper/download.py

212 lines
6.4 KiB
Python
Raw Permalink Normal View History

2023-07-05 11:50:05 +02:00
import tidal_scraper.metadata as metadata
2023-07-18 14:25:14 +02:00
from tidal_scraper.helper import extensions, clean_template, log_error
2023-06-28 23:24:32 +02:00
import tidalapi
import os
import json
2023-06-29 16:28:31 +02:00
import requests
import io
2023-07-18 14:25:14 +02:00
import time
import random
2023-06-29 16:28:31 +02:00
from tqdm import tqdm
2023-06-28 23:24:32 +02:00
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Util import Counter
2023-06-29 16:28:31 +02:00
from typing import Tuple
from typing import BinaryIO
MASTER_KEY = (
b"P\x89SLC&\x98\xb7\xc6\xa3\n?P.\xb4\xc7a\xf8\xe5n\x8cth\x13E\xfa?\xbah8\xef\x9e"
)
2023-06-28 23:24:32 +02:00
2023-06-29 16:28:31 +02:00
def __decode_key_id(key_id: str) -> Tuple[bytes, bytes]:
2023-06-28 23:24:32 +02:00
decoded_key_id = b64decode(key_id)
init_vector = decoded_key_id[:16]
encrypted_token = decoded_key_id[16:]
2023-06-29 16:28:31 +02:00
decryptor = AES.new(MASTER_KEY, AES.MODE_CBC, init_vector)
2023-06-28 23:24:32 +02:00
decrypted_token = decryptor.decrypt(encrypted_token)
key = decrypted_token[:16]
nonce = decrypted_token[16:24]
return key, nonce
2023-06-29 16:28:31 +02:00
def __decrypt_file(fp: BinaryIO, key: bytes, nonce: bytes) -> None:
2023-06-28 23:24:32 +02:00
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
2023-06-29 23:57:07 +02:00
fp.seek(0)
data = fp.read()
data = decryptor.decrypt(data)
2023-06-29 16:28:31 +02:00
fp.write(data)
2023-06-28 23:24:32 +02:00
2023-06-29 23:57:07 +02:00
def __download_file(url: str, fp: BinaryIO) -> str:
with requests.get(url, stream=True) as r:
r.raise_for_status()
mime = r.headers.get("Content-Type", "")
total_bytes = int(r.headers.get("Content-Length", 0))
with tqdm(total=total_bytes, unit="iB", unit_scale=True) as p:
for data in r.iter_content(1024):
fp.write(data)
p.update(len(data))
return mime
2023-06-28 23:24:32 +02:00
2023-07-05 11:50:05 +02:00
def download_track(
track: tidalapi.Track,
2023-07-18 14:25:14 +02:00
sleep: bool,
2023-07-05 11:50:05 +02:00
conf: dict | None = None,
name_template: str | None = None,
dest_dir: str | None = None,
2023-07-18 14:25:14 +02:00
skip_downloaded: bool | None = None,
2023-07-05 11:50:05 +02:00
errorfile: str | None = None,
) -> None:
2023-06-29 16:28:31 +02:00
album = track.album
assert album
2023-07-18 14:25:14 +02:00
assert album.artist
2023-07-05 11:50:05 +02:00
if conf is None:
2023-07-18 14:25:14 +02:00
assert skip_downloaded is not None
2023-07-05 11:50:05 +02:00
assert errorfile is not None
assert name_template is not None
assert dest_dir is not None
else:
2023-07-18 14:25:14 +02:00
skip_downloaded = skip_downloaded or conf["skip_downloaded"]
2023-07-05 11:50:05 +02:00
errorfile = errorfile or conf["error_log"]
dest_dir = dest_dir or conf["dest_dir"]
name_template = name_template or conf["track_name"]
dest = dest_dir + clean_template(name_template, track=track)
2023-06-29 23:57:07 +02:00
print(f"Starting {album.artist.name} - {track.name}")
http_failures = 0
while http_failures <= 3:
try:
stream = track.stream()
manifest = json.loads(b64decode(stream.manifest))
url = manifest["urls"][0]
codec = manifest["codecs"]
if ".mp4" in url:
if "ac4" in codec or "mha1" in codec:
dest += ".mp4"
2023-06-28 23:24:32 +02:00
else:
2023-06-29 23:57:07 +02:00
dest += ".m4a"
else:
for ext in extensions:
if ext in url:
dest += ext
break
2023-07-18 14:25:14 +02:00
if os.path.exists(dest) and skip_downloaded:
print("Skipping track\n")
if sleep:
t = random.randrange(750, 1500) / 1000
time.sleep(t)
2023-06-28 23:24:32 +02:00
return
2023-06-29 23:57:07 +02:00
assert track.name and album.name
with io.BytesIO() as b:
2023-07-12 17:45:04 +02:00
print("Downloading track")
2023-06-29 23:57:07 +02:00
key_id = manifest.get("keyId", None)
mime = __download_file(url, b)
if key_id:
2023-07-12 17:45:04 +02:00
print("Decrypting track")
2023-06-29 23:57:07 +02:00
__decrypt_file(b, *__decode_key_id(key_id))
metadata.write(
b,
mime,
track.name,
album.name,
str(track.track_num),
str(album.num_tracks),
)
with open(dest, "wb") as f:
data = b.getvalue()
f.write(data)
print()
2023-07-18 14:25:14 +02:00
if sleep:
t = random.randrange(1000, 5000) / 1000
time.sleep(t)
2023-06-29 23:57:07 +02:00
break
except requests.HTTPError:
http_failures += 1
2023-07-18 14:25:14 +02:00
t = random.randrange(10000, 20000) / 1000
time.sleep(t)
except KeyboardInterrupt as e:
raise e
2023-07-18 14:25:14 +02:00
except Exception:
2023-06-29 23:57:07 +02:00
log_error(
2023-07-05 11:50:05 +02:00
errorfile or "error.log",
2023-06-29 23:57:07 +02:00
"Failure while downloading {artist} - {track}",
artist=album.artist.name,
track=track.name,
2023-06-29 16:28:31 +02:00
)
2023-06-29 23:57:07 +02:00
break
2023-06-28 23:24:32 +02:00
2023-06-29 16:28:31 +02:00
def download_cover(
2023-07-05 11:50:05 +02:00
obj: tidalapi.Album | tidalapi.Playlist,
conf: dict | None = None,
dest: str | None = None,
size: int | None = None,
skip_dl: bool | None = None,
2023-06-29 16:28:31 +02:00
) -> None:
2023-07-05 11:50:05 +02:00
if conf is None:
assert dest is not None
assert size is not None
assert skip_dl is not None
else:
if type(obj) is tidalapi.Album:
dest = clean_template(
conf["dest_dir"] + "/" + conf["album_dir"],
album=obj,
)
size = conf["album_image_size"]
elif type(obj) is tidalapi.Playlist:
dest = clean_template(
conf["dest_dir"] + "/" + conf["playlist_dir"],
playlist=obj,
)
size = conf["playlist_image_size"]
skip_dl = conf["skip_downloaded"]
assert dest
assert size
if os.path.exists(dest) and skip_dl:
2023-06-29 16:28:31 +02:00
return
url = obj.image(size)
2023-06-29 23:57:07 +02:00
with open(dest, "wb") as f:
2023-06-29 16:28:31 +02:00
__download_file(url, f)
2023-07-05 11:50:05 +02:00
def download_album(album: tidalapi.Album, conf: dict) -> None:
2023-06-29 23:57:07 +02:00
dest = clean_template(
conf["dest_dir"] + "/" + conf["album_dir"],
2023-06-28 23:24:32 +02:00
album=album,
)
2023-06-29 23:57:07 +02:00
os.makedirs(os.path.dirname(dest), exist_ok=True)
2023-07-05 11:50:05 +02:00
download_cover(album, conf)
2023-06-29 16:28:31 +02:00
tracks = album.tracks()
for track in tracks:
2023-07-18 14:25:14 +02:00
download_track(track, True, conf, dest_dir=dest)
2023-06-29 16:28:31 +02:00
2023-07-05 11:50:05 +02:00
def download_playlist(playlist: tidalapi.Playlist, conf: dict) -> None:
2023-06-29 23:57:07 +02:00
dest = clean_template(
conf["dest_dir"] + "/" + conf["playlist_dir"],
2023-06-29 16:28:31 +02:00
playlist=playlist,
)
2023-06-29 23:57:07 +02:00
os.makedirs(os.path.dirname(dest), exist_ok=True)
2023-07-05 11:50:05 +02:00
download_cover(playlist, conf)
2023-06-29 16:28:31 +02:00
tracks = playlist.tracks()
for track in tracks:
2023-07-18 14:25:14 +02:00
download_track(track, True, conf, dest_dir=dest)
2023-06-28 23:24:32 +02:00
2023-07-05 11:50:05 +02:00
def download_artist(artist: tidalapi.Artist, conf: dict) -> None:
2023-06-29 16:28:31 +02:00
albums = artist.get_albums()
for album in albums:
2023-07-05 11:50:05 +02:00
download_album(album, conf)