2
0
Fork 0

lots more work

This commit is contained in:
Luca Bilke 2023-06-29 16:28:31 +02:00
parent 61de0b46d5
commit 024c837d82
8 changed files with 209 additions and 122 deletions

View File

@ -4,12 +4,18 @@ error_log = "error.log"
# Quality can be one of "master, lossless, high and low" # Quality can be one of "master, lossless, high and low"
# Keep in mind that you can only get the quality included in your tidal sub # Keep in mind that you can only get the quality included in your tidal sub
quality = "lossless" quality = "lossless"
user_id = 188721652 user_id =
dest_dir = "./downloads/" dest_dir = "./downloads/"
state_dir = "./state/"
# The following templates are passed an artist, album and track object. # The following templates are passed an artist, album and track object.
# Possible attributes can be found here: https://tidalapi.netlify.app/api.html # Possible attributes can be found here: https://tidalapi.netlify.app/api.html
# The artist is derived from the album a track is in rather than the track itself. # The artist is derived from the album a track is in rather than the track itself.
dl_dir ="/tmp/tidal-scraper/{track.name}.part"
album_dir = "{artist.name}/{album.name}/" album_dir = "{artist.name}/{album.name}/"
playlist_dir = "{playlist.name}/"
track_name = "{track.track_num}: {track.name}" track_name = "{track.track_num}: {track.name}"
# One of 160, 320, 480, 640, 750, 1080
playlist_image_size = 1080
# One of 80, 160, 320, 640, 1280
album_image_size = 1280

View File

@ -1,3 +1,4 @@
git+https://github.com/tamland/python-tidal git+https://github.com/tamland/python-tidal
pycrypto pycrypto
tqdm tqdm
mutagen

View File

@ -1,58 +0,0 @@
import json
from tidalapi import session, user, playlist, media, album, artist
from helper import CONF
ALBUM = 1
ARTIST = 2
PLAYLIST = 3
TRACK = 4
class account:
def __init__(self, user_id: int, quality: str):
match quality:
case "master":
q = session.Quality.master
case "lossless":
q = session.Quality.lossless
case "high":
q = session.Quality.high
case "low":
q = session.Quality.low
case _:
raise Exception("Quality misconfigured in conf.toml")
config = session.Config(quality=q)
self.user_id = user_id
self.session = session.Session(config)
self.favorites = user.Favorites(self.session, CONF['user_id'])
self._state = {
"albums": {},
"artists": {},
"playlists": {},
"tracks": {},
}
@staticmethod
def create_obj_state(
obj: playlist.Playlist | media.Track | album.Album | artist.Artist,
downloaded: bool = False,
) -> dict[str, int | bool]:
match type(obj):
case album.Album:
obj_type = ALBUM
case artist.Artist:
obj_type = ARTIST
case playlist.Playlist:
obj_type = PLAYLIST
case media.Track:
obj_type = TRACK
case _:
raise Exception("Incorrect object type received")
return {
"id": obj.id,
"type": obj_type,
"downloaded": downloaded,
}

View File

