mirror of
https://github.com/tcsenpai/swingmusic.git
synced 2025-06-06 03:05:35 +00:00
rewrite api/artist.py to remove artist cache
+ remove processing taylor's version
This commit is contained in:
parent
ad88ab4adb
commit
f2addf4d0f
3
.gitignore
vendored
3
.gitignore
vendored
@ -27,4 +27,5 @@ client
|
||||
logs.txt
|
||||
*.spec
|
||||
|
||||
TODO.md
|
||||
TODO.md
|
||||
testdata.py
|
@ -3,174 +3,22 @@ Contains all the artist(s) routes.
|
||||
"""
|
||||
import random
|
||||
import math
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
|
||||
from flask import Blueprint, request
|
||||
|
||||
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
|
||||
from app.db.sqlite.lastfm.similar_artists import SQLiteLastFMSimilarArtists as fmdb
|
||||
from app.models import Album, FavType, Track
|
||||
from app.models import Album, FavType
|
||||
from app.serializers.album import serialize_for_card_many
|
||||
from app.serializers.track import serialize_tracks
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.bisection import UseBisection
|
||||
from app.utils.remove_duplicates import remove_duplicates
|
||||
|
||||
api = Blueprint("artist", __name__, url_prefix="/")
|
||||
|
||||
|
||||
class CacheEntry:
|
||||
"""
|
||||
The cache entry class for the artists cache.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, artisthash: str, albumhashes: set[str], tracks: list[Track]
|
||||
) -> None:
|
||||
self.albums: list[Album] = []
|
||||
self.tracks: list[Track] = []
|
||||
|
||||
self.artisthash: str = artisthash
|
||||
self.albumhashes: set[str] = albumhashes
|
||||
|
||||
if len(tracks) > 0:
|
||||
self.tracks: list[Track] = tracks
|
||||
|
||||
self.type_checked = False
|
||||
self.albums_fetched = False
|
||||
|
||||
|
||||
class ArtistsCache:
|
||||
"""
|
||||
Holds artist page cache.
|
||||
"""
|
||||
|
||||
artists: deque[CacheEntry] = deque(maxlen=1)
|
||||
# THE ABOVE IS SET TO MAXLEN=1 TO AVOID A BUG THAT I WAS TOO LAZY TO INVESTIGATE
|
||||
# ARTIST TRACKS SOMEHOW DISAPPEARED FOR SOME REASON I COULDN'T UNDERSTAND. BY
|
||||
# DISAPPEARING I MEAN AN ARTIST YOU ARE SURE HAS 150 TRACKS ONLY SHOWING
|
||||
# LIKE 3 IN THE ARTIST PAGE. 🤷🏿 (TODO: MAYBE FIX THIS BUG?)
|
||||
|
||||
@classmethod
|
||||
def get_albums_by_artisthash(cls, artisthash: str) -> tuple[list[Album], int]:
|
||||
"""
|
||||
Returns the cached albums for the given artisthash.
|
||||
"""
|
||||
for index, albums in enumerate(cls.artists):
|
||||
if albums.artisthash == artisthash:
|
||||
return albums.albums, index
|
||||
|
||||
return [], -1
|
||||
|
||||
@classmethod
|
||||
def albums_cached(cls, artisthash: str) -> bool:
|
||||
"""
|
||||
Returns True if the artist is in the cache.
|
||||
"""
|
||||
for entry in cls.artists:
|
||||
if entry.artisthash == artisthash and len(entry.albums) > 0:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def albums_fetched(cls, artisthash: str):
|
||||
"""
|
||||
Checks if the albums have been fetched for the given artisthash.
|
||||
"""
|
||||
for entry in cls.artists:
|
||||
if entry.artisthash == artisthash:
|
||||
return entry.albums_fetched
|
||||
|
||||
@classmethod
|
||||
def tracks_cached(cls, artisthash: str) -> bool:
|
||||
"""
|
||||
Checks if the tracks have been cached for the given artisthash.
|
||||
"""
|
||||
for entry in cls.artists:
|
||||
if entry.artisthash == artisthash and len(entry.tracks) > 0:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def add_entry(cls, artisthash: str, albumhashes: set[str], tracks: list[Track]):
|
||||
"""
|
||||
Adds a new entry to the cache.
|
||||
"""
|
||||
cls.artists.append(CacheEntry(artisthash, albumhashes, tracks))
|
||||
|
||||
@classmethod
|
||||
def get_tracks(cls, artisthash: str):
|
||||
"""
|
||||
Returns the cached tracks for the given artisthash.
|
||||
"""
|
||||
entry = [a for a in cls.artists if a.artisthash == artisthash][0]
|
||||
return entry.tracks
|
||||
|
||||
@classmethod
|
||||
def get_albums(cls, artisthash: str):
|
||||
"""
|
||||
Returns the cached albums for the given artisthash.
|
||||
"""
|
||||
entry = [a for a in cls.artists if a.artisthash == artisthash][0]
|
||||
|
||||
src_albums = sorted(AlbumStore.albums, key=lambda x: x.albumhash)
|
||||
albums = UseBisection(
|
||||
source=src_albums, search_from="albumhash", queries=entry.albumhashes
|
||||
)()
|
||||
|
||||
entry.albums = [album for album in albums if album is not None]
|
||||
entry.albums_fetched = True
|
||||
|
||||
@classmethod
|
||||
def process_album_type(cls, artisthash: str):
|
||||
"""
|
||||
Checks the cached albums type for the given artisthash.
|
||||
"""
|
||||
entry = [a for a in cls.artists if a.artisthash == artisthash][0]
|
||||
|
||||
for album in entry.albums:
|
||||
album.check_type()
|
||||
|
||||
album_tracks = TrackStore.get_tracks_by_albumhash(album.albumhash)
|
||||
album_tracks = remove_duplicates(album_tracks)
|
||||
|
||||
album.get_date_from_tracks(album_tracks)
|
||||
|
||||
if album.date == 0:
|
||||
AlbumStore.remove_album_by_hash(album.albumhash)
|
||||
continue
|
||||
|
||||
album.check_is_single(album_tracks)
|
||||
|
||||
entry.type_checked = True
|
||||
entry.albums.sort(key=lambda a: a.date, reverse=True)
|
||||
|
||||
|
||||
def add_albums_to_cache(artisthash: str):
|
||||
"""
|
||||
Fetches albums and adds them to the cache.
|
||||
"""
|
||||
tracks = TrackStore.get_tracks_by_artisthash(artisthash)
|
||||
|
||||
if len(tracks) == 0:
|
||||
return False
|
||||
|
||||
albumhashes = set(t.albumhash for t in tracks)
|
||||
ArtistsCache.add_entry(artisthash, albumhashes, [])
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# =======================================================
|
||||
# ===================== ROUTES ==========================
|
||||
# =======================================================
|
||||
|
||||
|
||||
@api.route("/artist/<artisthash>", methods=["GET"])
|
||||
def get_artist(artisthash: str):
|
||||
"""
|
||||
@ -188,20 +36,7 @@ def get_artist(artisthash: str):
|
||||
if artist is None:
|
||||
return {"error": "Artist not found"}, 404
|
||||
|
||||
tracks_cached = ArtistsCache.tracks_cached(artisthash)
|
||||
|
||||
if tracks_cached:
|
||||
tracks = ArtistsCache.get_tracks(artisthash)
|
||||
else:
|
||||
tracks = TrackStore.get_tracks_by_artisthash(artisthash)
|
||||
albumhashes = set(t.albumhash for t in tracks)
|
||||
hashes_from_albums = set(
|
||||
a.albumhash for a in AlbumStore.get_albums_by_artisthash(artisthash)
|
||||
)
|
||||
|
||||
albumhashes = albumhashes.union(hashes_from_albums)
|
||||
ArtistsCache.add_entry(artisthash, albumhashes, tracks)
|
||||
|
||||
tracks = TrackStore.get_tracks_by_artisthash(artisthash)
|
||||
tcount = len(tracks)
|
||||
acount = AlbumStore.count_albums_by_artisthash(artisthash)
|
||||
|
||||
@ -246,7 +81,6 @@ def get_artist(artisthash: str):
|
||||
|
||||
@api.route("/artist/<artisthash>/albums", methods=["GET"])
|
||||
def get_artist_albums(artisthash: str):
|
||||
# TODO: Remove the artist cache and only process the required albums ie. not all albums. because that means processing all 355 albums for juice wrld while only less than 20 are shown in the artist page.
|
||||
limit = request.args.get("limit")
|
||||
|
||||
if limit is None:
|
||||
@ -256,20 +90,36 @@ def get_artist_albums(artisthash: str):
|
||||
|
||||
limit = int(limit)
|
||||
|
||||
is_cached = ArtistsCache.albums_cached(artisthash)
|
||||
all_albums = AlbumStore.get_albums_by_artisthash(artisthash)
|
||||
all_tracks = TrackStore.get_tracks_by_artisthash(artisthash)
|
||||
|
||||
if not is_cached:
|
||||
add_albums_to_cache(artisthash)
|
||||
track_albums = set(t.albumhash for t in all_tracks)
|
||||
missing_album_hashes = track_albums.difference(set(a.albumhash for a in all_albums))
|
||||
|
||||
albums_fetched = ArtistsCache.albums_fetched(artisthash)
|
||||
if len(missing_album_hashes) > 0:
|
||||
missing_albums = AlbumStore.get_albums_by_hashes(list(missing_album_hashes))
|
||||
all_albums.extend(missing_albums)
|
||||
|
||||
if not albums_fetched:
|
||||
ArtistsCache.get_albums(artisthash)
|
||||
def get_album_tracks(albumhash: str):
|
||||
return [t for t in all_tracks if t.albumhash == albumhash]
|
||||
|
||||
all_albums, index = ArtistsCache.get_albums_by_artisthash(artisthash)
|
||||
for a in all_albums:
|
||||
a.check_type()
|
||||
|
||||
if not ArtistsCache.artists[index].type_checked:
|
||||
ArtistsCache.process_album_type(artisthash)
|
||||
album_tracks = get_album_tracks(a.albumhash)
|
||||
|
||||
if len(album_tracks) == 0:
|
||||
continue
|
||||
|
||||
a.get_date_from_tracks(album_tracks)
|
||||
|
||||
if a.date == 0:
|
||||
AlbumStore.remove_album_by_hash(a.albumhash)
|
||||
continue
|
||||
|
||||
a.check_is_single(album_tracks)
|
||||
|
||||
all_albums = sorted(all_albums, key=lambda a: str(a.date), reverse=True)
|
||||
|
||||
singles = [a for a in all_albums if a.is_single]
|
||||
eps = [a for a in all_albums if a.is_EP]
|
||||
@ -300,11 +150,12 @@ def get_artist_albums(artisthash: str):
|
||||
if return_all is not None and return_all == "true":
|
||||
limit = len(all_albums)
|
||||
|
||||
singles_and_eps = singles + eps
|
||||
|
||||
return {
|
||||
"artistname": artist.name,
|
||||
"albums": serialize_for_card_many(albums[:limit]),
|
||||
"singles": serialize_for_card_many(singles[:limit]),
|
||||
"eps": serialize_for_card_many(eps[:limit]),
|
||||
"singles_and_eps": serialize_for_card_many(singles_and_eps[:limit]),
|
||||
"appearances": serialize_for_card_many(appearances[:limit]),
|
||||
"compilations": serialize_for_card_many(compilations[:limit]),
|
||||
}
|
||||
|
@ -4,13 +4,11 @@ interacting with the tracks table.
|
||||
"""
|
||||
|
||||
from collections import OrderedDict
|
||||
import random
|
||||
from sqlite3 import Cursor
|
||||
|
||||
from app.db.sqlite.utils import tuple_to_track, tuples_to_tracks
|
||||
|
||||
from .utils import SQLiteManager
|
||||
from app.logger import log
|
||||
from pprint import pprint
|
||||
|
||||
|
||||
@ -59,10 +57,6 @@ class SQLiteTrackMethods:
|
||||
# def force_surrogatepass(string: str):
|
||||
# return string.encode("utf-16", "surrogatepass").decode("utf-16")
|
||||
|
||||
# for key, value in track.items():
|
||||
# if isinstance(value, str):
|
||||
# track[key] = force_surrogatepass(value)
|
||||
|
||||
track["artist"] = track["artists"]
|
||||
track["albumartist"] = track["albumartists"]
|
||||
|
||||
@ -77,7 +71,6 @@ class SQLiteTrackMethods:
|
||||
Inserts a list of tracks into the database.
|
||||
"""
|
||||
|
||||
log.info("Send me the JSON below for debugging.")
|
||||
|
||||
with SQLiteManager() as cur:
|
||||
for track in tracks:
|
||||
@ -85,7 +78,6 @@ class SQLiteTrackMethods:
|
||||
cls.insert_one_track(track, cur)
|
||||
except Exception:
|
||||
pprint(track, indent=4)
|
||||
log.error("Send me the JSON above for debugging. Use https://pastebin.com and share the link.")
|
||||
|
||||
@staticmethod
|
||||
def get_all_tracks():
|
||||
|
@ -56,7 +56,6 @@ class AlbumVersionEnum(Enum):
|
||||
RE_RECORDED = ("re-recorded", "rerecorded")
|
||||
REISSUE = ("reissue",)
|
||||
REMASTERED = ("remaster",)
|
||||
TAYLORS_VERSION = ("taylor's version",)
|
||||
|
||||
|
||||
def get_all_keywords():
|
||||
|
@ -19,7 +19,7 @@ def create_folder(path: str, count=0) -> Folder:
|
||||
name=folder.name,
|
||||
path=win_replace_slash(str(folder)),
|
||||
is_sym=folder.is_symlink(),
|
||||
count=count
|
||||
count=count,
|
||||
)
|
||||
|
||||
|
||||
@ -36,7 +36,7 @@ def get_folders(paths: list[str]):
|
||||
count_dict[path] += 1
|
||||
|
||||
folders = [{"path": path, "count": count_dict[path]} for path in paths]
|
||||
return [create_folder(f['path'], f['count']) for f in folders if f['count'] > 0]
|
||||
return [create_folder(f["path"], f["count"]) for f in folders if f["count"] > 0]
|
||||
|
||||
|
||||
class GetFilesAndDirs:
|
||||
|
@ -69,11 +69,6 @@ class Album:
|
||||
if "super_deluxe" in self.versions:
|
||||
self.versions.remove("deluxe")
|
||||
|
||||
t_ = "taylors_version"
|
||||
if t_ in self.versions:
|
||||
self.versions.remove(t_)
|
||||
self.versions.insert(0, "taylor's version")
|
||||
|
||||
self.versions = [v.replace("_", " ") for v in self.versions]
|
||||
else:
|
||||
self.base_title = get_base_title_and_versions(
|
||||
@ -167,9 +162,7 @@ class Album:
|
||||
"""
|
||||
keywords = ["single version", "- single"]
|
||||
|
||||
show_albums_as_singles = get_flag(
|
||||
SessionVarKeys.SHOW_ALBUMS_AS_SINGLES
|
||||
)
|
||||
show_albums_as_singles = get_flag(SessionVarKeys.SHOW_ALBUMS_AS_SINGLES)
|
||||
|
||||
for keyword in keywords:
|
||||
if keyword in self.title.lower():
|
||||
@ -203,9 +196,4 @@ class Album:
|
||||
return
|
||||
|
||||
dates = (int(t.date) for t in tracks if t.date)
|
||||
|
||||
# if len(dates) == 0:
|
||||
# self.date = 0
|
||||
# return
|
||||
|
||||
self.date = datetime.datetime.fromtimestamp(min(dates)).year
|
||||
|
@ -1,3 +1,6 @@
|
||||
from app.models.track import Track
|
||||
|
||||
|
||||
class UseBisection:
|
||||
"""
|
||||
Uses bisection to find a list of items in another list.
|
||||
@ -29,11 +32,11 @@ class UseBisection:
|
||||
|
||||
return None
|
||||
|
||||
def __call__(self) -> list:
|
||||
def __call__(self):
|
||||
if len(self.source_list) == 0:
|
||||
return [None]
|
||||
return []
|
||||
|
||||
results = []
|
||||
results: list[Track] = []
|
||||
|
||||
for query in self.queries_list:
|
||||
res = self.find(query)
|
||||
|
8
app/utils/unicode.py
Normal file
8
app/utils/unicode.py
Normal file
@ -0,0 +1,8 @@
|
||||
def handle_unicode(string: str):
|
||||
"""
|
||||
Try resolving unicode characters, else escape them.
|
||||
"""
|
||||
return string.encode("utf-16", "replace").decode("utf-16")
|
||||
# try:
|
||||
# except:
|
||||
# return string.encode("unicode_escape").decode("utf-8")
|
Loading…
x
Reference in New Issue
Block a user