2
0
Fork 0

functions to get tracks from albums/playlists

This commit is contained in:
Luca Bilke 2023-06-22 00:07:40 +02:00
parent e2b781c3a2
commit 688f02c885
5 changed files with 238 additions and 107 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
auth.json
auth.json
__pycache__

5
__init__.py Normal file
View File

@ -0,0 +1,5 @@
from scraper import scraper
import os
authfile = os.environ["XDG_CACHE_HOME"] + "/auth.json"
s = scraper(authfile)

90
objects.py Normal file
View File

@ -0,0 +1,90 @@
from dataclasses import dataclass
@dataclass
class Login:
deviceCode: str
userCode: str
verificationUrl: str
timeout: int
interval: int
@dataclass
class Auth:
userId: str
countryCode: str
accessToken: str
refreshToken: str
expiresIn: str
@dataclass
class Artist:
id: int | None
name: str | None
type: str | None
picture: str | None
@dataclass
class Album:
id: int | None
title: str | None
duration: int | None
numberOfTracks: int | None
numberOfVideos: int | None
numberOfVolumes: int | None
releaseDate: str | None
type: str | None
version: str | None
cover: str | None
explicit: bool | None
audioQuality: str | None
audioModes: str | None
artist: Artist | None
artists: Artist | None
@dataclass
class Playlist:
uuid: str | None
title: str | None
numberOfTracks: int | None
numberOfVideos: int | None
description: str | None
duration: int | None
image: str | None
squareImage: str | None
@dataclass
class Track:
id: int | None
title: str | None
duration: int | None
trackNumber: int | None
volumeNumber: int | None
trackNumberOnPlaylist: int | None
version: str | None
isrc: str | None
explicit: bool | None
audioQuality: str | None
copyRight: str | None
artist: Artist | None
artists: Artist | None
album: Album | None
allowStreaming: bool | None
playlist: Playlist | None
@dataclass
class StreamResponse:
trackid: int | None
streamType: str | None
assetPresentation: str | None
audioMode: str | None
audioQuality: str | None
videoQuality: str | None
manifestMimeType: str | None
manifest: str | None

106
scrape.py
View File

@ -1,106 +0,0 @@
#!/bin/python3
CLIENT_ID = "zU4XHVVkc2tDPo4t"
CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4"
URL_BASE = "https://auth.tidal.com/v1/oauth2"
import requests
import time
import json
from dataclasses import dataclass
@dataclass
class Login:
deviceCode: str | None
userCode: str | None
verificationUrl: str | None
timeout: int | None
interval: int | None
@dataclass
class Auth:
userId: str | None
countryCode: str | None
accessToken: str | None
refreshToken: str | None
expiresIn: str | None
def post(path, data, auth=None) -> dict:
return requests.post(URL_BASE + path, data=data, auth=auth).json()
def getLogin() -> Login:
data = {"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"}
result = post("/device_authorization", data)
if "status" in result and result["status"] != 200:
raise Exception("Client ID not accepted by Tidal")
return Login(
deviceCode=result["deviceCode"],
userCode=result["userCode"],
verificationUrl=result["verificationUri"],
timeout=result["expiresIn"],
interval=result["interval"],
)
def getAuth(login: Login) -> Auth | None:
data = {
"client_id": CLIENT_ID,
"device_code": login.deviceCode,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
}
result = post("/token", data, (CLIENT_ID, CLIENT_SECRET))
if "status" in result and result["status"] != 200:
if result["status"] == 400 and result["sub_status"] == 1002:
return None # Not logged in yet
else:
raise Exception("Failed to check authorization status")
return Auth(
result["user"]["userId"],
result["user"]["countryCode"],
result["access_token"],
result["refresh_token"],
result["expires_in"],
)
def loginByWeb() -> Auth:
login = getLogin()
url = f"http://{login.verificationUrl}/{login.userCode}"
print(f"Log in at {url}")
start = time.time()
elapsed = 0
timeout = login.timeout if login.timeout else 300
interval = login.interval if login.interval else 2
while elapsed < timeout:
elapsed = time.time() - start
auth = getAuth(login)
if not auth:
time.sleep(interval)
else:
return auth
raise Exception("Failed to log in")
if __name__ == "__main__":
try:
with open("auth.json", "rb") as f:
a = json.load(f)
auth = Auth(a['userId'], a['countryCode'], a['accessToken'], a['refreshToken'], a['expiresIn'])
except (OSError, IndexError):
auth = loginByWeb()
with open("auth.json", "w") as f:
json.dump(auth.__dict__, f)