@ -1,25 +1,30 @@
import helper
from helper import CONF, EXTENSIONS
import metadata import metadata
from helper import CONF, EXTENSIONS, clean_template, log_error
import tidalapi import tidalapi
import os import os
import json import json
import requests
import io
from tqdm import tqdm
from base64 import b64decode from base64 import b64decode
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Util import Counter from Crypto.Util import Counter
from typing import Tuple, BinaryIO from typing import Tuple
from shutil import move from typing import BinaryIO
# MASTER_KEY = b64decode("UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754=")
MASTER_KEY = (
b"P\x89SLC&\x98\xb7\xc6\xa3\n?P.\xb4\xc7a\xf8\xe5n\x8cth\x13E\xfa?\xbah8\xef\x9e"
)
def __decode_key_id(key_id) -> Tuple[bytes, bytes]: def __decode_key_id(key_id: str) -> Tuple[bytes, bytes]:
master_key = b64decode("UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754=")
decoded_key_id = b64decode(key_id) decoded_key_id = b64decode(key_id)
init_vector = decoded_key_id[:16] init_vector = decoded_key_id[:16]
encrypted_token = decoded_key_id[16:] encrypted_token = decoded_key_id[16:]
decryptor = AES.new(master_key, AES.MODE_CBC, init_vector) decryptor = AES.new(MASTER_KEY, AES.MODE_CBC, init_vector)
decrypted_token = decryptor.decrypt(encrypted_token) decrypted_token = decryptor.decrypt(encrypted_token)
key = decrypted_token[:16] key = decrypted_token[:16]
@ -28,35 +33,34 @@ def __decode_key_id(key_id) -> Tuple[bytes, bytes]:
return key, nonce return key, nonce
def __decrypt_file( def __decrypt_file(fp: BinaryIO, key: bytes, nonce: bytes) -> None:
input_file: BinaryIO, output_file: BinaryIO, key: bytes, nonce: bytes
) -> None:
counter = Counter.new(64, prefix=nonce, initial_value=0) counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter) decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
data = decryptor.decrypt(input_file.read()) data = decryptor.decrypt(fp)
output_file.write(data) fp.write(data)
def set_metadata(): def __download_file(url: str, fp: BinaryIO) -> None:
pass r = requests.get(url, stream=True)
r.raise_for_status()
total_bytes = int(r.headers.get("content-length", 0))
progress = tqdm(total=total_bytes, unit="iB", unit_scale=True)
for data in r.iter_content(1024):
fp.write(data)
progress.update(len(data))
progress.close()
def download_track( def download_track(track: tidalapi.Track, dest_path: str) -> None:
track: tidalapi.Track, album = track.album
album: tidalapi.Album, assert album
) -> None: dest_path += clean_template(CONF["track_name"], track=track)
dl_path = helper.clean_template(CONF["dl_dir"])
dest_path = helper.clean_template(
CONF["dest_dir"] + CONF["album_dir"] + CONF["track_name"],
track=track,
album=album,
artist=album.artist,
)
try: try:
stream = track.stream() stream = track.stream()
manifest = json.loads(b64decode(stream.manifest)) manifest = json.loads(b64decode(stream.manifest))
print(manifest)
url = manifest["urls"][0] url = manifest["urls"][0]
codec = manifest['codecs']
for ext in EXTENSIONS: for ext in EXTENSIONS:
if ext in url and ext is not ".mp4": if ext in url and ext is not ".mp4":
dest_path += ext dest_path += ext
@ -66,45 +70,72 @@ def download_track(
else: else:
dest_path += ".m4a" dest_path += ".m4a"
if os.path.exists(dest_path + ext) and CONF["skip_downloaded"]: if os.path.exists(dest_path + ext) and CONF["skip_downloaded"]:
print("Skipping downloaded song") print(f"Skipping {album.artist.name} - {track.name}")
return return
key_id = manifest.get("keyId", None) assert track.name and album.name
os.makedirs(os.path.dirname(dest_path), exist_ok=True) os.makedirs(os.path.dirname(dest_path), exist_ok=True)
with open(dest_path, "wb") as f: with io.BytesIO() as b:
# TODO: DOWNLOAD print(f"Downloading {album.artist.name} - {track.name}")
pass key_id = manifest.get("keyId", None)
__download_file(url, b)
if key_id: if key_id:
key, nonce = __decode_key_id(key_id) __decrypt_file(b, *__decode_key_id(key_id))
with open(dl_path, "rb") as i, open(dest_path, "wb") as o: metadata.write(
__decrypt_file(i, o, key, nonce) b,
else: manifest["codecs"],
move(dl_path, dest_path) track.name,
album.name,
assert track.name is not None str(track.track_num),
assert album.name is not None str(album.num_tracks),
metadata.write(f, codec, track.name, album.name, str(track.track_num), str(album.num_tracks)) )
with open(dest_path, "wb") as f:
data = b.read()
f.write(data)
except: except:
assert album.artist is not None log_error(
helper.log_error( "Failure while downloading {artist} - {track}",
"Failure while downloading {artist} - {album} - {track}:",
artist=album.artist.name, artist=album.artist.name,
album=album.name,
track=track.name, track=track.name,
) )
# stack trace is printed by log_error()
def download_cover(album: tidalapi.Album) -> None: def download_cover(
dest_path = helper.clean_template( obj: tidalapi.Album | tidalapi.Playlist, dest_path: str, size: int
) -> None:
if os.path.exists(dest_path) and CONF["skip_downloaded"]:
return
url = obj.image(size)
with open(dest_path, "wb") as f:
__download_file(url, f)
def download_album(album: tidalapi.Album) -> None:
dest_path = clean_template(
CONF["dest_dir"] + CONF["album_dir"], CONF["dest_dir"] + CONF["album_dir"],
album=album, album=album,
artist=album.artist, artist=album.artist,
) )
url = album.image(1280) download_cover(album, dest_path, CONF["album_image_size"])
tracks = album.tracks()
for track in tracks:
download_track(track, dest_path)
if os.path.exists(dest_path) and CONF["skip_downloaded"]:
return
# TODO: DOWNLOAD def download_playlist(playlist: tidalapi.Playlist) -> None:
dest_path = clean_template(
CONF["dest_dir"] + CONF["playlist_dir"],
playlist=playlist,
)
download_cover(playlist, dest_path, CONF["playlist_image_size"])
tracks = playlist.tracks()
for track in tracks:
download_track(track, dest_path)
def download_artist(artist: tidalapi.Artist) -> None:
albums = artist.get_albums()
for album in albums:
download_album(album)

View File

@ -15,5 +15,6 @@ def clean_template(path: str, **kwargs) -> str:
def log_error(template: str, **kwargs): def log_error(template: str, **kwargs):
with open(CONF['error_log']) as f: with open(CONF['error_log']) as f:
f.write(template.format(**kwargs)) error = template.format(**kwargs)
f.write(error)
traceback.print_exception(*sys.exc_info(), file=f) traceback.print_exception(*sys.exc_info(), file=f)

View File

@ -1,9 +1,10 @@
from typing import BinaryIO
from mutagen import flac, mp4 # , mp3 from mutagen import flac, mp4 # , mp3
from mutagen.mp4 import MP4Tags from mutagen.mp4 import MP4Tags
from mutagen._vorbis import VCommentDict from mutagen._vorbis import VCommentDict
from typing import BinaryIO
# from mutagen.id3._tags import ID3Tags # from mutagen.id3._tags import ID3Tags
# from mutagen.id3._frames import ( # from mutagen.id3._frames import (
@ -119,14 +120,16 @@ def write(
copyright: str = "", copyright: str = "",
cover: bytes | None = None, cover: bytes | None = None,
cover_mime: str | None = None, cover_mime: str | None = None,
): ) -> None:
args = locals() args = locals()
# TODO: Figure out what codecs are sent in the manifest # TODO: Figure out what codecs are sent in the manifest
# WARN: This match is currently using placeholders # WARN: This match is currently using placeholders
match codec: match codec:
case "???flac???": case "flac":
file = flac.FLAC(fp) f = flac.FLAC(fp)
__write_flac(file, *args) __write_flac(f, *args)
case "???aac???": case "aac":
file = mp4.MP4(fp) f = mp4.MP4(fp)
__write_mp4(file, *args) __write_mp4(f, *args)
case _:
raise Exception(f"Couldn't recognize codec {codec}")

