From 307da06b175436f19f4d445d33875c48c50ecadc Mon Sep 17 00:00:00 2001
From: Luca Bilke <luca@snaile.de>
Date: Mon, 26 Jun 2023 20:54:02 +0200
Subject: [PATCH] change to aigpy and tidalapi

---
 __init__.py      |   8 --
 objects.py       |  72 -----------------
 requirements.txt |   2 +
 scraper.py       | 195 -----------------------------------------------
 tidal_cleanup.py |  53 +++++++++++++
 tidal_scrape.py  | 123 ++++++++++++++++++++++++++----
 6 files changed, 162 insertions(+), 291 deletions(-)
 delete mode 100644 __init__.py
 delete mode 100644 objects.py
 create mode 100644 requirements.txt
 delete mode 100644 scraper.py
 create mode 100755 tidal_cleanup.py

diff --git a/__init__.py b/__init__.py
deleted file mode 100644
index 98c6fbf..0000000
--- a/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from scraper import scraper
-
-NORMAL = "LOW"
-HIGH = "HIGH"
-HIFI = "LOSSLESS"
-MASTER = "HI_RES"
-
-s = scraper(quality = HIFI)
diff --git a/objects.py b/objects.py
deleted file mode 100644
index ccc3d69..0000000
--- a/objects.py
+++ /dev/null
@@ -1,72 +0,0 @@
-from dataclasses import dataclass
-
-
-@dataclass
-class Login:
-    deviceCode: str
-    userCode: str
-    verificationUrl: str
-    timeout: int
-    interval: int
-
-
-@dataclass
-class Auth:
-    userId: str
-    countryCode: str
-    accessToken: str
-    refreshToken: str
-    expiresIn: str
-
-
-@dataclass
-class Artist:
-    id: int
-    name: str
-    type: str
-    picture: str
-
-
-@dataclass
-class Album:
-    id: int
-    title: str
-    duration: int
-    numberOfTracks: int
-    numberOfVolumes: int
-    releaseDate: str
-    type: str
-    version: str
-    cover: str
-    explicit: bool
-    audioQuality: str
-    audioModes: str
-    artist: Artist
-    artists: list
-
-
-@dataclass
-class Track:
-    id: int
-    title: str
-    duration: int
-    number: int
-    volumeNumber: int
-    version: str
-    isrc: str
-    explicit: bool
-    audioQuality: str
-    copyRight: str
-    artist: Artist
-    artists: Artist
-    album: Album
-    allowStreaming: bool
-
-
-@dataclass
-class StreamInfo:
-    trackId: int
-    audioQuality: str
-    codecs: str
-    encryptionKey: str
-    url: str
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..7d88ead
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,2 @@
+git+https://github.com/tamland/python-tidal
+aigpy
diff --git a/scraper.py b/scraper.py
deleted file mode 100644
index 7e1af74..0000000
--- a/scraper.py
+++ /dev/null
@@ -1,195 +0,0 @@
-CLIENT_ID = "zU4XHVVkc2tDPo4t"
-CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4"
-
-from objects import *
-from typing import Tuple
-import requests
-import time
-import random
-import json
-import base64
-import os
-
-
-class scraper:
-    def __init__(
-        self,
-        quality: str,
-        redownload: bool = False,
-        authUrlBase: str = "https://auth.tidal.com/v1/oauth2",
-        apiUrlBase: str = "https://api.tidalhifi.com/v1",
-        clientToken: tuple = (CLIENT_ID, CLIENT_SECRET),
-        downloadPath: str = "~/Downloads",
-        cachePath: str = "~/.cache",
-    ):
-        self.quality = quality
-        self.redownload = redownload
-        self.authUrlBase = authUrlBase
-        self.apiUrlBase = apiUrlBase
-        self.apiUrlBase = apiUrlBase
-        self.clientToken = clientToken
-        self.downloadPath = downloadPath
-        self.cachePath = cachePath
-        authfile = self.cachePath + "/auth.json"
-        try:
-            with open(authfile, "rb") as f:
-                a = json.load(f)
-                self.auth = Auth(
-                    a["userId"],
-                    a["countryCode"],
-                    a["accessToken"],
-                    a["refreshToken"],
-                    a["expiresIn"],
-                )
-        except (OSError, IndexError):
-            self.auth = self.loginByWeb()
-            with open(authfile, "w") as f:
-                json.dump(self.auth.__dict__, f)
-
-    def post(self, url: str, data: dict) -> dict:
-        return requests.post(url, data=data, auth=self.clientToken).json()
-
-    def retrieve(self, url: str, path: str) -> None:
-        # TODO: Write function to retrieve stream
-
-    def get(self, url: str, params: dict = {}) -> dict:
-        headers = {"authorization": f"Bearer {self.auth.accessToken}"}
-        params["countryCode"] = self.auth.countryCode
-        err = f"Failed getting {url} "
-        for i in range(0, 3):
-            try:
-                response = requests.get(url, headers=headers, params=params)
-                if response.url.find("playbackinfopostpaywall") != -1:
-                    sleep_time = random.randint(1, 5)
-                    print(f"Pretending to be human, sleeping for {sleep_time}")
-                    time.sleep(sleep_time)
-                if response.status_code == 429:
-                    print("Rate limited, sleeping for 20 seconds")
-                    time.sleep(20)
-                    continue
-                response = response.json()
-                if "status" not in response:
-                    return response
-                if "userMessage" in response and response["userMessage"] is not None:
-                    err += f" : {response['userMessage']}"
-                break
-            except:
-                if i >= 3:
-                    err += "after 3 tries"
-        raise Exception(err)
-
-    def getItems(self, url: str, params: dict = {}) -> list:
-        step = 50
-        params["limit"] = step
-        params["offset"] = 0
-        total = 0
-        items = []
-        while True:
-            response = self.get(url, params)
-            if "totalNumberOfItems" in response:
-                total = response["totalNumberOfItems"]
-            if total > 0 and total <= len(items):
-                return items
-            items += response["items"]
-            num = len(response["items"])
-            if num < step:
-                break
-            params["offset"] += step
-        return items
-
-    def loginByWeb(self) -> Auth:
-        result = self.post(
-            f"{self.authUrlBase}/device_authorization",
-            {"client_id": self.clientToken[0], "scope": "r_usr+w_usr+w_sub"},
-        )
-        if "status" in result and result["status"] != 200:
-            raise Exception("Client ID not accepted by Tidal")
-        login = Login(
-            deviceCode=result["deviceCode"],
-            userCode=result["userCode"],
-            verificationUrl=result["verificationUri"],
-            timeout=result["expiresIn"],
-            interval=result["interval"],
-        )
-        elapsed = 0
-        timeout = login.timeout if login.timeout else 300
-        interval = login.interval if login.interval else 2
-        print(f"Log in at https://{login.verificationUrl}/{login.userCode}")
-        start = time.time()
-        auth = False
-        while elapsed < timeout and not auth:
-            elapsed = time.time() - start
-            result = self.post(
-                f"{self.authUrlBase}/token",
-                {
-                    "client_id": self.clientToken[0],
-                    "device_code": login.deviceCode,
-                    "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
-                    "scope": "r_usr+w_usr+w_sub",
-                },
-            )
-            if "status" in result and result["status"] != 200:
-                if result["status"] == 400 and result["sub_status"] == 1002:
-                    auth = False  # Not logged in yet
-                    continue
-                else:
-                    raise Exception("Failed to check authorization status")
-            auth = Auth(
-                result["user"]["userId"],
-                result["user"]["countryCode"],
-                result["access_token"],
-                result["refresh_token"],
-                result["expires_in"],
-            )
-            if not auth:
-                time.sleep(interval)
-            else:
-                return auth
-        raise Exception("Failed to log in")
-
-    def getTracks(self, album: Album) -> list:
-        return self.getItems(f"{self.apiUrlBase}/albums/{str(album.id)}/items")
-
-    def getAlbumFsPath(self, album: Album) -> str:
-        return self.downloadPath + album.title
-
-    def getTrackFsPath(self, track) -> str:
-        return f"{self.downloadPath}/{self.getAlbumFsPath(track.album)}/[{track.number}] {track.title}"
-
-    def getStreamInfo(self, track: Track) -> StreamInfo:
-        response = self.get(
-            f"tracks/{str(track.id)}/playbackinfopostpaywall",
-            {
-                "audioquality": self.quality,
-                "playbackmode": "STREAM",
-                "assetpresentation": "FULL",
-            },
-        )
-        if "vnd.tidal.bt" in response["manifestMimeType"]:
-            manifest = json.loads(
-                base64.b64decode(response["manifest"]).decode("utf-8")
-            )
-            return StreamInfo(
-                response["trackid"],
-                response["audioQuality"],
-                manifest["codecs"],
-                manifest["keyId"] if "keyId" in manifest else "",
-                manifest["urls"][0],
-            )
-        raise Exception("Can't read manifest of type {response['manifestMimeType']}")
-
-    def downloadTrack(self, track, partSize=1048576) -> Tuple[bool, str]:
-        try:
-            stream = self.getStreamInfo(track.id)
-            path = self.getTrackFsPath(track)
-            print(f"Starting download of track \"{track.title}\"")
-            if not self.redownload and os.path.exists(path):
-                print(f"Skipping download, \"{track.title}\" already exists")
-                return True, "exists"
-            
-
-
-    def downloadAlbum(self, album: Album):
-        tracks = self.getTracks(album)
-        for i, track in enumerate(tracks):
-            self.downloadTrack(track)
diff --git a/tidal_cleanup.py b/tidal_cleanup.py
new file mode 100755
index 0000000..6d78378
--- /dev/null
+++ b/tidal_cleanup.py
@@ -0,0 +1,53 @@
+#!/bin/env python
+import tidalapi
+
+USER_ID = 188721652
+
+def idExists(objects: list, id: int) -> bool:
+    for obj in objects:
+        if obj.id == id:
+            return True
+    return False
+
+
+config = tidalapi.Config(quality=tidalapi.Quality.lossless)
+session = tidalapi.Session(config)
+user = session.get_user(USER_ID)
+favorites = tidalapi.user.Favorites(session, user.id)
+
+existing_artists = favorites.artists()
+existing_albums = favorites.albums()
+existing_tracks = favorites.tracks()
+for album in existing_albums:
+    # ADD ARTIST
+    print(album.artist.name, end=" ")
+    if not idExists(existing_artists, album.artist.id):
+        if favorites.add_artist(album.artist.id):
+            print("added!")
+        else:
+            print("failed!")
+        pass
+    else:
+        print("skipped!")
+    # ADD TRACKS
+    for track in album.tracks():
+        print(track.name, end=" ")
+        if not idExists(existing_tracks, track.id):
+            if favorites.add_track(track.id):
+                print("added!")
+            else:
+                print("failed!")
+            pass
+        else:
+            print("skipped!")
+
+existing_tracks = favorites.tracks()
+for track in existing_tracks:
+    print(track.album.name, end=" ")
+    if not idExists(existing_albums, track.album.id):
+        if favorites.add_album(track.album.id):
+            print("added!")
+        else:
+            print("failed!")
+    else:
+        print("skipped!")
diff --git a/tidal_scrape.py b/tidal_scrape.py
index 3e4bb74..74100e3 100755
--- a/tidal_scrape.py
+++ b/tidal_scrape.py
@@ -1,18 +1,111 @@
-#!/bin/python3
+#!/bin/env python
 import tidalapi
