2
0
Fork 0

fix bugs, add cli tool, create package

This commit is contained in:
Luca Bilke 2023-07-05 11:50:05 +02:00
commit 2ac8a4e1bc
No known key found for this signature in database
GPG key ID: 7B77C51E8C779E75
11 changed files with 281 additions and 156 deletions

117
tidal_scraper/cli.py Normal file
View file

@ -0,0 +1,117 @@
from tidalapi import Album, Track, Artist, Playlist
from tidal_scraper.download import (
download_album,
download_playlist,
download_track,
download_artist,
)
from tidal_scraper.state import State
from tidal_scraper.helper import get_conf, human_sleep
from argparse import ArgumentParser, Namespace
from pathlib import Path
def handle_favorites(state: State, conf: dict, args: Namespace) -> None:
match args.obj:
case "album":
for album in state.favorites.albums():
download_album(album, conf)
human_sleep()
case "track":
for track in state.favorites.tracks():
download_track(track, conf)
human_sleep()
case "artist":
for artist in state.favorites.artists():
download_artist(artist, conf)
human_sleep()
case "playlist":
for playlist in state.favorites.playlists():
download_playlist(playlist, conf)
human_sleep()
def handle_id(state: State, conf: dict, args: Namespace) -> None:
match args.obj:
case "album":
album = Album(state.session, args.id)
download_album(album, conf)
case "track":
track = Track(state.session, args.id)
download_track(track, conf)
case "artist":
artist = Artist(state.session, args.id)
download_artist(artist, conf)
case "playlist":
playlist = Playlist(state.session, args.id)
download_playlist(playlist, conf)
def run():
parser = ArgumentParser(prog="tidal-scraper", description="Tidal music downloader")
parser.add_argument(
"-c", "--config", help="Configuration file to use", type=Path, dest="conf_file"
)
parser.add_argument(
"-s", "--state", help="Directory to keep state in", type=Path, dest="state_dir"
)
obj = parser.add_mutually_exclusive_group(required=True)
obj.add_argument(
"-a",
"--album",
action="store_const",
const="album",
help="Download an album",
dest="obj",
)
obj.add_argument(
"-A",
"--artist",
action="store_const",
const="artist",
help="Download an artist's albums",
dest="obj",
)
obj.add_argument(
"-p",
"--playlist",
action="store_const",
const="playlist",
help="Download a playlist",
dest="obj",
)
obj.add_argument(
"-t",
"--track",
action="store_const",
const="track",
help="Download a single track",
dest="obj",
)
source = parser.add_mutually_exclusive_group(required=True)
source.add_argument(
"-f",
"--favorite",
action="store_true",
help="Download all favorited",
dest="favorite",
)
source.add_argument(
"-i",
"--id",
type=int,
help="Download by ID",
dest="id",
)
args = parser.parse_args()
conf = get_conf(args.state_dir, args.conf_file)
state = State(conf["user_id"], conf["quality"], conf["state_dir"])
state.login(conf["state_dir"] + "auth.json")
if args.favorite:
handle_favorites(state, conf, args)
elif args.id is not None:
handle_id(state, conf, args)
state.write_dl_state(conf["state_dir"] + "state.json")

204
tidal_scraper/download.py Normal file
View file

