diff --git a/app/api/album.py b/app/api/album.py index 6d2c7cf..4c085c9 100644 --- a/app/api/album.py +++ b/app/api/album.py @@ -8,10 +8,12 @@ from flask import Blueprint, request from app.db.sqlite.albums import SQLiteAlbumMethods as adb from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb -from app.db.store import Store from app.models import FavType, Track from app.utils.remove_duplicates import remove_duplicates +from app.store.tracks import TrackStore +from app.store.albums import AlbumStore + get_albums_by_albumartist = adb.get_albums_by_albumartist check_is_fav = favdb.check_is_favorite @@ -36,12 +38,12 @@ def get_album_tracks_and_info(): return error_msg, 400 error_msg = {"error": "Album not created yet."} - album = Store.get_album_by_hash(albumhash) + album = AlbumStore.get_album_by_hash(albumhash) if album is None: return error_msg, 204 - tracks = Store.get_tracks_by_albumhash(albumhash) + tracks = TrackStore.get_tracks_by_albumhash(albumhash) if tracks is None: return error_msg, 404 @@ -84,7 +86,7 @@ def get_album_tracks(albumhash: str): """ Returns all the tracks in the given album, sorted by disc and track number. """ - tracks = Store.get_tracks_by_albumhash(albumhash) + tracks = TrackStore.get_tracks_by_albumhash(albumhash) tracks = [asdict(t) for t in tracks] for t in tracks: @@ -112,7 +114,7 @@ def get_artist_albums(): albums = [ { "artisthash": a, - "albums": Store.get_albums_by_albumartist(a, limit, exclude=exclude), + "albums": AlbumStore.get_albums_by_albumartist(a, limit, exclude=exclude), } for a in albumartists ] diff --git a/app/api/artist.py b/app/api/artist.py index a183a4b..3c8b3f7 100644 --- a/app/api/artist.py +++ b/app/api/artist.py @@ -7,11 +7,14 @@ from collections import deque from flask import Blueprint, request from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb -from app.db.store import Store from app.models import Album, FavType, Track from app.utils.remove_duplicates import remove_duplicates from app.requests.artists import fetch_similar_artists +from app.store.albums import AlbumStore +from app.store.tracks import TrackStore +from app.store.artists import ArtistStore + api = Blueprint("artist", __name__, url_prefix="/") @@ -107,10 +110,10 @@ class ArtistsCache: """ entry = [a for a in cls.artists if a.artisthash == artisthash][0] - albums = [Store.get_album_by_hash(h) for h in entry.albumhashes] + albums = [AlbumStore.get_album_by_hash(h) for h in entry.albumhashes] entry.albums = [album for album in albums if album is not None] - store_albums = Store.get_albums_by_artisthash(artisthash) + store_albums = AlbumStore.get_albums_by_artisthash(artisthash) all_albums_hash = "-".join([a.albumhash for a in entry.albums]) @@ -130,7 +133,7 @@ class ArtistsCache: for album in entry.albums: album.check_type() - album_tracks = Store.get_tracks_by_albumhash(album.albumhash) + album_tracks = TrackStore.get_tracks_by_albumhash(album.albumhash) album_tracks = remove_duplicates(album_tracks) album.get_date_from_tracks(album_tracks) @@ -143,7 +146,7 @@ def add_albums_to_cache(artisthash: str): """ Fetches albums and adds them to the cache. """ - tracks = Store.get_tracks_by_artist(artisthash) + tracks = TrackStore.get_tracks_by_artist(artisthash) if len(tracks) == 0: return False @@ -171,7 +174,7 @@ def get_artist(artisthash: str): limit = int(limit) - artist = Store.get_artist_by_hash(artisthash) + artist = ArtistStore.get_artist_by_hash(artisthash) if artist is None: return {"error": "Artist not found"}, 404 @@ -181,17 +184,17 @@ def get_artist(artisthash: str): if tracks_cached: tracks = ArtistsCache.get_tracks(artisthash) else: - tracks = Store.get_tracks_by_artist(artisthash) + tracks = TrackStore.get_tracks_by_artist(artisthash) albumhashes = set(t.albumhash for t in tracks) hashes_from_albums = set( - a.albumhash for a in Store.get_albums_by_artisthash(artisthash) + a.albumhash for a in AlbumStore.get_albums_by_artisthash(artisthash) ) albumhashes = albumhashes.union(hashes_from_albums) ArtistsCache.add_entry(artisthash, albumhashes, tracks) tcount = len(tracks) - acount = Store.count_albums_by_artisthash(artisthash) + acount = AlbumStore.count_albums_by_artisthash(artisthash) if acount == 0 and tcount < 10: limit = tcount @@ -253,7 +256,7 @@ def get_artist_albums(artisthash: str): appearances = remove_EPs_and_singles(appearances) - artist = Store.get_artist_by_hash(artisthash) + artist = ArtistStore.get_artist_by_hash(artisthash) if return_all is not None: limit = len(all_albums) @@ -273,7 +276,7 @@ def get_all_artist_tracks(artisthash: str): """ Returns all artists by a given artist. """ - tracks = Store.get_tracks_by_artist(artisthash) + tracks = TrackStore.get_tracks_by_artist(artisthash) return {"tracks": tracks} @@ -290,13 +293,13 @@ def get_similar_artists(artisthash: str): limit = int(limit) - artist = Store.get_artist_by_hash(artisthash) + artist = ArtistStore.get_artist_by_hash(artisthash) if artist is None: return {"error": "Artist not found"}, 404 similar_hashes = fetch_similar_artists(artist.name) - similar = Store.get_artists_by_hashes(similar_hashes) + similar = ArtistStore.get_artists_by_hashes(similar_hashes) if len(similar) > limit: similar = random.sample(similar, limit) diff --git a/app/api/colors.py b/app/api/colors.py index 20826ad..f6de2c5 100644 --- a/app/api/colors.py +++ b/app/api/colors.py @@ -1,5 +1,5 @@ from flask import Blueprint -from app.db.store import Store +from app.store.albums import AlbumStore as Store api = Blueprint("colors", __name__, url_prefix="/colors") diff --git a/app/api/favorites.py b/app/api/favorites.py index b039777..7883cbc 100644 --- a/app/api/favorites.py +++ b/app/api/favorites.py @@ -1,10 +1,13 @@ from flask import Blueprint, request from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb -from app.db.store import Store from app.models import FavType from app.utils.bisection import UseBisection +from app.store.artists import ArtistStore +from app.store.albums import AlbumStore +from app.store.tracks import TrackStore + api = Blueprint("favorite", __name__, url_prefix="/") @@ -28,7 +31,7 @@ def add_favorite(): favdb.insert_one_favorite(itemtype, itemhash) if itemtype == FavType.track: - Store.add_fav_track(itemhash) + TrackStore.make_track_fav(itemhash) return {"msg": "Added to favorites"} @@ -49,7 +52,7 @@ def remove_favorite(): favdb.delete_favorite(itemtype, itemhash) if itemtype == FavType.track: - Store.remove_fav_track(itemhash) + TrackStore.remove_track_from_fav(itemhash) return {"msg": "Removed from favorites"} @@ -67,7 +70,7 @@ def get_favorite_albums(): albumhashes = [a[1] for a in albums] albumhashes.reverse() - src_albums = sorted(Store.albums, key=lambda x: x.albumhash) + src_albums = sorted(AlbumStore.albums, key=lambda x: x.albumhash) fav_albums = UseBisection(src_albums, "albumhash", albumhashes)() fav_albums = remove_none(fav_albums) @@ -90,7 +93,7 @@ def get_favorite_tracks(): tracks = favdb.get_fav_tracks() trackhashes = [t[1] for t in tracks] trackhashes.reverse() - src_tracks = sorted(Store.tracks, key=lambda x: x.trackhash) + src_tracks = sorted(TrackStore.tracks, key=lambda x: x.trackhash) tracks = UseBisection(src_tracks, "trackhash", trackhashes)() tracks = remove_none(tracks) @@ -114,7 +117,7 @@ def get_favorite_artists(): artisthashes = [a[1] for a in artists] artisthashes.reverse() - src_artists = sorted(Store.artists, key=lambda x: x.artisthash) + src_artists = sorted(ArtistStore.artists, key=lambda x: x.artisthash) artists = UseBisection(src_artists, "artisthash", artisthashes)() artists = remove_none(artists) @@ -176,9 +179,9 @@ def get_all_favorites(): if fav[2] == FavType.artist: artists.append(fav[1]) - src_tracks = sorted(Store.tracks, key=lambda x: x.trackhash) - src_albums = sorted(Store.albums, key=lambda x: x.albumhash) - src_artists = sorted(Store.artists, key=lambda x: x.artisthash) + src_tracks = sorted(TrackStore.tracks, key=lambda x: x.trackhash) + src_albums = sorted(AlbumStore.albums, key=lambda x: x.albumhash) + src_artists = sorted(ArtistStore.artists, key=lambda x: x.artisthash) tracks = UseBisection(src_tracks, "trackhash", tracks)() albums = UseBisection(src_albums, "albumhash", albums)() diff --git a/app/api/playlist.py b/app/api/playlist.py index 1485250..325bcd5 100644 --- a/app/api/playlist.py +++ b/app/api/playlist.py @@ -7,13 +7,15 @@ from datetime import datetime from flask import Blueprint, request from PIL import UnidentifiedImageError -from app import models, serializer +from app import models from app.db.sqlite.playlists import SQLitePlaylistMethods -from app.db.store import Store from app.lib import playlistlib -from app.utils.generators import create_new_date +from app.utils.dates import date_string_to_time_passed, create_new_date from app.utils.remove_duplicates import remove_duplicates +from app.store.tracks import TrackStore +from app.store.albums import AlbumStore + api = Blueprint("playlist", __name__, url_prefix="/") PL = SQLitePlaylistMethods @@ -42,7 +44,7 @@ def duplicate_images(images: list): def get_first_4_images(trackhashes: list[str]) -> list[dict['str', str]]: - tracks = Store.get_tracks_by_trackhashes(trackhashes) + tracks = TrackStore.get_tracks_by_trackhashes(trackhashes) albums = [] for track in tracks: @@ -51,7 +53,7 @@ def get_first_4_images(trackhashes: list[str]) -> list[dict['str', str]]: if len(albums) == 4: break - albums = Store.get_albums_by_hashes(albums) + albums = AlbumStore.get_albums_by_hashes(albums) images = [ { 'image': album.image, @@ -155,11 +157,11 @@ def get_playlist(playlistid: str): if playlist is None: return {"msg": "Playlist not found"}, 404 - tracks = Store.get_tracks_by_trackhashes(list(playlist.trackhashes)) + tracks = TrackStore.get_tracks_by_trackhashes(list(playlist.trackhashes)) tracks = remove_duplicates(tracks) duration = sum(t.duration for t in tracks) - playlist.last_updated = serializer.date_string_to_time_passed(playlist.last_updated) + playlist.last_updated = date_string_to_time_passed(playlist.last_updated) playlist.duration = duration @@ -223,7 +225,7 @@ def update_playlist_info(playlistid: str): update_playlist(int(playlistid), playlist) playlist = models.Playlist(*p_tuple) - playlist.last_updated = serializer.date_string_to_time_passed(playlist.last_updated) + playlist.last_updated = date_string_to_time_passed(playlist.last_updated) return { "data": playlist, diff --git a/app/api/search.py b/app/api/search.py index a1a3b1f..aafe787 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -6,9 +6,11 @@ from unidecode import unidecode from flask import Blueprint, request from app import models -from app.db.store import Store from app.lib import searchlib + +from app.store.tracks import TrackStore + api = Blueprint("search", __name__, url_prefix="/") SEARCH_COUNT = 12 @@ -38,7 +40,7 @@ class Search: Calls :class:`SearchTracks` which returns the tracks that fuzzily match the search terms. Then adds them to the `SearchResults` store. """ - self.tracks = Store.tracks + self.tracks = TrackStore.tracks tracks = searchlib.SearchTracks(self.query)() SearchResults.tracks = tracks diff --git a/app/api/settings.py b/app/api/settings.py index 86ce98f..d82ce52 100644 --- a/app/api/settings.py +++ b/app/api/settings.py @@ -3,12 +3,16 @@ from app import settings from app.logger import log from app.lib import populate -from app.db.store import Store from app.lib.watchdogg import Watcher as WatchDog from app.db.sqlite.settings import SettingsSQLMethods as sdb from app.utils.generators import get_random_str from app.utils.threading import background +from app.store.store import FolderStore +from app.store.albums import AlbumStore +from app.store.tracks import TrackStore +from app.store.artists import ArtistStore + api = Blueprint("settings", __name__, url_prefix="/") @@ -22,10 +26,10 @@ def reload_everything(): """ Reloads all stores using the current database items """ - Store.load_all_tracks() - Store.process_folders() - Store.load_albums() - Store.load_artists() + TrackStore.load_all_tracks() + FolderStore.process_folders() + AlbumStore.load_albums() + ArtistStore.load_artists() @background @@ -34,7 +38,7 @@ def rebuild_store(db_dirs: list[str]): Restarts the watchdog and rebuilds the music library. """ log.info("Rebuilding library...") - Store.remove_tracks_by_dir_except(db_dirs) + TrackStore.remove_tracks_by_dir_except(db_dirs) reload_everything() key = get_random_str() diff --git a/app/api/track.py b/app/api/track.py index 708edf0..129bba9 100644 --- a/app/api/track.py +++ b/app/api/track.py @@ -5,7 +5,7 @@ import os from flask import Blueprint, send_file, request -from app.db.store import Store +from app.store.tracks import TrackStore api = Blueprint("track", __name__, url_prefix="/") @@ -31,7 +31,7 @@ def send_track_file(trackhash: str): if trackhash is None: return msg, 404 - tracks = Store.get_tracks_by_trackhashes([trackhash]) + tracks = TrackStore.get_tracks_by_trackhashes([trackhash]) for track in tracks: if track is None: diff --git a/app/db/sqlite/playlists.py b/app/db/sqlite/playlists.py index 2182faf..52f7d28 100644 --- a/app/db/sqlite/playlists.py +++ b/app/db/sqlite/playlists.py @@ -4,7 +4,7 @@ from collections import OrderedDict from app.db.sqlite.tracks import SQLiteTrackMethods from app.db.sqlite.utils import SQLiteManager, tuple_to_playlist, tuples_to_playlists from app.models import Artist -from app.utils.generators import create_new_date +from app.utils.dates import create_new_date from app.utils.threading import background @@ -151,12 +151,12 @@ class SQLitePlaylistMethods: cur.execute(sql, params) @staticmethod - def update_last_updated(playlist_id: int, date=create_new_date()): + def update_last_updated(playlist_id: int): """Updates the last updated date of a playlist.""" sql = """UPDATE playlists SET last_updated = ? WHERE id = ?""" with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (date, playlist_id)) + cur.execute(sql, (create_new_date(), playlist_id)) @staticmethod def delete_playlist(pid: str): diff --git a/app/db/store.py b/app/db/store.py deleted file mode 100644 index 56d7399..0000000 --- a/app/db/store.py +++ /dev/null @@ -1,516 +0,0 @@ -""" -In memory store. -""" -import json -import random -from pathlib import Path - -from tqdm import tqdm - -from app.db.sqlite.albums import SQLiteAlbumMethods as aldb -from app.db.sqlite.artists import SQLiteArtistMethods as ardb -from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb -from app.db.sqlite.tracks import SQLiteTrackMethods as tdb -from app.lib.artistlib import get_all_artists -from app.models import Album, Artist, Folder, Track -from app.utils.bisection import UseBisection -from app.utils.hashing import create_folder_hash -from app.utils.remove_duplicates import remove_duplicates -from app.utils.wintools import win_replace_slash - - -class Store: - """ - This class holds all tracks in memory and provides methods for - interacting with them. - """ - - tracks: list[Track] = [] - folders: list[Folder] = [] - albums: list[Album] = [] - artists: list[Artist] = [] - - @classmethod - def load_all_tracks(cls): - """ - Loads all tracks from the database into the store. - """ - - cls.tracks = list(tdb.get_all_tracks()) - - fav_hashes = favdb.get_fav_tracks() - fav_hashes = " ".join([t[1] for t in fav_hashes]) - - for track in tqdm(cls.tracks, desc="Loading tracks"): - if track.trackhash in fav_hashes: - track.is_favorite = True - - @classmethod - def add_track(cls, track: Track): - """ - Adds a single track to the store. - """ - - cls.tracks.append(track) - - @classmethod - def add_tracks(cls, tracks: list[Track]): - """ - Adds multiple tracks to the store. - """ - - cls.tracks.extend(tracks) - - @classmethod - def get_tracks_by_trackhashes(cls, trackhashes: list[str]) -> list[Track]: - """ - Returns a list of tracks by their hashes. - """ - - trackhashes = " ".join(trackhashes) - tracks = [track for track in cls.tracks if track.trackhash in trackhashes] - - tracks.sort(key=lambda t: trackhashes.index(t.trackhash)) - return tracks - - @classmethod - def remove_track_by_filepath(cls, filepath: str): - """ - Removes a track from the store by its filepath. - """ - - for track in cls.tracks: - if track.filepath == filepath: - cls.tracks.remove(track) - break - - @classmethod - def remove_tracks_by_dir_except(cls, dirs: list[str]): - """Removes all tracks not in the root directories.""" - to_remove = set() - - for track in cls.tracks: - if not track.folder.startswith(tuple(dirs)): - to_remove.add(track.folder) - - tdb.remove_tracks_by_folders(to_remove) - - @classmethod - def count_tracks_by_hash(cls, trackhash: str) -> int: - """ - Counts the number of tracks with a specific hash. - """ - - count = 0 - - for track in cls.tracks: - if track.trackhash == trackhash: - count += 1 - - return count - - # ==================================================== - # =================== FAVORITES ====================== - # ==================================================== - - @classmethod - def add_fav_track(cls, trackhash: str): - """ - Adds a track to the favorites. - """ - - for track in cls.tracks: - if track.trackhash == trackhash: - track.is_favorite = True - - @classmethod - def remove_fav_track(cls, trackhash: str): - """ - Removes a track from the favorites. - """ - - for track in cls.tracks: - if track.trackhash == trackhash: - track.is_favorite = False - - # ==================================================== - # ==================== FOLDERS ======================= - # ==================================================== - - @classmethod - def check_has_tracks(cls, path: str): # type: ignore - """ - Checks if a folder has tracks. - """ - path_hashes = "".join(f.path_hash for f in cls.folders) - path_hash = create_folder_hash(*Path(path).parts[1:]) - - return path_hash in path_hashes - - @classmethod - def is_empty_folder(cls, path: str): - """ - Checks if a folder has tracks using tracks in the store. - """ - - all_folders = set(track.folder for track in cls.tracks) - folder_hashes = "".join( - create_folder_hash(*Path(f).parts[1:]) for f in all_folders - ) - - path_hash = create_folder_hash(*Path(path).parts[1:]) - return path_hash in folder_hashes - - @staticmethod - def create_folder(path: str) -> Folder: - """ - Creates a folder object from a path. - """ - folder = Path(path) - - return Folder( - name=folder.name, - path=win_replace_slash(str(folder)), - is_sym=folder.is_symlink(), - has_tracks=True, - path_hash=create_folder_hash(*folder.parts[1:]), - ) - - @classmethod - def add_folder(cls, path: str): - """ - Adds a folder to the store. - """ - - if cls.check_has_tracks(path): - return - - folder = cls.create_folder(path) - cls.folders.append(folder) - - @classmethod - def remove_folder(cls, path: str): - """ - Removes a folder from the store. - """ - - for folder in cls.folders: - if folder.path == path: - cls.folders.remove(folder) - break - - @classmethod - def process_folders(cls): - """ - Creates a list of folders from the tracks in the store. - """ - cls.folders.clear() - - all_folders = [track.folder for track in cls.tracks] - all_folders = set(all_folders) - - all_folders = [ - folder for folder in all_folders if not cls.check_has_tracks(folder) - ] - - all_folders = [Path(f) for f in all_folders] - # all_folders = [f for f in all_folders if f.exists()] - - valid_folders = [] - - for folder in all_folders: - try: - if folder.exists(): - valid_folders.append(folder) - except PermissionError: - pass - - for path in tqdm(valid_folders, desc="Processing folders"): - folder = cls.create_folder(str(path)) - - cls.folders.append(folder) - - @classmethod - def get_folder(cls, path: str): # type: ignore - """ - Returns a folder object by its path. - """ - # TODO: Modify this method to accept a list of paths, sorting is computationally expensive. - folders = sorted(cls.folders, key=lambda x: x.path) - folder = UseBisection(folders, "path", [path])()[0] - - if folder is not None: - return folder - - has_tracks = cls.check_has_tracks(path) - - if not has_tracks: - return None - - folder = cls.create_folder(path) - cls.folders.append(folder) - return folder - - @classmethod - def get_folders_count(cls, paths: list[str]) -> list[dict[str, int]]: - count_dict = {path: 0 for path in paths} - - for track in cls.tracks: - for path in paths: - if track.filepath.startswith(path): - count_dict[path] += 1 - - result = [{"path": path, "count": count_dict[path]} for path in paths] - - # TODO: Modify this method to return Folder objects with - # track count mapped. Keep an eye on function complexity. - return result - - @classmethod - def get_tracks_by_filepaths(cls, paths: list[str]) -> list[Track]: - """ - Returns all tracks matching the given paths. - """ - tracks = sorted(cls.tracks, key=lambda x: x.filepath) - tracks = UseBisection(tracks, "filepath", paths)() - return [track for track in tracks if track is not None] - - @classmethod - def get_tracks_by_albumhash(cls, album_hash: str) -> list[Track]: - """ - Returns all tracks matching the given album hash. - """ - return [t for t in cls.tracks if t.albumhash == album_hash] - - @classmethod - def get_tracks_by_artist(cls, artisthash: str) -> list[Track]: - """ - Returns all tracks matching the given artist. Duplicate tracks are removed. - """ - tracks = [t for t in cls.tracks if artisthash in t.artist_hashes] - return remove_duplicates(tracks) - - # ==================================================== - # ==================== ALBUMS ======================== - # ==================================================== - - @staticmethod - def create_album(track: Track): - """ - Creates album object from a track - """ - return Album( - albumhash=track.albumhash, - albumartists=track.albumartist, # type: ignore - title=track.album, - ) - - @classmethod - def load_albums(cls): - """ - Loads all albums from the database into the store. - """ - - cls.albums = [] - - albumhashes = set(t.albumhash for t in cls.tracks) - - for albumhash in tqdm(albumhashes, desc="Loading albums"): - for track in cls.tracks: - if track.albumhash == albumhash: - cls.albums.append(cls.create_album(track)) - break - - db_albums: list[tuple] = aldb.get_all_albums() - - for album in tqdm(db_albums, desc="Mapping album colors"): - albumhash = album[1] - colors = json.loads(album[2]) - - for _al in cls.albums: - if _al.albumhash == albumhash: - _al.set_colors(colors) - break - - @classmethod - def add_album(cls, album: Album): - """ - Adds an album to the store. - """ - cls.albums.append(album) - - @classmethod - def add_albums(cls, albums: list[Album]): - """ - Adds multiple albums to the store. - """ - cls.albums.extend(albums) - - @classmethod - def get_albums_by_albumartist( - cls, artisthash: str, limit: int, exclude: str - ) -> list[Album]: - """ - Returns N albums by the given albumartist, excluding the specified album. - """ - - albums = [album for album in cls.albums if artisthash in album.albumartisthash] - - albums = [album for album in albums if album.albumhash != exclude] - - if len(albums) > limit: - random.shuffle(albums) - - # TODO: Merge this with `cls.get_albums_by_artisthash()` - return albums[:limit] - - @classmethod - def get_album_by_hash(cls, albumhash: str) -> Album | None: - """ - Returns an album by its hash. - """ - try: - return [a for a in cls.albums if a.albumhash == albumhash][0] - except IndexError: - return None - - @classmethod - def get_albums_by_hashes(cls, albumhashes: list[str]) -> list[Album]: - """ - Returns albums by their hashes. - """ - albums_str = "-".join(albumhashes) - albums = [a for a in cls.albums if a.albumhash in albums_str] - - # sort albums by the order of the hashes - albums.sort(key=lambda x: albumhashes.index(x.albumhash)) - return albums - - @classmethod - def get_albums_by_artisthash(cls, artisthash: str) -> list[Album]: - """ - Returns all albums by the given artist. - """ - return [album for album in cls.albums if artisthash in album.albumartisthash] - - @classmethod - def count_albums_by_artisthash(cls, artisthash: str): - """ - Count albums for the given artisthash. - """ - albumartists = [a.albumartists for a in cls.albums] - artisthashes = [] - - for artist in albumartists: - artisthashes.extend([a.artisthash for a in artist]) # type: ignore - - master_string = "-".join(artisthashes) - - return master_string.count(artisthash) - - @classmethod - def album_exists(cls, albumhash: str) -> bool: - """ - Checks if an album exists. - """ - return albumhash in "-".join([a.albumhash for a in cls.albums]) - - @classmethod - def remove_album_by_hash(cls, albumhash: str): - """ - Removes an album from the store. - """ - cls.albums = [a for a in cls.albums if a.albumhash != albumhash] - - # ==================================================== - # ==================== ARTISTS ======================= - # ==================================================== - - @classmethod - def load_artists(cls): - """ - Loads all artists from the database into the store. - """ - cls.artists = get_all_artists(cls.tracks, cls.albums) - - db_artists: list[tuple] = list(ardb.get_all_artists()) - - for art in tqdm(db_artists, desc="Loading artists"): - cls.map_artist_color(art) - - @classmethod - def map_artist_color(cls, artist_tuple: tuple): - """ - Maps a color to the corresponding artist. - """ - - artisthash = artist_tuple[1] - color = json.loads(artist_tuple[2]) - - for artist in cls.artists: - if artist.artisthash == artisthash: - artist.colors = color - break - - @classmethod - def add_artist(cls, artist: Artist): - """ - Adds an artist to the store. - """ - cls.artists.append(artist) - - @classmethod - def add_artists(cls, artists: list[Artist]): - """ - Adds multiple artists to the store. - """ - for artist in artists: - if artist not in cls.artists: - cls.artists.append(artist) - - @classmethod - def get_artist_by_hash(cls, artisthash: str) -> Artist: - """ - Returns an artist by its hash.P - """ - artists = sorted(cls.artists, key=lambda x: x.artisthash) - artist = UseBisection(artists, "artisthash", [artisthash])()[0] - return artist - - @classmethod - def get_artists_by_hashes(cls, artisthashes: list[str]) -> list[Artist]: - """ - Returns artists by their hashes. - """ - artists = sorted(cls.artists, key=lambda x: x.artisthash) - artists = UseBisection(artists, "artisthash", artisthashes)() - return [a for a in artists if a is not None] - - @classmethod - def artist_exists(cls, artisthash: str) -> bool: - """ - Checks if an artist exists. - """ - return artisthash in "-".join([a.artisthash for a in cls.artists]) - - @classmethod - def artist_has_tracks(cls, artisthash: str) -> bool: - """ - Checks if an artist has tracks. - """ - artists: set[str] = set() - - for track in cls.tracks: - artists.update(track.artist_hashes) - album_artists: list[str] = [a.artisthash for a in track.albumartist] - artists.update(album_artists) - - master_hash = "-".join(artists) - return artisthash in master_hash - - @classmethod - def remove_artist_by_hash(cls, artisthash: str): - """ - Removes an artist from the store. - """ - cls.artists = [a for a in cls.artists if a.artisthash != artisthash] diff --git a/app/lib/artistlib.py b/app/lib/artistlib.py index 3c60ef0..2011585 100644 --- a/app/lib/artistlib.py +++ b/app/lib/artistlib.py @@ -10,9 +10,10 @@ from requests.exceptions import ConnectionError as ReqConnError, ReadTimeout from app import settings from app.models import Artist, Track, Album -from app.db import store from app.utils.hashing import create_hash +from app.store import artists as artist_store + def get_artist_image_link(artist: str): """ @@ -72,8 +73,8 @@ class CheckArtistImages: with ThreadPoolExecutor() as pool: list( tqdm( - pool.map(self.download_image, store.Store.artists), - total=len(store.Store.artists), + pool.map(self.download_image, artist_store.ArtistStore.artists), + total=len(artist_store.ArtistStore.artists), desc="Downloading artist images", ) ) diff --git a/app/lib/colorlib.py b/app/lib/colorlib.py index abfebda..cd1ede4 100644 --- a/app/lib/colorlib.py +++ b/app/lib/colorlib.py @@ -12,9 +12,11 @@ from app import settings from app.db.sqlite.albums import SQLiteAlbumMethods as db from app.db.sqlite.artists import SQLiteArtistMethods as adb from app.db.sqlite.utils import SQLiteManager -from app.db.store import Store from app.models import Album, Artist +from app.store.artists import ArtistStore +from app.store.albums import AlbumStore + def get_image_colors(image: str) -> list[str]: """Extracts 2 of the most dominant colors from an image.""" @@ -38,7 +40,7 @@ class ProcessAlbumColors: """ def __init__(self) -> None: - albums = [a for a in Store.albums if len(a.colors) == 0] + albums = [a for a in AlbumStore.albums if len(a.colors) == 0] with SQLiteManager() as cur: for album in tqdm(albums, desc="Processing missing album colors"): @@ -69,7 +71,7 @@ class ProcessArtistColors: """ def __init__(self) -> None: - all_artists = [a for a in Store.artists if len(a.colors) == 0] + all_artists = [a for a in ArtistStore.artists if len(a.colors) == 0] for artist in tqdm(all_artists, desc="Processing missing artist colors"): self.process_color(artist) @@ -85,7 +87,7 @@ class ProcessArtistColors: if len(colors) > 0: adb.insert_one_artist(artisthash=artist.artisthash, colors=colors) - Store.map_artist_color((0, artist.artisthash, json.dumps(colors))) + ArtistStore.map_artist_color((0, artist.artisthash, json.dumps(colors))) # TODO: If item color is in db, get it, assign it to the item and continue. # - Format all colors in the format: rgb(123, 123, 123) diff --git a/app/lib/folderslib.py b/app/lib/folderslib.py index f2e63b3..e229f33 100644 --- a/app/lib/folderslib.py +++ b/app/lib/folderslib.py @@ -1,13 +1,14 @@ import os from concurrent.futures import ThreadPoolExecutor -from pprint import pprint -from app.db.store import Store from app.models import Folder, Track from app.settings import SUPPORTED_FILES from app.logger import log from app.utils.wintools import win_replace_slash +from app.store.store import FolderStore +from app.store.tracks import TrackStore + class GetFilesAndDirs: """ @@ -49,12 +50,12 @@ class GetFilesAndDirs: files_.sort(key=lambda f: f["time"]) files = [f["path"] for f in files_] - tracks = Store.get_tracks_by_filepaths(files) + tracks = TrackStore.get_tracks_by_filepaths(files) # TODO: Remove this threadpool and modify the get_folder store # method to accept a list of paths. with ThreadPoolExecutor() as pool: - iterable = pool.map(Store.get_folder, dirs) + iterable = pool.map(FolderStore.get_folder, dirs) folders = [i for i in iterable if i is not None] folders = filter(lambda f: f.has_tracks, folders) diff --git a/app/lib/populate.py b/app/lib/populate.py index 00da0cd..5e9bfef 100644 --- a/app/lib/populate.py +++ b/app/lib/populate.py @@ -5,7 +5,6 @@ from app import settings from app.db.sqlite.tracks import SQLiteTrackMethods from app.db.sqlite.settings import SettingsSQLMethods as sdb from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb -from app.db.store import Store from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors from app.lib.taglib import extract_thumb, get_tags @@ -13,6 +12,11 @@ from app.logger import log from app.models import Album, Artist, Track from app.utils.filesystem import run_fast_scandir +from app.store.store import FolderStore +from app.store.albums import AlbumStore +from app.store.tracks import TrackStore +from app.store.artists import ArtistStore + get_all_tracks = SQLiteTrackMethods.get_all_tracks insert_many_tracks = SQLiteTrackMethods.insert_many_tracks @@ -97,19 +101,19 @@ class Populate: track = Track(**tags) track.is_favorite = track.trackhash in fav_tracks - Store.add_track(track) - Store.add_folder(track.folder) + TrackStore.add_track(track) + FolderStore.add_folder(track.folder) - if not Store.album_exists(track.albumhash): - Store.add_album(Store.create_album(track)) + if not AlbumStore.album_exists(track.albumhash): + AlbumStore.add_album(AlbumStore.create_album(track)) for artist in track.artist: - if not Store.artist_exists(artist.artisthash): - Store.add_artist(Artist(artist.name)) + if not ArtistStore.artist_exists(artist.artisthash): + ArtistStore.add_artist(Artist(artist.name)) for artist in track.albumartist: - if not Store.artist_exists(artist.artisthash): - Store.add_artist(Artist(artist.name)) + if not ArtistStore.artist_exists(artist.artisthash): + ArtistStore.add_artist(Artist(artist.name)) tagged_count += 1 else: @@ -122,7 +126,7 @@ class Populate: def get_image(album: Album): - for track in Store.tracks: + for track in TrackStore.tracks: if track.albumhash == album.albumhash: extract_thumb(track.filepath, track.image) break @@ -133,8 +137,8 @@ class ProcessTrackThumbnails: with ThreadPoolExecutor(max_workers=4) as pool: results = list( tqdm( - pool.map(get_image, Store.albums), - total=len(Store.albums), + pool.map(get_image, AlbumStore.albums), + total=len(AlbumStore.albums), desc="Extracting track images", ) ) diff --git a/app/lib/searchlib.py b/app/lib/searchlib.py index 2cb2883..50d1757 100644 --- a/app/lib/searchlib.py +++ b/app/lib/searchlib.py @@ -8,9 +8,12 @@ from rapidfuzz import fuzz, process from unidecode import unidecode from app import models -from app.db.store import Store from app.utils.remove_duplicates import remove_duplicates +from app.store.albums import AlbumStore +from app.store.artists import ArtistStore +from app.store.tracks import TrackStore + ratio = fuzz.ratio wratio = fuzz.WRatio @@ -40,7 +43,7 @@ class Limit: class SearchTracks: def __init__(self, query: str) -> None: self.query = query - self.tracks = Store.tracks + self.tracks = TrackStore.tracks def __call__(self) -> List[models.Track]: """ @@ -63,7 +66,7 @@ class SearchTracks: class SearchArtists: def __init__(self, query: str) -> None: self.query = query - self.artists = Store.artists + self.artists = ArtistStore.artists def __call__(self) -> list: """ @@ -85,7 +88,7 @@ class SearchArtists: class SearchAlbums: def __init__(self, query: str) -> None: self.query = query - self.albums = Store.albums + self.albums = AlbumStore.albums def __call__(self) -> List[models.Album]: """ @@ -160,9 +163,9 @@ class SearchAll: def collect_all(): all_items: _type = [] - all_items.extend(Store.tracks) - all_items.extend(Store.albums) - all_items.extend(Store.artists) + all_items.extend(TrackStore.tracks) + all_items.extend(AlbumStore.albums) + all_items.extend(ArtistStore.artists) return all_items, get_titles(all_items) diff --git a/app/lib/trackslib.py b/app/lib/trackslib.py index 08138d6..fcc5ce5 100644 --- a/app/lib/trackslib.py +++ b/app/lib/trackslib.py @@ -5,16 +5,16 @@ import os from tqdm import tqdm -from app.db.store import Store from app.db.sqlite.tracks import SQLiteTrackMethods as tdb +from app.store.tracks import TrackStore def validate_tracks() -> None: """ Gets all songs under the ~/ directory. """ - for track in tqdm(Store.tracks, desc="Removing deleted tracks"): + for track in tqdm(TrackStore.tracks, desc="Removing deleted tracks"): if not os.path.exists(track.filepath): print(f"Removing {track.filepath}") - Store.tracks.remove(track) + TrackStore.tracks.remove(track) tdb.remove_track_by_filepath(track.filepath) diff --git a/app/lib/watchdogg.py b/app/lib/watchdogg.py index a5313f1..f31e976 100644 --- a/app/lib/watchdogg.py +++ b/app/lib/watchdogg.py @@ -9,7 +9,6 @@ from watchdog.events import PatternMatchingEventHandler from watchdog.observers import Observer from app.logger import log -from app.db.store import Store from app.lib.taglib import get_tags from app.models import Artist, Track from app import settings @@ -18,6 +17,11 @@ from app.db.sqlite.tracks import SQLiteManager from app.db.sqlite.tracks import SQLiteTrackMethods as db from app.db.sqlite.settings import SettingsSQLMethods as sdb +from app.store.store import FolderStore +from app.store.tracks import TrackStore +from app.store.albums import AlbumStore +from app.store.artists import ArtistStore + class Watcher: """ @@ -138,19 +142,19 @@ def add_track(filepath: str) -> None: db.insert_one_track(tags, cur) track = Track(**tags) - Store().add_track(track) + TrackStore.add_track(track) - Store.add_folder(track.folder) + FolderStore.add_folder(track.folder) - if not Store.album_exists(track.albumhash): - album = Store.create_album(track) - Store.add_album(album) + if not AlbumStore.album_exists(track.albumhash): + album = AlbumStore.create_album(track) + AlbumStore.add_album(album) artists: list[Artist] = track.artist + track.albumartist # type: ignore for artist in artists: - if not Store.artist_exists(artist.artisthash): - Store.add_artist(Artist(artist.name)) + if not ArtistStore.artist_exists(artist.artisthash): + ArtistStore.add_artist(Artist(artist.name)) def remove_track(filepath: str) -> None: @@ -158,30 +162,30 @@ def remove_track(filepath: str) -> None: Removes a track from the music dict. """ try: - track = Store.get_tracks_by_filepaths([filepath])[0] + track = TrackStore.get_tracks_by_filepaths([filepath])[0] except IndexError: return db.remove_track_by_filepath(filepath) - Store.remove_track_by_filepath(filepath) + TrackStore.remove_track_by_filepath(filepath) - empty_album = Store.count_tracks_by_hash(track.albumhash) > 0 + empty_album = TrackStore.count_tracks_by_hash(track.albumhash) > 0 if empty_album: - Store.remove_album_by_hash(track.albumhash) + AlbumStore.remove_album_by_hash(track.albumhash) artists: list[Artist] = track.artist + track.albumartist # type: ignore for artist in artists: - empty_artist = not Store.artist_has_tracks(artist.artisthash) + empty_artist = not ArtistStore.artist_has_tracks(artist.artisthash) if empty_artist: - Store.remove_artist_by_hash(artist.artisthash) + ArtistStore.remove_artist_by_hash(artist.artisthash) - empty_folder = Store.is_empty_folder(track.folder) + empty_folder = FolderStore.is_empty_folder(track.folder) if empty_folder: - Store.remove_folder(track.folder) + FolderStore.remove_folder(track.folder) class Handler(PatternMatchingEventHandler): diff --git a/app/setup/__init__.py b/app/setup/__init__.py index b731df6..21cdacd 100644 --- a/app/setup/__init__.py +++ b/app/setup/__init__.py @@ -1,17 +1,21 @@ """ Prepares the server for use. """ -from app.db.store import Store +from app.store.store import FolderStore from app.setup.files import create_config_dir from app.setup.sqlite import setup_sqlite, run_migrations +from app.store.albums import AlbumStore +from app.store.tracks import TrackStore +from app.store.artists import ArtistStore + def run_setup(): create_config_dir() setup_sqlite() run_migrations() - Store.load_all_tracks() - Store.process_folders() - Store.load_albums() - Store.load_artists() + TrackStore.load_all_tracks() + FolderStore.process_folders() + AlbumStore.load_albums() + ArtistStore.load_artists() diff --git a/app/store/__init__.py b/app/store/__init__.py new file mode 100644 index 0000000..56bf319 --- /dev/null +++ b/app/store/__init__.py @@ -0,0 +1,4 @@ +""" +This module contains classes and methods for working with +data loaded in memory. +""" diff --git a/app/store/albums.py b/app/store/albums.py new file mode 100644 index 0000000..743a341 --- /dev/null +++ b/app/store/albums.py @@ -0,0 +1,140 @@ +import json +import random + +from tqdm import tqdm + +from app.models import Album, Track +from app.db.sqlite.albums import SQLiteAlbumMethods as aldb +from .tracks import TrackStore + + +class AlbumStore: + albums: list[Album] = [] + + @staticmethod + def create_album(track: Track): + """ + Creates album object from a track + """ + return Album( + albumhash=track.albumhash, + albumartists=track.albumartist, # type: ignore + title=track.album, + ) + + @classmethod + def load_albums(cls): + """ + Loads all albums from the database into the store. + """ + + cls.albums = [] + + albumhashes = set(t.albumhash for t in TrackStore.tracks) + + for albumhash in tqdm(albumhashes, desc="Loading albums"): + for track in TrackStore.tracks: + if track.albumhash == albumhash: + cls.albums.append(cls.create_album(track)) + break + + db_albums: list[tuple] = aldb.get_all_albums() + + for album in tqdm(db_albums, desc="Mapping album colors"): + albumhash = album[1] + colors = json.loads(album[2]) + + for _al in cls.albums: + if _al.albumhash == albumhash: + _al.set_colors(colors) + break + + @classmethod + def add_album(cls, album: Album): + """ + Adds an album to the store. + """ + cls.albums.append(album) + + @classmethod + def add_albums(cls, albums: list[Album]): + """ + Adds multiple albums to the store. + """ + cls.albums.extend(albums) + + @classmethod + def get_albums_by_albumartist( + cls, artisthash: str, limit: int, exclude: str + ) -> list[Album]: + """ + Returns N albums by the given albumartist, excluding the specified album. + """ + + albums = [album for album in cls.albums if artisthash in album.albumartisthash] + + albums = [album for album in albums if album.albumhash != exclude] + + if len(albums) > limit: + random.shuffle(albums) + + # TODO: Merge this with `cls.get_albums_by_artisthash()` + return albums[:limit] + + @classmethod + def get_album_by_hash(cls, albumhash: str) -> Album | None: + """ + Returns an album by its hash. + """ + try: + return [a for a in cls.albums if a.albumhash == albumhash][0] + except IndexError: + return None + + @classmethod + def get_albums_by_hashes(cls, albumhashes: list[str]) -> list[Album]: + """ + Returns albums by their hashes. + """ + albums_str = "-".join(albumhashes) + albums = [a for a in cls.albums if a.albumhash in albums_str] + + # sort albums by the order of the hashes + albums.sort(key=lambda x: albumhashes.index(x.albumhash)) + return albums + + @classmethod + def get_albums_by_artisthash(cls, artisthash: str) -> list[Album]: + """ + Returns all albums by the given artist. + """ + return [album for album in cls.albums if artisthash in album.albumartisthash] + + @classmethod + def count_albums_by_artisthash(cls, artisthash: str): + """ + Count albums for the given artisthash. + """ + albumartists = [a.albumartists for a in cls.albums] + artisthashes = [] + + for artist in albumartists: + artisthashes.extend([a.artisthash for a in artist]) # type: ignore + + master_string = "-".join(artisthashes) + + return master_string.count(artisthash) + + @classmethod + def album_exists(cls, albumhash: str) -> bool: + """ + Checks if an album exists. + """ + return albumhash in "-".join([a.albumhash for a in cls.albums]) + + @classmethod + def remove_album_by_hash(cls, albumhash: str): + """ + Removes an album from the store. + """ + cls.albums = [a for a in cls.albums if a.albumhash != albumhash] diff --git a/app/store/artists.py b/app/store/artists.py new file mode 100644 index 0000000..b817770 --- /dev/null +++ b/app/store/artists.py @@ -0,0 +1,103 @@ +import json + +from tqdm import tqdm + +from app.db.sqlite.artists import SQLiteArtistMethods as ardb +from app.lib.artistlib import get_all_artists +from app.models import Artist +from app.utils.bisection import UseBisection +from .tracks import TrackStore +from .albums import AlbumStore + + +class ArtistStore: + artists: list[Artist] = [] + + @classmethod + def load_artists(cls): + """ + Loads all artists from the database into the store. + """ + cls.artists = get_all_artists(TrackStore.tracks, AlbumStore.albums) + + db_artists: list[tuple] = list(ardb.get_all_artists()) + + for art in tqdm(db_artists, desc="Loading artists"): + cls.map_artist_color(art) + + @classmethod + def map_artist_color(cls, artist_tuple: tuple): + """ + Maps a color to the corresponding artist. + """ + + artisthash = artist_tuple[1] + color = json.loads(artist_tuple[2]) + + for artist in cls.artists: + if artist.artisthash == artisthash: + artist.colors = color + break + + @classmethod + def add_artist(cls, artist: Artist): + """ + Adds an artist to the store. + """ + cls.artists.append(artist) + + @classmethod + def add_artists(cls, artists: list[Artist]): + """ + Adds multiple artists to the store. + """ + for artist in artists: + if artist not in cls.artists: + cls.artists.append(artist) + + @classmethod + def get_artist_by_hash(cls, artisthash: str) -> Artist: + """ + Returns an artist by its hash.P + """ + artists = sorted(cls.artists, key=lambda x: x.artisthash) + artist = UseBisection(artists, "artisthash", [artisthash])()[0] + return artist + + @classmethod + def get_artists_by_hashes(cls, artisthashes: list[str]) -> list[Artist]: + """ + Returns artists by their hashes. + """ + artists = sorted(cls.artists, key=lambda x: x.artisthash) + artists = UseBisection(artists, "artisthash", artisthashes)() + return [a for a in artists if a is not None] + + @classmethod + def artist_exists(cls, artisthash: str) -> bool: + """ + Checks if an artist exists. + """ + return artisthash in "-".join([a.artisthash for a in cls.artists]) + + @classmethod + def artist_has_tracks(cls, artisthash: str) -> bool: + """ + Checks if an artist has tracks. + """ + artists: set[str] = set() + + for track in TrackStore.tracks: + artists.update(track.artist_hashes) + album_artists: list[str] = [a.artisthash for a in track.albumartist] + artists.update(album_artists) + + master_hash = "-".join(artists) + return artisthash in master_hash + + @classmethod + def remove_artist_by_hash(cls, artisthash: str): + """ + Removes an artist from the store. + """ + cls.artists = [a for a in cls.artists if a.artisthash != artisthash] diff --git a/app/store/store.py b/app/store/store.py new file mode 100644 index 0000000..0a9d64c --- /dev/null +++ b/app/store/store.py @@ -0,0 +1,150 @@ +""" +In memory store. +""" +from pathlib import Path + +from tqdm import tqdm + +from app.models import Folder +from app.utils.bisection import UseBisection +from app.utils.hashing import create_folder_hash +from app.utils.wintools import win_replace_slash +from .tracks import TrackStore + + +class FolderStore: + """ + This class holds all tracks in memory and provides methods for + interacting with them. + """ + + folders: list[Folder] = [] + + @classmethod + def check_has_tracks(cls, path: str): # type: ignore + """ + Checks if a folder has tracks. + """ + path_hashes = "".join(f.path_hash for f in cls.folders) + path_hash = create_folder_hash(*Path(path).parts[1:]) + + return path_hash in path_hashes + + @classmethod + def is_empty_folder(cls, path: str): + """ + Checks if a folder has tracks using tracks in the store. + """ + + all_folders = set(track.folder for track in TrackStore.tracks) + folder_hashes = "".join( + create_folder_hash(*Path(f).parts[1:]) for f in all_folders + ) + + path_hash = create_folder_hash(*Path(path).parts[1:]) + return path_hash in folder_hashes + + @staticmethod + def create_folder(path: str) -> Folder: + """ + Creates a folder object from a path. + """ + folder = Path(path) + + return Folder( + name=folder.name, + path=win_replace_slash(str(folder)), + is_sym=folder.is_symlink(), + has_tracks=True, + path_hash=create_folder_hash(*folder.parts[1:]), + ) + + @classmethod + def add_folder(cls, path: str): + """ + Adds a folder to the store. + """ + + if cls.check_has_tracks(path): + return + + folder = cls.create_folder(path) + cls.folders.append(folder) + + @classmethod + def remove_folder(cls, path: str): + """ + Removes a folder from the store. + """ + + for folder in cls.folders: + if folder.path == path: + cls.folders.remove(folder) + break + + @classmethod + def process_folders(cls): + """ + Creates a list of folders from the tracks in the store. + """ + cls.folders.clear() + + all_folders = [track.folder for track in TrackStore.tracks] + all_folders = set(all_folders) + + all_folders = [ + folder for folder in all_folders if not cls.check_has_tracks(folder) + ] + + all_folders = [Path(f) for f in all_folders] + # all_folders = [f for f in all_folders if f.exists()] + + valid_folders = [] + + for folder in all_folders: + try: + if folder.exists(): + valid_folders.append(folder) + except PermissionError: + pass + + for path in tqdm(valid_folders, desc="Processing folders"): + folder = cls.create_folder(str(path)) + + cls.folders.append(folder) + + @classmethod + def get_folder(cls, path: str): # type: ignore + """ + Returns a folder object by its path. + """ + # TODO: Modify this method to accept a list of paths, sorting is computationally expensive. + folders = sorted(cls.folders, key=lambda x: x.path) + folder = UseBisection(folders, "path", [path])()[0] + + if folder is not None: + return folder + + has_tracks = cls.check_has_tracks(path) + + if not has_tracks: + return None + + folder = cls.create_folder(path) + cls.folders.append(folder) + return folder + + @classmethod + def get_folders_count(cls, paths: list[str]) -> list[dict[str, int]]: + count_dict = {path: 0 for path in paths} + + for track in TrackStore.tracks: + for path in paths: + if track.filepath.startswith(path): + count_dict[path] += 1 + + result = [{"path": path, "count": count_dict[path]} for path in paths] + + # TODO: Modify this method to return Folder objects with + # track count mapped. Keep an eye on function complexity. + return result diff --git a/app/store/tracks.py b/app/store/tracks.py new file mode 100644 index 0000000..d6c412f --- /dev/null +++ b/app/store/tracks.py @@ -0,0 +1,134 @@ +from tqdm import tqdm + +from app.models import Track +from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb +from app.db.sqlite.tracks import SQLiteTrackMethods as tdb +from app.utils.bisection import UseBisection +from app.utils.remove_duplicates import remove_duplicates + + +class TrackStore: + tracks: list[Track] = [] + + @classmethod + def load_all_tracks(cls): + """ + Loads all tracks from the database into the store. + """ + + cls.tracks = list(tdb.get_all_tracks()) + + fav_hashes = favdb.get_fav_tracks() + fav_hashes = " ".join([t[1] for t in fav_hashes]) + + for track in tqdm(cls.tracks, desc="Loading tracks"): + if track.trackhash in fav_hashes: + track.is_favorite = True + + @classmethod + def add_track(cls, track: Track): + """ + Adds a single track to the store. + """ + + cls.tracks.append(track) + + @classmethod + def add_tracks(cls, tracks: list[Track]): + """ + Adds multiple tracks to the store. + """ + + cls.tracks.extend(tracks) + + @classmethod + def get_tracks_by_trackhashes(cls, trackhashes: list[str]) -> list[Track]: + """ + Returns a list of tracks by their hashes. + """ + + trackhashes = " ".join(trackhashes) + tracks = [track for track in cls.tracks if track.trackhash in trackhashes] + + tracks.sort(key=lambda t: trackhashes.index(t.trackhash)) + return tracks + + @classmethod + def remove_track_by_filepath(cls, filepath: str): + """ + Removes a track from the store by its filepath. + """ + + for track in cls.tracks: + if track.filepath == filepath: + cls.tracks.remove(track) + break + + @classmethod + def remove_tracks_by_dir_except(cls, dirs: list[str]): + """Removes all tracks not in the root directories.""" + to_remove = set() + + for track in cls.tracks: + if not track.folder.startswith(tuple(dirs)): + to_remove.add(track.folder) + + tdb.remove_tracks_by_folders(to_remove) + + @classmethod + def count_tracks_by_hash(cls, trackhash: str) -> int: + """ + Counts the number of tracks with a specific hash. + """ + + count = 0 + + for track in cls.tracks: + if track.trackhash == trackhash: + count += 1 + + return count + + @classmethod + def make_track_fav(cls, trackhash: str): + """ + Adds a track to the favorites. + """ + + for track in cls.tracks: + if track.trackhash == trackhash: + track.is_favorite = True + + @classmethod + def remove_track_from_fav(cls, trackhash: str): + """ + Removes a track from the favorites. + """ + + for track in cls.tracks: + if track.trackhash == trackhash: + track.is_favorite = False + + @classmethod + def get_tracks_by_filepaths(cls, paths: list[str]) -> list[Track]: + """ + Returns all tracks matching the given paths. + """ + tracks = sorted(cls.tracks, key=lambda x: x.filepath) + tracks = UseBisection(tracks, "filepath", paths)() + return [track for track in tracks if track is not None] + + @classmethod + def get_tracks_by_albumhash(cls, album_hash: str) -> list[Track]: + """ + Returns all tracks matching the given album hash. + """ + return [t for t in cls.tracks if t.albumhash == album_hash] + + @classmethod + def get_tracks_by_artist(cls, artisthash: str) -> list[Track]: + """ + Returns all tracks matching the given artist. Duplicate tracks are removed. + """ + tracks = [t for t in cls.tracks if artisthash in t.artist_hashes] + return remove_duplicates(tracks) diff --git a/app/serializer.py b/app/utils/dates.py similarity index 61% rename from app/serializer.py rename to app/utils/dates.py index b73c584..b86e296 100644 --- a/app/serializer.py +++ b/app/utils/dates.py @@ -1,20 +1,30 @@ -from datetime import datetime, timezone +from datetime import datetime + +_format = "%Y-%m-%d %H:%M:%S" + + +def create_new_date(): + """ + Creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS" + :return: A string of the current date and time. + """ + now = datetime.now() + return now.strftime(_format) def date_string_to_time_passed(prev_date: str) -> str: """ - Converts a date string to time passed. eg. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc. + Converts a date string to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc. """ - now = datetime.now(timezone.utc) - then = datetime.strptime(prev_date, "%Y-%m-%d %H:%M:%S").replace( - tzinfo=timezone.utc - ) + now = datetime.now() + then = datetime.strptime(prev_date, _format) diff = now - then - seconds = diff.total_seconds() + seconds = diff.seconds + print(seconds) if seconds < 0: - return "-from a time machine 🛸" + return "from the future 🛸" if seconds < 15: return "now" diff --git a/app/utils/generators.py b/app/utils/generators.py index 4a6b324..fa3b17b 100644 --- a/app/utils/generators.py +++ b/app/utils/generators.py @@ -3,14 +3,6 @@ from datetime import datetime import random -def create_new_date(): - """ - It creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS" - :return: A string of the current date and time. - """ - now = datetime.now() - return now.strftime("%Y-%m-%d %H:%M:%S") - def get_random_str(length=5): """ diff --git a/app/utils/remove_duplicates.py b/app/utils/remove_duplicates.py index 93ae5ac..461dd21 100644 --- a/app/utils/remove_duplicates.py +++ b/app/utils/remove_duplicates.py @@ -7,7 +7,7 @@ from app.models import Track def remove_duplicates(tracks: list[Track]) -> list[Track]: """ Remove duplicates from a list of Track objects based on the trackhash attribute. - Retains objects with the highest bitrate. + Retain objects with the highest bitrate. """ hash_to_tracks = defaultdict(list)