2
0
Fork 0

change to aigpy and tidalapi

This commit is contained in:
Luca Bilke 2023-06-26 20:54:02 +02:00
parent 1371b2f88f
commit 307da06b17
6 changed files with 162 additions and 291 deletions

View file

@ -1,8 +0,0 @@
from scraper import scraper
NORMAL = "LOW"
HIGH = "HIGH"
HIFI = "LOSSLESS"
MASTER = "HI_RES"
s = scraper(quality = HIFI)

View file

@ -1,72 +0,0 @@
from dataclasses import dataclass
@dataclass
class Login:
deviceCode: str
userCode: str
verificationUrl: str
timeout: int
interval: int
@dataclass
class Auth:
userId: str
countryCode: str
accessToken: str
refreshToken: str
expiresIn: str
@dataclass
class Artist:
id: int
name: str
type: str
picture: str
@dataclass
class Album:
id: int
title: str
duration: int
numberOfTracks: int
numberOfVolumes: int
releaseDate: str
type: str
version: str
cover: str
explicit: bool
audioQuality: str
audioModes: str
artist: Artist
artists: list
@dataclass
class Track:
id: int
title: str
duration: int
number: int
volumeNumber: int
version: str
isrc: str
explicit: bool
audioQuality: str
copyRight: str
artist: Artist
artists: Artist
album: Album
allowStreaming: bool
@dataclass
class StreamInfo:
trackId: int
audioQuality: str
codecs: str
encryptionKey: str
url: str

2
requirements.txt Normal file
View file

@ -0,0 +1,2 @@
git+https://github.com/tamland/python-tidal
aigpy

View file