View File

@ -1,3 +1,14 @@
#!/bin/env python #!/bin/env python
import download from download import download_album
from state import State
from helper import CONF
s = State(CONF['user_id'], CONF['quality'])
s.login()
albums = s.favorites.albums()
download_album(albums[0])
s.set_dl_state(albums[0], True)
s.write_state()

92
tidal-scraper/state.py Normal file
View File

@ -0,0 +1,92 @@
import json
from datetime import datetime
from tidalapi import session, user, playlist, media, album, artist
from helper import CONF
class State:
def __init__(self, user_id: int, quality: str):
match quality:
case "master":
q = session.Quality.master
case "lossless":
q = session.Quality.lossless
case "high":
q = session.Quality.high
case "low":
q = session.Quality.low
case _:
raise Exception("Quality misconfigured in conf.toml")
config = session.Config(quality=q)
self.user_id = user_id
self.session = session.Session(config)
self.favorites = user.Favorites(self.session, CONF["user_id"])
self._state = {
"albums": {},
"artists": {},
"playlists": {},
"tracks": {},
}
def login(self, auth_file: str | None = CONF["state_dir"] + "auth.json") -> None:
s = self.session
try:
assert auth_file
with open(auth_file, "r") as f:
a = json.load(f)
s.load_oauth_session(
a["token_type"],
a["access_token"],
a["refresh_token"],
datetime.fromtimestamp(a["expiry_time"]),
)
except (OSError, IndexError, AssertionError):
s.login_oauth_simple()
if (
s.token_type
and s.access_token
and s.refresh_token
and s.expiry_time
and auth_file
):
data = {
"token_type": s.token_type,
"access_token": s.access_token,
"refresh_token": s.refresh_token,
"expiry_time": s.expiry_time.timestamp(),
}
with open(auth_file, "w") as f:
json.dump(data, f)
assert self.session.check_login()
def set_dl_state(
self,
obj: playlist.Playlist | media.Track | album.Album | artist.Artist,
downloaded: bool,
) -> None:
match type(obj):
case album.Album:
t = "albums"
case artist.Artist:
t = "artists"
case playlist.Playlist:
t = "playlists"
case media.Track:
t = "tracks"
case _:
raise Exception("Incorrect object type received")
self._state[t][obj.id] = downloaded
def write_state(
self, state_file_path: str = CONF["state_dir"] + "state.json"
) -> None:
with open(state_file_path, "w") as f:
json.dump(self._state, f)
def load_state(
self, state_file_path: str = CONF["state_dir"] + "state.json"
) -> None:
with open(state_file_path, "r") as f:
self._state = json.load(f)