+import aigpy
+import aigpy.downloadHelper
 import json
 import sys
+import base64
+from Crypto.Cipher import AES
+from Crypto.Util import Counter
+from typing import Tuple
 from datetime import datetime
 
+USER_ID = 188721652
+DL_PATH = "/home/luca/.cache/tidal_scrape"
+DEST_PATH = "/home/luca/Music"
+
 config = tidalapi.Config(quality=tidalapi.Quality.lossless)
 session = tidalapi.Session(config)
 
+
+def decrypt_token(token) -> Tuple[bytes, bytes]:
+    master_key = "UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754="
+    master_key = base64.b64decode(master_key)
+    security_token = base64.b64decode(token)
+    iv = security_token[:16]
+    encrypted_st = security_token[16:]
+
+    decryptor = AES.new(master_key, AES.MODE_CBC, iv)
+    decrypted_st = decryptor.decrypt(encrypted_st)
+
+    key = decrypted_st[:16]
+    nonce = decrypted_st[16:24]
+
+    return key, nonce
+
+
+def decrypt_file(input_file, output_file, key, nonce) -> None:
+    counter = Counter.new(64, prefix=nonce, initial_value=0)
+    decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
+
+    with open(input_file, "rb") as i:
+        data = decryptor.decrypt(i.read())
+
+        with open(output_file, "wb") as o:
+            o.write(data)
+
+
+def set_metadata(track: tidalapi.Track, file: str):
+    # This function could be more fleshed out (lyrics, covers) but I will leave that to external programs
+    tagger = aigpy.tag.TagTool(file)
+
+    tagger.title = track.name
+    tagger.artist = list(map(lambda artist: artist.name, track.artists))  # type: ignore[reportOptionalMemberAccess]
+    tagger.copyright = track.copyright
+    tagger.tracknumber = track.track_num
+    tagger.discnumber = track.volume_num
+
+    tagger.album = track.album.name  # type: ignore[reportOptionalMemberAccess]
+    tagger.albumartist = list(map(lambda artist: artist.name, track.album.artists))  # type: ignore[reportOptionalMemberAccess]
+    tagger.date = track.album.available_release_date  # type: ignore[reportOptionalMemberAccess]
+    tagger.totaldisc = track.album.num_volumes or 0  # type: ignore[reportOptionalMemberAccess]
+    if tagger.totaldisc <= 1:
+        tagger.totaltrack = track.album.num_tracks  # type: ignore[reportOptionalMemberAccess]
+
+    tagger.save()
+
+
+def download_track(
+    track: tidalapi.Track,
+    partSize: int = 1048576,
+) -> Tuple[bool, str]:
+    try:
+        dl_path = f"{DL_PATH}/{track.album.name}/{track.name}.part"  # type: ignore[reportOptionalMemberAccess]
+        dest_path = f"{DEST_PATH}/{track.album.name}/{track.name}"  # type: ignore[reportOptionalMemberAccess]
+
+        stream = track.stream()
+
+        stream.manifest = json.loads(base64.b64decode(stream.manifest))
+        url = stream.manifest["urls"][0]
+        try:
+            key = stream.manifest["keyId"]
+        except KeyError:
+            key = None
+        tool = aigpy.downloadHelper.DownloadTool(dl_path, [url])
+
+        tool.setPartSize(partSize)
+        check, err = tool.start(True, 1)
+        if not check:
+            return False, str(err)
+
+        if key:
+            key, nonce = decrypt_token(key)
+            decrypt_file(dl_path, dest_path, key, nonce)
+
+        set_metadata(track, dest_path)
+
+        return True, ""
+    except Exception as err:
+        return False, str(err)
+
+
 try:
     with open("auth.json", "rb") as f:
         a = json.load(f)
