lots more changes
This commit is contained in:
parent
024c837d82
commit
6f137fd50d
14
TODO.md
14
TODO.md
|
@ -1,12 +1,4 @@
|
||||||
- [ ] allow downloads of lists of albums, artists and tracks, and set their downloaded status
|
|
||||||
- [ ] write all metadata from tidal to downloaded tracks
|
|
||||||
|
|
||||||
- [ ] get lists of favorited albums, artists and tracks
|
|
||||||
- [ ] allow export of these lists (with names, IDs and downloaded status)
|
|
||||||
- [ ] allow import of these lists
|
|
||||||
|
|
||||||
- [ ] allow configuration with a toml file
|
|
||||||
- [ ] allow configuration of download path
|
|
||||||
- [ ] write log of errors
|
|
||||||
|
|
||||||
- [ ] installer or pip package
|
- [ ] installer or pip package
|
||||||
|
- [ ] installer should create state and config homes if not existing
|
||||||
|
- [ ] proper SIGTERM handling
|
||||||
|
- [ ] decrypt and write in chunks
|
||||||
|
|
|
@ -7,7 +7,6 @@ quality = "lossless"
|
||||||
user_id =
|
user_id =
|
||||||
|
|
||||||
dest_dir = "./downloads/"
|
dest_dir = "./downloads/"
|
||||||
state_dir = "./state/"
|
|
||||||
# The following templates are passed an artist, album and track object.
|
# The following templates are passed an artist, album and track object.
|
||||||
# Possible attributes can be found here: https://tidalapi.netlify.app/api.html
|
# Possible attributes can be found here: https://tidalapi.netlify.app/api.html
|
||||||
# The artist is derived from the album a track is in rather than the track itself.
|
# The artist is derived from the album a track is in rather than the track itself.
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import metadata
|
import metadata
|
||||||
from helper import CONF, EXTENSIONS, clean_template, log_error
|
from helper import conf, extensions, clean_template, log_error
|
||||||
|
|
||||||
import tidalapi
|
import tidalapi
|
||||||
import os
|
import os
|
||||||
|
@ -36,103 +36,118 @@ def __decode_key_id(key_id: str) -> Tuple[bytes, bytes]:
|
||||||
def __decrypt_file(fp: BinaryIO, key: bytes, nonce: bytes) -> None:
|
def __decrypt_file(fp: BinaryIO, key: bytes, nonce: bytes) -> None:
|
||||||
counter = Counter.new(64, prefix=nonce, initial_value=0)
|
counter = Counter.new(64, prefix=nonce, initial_value=0)
|
||||||
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
|
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
|
||||||
data = decryptor.decrypt(fp)
|
fp.seek(0)
|
||||||
|
data = fp.read()
|
||||||
|
data = decryptor.decrypt(data)
|
||||||
fp.write(data)
|
fp.write(data)
|
||||||
|
|
||||||
|
|
||||||
def __download_file(url: str, fp: BinaryIO) -> None:
|
def __download_file(url: str, fp: BinaryIO) -> str:
|
||||||
r = requests.get(url, stream=True)
|
with requests.get(url, stream=True) as r:
|
||||||
|
if conf["debug"]:
|
||||||
|
print(r.headers)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
total_bytes = int(r.headers.get("content-length", 0))
|
mime = r.headers.get("Content-Type", "")
|
||||||
progress = tqdm(total=total_bytes, unit="iB", unit_scale=True)
|
total_bytes = int(r.headers.get("Content-Length", 0))
|
||||||
|
with tqdm(total=total_bytes, unit="iB", unit_scale=True) as p:
|
||||||
for data in r.iter_content(1024):
|
for data in r.iter_content(1024):
|
||||||
fp.write(data)
|
fp.write(data)
|
||||||
progress.update(len(data))
|
p.update(len(data))
|
||||||
progress.close()
|
return mime
|
||||||
|
|
||||||
|
|
||||||
def download_track(track: tidalapi.Track, dest_path: str) -> None:
|
def download_track(track: tidalapi.Track, dest: str) -> None:
|
||||||
album = track.album
|
album = track.album
|
||||||
assert album
|
assert album
|
||||||
dest_path += clean_template(CONF["track_name"], track=track)
|
print(f"Starting {album.artist.name} - {track.name}")
|
||||||
|
dest += clean_template(conf["track_name"], track=track)
|
||||||
|
http_failures = 0
|
||||||
|
while http_failures <= 3:
|
||||||
try:
|
try:
|
||||||
stream = track.stream()
|
stream = track.stream()
|
||||||
manifest = json.loads(b64decode(stream.manifest))
|
manifest = json.loads(b64decode(stream.manifest))
|
||||||
|
if conf["debug"]:
|
||||||
print(manifest)
|
print(manifest)
|
||||||
url = manifest["urls"][0]
|
url = manifest["urls"][0]
|
||||||
for ext in EXTENSIONS:
|
codec = manifest["codecs"]
|
||||||
if ext in url and ext is not ".mp4":
|
if ".mp4" in url:
|
||||||
dest_path += ext
|
if "ac4" in codec or "mha1" in codec:
|
||||||
elif ".mp4" in url:
|
dest += ".mp4"
|
||||||
if "ac4" in stream.codec or "mha1" in stream.codec:
|
|
||||||
dest_path += ".mp4"
|
|
||||||
else:
|
else:
|
||||||
dest_path += ".m4a"
|
dest += ".m4a"
|
||||||
if os.path.exists(dest_path + ext) and CONF["skip_downloaded"]:
|
else:
|
||||||
print(f"Skipping {album.artist.name} - {track.name}")
|
for ext in (x for x in extensions if x != ".mp4"):
|
||||||
|
dest += ext
|
||||||
|
if os.path.exists(dest) and conf["skip_downloaded"]:
|
||||||
|
print(f"Skipping track")
|
||||||
return
|
return
|
||||||
|
|
||||||
assert track.name and album.name
|
assert track.name and album.name
|
||||||
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
||||||
with io.BytesIO() as b:
|
with io.BytesIO() as b:
|
||||||
print(f"Downloading {album.artist.name} - {track.name}")
|
print(f"Downloading track")
|
||||||
key_id = manifest.get("keyId", None)
|
key_id = manifest.get("keyId", None)
|
||||||
__download_file(url, b)
|
mime = __download_file(url, b)
|
||||||
|
|
||||||
if key_id:
|
if key_id:
|
||||||
|
print(f"Decrypting track")
|
||||||
__decrypt_file(b, *__decode_key_id(key_id))
|
__decrypt_file(b, *__decode_key_id(key_id))
|
||||||
metadata.write(
|
metadata.write(
|
||||||
b,
|
b,
|
||||||
manifest["codecs"],
|
mime,
|
||||||
track.name,
|
track.name,
|
||||||
album.name,
|
album.name,
|
||||||
str(track.track_num),
|
str(track.track_num),
|
||||||
str(album.num_tracks),
|
str(album.num_tracks),
|
||||||
)
|
)
|
||||||
with open(dest_path, "wb") as f:
|
with open(dest, "wb") as f:
|
||||||
data = b.read()
|
data = b.getvalue()
|
||||||
f.write(data)
|
f.write(data)
|
||||||
|
print()
|
||||||
|
break
|
||||||
|
except requests.HTTPError:
|
||||||
|
http_failures += 1
|
||||||
except:
|
except:
|
||||||
log_error(
|
log_error(
|
||||||
"Failure while downloading {artist} - {track}",
|
"Failure while downloading {artist} - {track}",
|
||||||
artist=album.artist.name,
|
artist=album.artist.name,
|
||||||
track=track.name,
|
track=track.name,
|
||||||
)
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
def download_cover(
|
def download_cover(
|
||||||
obj: tidalapi.Album | tidalapi.Playlist, dest_path: str, size: int
|
obj: tidalapi.Album | tidalapi.Playlist, dest: str, size: int
|
||||||
) -> None:
|
) -> None:
|
||||||
if os.path.exists(dest_path) and CONF["skip_downloaded"]:
|
if os.path.exists(dest) and conf["skip_downloaded"]:
|
||||||
return
|
return
|
||||||
|
|
||||||
url = obj.image(size)
|
url = obj.image(size)
|
||||||
with open(dest_path, "wb") as f:
|
with open(dest, "wb") as f:
|
||||||
__download_file(url, f)
|
__download_file(url, f)
|
||||||
|
|
||||||
|
|
||||||
def download_album(album: tidalapi.Album) -> None:
|
def download_album(album: tidalapi.Album) -> None:
|
||||||
dest_path = clean_template(
|
dest = clean_template(
|
||||||
CONF["dest_dir"] + CONF["album_dir"],
|
conf["dest_dir"] + "/" + conf["album_dir"],
|
||||||
album=album,
|
album=album,
|
||||||
artist=album.artist,
|
artist=album.artist,
|
||||||
)
|
)
|
||||||
download_cover(album, dest_path, CONF["album_image_size"])
|
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
||||||
|
download_cover(album, dest, conf["album_image_size"])
|
||||||
tracks = album.tracks()
|
tracks = album.tracks()
|
||||||
for track in tracks:
|
for track in tracks:
|
||||||
download_track(track, dest_path)
|
download_track(track, dest)
|
||||||
|
|
||||||
|
|
||||||
def download_playlist(playlist: tidalapi.Playlist) -> None:
|
def download_playlist(playlist: tidalapi.Playlist) -> None:
|
||||||
dest_path = clean_template(
|
dest = clean_template(
|
||||||
CONF["dest_dir"] + CONF["playlist_dir"],
|
conf["dest_dir"] + "/" + conf["playlist_dir"],
|
||||||
playlist=playlist,
|
playlist=playlist,
|
||||||
)
|
)
|
||||||
download_cover(playlist, dest_path, CONF["playlist_image_size"])
|
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
||||||
|
download_cover(playlist, dest, conf["playlist_image_size"])
|
||||||
tracks = playlist.tracks()
|
tracks = playlist.tracks()
|
||||||
for track in tracks:
|
for track in tracks:
|
||||||
download_track(track, dest_path)
|
download_track(track, dest)
|
||||||
|
|
||||||
|
|
||||||
def download_artist(artist: tidalapi.Artist) -> None:
|
def download_artist(artist: tidalapi.Artist) -> None:
|
||||||
|
|
|
@ -1,20 +1,37 @@
|
||||||
import re
|
import re
|
||||||
|
import os
|
||||||
import tomllib
|
import tomllib
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
with open("../config.toml", "rb") as conf:
|
extensions = [".flac", ".mp4", ".m4a", ""]
|
||||||
CONF = tomllib.load(conf)
|
|
||||||
|
home = os.getenv("HOME")
|
||||||
|
state_dir = os.getenv("XDG_STATE_HOME") or os.getenv("XDG_CACHE_HOME")
|
||||||
|
conf_dir = os.getenv("XDG_CONFIG_HOME")
|
||||||
|
if not state_dir:
|
||||||
|
assert home
|
||||||
|
state_dir = home + "/.cache"
|
||||||
|
|
||||||
|
if not conf_dir:
|
||||||
|
assert home
|
||||||
|
conf_dir = home + "/.config"
|
||||||
|
conf_dir += "/tidal-scraper"
|
||||||
|
state_dir += "/tidal-scraper"
|
||||||
|
|
||||||
|
with open(conf_dir + "/conf.toml", "rb") as f:
|
||||||
|
conf = tomllib.load(f)
|
||||||
|
|
||||||
EXTENSIONS = ['.flac', '.mp4', '.m4a', '']
|
|
||||||
|
|
||||||
def clean_template(path: str, **kwargs) -> str:
|
def clean_template(path: str, **kwargs) -> str:
|
||||||
|
path = os.path.expanduser(path)
|
||||||
split = path.split("/")
|
split = path.split("/")
|
||||||
cleaned_split = [re.sub("/", " ", s.format(**kwargs)) for s in split]
|
cleaned_split = [re.sub("/", " ", s.format(**kwargs)) for s in split]
|
||||||
return "/".join(cleaned_split)
|
return "/".join(cleaned_split)
|
||||||
|
|
||||||
|
|
||||||
def log_error(template: str, **kwargs):
|
def log_error(template: str, **kwargs):
|
||||||
with open(CONF['error_log']) as f:
|
with open(conf["error_log"], "a") as f:
|
||||||
error = template.format(**kwargs)
|
msg = template.format(**kwargs)
|
||||||
f.write(error)
|
f.write(msg + "\n\n\n")
|
||||||
traceback.print_exception(*sys.exc_info(), file=f)
|
traceback.format_exception(*sys.exc_info())
|
||||||
|
|
|
@ -103,7 +103,7 @@ def __write_mp4(file: mp4.MP4, **kwargs) -> None:
|
||||||
|
|
||||||
def write(
|
def write(
|
||||||
fp: BinaryIO,
|
fp: BinaryIO,
|
||||||
codec: str,
|
mime: str,
|
||||||
title: str,
|
title: str,
|
||||||
album: str,
|
album: str,
|
||||||
tracknumber: str,
|
tracknumber: str,
|
||||||
|
@ -122,14 +122,13 @@ def write(
|
||||||
cover_mime: str | None = None,
|
cover_mime: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
args = locals()
|
args = locals()
|
||||||
# TODO: Figure out what codecs are sent in the manifest
|
fp.seek(0)
|
||||||
# WARN: This match is currently using placeholders
|
match mime:
|
||||||
match codec:
|
case "audio/flac":
|
||||||
case "flac":
|
|
||||||
f = flac.FLAC(fp)
|
f = flac.FLAC(fp)
|
||||||
__write_flac(f, *args)
|
__write_flac(f, *args)
|
||||||
case "aac":
|
case "audio/mp4":
|
||||||
f = mp4.MP4(fp)
|
f = mp4.MP4(fp)
|
||||||
__write_mp4(f, *args)
|
__write_mp4(f, *args)
|
||||||
case _:
|
case _:
|
||||||
raise Exception(f"Couldn't recognize codec {codec}")
|
raise Exception(f"Couldn't recognize mimetype {mime}")
|
||||||
|
|
|
@ -1,14 +1,17 @@
|
||||||
#!/bin/env python
|
#!/bin/env python
|
||||||
from download import download_album
|
from download import download_album
|
||||||
from state import State
|
from state import State
|
||||||
from helper import CONF
|
from helper import conf
|
||||||
|
|
||||||
s = State(CONF['user_id'], CONF['quality'])
|
|
||||||
|
|
||||||
|
s = State(conf['user_id'], conf['quality'])
|
||||||
s.login()
|
s.login()
|
||||||
|
|
||||||
albums = s.favorites.albums()
|
albums = s.favorites.albums()
|
||||||
|
|
||||||
|
try:
|
||||||
|
s.load_dl_state
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
download_album(albums[0])
|
download_album(albums[0])
|
||||||
s.set_dl_state(albums[0], True)
|
s.set_dl_state(albums[0], True)
|
||||||
s.write_state()
|
s.write_dl_state()
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from tidalapi import session, user, playlist, media, album, artist
|
from tidalapi import session, user, playlist, media, album, artist
|
||||||
from helper import CONF
|
from helper import conf, state_dir
|
||||||
|
|
||||||
|
|
||||||
class State:
|
class State:
|
||||||
def __init__(self, user_id: int, quality: str):
|
def __init__(
|
||||||
|
self, user_id: int, quality: str, dl_state_path: str = state_dir + "/state.json"
|
||||||
|
):
|
||||||
match quality:
|
match quality:
|
||||||
case "master":
|
case "master":
|
||||||
q = session.Quality.master
|
q = session.Quality.master
|
||||||
|
@ -20,7 +22,10 @@ class State:
|
||||||
config = session.Config(quality=q)
|
config = session.Config(quality=q)
|
||||||
self.user_id = user_id
|
self.user_id = user_id
|
||||||
self.session = session.Session(config)
|
self.session = session.Session(config)
|
||||||
self.favorites = user.Favorites(self.session, CONF["user_id"])
|
self.favorites = user.Favorites(self.session, conf["user_id"])
|
||||||
|
try:
|
||||||
|
self.load_dl_state(dl_state_path)
|
||||||
|
except:
|
||||||
self._state = {
|
self._state = {
|
||||||
"albums": {},
|
"albums": {},
|
||||||
"artists": {},
|
"artists": {},
|
||||||
|
@ -28,7 +33,7 @@ class State:
|
||||||
"tracks": {},
|
"tracks": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
def login(self, auth_file: str | None = CONF["state_dir"] + "auth.json") -> None:
|
def login(self, auth_file: str | None = state_dir + "/auth.json") -> None:
|
||||||
s = self.session
|
s = self.session
|
||||||
try:
|
try:
|
||||||
assert auth_file
|
assert auth_file
|
||||||
|
@ -40,7 +45,7 @@ class State:
|
||||||
a["refresh_token"],
|
a["refresh_token"],
|
||||||
datetime.fromtimestamp(a["expiry_time"]),
|
datetime.fromtimestamp(a["expiry_time"]),
|
||||||
)
|
)
|
||||||
except (OSError, IndexError, AssertionError):
|
except (FileNotFoundError, IndexError, AssertionError):
|
||||||
s.login_oauth_simple()
|
s.login_oauth_simple()
|
||||||
if (
|
if (
|
||||||
s.token_type
|
s.token_type
|
||||||
|
@ -79,14 +84,19 @@ class State:
|
||||||
|
|
||||||
self._state[t][obj.id] = downloaded
|
self._state[t][obj.id] = downloaded
|
||||||
|
|
||||||
def write_state(
|
def write_dl_state(self, dl_state_path: str | None = None) -> None:
|
||||||
self, state_file_path: str = CONF["state_dir"] + "state.json"
|
if dl_state_path is None:
|
||||||
) -> None:
|
dl_state_path = state_dir + "/state.json"
|
||||||
with open(state_file_path, "w") as f:
|
with open(dl_state_path, "w") as f:
|
||||||
json.dump(self._state, f)
|
json.dump(self._state, f)
|
||||||
|
|
||||||
def load_state(
|
def load_dl_state(self, dl_state_path: str | None = None) -> None:
|
||||||
self, state_file_path: str = CONF["state_dir"] + "state.json"
|
if dl_state_path is None:
|
||||||
) -> None:
|
dl_state_path = state_dir + "/state.json"
|
||||||
with open(state_file_path, "r") as f:
|
with open(dl_state_path, "r") as f:
|
||||||
self._state = json.load(f)
|
self._state = json.load(f)
|
||||||
|
|
||||||
|
assert type(self._state["albums"]) is dict[int, bool]
|
||||||
|
assert type(self._state["artists"]) is dict[int, bool]
|
||||||
|
assert type(self._state["playlists"]) is dict[int, bool]
|
||||||
|
assert type(self._state["tracks"]) is dict[int, bool]
|
||||||
|
|
Loading…
Reference in New Issue