diff --git a/__init__.py b/__init__.py deleted file mode 100644 index 98c6fbf..0000000 --- a/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from scraper import scraper - -NORMAL = "LOW" -HIGH = "HIGH" -HIFI = "LOSSLESS" -MASTER = "HI_RES" - -s = scraper(quality = HIFI) diff --git a/objects.py b/objects.py deleted file mode 100644 index ccc3d69..0000000 --- a/objects.py +++ /dev/null @@ -1,72 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class Login: - deviceCode: str - userCode: str - verificationUrl: str - timeout: int - interval: int - - -@dataclass -class Auth: - userId: str - countryCode: str - accessToken: str - refreshToken: str - expiresIn: str - - -@dataclass -class Artist: - id: int - name: str - type: str - picture: str - - -@dataclass -class Album: - id: int - title: str - duration: int - numberOfTracks: int - numberOfVolumes: int - releaseDate: str - type: str - version: str - cover: str - explicit: bool - audioQuality: str - audioModes: str - artist: Artist - artists: list - - -@dataclass -class Track: - id: int - title: str - duration: int - number: int - volumeNumber: int - version: str - isrc: str - explicit: bool - audioQuality: str - copyRight: str - artist: Artist - artists: Artist - album: Album - allowStreaming: bool - - -@dataclass -class StreamInfo: - trackId: int - audioQuality: str - codecs: str - encryptionKey: str - url: str diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7d88ead --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +git+https://github.com/tamland/python-tidal +aigpy diff --git a/scraper.py b/scraper.py deleted file mode 100644 index 7e1af74..0000000 --- a/scraper.py +++ /dev/null @@ -1,195 +0,0 @@ -CLIENT_ID = "zU4XHVVkc2tDPo4t" -CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4" - -from objects import * -from typing import Tuple -import requests -import time -import random -import json -import base64 -import os - - -class scraper: - def __init__( - self, - quality: str, - redownload: bool = False, - authUrlBase: str = "https://auth.tidal.com/v1/oauth2", - apiUrlBase: str = "https://api.tidalhifi.com/v1", - clientToken: tuple = (CLIENT_ID, CLIENT_SECRET), - downloadPath: str = "~/Downloads", - cachePath: str = "~/.cache", - ): - self.quality = quality - self.redownload = redownload - self.authUrlBase = authUrlBase - self.apiUrlBase = apiUrlBase - self.apiUrlBase = apiUrlBase - self.clientToken = clientToken - self.downloadPath = downloadPath - self.cachePath = cachePath - authfile = self.cachePath + "/auth.json" - try: - with open(authfile, "rb") as f: - a = json.load(f) - self.auth = Auth( - a["userId"], - a["countryCode"], - a["accessToken"], - a["refreshToken"], - a["expiresIn"], - ) - except (OSError, IndexError): - self.auth = self.loginByWeb() - with open(authfile, "w") as f: - json.dump(self.auth.__dict__, f) - - def post(self, url: str, data: dict) -> dict: - return requests.post(url, data=data, auth=self.clientToken).json() - - def retrieve(self, url: str, path: str) -> None: - # TODO: Write function to retrieve stream - - def get(self, url: str, params: dict = {}) -> dict: - headers = {"authorization": f"Bearer {self.auth.accessToken}"} - params["countryCode"] = self.auth.countryCode - err = f"Failed getting {url} " - for i in range(0, 3): - try: - response = requests.get(url, headers=headers, params=params) - if response.url.find("playbackinfopostpaywall") != -1: - sleep_time = random.randint(1, 5) - print(f"Pretending to be human, sleeping for {sleep_time}") - time.sleep(sleep_time) - if response.status_code == 429: - print("Rate limited, sleeping for 20 seconds") - time.sleep(20) - continue - response = response.json() - if "status" not in response: - return response - if "userMessage" in response and response["userMessage"] is not None: - err += f" : {response['userMessage']}" - break - except: - if i >= 3: - err += "after 3 tries" - raise Exception(err) - - def getItems(self, url: str, params: dict = {}) -> list: - step = 50 - params["limit"] = step - params["offset"] = 0 - total = 0 - items = [] - while True: - response = self.get(url, params) - if "totalNumberOfItems" in response: - total = response["totalNumberOfItems"] - if total > 0 and total <= len(items): - return items - items += response["items"] - num = len(response["items"]) - if num < step: - break - params["offset"] += step - return items - - def loginByWeb(self) -> Auth: - result = self.post( - f"{self.authUrlBase}/device_authorization", - {"client_id": self.clientToken[0], "scope": "r_usr+w_usr+w_sub"}, - ) - if "status" in result and result["status"] != 200: - raise Exception("Client ID not accepted by Tidal") - login = Login( - deviceCode=result["deviceCode"], - userCode=result["userCode"], - verificationUrl=result["verificationUri"], - timeout=result["expiresIn"], - interval=result["interval"], - ) - elapsed = 0 - timeout = login.timeout if login.timeout else 300 - interval = login.interval if login.interval else 2 - print(f"Log in at https://{login.verificationUrl}/{login.userCode}") - start = time.time() - auth = False - while elapsed < timeout and not auth: - elapsed = time.time() - start - result = self.post( - f"{self.authUrlBase}/token", - { - "client_id": self.clientToken[0], - "device_code": login.deviceCode, - "grant_type": "urn:ietf:params:oauth:grant-type:device_code", - "scope": "r_usr+w_usr+w_sub", - }, - ) - if "status" in result and result["status"] != 200: - if result["status"] == 400 and result["sub_status"] == 1002: - auth = False # Not logged in yet - continue - else: - raise Exception("Failed to check authorization status") - auth = Auth( - result["user"]["userId"], - result["user"]["countryCode"], - result["access_token"], - result["refresh_token"], - result["expires_in"], - ) - if not auth: - time.sleep(interval) - else: - return auth - raise Exception("Failed to log in") - - def getTracks(self, album: Album) -> list: - return self.getItems(f"{self.apiUrlBase}/albums/{str(album.id)}/items") - - def getAlbumFsPath(self, album: Album) -> str: - return self.downloadPath + album.title - - def getTrackFsPath(self, track) -> str: - return f"{self.downloadPath}/{self.getAlbumFsPath(track.album)}/[{track.number}] {track.title}" - - def getStreamInfo(self, track: Track) -> StreamInfo: - response = self.get( - f"tracks/{str(track.id)}/playbackinfopostpaywall", - { - "audioquality": self.quality, - "playbackmode": "STREAM", - "assetpresentation": "FULL", - }, - ) - if "vnd.tidal.bt" in response["manifestMimeType"]: - manifest = json.loads( - base64.b64decode(response["manifest"]).decode("utf-8") - ) - return StreamInfo( - response["trackid"], - response["audioQuality"], - manifest["codecs"], - manifest["keyId"] if "keyId" in manifest else "", - manifest["urls"][0], - ) - raise Exception("Can't read manifest of type {response['manifestMimeType']}") - - def downloadTrack(self, track, partSize=1048576) -> Tuple[bool, str]: - try: - stream = self.getStreamInfo(track.id) - path = self.getTrackFsPath(track) - print(f"Starting download of track \"{track.title}\"") - if not self.redownload and os.path.exists(path): - print(f"Skipping download, \"{track.title}\" already exists") - return True, "exists" - - - - def downloadAlbum(self, album: Album): - tracks = self.getTracks(album) - for i, track in enumerate(tracks): - self.downloadTrack(track) diff --git a/tidal_cleanup.py b/tidal_cleanup.py new file mode 100755 index 0000000..6d78378 --- /dev/null +++ b/tidal_cleanup.py @@ -0,0 +1,53 @@ +#!/bin/env python +import tidalapi + +USER_ID = 188721652 + +def idExists(objects: list, id: int) -> bool: + for obj in objects: + if obj.id == id: + return True + return False + + +config = tidalapi.Config(quality=tidalapi.Quality.lossless) +session = tidalapi.Session(config) +user = session.get_user(USER_ID) +favorites = tidalapi.user.Favorites(session, user.id) + +existing_artists = favorites.artists() +existing_albums = favorites.albums() +existing_tracks = favorites.tracks() +for album in existing_albums: + # ADD ARTIST + print(album.artist.name, end=" ") + if not idExists(existing_artists, album.artist.id): + if favorites.add_artist(album.artist.id): + print("added!") + else: + print("failed!") + pass + else: + print("skipped!") + # ADD TRACKS + for track in album.tracks(): + print(track.name, end=" ") + if not idExists(existing_tracks, track.id): + if favorites.add_track(track.id): + print("added!") + else: + print("failed!") + pass + else: + print("skipped!") + +existing_tracks = favorites.tracks() +for track in existing_tracks: + print(track.album.name, end=" ") + if not idExists(existing_albums, track.album.id): + if favorites.add_album(track.album.id): + print("added!") + else: + print("failed!") + else: + print("skipped!") diff --git a/tidal_scrape.py b/tidal_scrape.py index 3e4bb74..74100e3 100755 --- a/tidal_scrape.py +++ b/tidal_scrape.py @@ -1,18 +1,111 @@ -#!/bin/python3 +#!/bin/env python import tidalapi +import aigpy +import aigpy.downloadHelper import json import sys +import base64 +from Crypto.Cipher import AES +from Crypto.Util import Counter +from typing import Tuple from datetime import datetime +USER_ID = 188721652 +DL_PATH = "/home/luca/.cache/tidal_scrape" +DEST_PATH = "/home/luca/Music" + config = tidalapi.Config(quality=tidalapi.Quality.lossless) session = tidalapi.Session(config) + +def decrypt_token(token) -> Tuple[bytes, bytes]: + master_key = "UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754=" + master_key = base64.b64decode(master_key) + security_token = base64.b64decode(token) + iv = security_token[:16] + encrypted_st = security_token[16:] + + decryptor = AES.new(master_key, AES.MODE_CBC, iv) + decrypted_st = decryptor.decrypt(encrypted_st) + + key = decrypted_st[:16] + nonce = decrypted_st[16:24] + + return key, nonce + + +def decrypt_file(input_file, output_file, key, nonce) -> None: + counter = Counter.new(64, prefix=nonce, initial_value=0) + decryptor = AES.new(key, AES.MODE_CTR, counter=counter) + + with open(input_file, "rb") as i: + data = decryptor.decrypt(i.read()) + + with open(output_file, "wb") as o: + o.write(data) + + +def set_metadata(track: tidalapi.Track, file: str): + # This function could be more fleshed out (lyrics, covers) but I will leave that to external programs + tagger = aigpy.tag.TagTool(file) + + tagger.title = track.name + tagger.artist = list(map(lambda artist: artist.name, track.artists)) # type: ignore[reportOptionalMemberAccess] + tagger.copyright = track.copyright + tagger.tracknumber = track.track_num + tagger.discnumber = track.volume_num + + tagger.album = track.album.name # type: ignore[reportOptionalMemberAccess] + tagger.albumartist = list(map(lambda artist: artist.name, track.album.artists)) # type: ignore[reportOptionalMemberAccess] + tagger.date = track.album.available_release_date # type: ignore[reportOptionalMemberAccess] + tagger.totaldisc = track.album.num_volumes or 0 # type: ignore[reportOptionalMemberAccess] + if tagger.totaldisc <= 1: + tagger.totaltrack = track.album.num_tracks # type: ignore[reportOptionalMemberAccess] + + tagger.save() + + +def download_track( + track: tidalapi.Track, + partSize: int = 1048576, +) -> Tuple[bool, str]: + try: + dl_path = f"{DL_PATH}/{track.album.name}/{track.name}.part" # type: ignore[reportOptionalMemberAccess] + dest_path = f"{DEST_PATH}/{track.album.name}/{track.name}" # type: ignore[reportOptionalMemberAccess] + + stream = track.stream() + + stream.manifest = json.loads(base64.b64decode(stream.manifest)) + url = stream.manifest["urls"][0] + try: + key = stream.manifest["keyId"] + except KeyError: + key = None + tool = aigpy.downloadHelper.DownloadTool(dl_path, [url]) + + tool.setPartSize(partSize) + check, err = tool.start(True, 1) + if not check: + return False, str(err) + + if key: + key, nonce = decrypt_token(key) + decrypt_file(dl_path, dest_path, key, nonce) + + set_metadata(track, dest_path) + + return True, "" + except Exception as err: + return False, str(err) + + try: with open("auth.json", "rb") as f: a = json.load(f) - a.expiry_time = datetime.strptime(a.expiry_time, "%y-%m-%d %H:%M:%S") + expiry_time = a["expiry_time"].split(".", 1)[0] + expiry_time = datetime.strptime(expiry_time, "%Y-%m-%d %H:%M:%S") session.load_oauth_session( - a.token_type, a.access_token, a.refresh_token, a.expiry_time + a["token_type"], a["access_token"], a["refresh_token"], expiry_time ) except (OSError, IndexError): session.login_oauth_simple() @@ -24,22 +117,20 @@ if session.check_login(): "token_type": session.token_type, "access_token": session.access_token, "refresh_token": session.refresh_token, - "expiry_time": session.expiry_time, + "expiry_time": str(session.expiry_time), }, f, ) else: sys.exit("Failed to log in") -user = session.get_user() -# albums = user.Favorites.albums() -# tracks = user.Favorites.tracks() -# artists = user.Favorites.artists() -# -# for album in albums: -# if album.artist not in artists: -# user.Favorites.add_artist(album.artist.id) -# -# for track in tracks: -# if track.album not in albums: -# user.Favorites.add_album(track.album.id) + +user = session.get_user(USER_ID) +favorites = tidalapi.user.Favorites(session, user.id) +tracks = favorites.tracks() + +for track in tracks: + print(f"Downloading {track.album.name} by {track.artist}") + check, err = download_track(track) + if not check: + print(err)