-        a.expiry_time = datetime.strptime(a.expiry_time, "%y-%m-%d %H:%M:%S")
+        expiry_time = a["expiry_time"].split(".", 1)[0]
+        expiry_time = datetime.strptime(expiry_time, "%Y-%m-%d %H:%M:%S")
         session.load_oauth_session(
-            a.token_type, a.access_token, a.refresh_token, a.expiry_time
+            a["token_type"], a["access_token"], a["refresh_token"], expiry_time
         )
 except (OSError, IndexError):
     session.login_oauth_simple()
@@ -24,22 +117,20 @@ if session.check_login():
                 "token_type": session.token_type,
                 "access_token": session.access_token,
                 "refresh_token": session.refresh_token,
-                "expiry_time": session.expiry_time,
+                "expiry_time": str(session.expiry_time),
             },
             f,
         )
 else:
     sys.exit("Failed to log in")
 
-user = session.get_user()
-# albums = user.Favorites.albums()
-# tracks = user.Favorites.tracks()
-# artists = user.Favorites.artists()
-#
-# for album in albums:
-#     if album.artist not in artists:
-#         user.Favorites.add_artist(album.artist.id)
-#
-# for track in tracks:
-#     if track.album not in albums:
-#         user.Favorites.add_album(track.album.id)
+
+user = session.get_user(USER_ID)
+favorites = tidalapi.user.Favorites(session, user.id)
+tracks = favorites.tracks()
+
+for track in tracks:
+    print(f"Downloading {track.album.name} by {track.artist}")
+    check, err = download_track(track)
+    if not check:
+        print(err)