@ -0,0 +1,204 @@
import tidal_scraper.metadata as metadata
from tidal_scraper.helper import extensions, clean_template, log_error, human_sleep
import tidalapi
import os
import json
import requests
import io
from tqdm import tqdm
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Util import Counter
from typing import Tuple
from typing import BinaryIO
# MASTER_KEY = b64decode("UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754=")
MASTER_KEY = (
b"P\x89SLC&\x98\xb7\xc6\xa3\n?P.\xb4\xc7a\xf8\xe5n\x8cth\x13E\xfa?\xbah8\xef\x9e"
)
def __decode_key_id(key_id: str) -> Tuple[bytes, bytes]:
decoded_key_id = b64decode(key_id)
init_vector = decoded_key_id[:16]
encrypted_token = decoded_key_id[16:]
decryptor = AES.new(MASTER_KEY, AES.MODE_CBC, init_vector)
decrypted_token = decryptor.decrypt(encrypted_token)
key = decrypted_token[:16]
nonce = decrypted_token[16:24]
return key, nonce
def __decrypt_file(fp: BinaryIO, key: bytes, nonce: bytes) -> None:
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
fp.seek(0)
data = fp.read()
data = decryptor.decrypt(data)
fp.write(data)
def __download_file(url: str, fp: BinaryIO) -> str:
with requests.get(url, stream=True) as r:
r.raise_for_status()
mime = r.headers.get("Content-Type", "")
total_bytes = int(r.headers.get("Content-Length", 0))
with tqdm(total=total_bytes, unit="iB", unit_scale=True) as p:
for data in r.iter_content(1024):
fp.write(data)
p.update(len(data))
return mime
def download_track(
track: tidalapi.Track,
conf: dict | None = None,
name_template: str | None = None,
dest_dir: str | None = None,
skip_dl: bool | None = None,
errorfile: str | None = None,
) -> None:
album = track.album
assert album
if conf is None:
assert skip_dl is not None
assert errorfile is not None
assert name_template is not None
assert dest_dir is not None
else:
skip_dl = skip_dl or conf["skip_downloaded"]
errorfile = errorfile or conf["error_log"]
dest_dir = dest_dir or conf["dest_dir"]
name_template = name_template or conf["track_name"]
dest = dest_dir + clean_template(name_template, track=track)
print(f"Starting {album.artist.name} - {track.name}")
http_failures = 0
while http_failures <= 3:
try:
print("running")
stream = track.stream()
manifest = json.loads(b64decode(stream.manifest))
url = manifest["urls"][0]
codec = manifest["codecs"]
if ".mp4" in url:
if "ac4" in codec or "mha1" in codec:
dest += ".mp4"
else:
dest += ".m4a"
else:
for ext in extensions:
if ext in url:
dest += ext
break
if os.path.exists(dest) and skip_dl:
print(f"Skipping track")
return
assert track.name and album.name
with io.BytesIO() as b:
print(f"Downloading track")
key_id = manifest.get("keyId", None)
mime = __download_file(url, b)
if key_id:
print(f"Decrypting track")
__decrypt_file(b, *__decode_key_id(key_id))
metadata.write(
b,
mime,
track.name,
album.name,
str(track.track_num),
str(album.num_tracks),
)
with open(dest, "wb") as f:
data = b.getvalue()
f.write(data)
print()
break
except requests.HTTPError:
http_failures += 1
except KeyboardInterrupt as e:
raise e
except Exception as e:
log_error(
errorfile or "error.log",
"Failure while downloading {artist} - {track}",
artist=album.artist.name,
track=track.name,
)
break
def download_cover(
obj: tidalapi.Album | tidalapi.Playlist,
conf: dict | None = None,
dest: str | None = None,
size: int | None = None,
skip_dl: bool | None = None,
) -> None:
if conf is None:
assert dest is not None
assert size is not None
assert skip_dl is not None
else:
if type(obj) is tidalapi.Album:
dest = clean_template(
conf["dest_dir"] + "/" + conf["album_dir"],
album=obj,
)
size = conf["album_image_size"]
elif type(obj) is tidalapi.Playlist:
dest = clean_template(
conf["dest_dir"] + "/" + conf["playlist_dir"],
playlist=obj,
)
size = conf["playlist_image_size"]
skip_dl = conf["skip_downloaded"]
assert dest
assert size
if os.path.exists(dest) and skip_dl:
return
url = obj.image(size)
with open(dest, "wb") as f:
__download_file(url, f)
def download_album(album: tidalapi.Album, conf: dict) -> None:
dest = clean_template(
conf["dest_dir"] + "/" + conf["album_dir"],
album=album,
)
os.makedirs(os.path.dirname(dest), exist_ok=True)
download_cover(album, conf)
tracks = album.tracks()
for track in tracks:
download_track(track, conf, dest_dir=dest)
human_sleep()
def download_playlist(playlist: tidalapi.Playlist, conf: dict) -> None:
dest = clean_template(
conf["dest_dir"] + "/" + conf["playlist_dir"],
playlist=playlist,
)
os.makedirs(os.path.dirname(dest), exist_ok=True)
download_cover(playlist, conf)
tracks = playlist.tracks()
for track in tracks:
download_track(track, conf, dest_dir=dest)
human_sleep()
def download_artist(artist: tidalapi.Artist, conf: dict) -> None:
albums = artist.get_albums()
for album in albums:
download_album(album, conf)
human_sleep()

50
tidal_scraper/helper.py Normal file
View file

@ -0,0 +1,50 @@
import re
import os
import time
import random
import tomllib
import sys
import traceback
extensions = [".flac", ".mp4", ".m4a", ""]
def get_conf(state_dir: str | None = None, conf_file: str | None = None) -> dict:
home = os.getenv("HOME")
assert home
conf_file = (os.getenv("XDG_CONFIG_HOME") or home + "/.config") + "/tidal-scraper/conf.toml"
with open(conf_file, "rb") as f:
conf = tomllib.load(f)
if not state_dir:
state_dir = (
os.getenv("XDG_STATE_HOME")
or os.getenv("XDG_CACHE_HOME")
or home + "/.cache"
)
state_dir += "/tidal-scraper"
conf["state_dir"] = state_dir
return conf
def clean_template(path: str, **kwargs) -> str:
path = os.path.expanduser(path)
split = path.split("/")
cleaned_split = [re.sub("/", " ", s.format(**kwargs)) for s in split]
return "/".join(cleaned_split)
def log_error(logfile: str, template: str, **kwargs):
with open(logfile, "a") as f:
msg = template.format(**kwargs)
f.write(msg + "\n")
traceback.format_exception(*sys.exc_info())
f.write("\n\n")
def human_sleep() -> None:
t = random.randrange(10, 50) / 10
time.sleep(t)

84
tidal_scraper/metadata.py Normal file
View file