@ -1,195 +0,0 @@
CLIENT_ID = "zU4XHVVkc2tDPo4t"
CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4"
from objects import *
from typing import Tuple
import requests
import time
import random
import json
import base64
import os
class scraper:
def __init__(
self,
quality: str,
redownload: bool = False,
authUrlBase: str = "https://auth.tidal.com/v1/oauth2",
apiUrlBase: str = "https://api.tidalhifi.com/v1",
clientToken: tuple = (CLIENT_ID, CLIENT_SECRET),
downloadPath: str = "~/Downloads",
cachePath: str = "~/.cache",
):
self.quality = quality
self.redownload = redownload
self.authUrlBase = authUrlBase
self.apiUrlBase = apiUrlBase
self.apiUrlBase = apiUrlBase
self.clientToken = clientToken
self.downloadPath = downloadPath
self.cachePath = cachePath
authfile = self.cachePath + "/auth.json"
try:
with open(authfile, "rb") as f:
a = json.load(f)
self.auth = Auth(
a["userId"],
a["countryCode"],
a["accessToken"],
a["refreshToken"],
a["expiresIn"],
)
except (OSError, IndexError):
self.auth = self.loginByWeb()
with open(authfile, "w") as f:
json.dump(self.auth.__dict__, f)
def post(self, url: str, data: dict) -> dict:
return requests.post(url, data=data, auth=self.clientToken).json()
def retrieve(self, url: str, path: str) -> None:
# TODO: Write function to retrieve stream
def get(self, url: str, params: dict = {}) -> dict:
headers = {"authorization": f"Bearer {self.auth.accessToken}"}
params["countryCode"] = self.auth.countryCode
err = f"Failed getting {url} "
for i in range(0, 3):
try:
response = requests.get(url, headers=headers, params=params)
if response.url.find("playbackinfopostpaywall") != -1:
sleep_time = random.randint(1, 5)
print(f"Pretending to be human, sleeping for {sleep_time}")
time.sleep(sleep_time)
if response.status_code == 429:
print("Rate limited, sleeping for 20 seconds")
time.sleep(20)
continue
response = response.json()
if "status" not in response:
return response
if "userMessage" in response and response["userMessage"] is not None:
err += f" : {response['userMessage']}"
break
except:
if i >= 3:
err += "after 3 tries"
raise Exception(err)
def getItems(self, url: str, params: dict = {}) -> list:
step = 50
params["limit"] = step
params["offset"] = 0
total = 0
items = []
while True:
response = self.get(url, params)
if "totalNumberOfItems" in response:
total = response["totalNumberOfItems"]
if total > 0 and total <= len(items):
return items
items += response["items"]
num = len(response["items"])
if num < step:
break
params["offset"] += step
return items
def loginByWeb(self) -> Auth:
result = self.post(
f"{self.authUrlBase}/device_authorization",
{"client_id": self.clientToken[0], "scope": "r_usr+w_usr+w_sub"},
)
if "status" in result and result["status"] != 200:
raise Exception("Client ID not accepted by Tidal")
login = Login(
deviceCode=result["deviceCode"],
userCode=result["userCode"],
verificationUrl=result["verificationUri"],
timeout=result["expiresIn"],
interval=result["interval"],
)
elapsed = 0
timeout = login.timeout if login.timeout else 300
interval = login.interval if login.interval else 2
print(f"Log in at https://{login.verificationUrl}/{login.userCode}")
start = time.time()
auth = False
while elapsed < timeout and not auth:
elapsed = time.time() - start
result = self.post(
f"{self.authUrlBase}/token",
{
"client_id": self.clientToken[0],
"device_code": login.deviceCode,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
},
)
if "status" in result and result["status"] != 200:
if result["status"] == 400 and result["sub_status"] == 1002:
auth = False # Not logged in yet
continue
else:
raise Exception("Failed to check authorization status")
auth = Auth(
result["user"]["userId"],
result["user"]["countryCode"],
result["access_token"],
result["refresh_token"],
result["expires_in"],
)
if not auth:
time.sleep(interval)
else:
return auth
raise Exception("Failed to log in")
def getTracks(self, album: Album) -> list:
return self.getItems(f"{self.apiUrlBase}/albums/{str(album.id)}/items")
def getAlbumFsPath(self, album: Album) -> str:
return self.downloadPath + album.title
def getTrackFsPath(self, track) -> str:
return f"{self.downloadPath}/{self.getAlbumFsPath(track.album)}/[{track.number}] {track.title}"
def getStreamInfo(self, track: Track) -> StreamInfo:
response = self.get(
f"tracks/{str(track.id)}/playbackinfopostpaywall",
{
"audioquality": self.quality,
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
)
if "vnd.tidal.bt" in response["manifestMimeType"]:
manifest = json.loads(
base64.b64decode(response["manifest"]).decode("utf-8")
)
return StreamInfo(
response["trackid"],
response["audioQuality"],
manifest["codecs"],
manifest["keyId"] if "keyId" in manifest else "",
manifest["urls"][0],
)
raise Exception("Can't read manifest of type {response['manifestMimeType']}")
def downloadTrack(self, track, partSize=1048576) -> Tuple[bool, str]:
try:
stream = self.getStreamInfo(track.id)
path = self.getTrackFsPath(track)
print(f"Starting download of track \"{track.title}\"")
if not self.redownload and os.path.exists(path):
print(f"Skipping download, \"{track.title}\" already exists")
return True, "exists"
def downloadAlbum(self, album: Album):
tracks = self.getTracks(album)
for i, track in enumerate(tracks):
self.downloadTrack(track)

53
tidal_cleanup.py Executable file
View file

@ -0,0 +1,53 @@
#!/bin/env python
import tidalapi
USER_ID = 188721652
def idExists(objects: list, id: int) -> bool:
for obj in objects:
if obj.id == id:
return True
return False
config = tidalapi.Config(quality=tidalapi.Quality.lossless)
session = tidalapi.Session(config)
user = session.get_user(USER_ID)
favorites = tidalapi.user.Favorites(session, user.id)
existing_artists = favorites.artists()
existing_albums = favorites.albums()
existing_tracks = favorites.tracks()
for album in existing_albums:
# ADD ARTIST
print(album.artist.name, end=" ")
if not idExists(existing_artists, album.artist.id):
if favorites.add_artist(album.artist.id):
print("added!")
else:
print("failed!")
pass
else:
print("skipped!")
# ADD TRACKS
for track in album.tracks():
print(track.name, end=" ")
if not idExists(existing_tracks, track.id):
if favorites.add_track(track.id):
print("added!")
else:
print("failed!")
pass
else:
print("skipped!")
existing_tracks = favorites.tracks()
for track in existing_tracks:
print(track.album.name, end=" ")
if not idExists(existing_albums, track.album.id):
if favorites.add_album(track.album.id):
print("added!")
else:
print("failed!")
else:
print("skipped!")

View file

@ -1,18 +1,111 @@
#!/bin/python3
#!/bin/env python
import tidalapi
import aigpy
import aigpy.downloadHelper
import json
import sys
import base64
from Crypto.Cipher import AES
from Crypto.Util import Counter
from typing import Tuple
from datetime import datetime
USER_ID = 188721652
DL_PATH = "/home/luca/.cache/tidal_scrape"
DEST_PATH = "/home/luca/Music"
config = tidalapi.Config(quality=tidalapi.Quality.lossless)
session = tidalapi.Session(config)
def decrypt_token(token) -> Tuple[bytes, bytes]:
master_key = "UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754="
master_key = base64.b64decode(master_key)
security_token = base64.b64decode(token)
iv = security_token[:16]
encrypted_st = security_token[16:]
decryptor = AES.new(master_key, AES.MODE_CBC, iv)
decrypted_st = decryptor.decrypt(encrypted_st)
key = decrypted_st[:16]
nonce = decrypted_st[16:24]
return key, nonce
def decrypt_file(input_file, output_file, key, nonce) -> None:
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
with open(input_file, "rb") as i:
data = decryptor.decrypt(i.read())
with open(output_file, "wb") as o:
o.write(data)
def set_metadata(track: tidalapi.Track, file: str):
# This function could be more fleshed out (lyrics, covers) but I will leave that to external programs
tagger = aigpy.tag.TagTool(file)
tagger.title = track.name
tagger.artist = list(map(lambda artist: artist.name, track.artists)) # type: ignore[reportOptionalMemberAccess]
tagger.copyright = track.copyright
tagger.tracknumber = track.track_num
tagger.discnumber = track.volume_num
tagger.album = track.album.name # type: ignore[reportOptionalMemberAccess]
tagger.albumartist = list(map(lambda artist: artist.name, track.album.artists)) # type: ignore[reportOptionalMemberAccess]
tagger.date = track.album.available_release_date # type: ignore[reportOptionalMemberAccess]
tagger.totaldisc = track.album.num_volumes or 0 # type: ignore[reportOptionalMemberAccess]
if tagger.totaldisc <= 1:
tagger.totaltrack = track.album.num_tracks # type: ignore[reportOptionalMemberAccess]
tagger.save()
def download_track(
track: tidalapi.Track,
partSize: int = 1048576,
) -> Tuple[bool, str]:
try:
dl_path = f"{DL_PATH}/{track.album.name}/{track.name}.part" # type: ignore[reportOptionalMemberAccess]
dest_path = f"{DEST_PATH}/{track.album.name}/{track.name}" # type: ignore[reportOptionalMemberAccess]
stream = track.stream()
stream.manifest = json.loads(base64.b64decode(stream.manifest))
url = stream.manifest["urls"][0]
try:
key = stream.manifest["keyId"]
except KeyError:
key = None
tool = aigpy.downloadHelper.DownloadTool(dl_path, [url])
tool.setPartSize(partSize)
check, err = tool.start(True, 1)
if not check:
return False, str(err)
if key:
key, nonce = decrypt_token(key)
decrypt_file(dl_path, dest_path, key, nonce)
set_metadata(track, dest_path)
return True, ""
except Exception as err:
return False, str(err)
try:
with open("auth.json", "rb") as f:
a = json.load(f)
a.expiry_time = datetime.strptime(a.expiry_time, "%y-%m-%d %H:%M:%S")
expiry_time = a["expiry_time"].split(".", 1)[0]
expiry_time = datetime.strptime(expiry_time, "%Y-%m-%d %H:%M:%S")
session.load_oauth_session(
a.token_type, a.access_token, a.refresh_token, a.expiry_time
a["token_type"], a["access_token"], a["refresh_token"], expiry_time
)
except (OSError, IndexError):
session.login_oauth_simple()
@ -24,22 +117,20 @@ if session.check_login():
"token_type": session.token_type,
"access_token": session.access_token,
"refresh_token": session.refresh_token,
"expiry_time": session.expiry_time,
"expiry_time": str(session.expiry_time),
},
f,
)
else:
sys.exit("Failed to log in")
user = session.get_user()
# albums = user.Favorites.albums()
# tracks = user.Favorites.tracks()
# artists = user.Favorites.artists()
#
# for album in albums:
# if album.artist not in artists:
# user.Favorites.add_artist(album.artist.id)
#
# for track in tracks:
# if track.album not in albums:
# user.Favorites.add_album(track.album.id)
user = session.get_user(USER_ID)
favorites = tidalapi.user.Favorites(session, user.id)
tracks = favorites.tracks()
for track in tracks:
print(f"Downloading {track.album.name} by {track.artist}")
check, err = download_track(track)
if not check:
print(err)