diff --git a/.gitignore b/.gitignore index 0b57306..a91cdd8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ -auth.json \ No newline at end of file +auth.json +__pycache__ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..82b8ee1 --- /dev/null +++ b/__init__.py @@ -0,0 +1,5 @@ +from scraper import scraper +import os + +authfile = os.environ["XDG_CACHE_HOME"] + "/auth.json" +s = scraper(authfile) diff --git a/objects.py b/objects.py new file mode 100644 index 0000000..4122ac5 --- /dev/null +++ b/objects.py @@ -0,0 +1,90 @@ +from dataclasses import dataclass + + +@dataclass +class Login: + deviceCode: str + userCode: str + verificationUrl: str + timeout: int + interval: int + + +@dataclass +class Auth: + userId: str + countryCode: str + accessToken: str + refreshToken: str + expiresIn: str + + +@dataclass +class Artist: + id: int | None + name: str | None + type: str | None + picture: str | None + + +@dataclass +class Album: + id: int | None + title: str | None + duration: int | None + numberOfTracks: int | None + numberOfVideos: int | None + numberOfVolumes: int | None + releaseDate: str | None + type: str | None + version: str | None + cover: str | None + explicit: bool | None + audioQuality: str | None + audioModes: str | None + artist: Artist | None + artists: Artist | None + + +@dataclass +class Playlist: + uuid: str | None + title: str | None + numberOfTracks: int | None + numberOfVideos: int | None + description: str | None + duration: int | None + image: str | None + squareImage: str | None + + +@dataclass +class Track: + id: int | None + title: str | None + duration: int | None + trackNumber: int | None + volumeNumber: int | None + trackNumberOnPlaylist: int | None + version: str | None + isrc: str | None + explicit: bool | None + audioQuality: str | None + copyRight: str | None + artist: Artist | None + artists: Artist | None + album: Album | None + allowStreaming: bool | None + playlist: Playlist | None + + +@dataclass +class StreamResponse: + trackid: int | None + streamType: str | None + assetPresentation: str | None + audioMode: str | None + audioQuality: str | None + videoQuality: str | None + manifestMimeType: str | None + manifest: str | None diff --git a/scrape.py b/scrape.py deleted file mode 100755 index c24c722..0000000 --- a/scrape.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/bin/python3 - -CLIENT_ID = "zU4XHVVkc2tDPo4t" -CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4" -URL_BASE = "https://auth.tidal.com/v1/oauth2" - -import requests -import time -import json -from dataclasses import dataclass - - -@dataclass -class Login: - deviceCode: str | None - userCode: str | None - verificationUrl: str | None - timeout: int | None - interval: int | None - - -@dataclass -class Auth: - userId: str | None - countryCode: str | None - accessToken: str | None - refreshToken: str | None - expiresIn: str | None - - -def post(path, data, auth=None) -> dict: - return requests.post(URL_BASE + path, data=data, auth=auth).json() - - -def getLogin() -> Login: - data = {"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"} - - result = post("/device_authorization", data) - - if "status" in result and result["status"] != 200: - raise Exception("Client ID not accepted by Tidal") - - return Login( - deviceCode=result["deviceCode"], - userCode=result["userCode"], - verificationUrl=result["verificationUri"], - timeout=result["expiresIn"], - interval=result["interval"], - ) - - -def getAuth(login: Login) -> Auth | None: - data = { - "client_id": CLIENT_ID, - "device_code": login.deviceCode, - "grant_type": "urn:ietf:params:oauth:grant-type:device_code", - "scope": "r_usr+w_usr+w_sub", - } - - result = post("/token", data, (CLIENT_ID, CLIENT_SECRET)) - - if "status" in result and result["status"] != 200: - if result["status"] == 400 and result["sub_status"] == 1002: - return None # Not logged in yet - else: - raise Exception("Failed to check authorization status") - - return Auth( - result["user"]["userId"], - result["user"]["countryCode"], - result["access_token"], - result["refresh_token"], - result["expires_in"], - ) - - -def loginByWeb() -> Auth: - login = getLogin() - url = f"http://{login.verificationUrl}/{login.userCode}" - print(f"Log in at {url}") - - start = time.time() - elapsed = 0 - timeout = login.timeout if login.timeout else 300 - interval = login.interval if login.interval else 2 - - while elapsed < timeout: - elapsed = time.time() - start - auth = getAuth(login) - if not auth: - time.sleep(interval) - else: - return auth - - raise Exception("Failed to log in") - - -if __name__ == "__main__": - try: - with open("auth.json", "rb") as f: - a = json.load(f) - auth = Auth(a['userId'], a['countryCode'], a['accessToken'], a['refreshToken'], a['expiresIn']) - except (OSError, IndexError): - auth = loginByWeb() - with open("auth.json", "w") as f: - json.dump(auth.__dict__, f) diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000..c9002d9 --- /dev/null +++ b/scraper.py @@ -0,0 +1,141 @@ +CLIENT_ID = "zU4XHVVkc2tDPo4t" +CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4" +API_URL_BASE = "https://api.tidalhifi.com/v1" +AUTH_URL_BASE = "https://auth.tidal.com/v1/oauth2" + + +from objects import * +import requests +import time +import random +import json + + +class scraper: + def __init__(self, authfile: str): + try: + with open(authfile, "rb") as f: + a = json.load(f) + self.auth = Auth( + a["userId"], + a["countryCode"], + a["accessToken"], + a["refreshToken"], + a["expiresIn"], + ) + except (OSError, IndexError): + self.auth = self.loginByWeb() + with open(authfile, "w") as f: + json.dump(self.auth.__dict__, f) + + def post(self, url: str, data: dict) -> dict: + return requests.post(url, data=data, auth=(CLIENT_ID, CLIENT_SECRET)).json() + + def get(self, url: str, params: dict = {}) -> dict: + headers = {"authorization": f"Bearer {self.auth.accessToken}"} + params["countryCode"] = self.auth.countryCode + err = f"Failed getting {url} " + for i in range(0, 3): + try: + response = requests.get(url, headers=headers, params=params) + if response.url.find("playbackinfopostpaywall") != -1: + sleep_time = random.randint(1, 5) + print(f"Pretending to be human, sleeping for {sleep_time}") + time.sleep(sleep_time) + if response.status_code == 429: + print("Rate limited, sleeping for 20 seconds") + time.sleep(20) + continue + response = response.json() + if "status" not in response: + return response + if "userMessage" in response and response["userMessage"] is not None: + err += f" : {response['userMessage']}" + break + except: + if i >= 3: + err += "after 3 tries" + raise Exception(err) + + def getItems(self, url: str, params: dict = {}) -> list: + step = 50 + params["limit"] = step + params["offset"] = 0 + total = 0 + items = [] + while True: + response = self.get(url, params) + if "totalNumberOfItems" in response: + total = response["totalNumberOfItems"] + if total > 0 and total <= len(items): + return items + items += response["items"] + num = len(response["items"]) + if num < step: + break + params["offset"] += step + return items + + def loginByWeb(self) -> Auth: + result = self.post( + f"{AUTH_URL_BASE}/device_authorization", + {"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"}, + ) + if "status" in result and result["status"] != 200: + raise Exception("Client ID not accepted by Tidal") + login = Login( + deviceCode=result["deviceCode"], + userCode=result["userCode"], + verificationUrl=result["verificationUri"], + timeout=result["expiresIn"], + interval=result["interval"], + ) + elapsed = 0 + timeout = login.timeout if login.timeout else 300 + interval = login.interval if login.interval else 2 + print(f"Log in at https://{login.verificationUrl}/{login.userCode}") + start = time.time() + auth = False + while elapsed < timeout and not auth: + elapsed = time.time() - start + result = self.post( + f"{AUTH_URL_BASE}/token", + { + "client_id": CLIENT_ID, + "device_code": login.deviceCode, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "scope": "r_usr+w_usr+w_sub", + }, + ) + if "status" in result and result["status"] != 200: + if result["status"] == 400 and result["sub_status"] == 1002: + auth = False # Not logged in yet + continue + else: + raise Exception("Failed to check authorization status") + auth = Auth( + result["user"]["userId"], + result["user"]["countryCode"], + result["access_token"], + result["refresh_token"], + result["expires_in"], + ) + if not auth: + time.sleep(interval) + else: + return auth + raise Exception("Failed to log in") + + def getTracks(self, obj) -> list: + url = API_URL_BASE + if type(obj) is Album: + url += f"/albums/{str(obj.id)}/items" + elif type(obj) is Playlist: + url += f"/playlists/{obj.uuid}/items" + else: + raise Exception("Tried to get tracks from incorrect object") + return self.getItems(url) + + def downloadAlbum(self, album: Album): + tracks = self.getTracks(album) + # TODO: Continue working here