@ -0,0 +1,84 @@
from mutagen import flac, mp4
from mutagen.mp4 import MP4Tags
from mutagen._vorbis import VCommentDict
from typing import BinaryIO
def __write_flac(file: flac.FLAC, **kwargs) -> None:
tags = VCommentDict()
tags["title"] = kwargs["title"]
tags["album"] = kwargs["album"]
tags["albumartist"] = kwargs["albumartist"]
tags["artist"] = kwargs["artist"]
tags["copyright"] = kwargs["copyright"]
tags["tracknumber"] = kwargs["tracknumber"]
tags["tracktotal"] = kwargs["tracktotal"]
tags["discnumber"] = kwargs["discnumber"]
tags["disctotal"] = kwargs["disctotal"]
tags["genre"] = kwargs["genre"]
tags["date"] = kwargs["date"]
tags["composer"] = kwargs["composer"]
tags["isrc"] = kwargs["isrc"]
tags["lyrics"] = kwargs["lyrics"]
file.tags = tags
pic = flac.Picture()
pic.data = kwargs["cover"]
pic.mime = kwargs["cover_mime"]
file.clear_pictures()
file.add_picture(pic)
file.save()
def __write_mp4(file: mp4.MP4, **kwargs) -> None:
tags = MP4Tags()
tags["\xa9nam"] = kwargs["title"]
tags["\xa9alb"] = kwargs["album"]
tags["aART"] = kwargs["albumartist"]
tags["\xa9ART"] = kwargs["artist"]
tags["cprt"] = kwargs["copyright"]
tags["trkn"] = [[kwargs["tracknumber"], kwargs["totaltrack"]]]
tags["disk"] = [[kwargs["discnumber"], kwargs["totaldisc"]]]
tags["\xa9gen"] = kwargs["genre"]
tags["\xa9day"] = kwargs["date"]
tags["\xa9wrt"] = kwargs["composer"]
tags["\xa9lyr"] = kwargs["lyrics"]
file.tags = tags
file.save
def write(
fp: BinaryIO,
mime: str,
title: str,
album: str,
tracknumber: str,
tracktotal: str,
discnumber: str = "",
disctotal: str = "",
artist: list[str] = [""],
albumartist: list[str] = [""],
genre: str = "",
date: str = "",
composer: str = "",
isrc: str = "",
lyrics: str = "",
copyright: str = "",
cover: bytes | None = None,
cover_mime: str | None = None,
) -> None:
args = locals()
fp.seek(0)
match mime:
case "audio/flac":
f = flac.FLAC(fp)
__write_flac(f, *args)
case "audio/mp4":
f = mp4.MP4(fp)
__write_mp4(f, *args)
case _:
raise Exception(f"Couldn't recognize mimetype {mime}")

95
tidal_scraper/state.py Normal file
View file

@ -0,0 +1,95 @@
import json
from datetime import datetime
from tidalapi import session, user, playlist, media, album, artist
class State:
def __init__(self, user_id: int, quality: str, dl_state_path: str):
match quality:
case "master":
q = session.Quality.master
case "lossless":
q = session.Quality.lossless
case "high":
q = session.Quality.high
case "low":
q = session.Quality.low
case _:
raise Exception("Bad Quality String")
config = session.Config(quality=q)
self.user_id = user_id
self.session = session.Session(config)
self.favorites = user.Favorites(self.session, user_id)
try:
self.load_dl_state(dl_state_path)
except:
self._state = {
"albums": {},
"artists": {},
"playlists": {},
"tracks": {},
}
def login(self, auth_file: str | None = None) -> None:
s = self.session
try:
assert auth_file
with open(auth_file, "r") as f:
a = json.load(f)
s.load_oauth_session(
a["token_type"],
a["access_token"],
a["refresh_token"],
datetime.fromtimestamp(a["expiry_time"]),
)
except (FileNotFoundError, IndexError, AssertionError):
s.login_oauth_simple()
if (
s.token_type
and s.access_token
and s.refresh_token
and s.expiry_time
and auth_file
):
data = {
"token_type": s.token_type,
"access_token": s.access_token,
"refresh_token": s.refresh_token,
"expiry_time": s.expiry_time.timestamp(),
}
with open(auth_file, "w") as f:
json.dump(data, f)
assert self.session.check_login()
def set_dl_state(
self,
obj: playlist.Playlist | media.Track | album.Album | artist.Artist,
downloaded: bool,
) -> None:
match type(obj):
case album.Album:
t = "albums"
case artist.Artist:
t = "artists"
case playlist.Playlist:
t = "playlists"
case media.Track:
t = "tracks"
case _:
raise Exception("Incorrect object type received")
self._state[t][obj.id] = downloaded
def write_dl_state(self, statefile: str) -> None:
with open(statefile, "w") as f:
json.dump(self._state, f)
def load_dl_state(self, statefile: str) -> None:
with open(statefile, "r") as f:
self._state = json.load(f)
assert type(self._state["albums"]) is dict[int, bool]
assert type(self._state["artists"]) is dict[int, bool]
assert type(self._state["playlists"]) is dict[int, bool]
assert type(self._state["tracks"]) is dict[int, bool]