141
scraper.py Normal file
View File

@ -0,0 +1,141 @@
CLIENT_ID = "zU4XHVVkc2tDPo4t"
CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4"
API_URL_BASE = "https://api.tidalhifi.com/v1"
AUTH_URL_BASE = "https://auth.tidal.com/v1/oauth2"
from objects import *
import requests
import time
import random
import json
class scraper:
def __init__(self, authfile: str):
try:
with open(authfile, "rb") as f:
a = json.load(f)
self.auth = Auth(
a["userId"],
a["countryCode"],
a["accessToken"],
a["refreshToken"],
a["expiresIn"],
)
except (OSError, IndexError):
self.auth = self.loginByWeb()
with open(authfile, "w") as f:
json.dump(self.auth.__dict__, f)
def post(self, url: str, data: dict) -> dict:
return requests.post(url, data=data, auth=(CLIENT_ID, CLIENT_SECRET)).json()
def get(self, url: str, params: dict = {}) -> dict:
headers = {"authorization": f"Bearer {self.auth.accessToken}"}
params["countryCode"] = self.auth.countryCode
err = f"Failed getting {url} "
for i in range(0, 3):
try:
response = requests.get(url, headers=headers, params=params)
if response.url.find("playbackinfopostpaywall") != -1:
sleep_time = random.randint(1, 5)
print(f"Pretending to be human, sleeping for {sleep_time}")
time.sleep(sleep_time)
if response.status_code == 429:
print("Rate limited, sleeping for 20 seconds")
time.sleep(20)
continue
response = response.json()
if "status" not in response:
return response
if "userMessage" in response and response["userMessage"] is not None:
err += f" : {response['userMessage']}"
break
except:
if i >= 3:
err += "after 3 tries"
raise Exception(err)
def getItems(self, url: str, params: dict = {}) -> list:
step = 50
params["limit"] = step
params["offset"] = 0
total = 0
items = []
while True:
response = self.get(url, params)
if "totalNumberOfItems" in response:
total = response["totalNumberOfItems"]
if total > 0 and total <= len(items):
return items
items += response["items"]
num = len(response["items"])
if num < step:
break
params["offset"] += step
return items
def loginByWeb(self) -> Auth:
result = self.post(
f"{AUTH_URL_BASE}/device_authorization",
{"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"},
)
if "status" in result and result["status"] != 200:
raise Exception("Client ID not accepted by Tidal")
login = Login(
deviceCode=result["deviceCode"],
userCode=result["userCode"],
verificationUrl=result["verificationUri"],
timeout=result["expiresIn"],
interval=result["interval"],
)
elapsed = 0
timeout = login.timeout if login.timeout else 300
interval = login.interval if login.interval else 2
print(f"Log in at https://{login.verificationUrl}/{login.userCode}")
start = time.time()
auth = False
while elapsed < timeout and not auth:
elapsed = time.time() - start
result = self.post(
f"{AUTH_URL_BASE}/token",
{
"client_id": CLIENT_ID,
"device_code": login.deviceCode,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
},
)
if "status" in result and result["status"] != 200:
if result["status"] == 400 and result["sub_status"] == 1002:
auth = False # Not logged in yet
continue
else:
raise Exception("Failed to check authorization status")
auth = Auth(
result["user"]["userId"],
result["user"]["countryCode"],
result["access_token"],
result["refresh_token"],
result["expires_in"],
)
if not auth:
time.sleep(interval)
else:
return auth
raise Exception("Failed to log in")
def getTracks(self, obj) -> list:
url = API_URL_BASE
if type(obj) is Album:
url += f"/albums/{str(obj.id)}/items"
elif type(obj) is Playlist:
url += f"/playlists/{obj.uuid}/items"
else:
raise Exception("Tried to get tracks from incorrect object")
return self.getItems(url)
def downloadAlbum(self, album: Album):
tracks = self.getTracks(album)
# TODO: Continue working here