From e3ec9db9894a2b65e9d1301189e89c7ac660b491 Mon Sep 17 00:00:00 2001 From: geoffrey45 Date: Thu, 9 Mar 2023 13:08:50 +0300 Subject: [PATCH] add method and route to search across tracks, albums and artists. + break models into separate files + same for the utils and setup --- README.md | 4 +- app/api/__init__.py | 1 - app/api/album.py | 36 +- app/api/artist.py | 2 +- app/api/favorites.py | 2 +- app/api/folder.py | 5 +- app/api/playlist.py | 3 +- app/api/search.py | 60 ++- app/api/settings.py | 3 +- app/api/track.py | 37 +- app/arg_handler.py | 2 +- app/config.py | 61 +++ app/db/__init__.py | 214 ---------- app/db/sqlite/albums.py | 4 +- app/db/sqlite/playlists.py | 3 +- app/db/sqlite/settings.py | 8 +- app/db/sqlite/tracks.py | 4 +- app/db/store.py | 12 +- app/functions.py | 10 +- app/lib/artistlib.py | 60 ++- app/lib/folderslib.py | 2 +- app/lib/playlistlib.py | 59 ++- app/lib/populate.py | 3 +- app/lib/searchlib.py | 112 +++++- app/lib/taglib.py | 9 +- app/lib/trackslib.py | 1 + app/lib/watchdogg.py | 4 +- .../{_preinit => __preinit}/__init__.py | 2 +- .../move_to_xdg_folder.py | 0 app/models.py | 258 ------------- app/models/__init__.py | 16 + app/models/album.py | 123 ++++++ app/models/artist.py | 41 ++ app/models/enums.py | 6 + app/models/folder.py | 10 + app/models/playlist.py | 33 ++ app/models/track.py | 70 ++++ app/settings.py | 3 + app/setup/__init__.py | 137 +------ app/setup/files.py | 93 +++++ app/setup/sqlite.py | 41 ++ app/start_info_logger.py | 2 +- app/utils.py | 364 ------------------ app/utils/__init__.py | 0 app/utils/bisection.py | 57 +++ app/utils/filesystem.py | 50 +++ app/utils/generators.py | 19 + app/utils/hashing.py | 31 ++ app/utils/network.py | 28 ++ app/utils/parsers.py | 86 +++++ app/utils/remove_duplicates.py | 23 ++ app/utils/threading.py | 13 + app/utils/wintools.py | 16 + manage.py | 3 +- tests/test_utils.py | 4 +- 55 files changed, 1113 insertions(+), 1137 deletions(-) create mode 100644 app/config.py rename app/migrations/{_preinit => __preinit}/__init__.py (97%) rename app/migrations/{_preinit => __preinit}/move_to_xdg_folder.py (100%) delete mode 100644 app/models.py create mode 100644 app/models/__init__.py create mode 100644 app/models/album.py create mode 100644 app/models/artist.py create mode 100644 app/models/enums.py create mode 100644 app/models/folder.py create mode 100644 app/models/playlist.py create mode 100644 app/models/track.py create mode 100644 app/setup/files.py create mode 100644 app/setup/sqlite.py delete mode 100644 app/utils.py create mode 100644 app/utils/__init__.py create mode 100644 app/utils/bisection.py create mode 100644 app/utils/filesystem.py create mode 100644 app/utils/generators.py create mode 100644 app/utils/hashing.py create mode 100644 app/utils/network.py create mode 100644 app/utils/parsers.py create mode 100644 app/utils/remove_duplicates.py create mode 100644 app/utils/threading.py create mode 100644 app/utils/wintools.py diff --git a/README.md b/README.md index a329da5..deb9ca4 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Swing music -![SWING MUSIC PLAYER BANNER IMAGE](./screenshots/on-readme2.webp) +![SWING MUSIC PLAYER BANNER IMAGE](screenshots/on-readme2.webp) -![SWING MUSIC PLAYER BANNER IMAGE](./screenshots/on-readme1.webp) +![SWING MUSIC PLAYER BANNER IMAGE](screenshots/on-readme1.webp) --- diff --git a/app/api/__init__.py b/app/api/__init__.py index 8fb2011..b8291fd 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -17,7 +17,6 @@ def create_api(): CORS(app) with app.app_context(): - app.register_blueprint(album.api) app.register_blueprint(artist.api) app.register_blueprint(track.api) diff --git a/app/api/album.py b/app/api/album.py index 4b0e826..cdc74ad 100644 --- a/app/api/album.py +++ b/app/api/album.py @@ -6,13 +6,12 @@ from dataclasses import asdict from flask import Blueprint, request -from app import utils from app.db.sqlite.albums import SQLiteAlbumMethods as adb from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb from app.db.store import Store from app.models import FavType, Track +from app.utils.remove_duplicates import remove_duplicates -get_album_by_id = adb.get_album_by_id get_albums_by_albumartist = adb.get_albums_by_albumartist check_is_fav = favdb.check_is_favorite @@ -20,8 +19,10 @@ api = Blueprint("album", __name__, url_prefix="") @api.route("/album", methods=["POST"]) -def get_album(): - """Returns all the tracks in the given album.""" +def get_album_tracks_and_info(): + """ + Returns all the tracks in the given album + """ data = request.get_json() error_msg = {"msg": "No hash provided"} @@ -58,7 +59,7 @@ def get_album(): return list(genres) album.genres = get_album_genres(tracks) - tracks = utils.remove_duplicates(tracks) + tracks = remove_duplicates(tracks) album.count = len(tracks) album.get_date_from_tracks(tracks) @@ -83,7 +84,7 @@ def get_album(): @api.route("/album//tracks", methods=["GET"]) def get_album_tracks(albumhash: str): """ - Returns all the tracks in the given album. + Returns all the tracks in the given album, sorted by disc and track number. """ tracks = Store.get_tracks_by_albumhash(albumhash) tracks = [asdict(t) for t in tracks] @@ -104,11 +105,11 @@ def get_artist_albums(): if data is None: return {"msg": "No albumartist provided"} - albumartists: str = data["albumartists"] # type: ignore + albumartists: str = data["albumartists"] limit: int = data.get("limit") exclude: str = data.get("exclude") - albumartists: list[str] = albumartists.split(",") # type: ignore + albumartists: list[str] = albumartists.split(",") albums = [ { @@ -121,22 +122,3 @@ def get_artist_albums(): albums = [a for a in albums if len(a["albums"]) > 0] return {"data": albums} - -# @album_bp.route("/album/bio", methods=["POST"]) -# def get_album_bio(): -# """Returns the album bio for the given album.""" -# data = request.get_json() -# album_hash = data["hash"] -# err_msg = {"bio": "No bio found"} - -# album = instances.album_instance.find_album_by_hash(album_hash) - -# if album is None: -# return err_msg, 404 - -# bio = FetchAlbumBio(album["title"], album["artist"])() - -# if bio is None: -# return err_msg, 404 - -# return {"bio": bio} diff --git a/app/api/artist.py b/app/api/artist.py index 486c044..002a2df 100644 --- a/app/api/artist.py +++ b/app/api/artist.py @@ -8,7 +8,7 @@ from flask import Blueprint, request from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb from app.db.store import Store from app.models import Album, FavType, Track -from app.utils import remove_duplicates +from app.utils.remove_duplicates import remove_duplicates api = Blueprint("artist", __name__, url_prefix="/") diff --git a/app/api/favorites.py b/app/api/favorites.py index 26a84ea..b039777 100644 --- a/app/api/favorites.py +++ b/app/api/favorites.py @@ -3,7 +3,7 @@ from flask import Blueprint, request from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb from app.db.store import Store from app.models import FavType -from app.utils import UseBisection +from app.utils.bisection import UseBisection api = Blueprint("favorite", __name__, url_prefix="/") diff --git a/app/api/folder.py b/app/api/folder.py index dd20bbf..78ed7fd 100644 --- a/app/api/folder.py +++ b/app/api/folder.py @@ -10,8 +10,9 @@ from flask import Blueprint, request from app import settings from app.lib.folderslib import GetFilesAndDirs from app.db.sqlite.settings import SettingsSQLMethods as db -from app.models import Folder -from app.utils import create_folder_hash, is_windows, win_replace_slash +from app.models.folder import Folder +from app.utils.hashing import create_folder_hash +from app.utils.wintools import win_replace_slash, is_windows api = Blueprint("folder", __name__, url_prefix="/") diff --git a/app/api/playlist.py b/app/api/playlist.py index c9100ed..32b49a4 100644 --- a/app/api/playlist.py +++ b/app/api/playlist.py @@ -11,7 +11,8 @@ from app import models, serializer from app.db.sqlite.playlists import SQLitePlaylistMethods from app.db.store import Store from app.lib import playlistlib -from app.utils import create_new_date, remove_duplicates +from app.utils.generators import create_new_date +from app.utils.remove_duplicates import remove_duplicates api = Blueprint("playlist", __name__, url_prefix="/") diff --git a/app/api/search.py b/app/api/search.py index ee0c2b9..a1a3b1f 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -2,16 +2,15 @@ Contains all the search routes. """ +from unidecode import unidecode from flask import Blueprint, request -from app import models, utils +from app import models from app.db.store import Store from app.lib import searchlib -from unidecode import unidecode api = Blueprint("search", __name__, url_prefix="/") - SEARCH_COUNT = 12 """The max amount of items to return per request""" @@ -28,48 +27,36 @@ class SearchResults: artists: list[models.Artist] = [] -class DoSearch: - """Class containing the methods that perform searching.""" - +class Search: def __init__(self, query: str) -> None: - """ - :param :str:`query`: the search query. - """ self.tracks: list[models.Track] = [] self.query = unidecode(query) SearchResults.query = self.query def search_tracks(self): - """Calls :class:`SearchTracks` which returns the tracks that fuzzily match + """ + Calls :class:`SearchTracks` which returns the tracks that fuzzily match the search terms. Then adds them to the `SearchResults` store. """ self.tracks = Store.tracks - tracks = searchlib.SearchTracks(self.tracks, self.query)() + tracks = searchlib.SearchTracks(self.query)() - if len(tracks) == 0: - return [] - - tracks = utils.remove_duplicates(tracks) SearchResults.tracks = tracks - return tracks def search_artists(self): """Calls :class:`SearchArtists` which returns the artists that fuzzily match the search term. Then adds them to the `SearchResults` store. """ - artists = [a.name for a in Store.artists] - artists = searchlib.SearchArtists(Store.artists, self.query)() + artists = searchlib.SearchArtists(self.query)() SearchResults.artists = artists - return artists def search_albums(self): """Calls :class:`SearchAlbums` which returns the albums that fuzzily match the search term. Then adds them to the `SearchResults` store. """ - albums = Store.albums - albums = searchlib.SearchAlbums(albums, self.query)() + albums = searchlib.SearchAlbums(self.query)() SearchResults.albums = albums return albums @@ -86,6 +73,10 @@ class DoSearch: # return playlists + def get_top_results(self): + finder = searchlib.SearchAll() + return finder.search(self.query) + def search_all(self): """Calls all the search methods.""" self.search_tracks() @@ -104,7 +95,7 @@ def search_tracks(): if not query: return {"error": "No query provided"}, 400 - tracks = DoSearch(query).search_tracks() + tracks = Search(query).search_tracks() return { "tracks": tracks[:SEARCH_COUNT], @@ -122,7 +113,7 @@ def search_albums(): if not query: return {"error": "No query provided"}, 400 - tracks = DoSearch(query).search_albums() + tracks = Search(query).search_albums() return { "albums": tracks[:SEARCH_COUNT], @@ -140,7 +131,7 @@ def search_artists(): if not query: return {"error": "No query provided"}, 400 - artists = DoSearch(query).search_artists() + artists = Search(query).search_artists() return { "artists": artists[:SEARCH_COUNT], @@ -176,14 +167,17 @@ def get_top_results(): if not query: return {"error": "No query provided"}, 400 - DoSearch(query).search_all() + results = Search(query).get_top_results() - max_results = 2 + # max_results = 2 + # return { + # "tracks": SearchResults.tracks[:max_results], + # "albums": SearchResults.albums[:max_results], + # "artists": SearchResults.artists[:max_results], + # "playlists": SearchResults.playlists[:max_results], + # } return { - "tracks": SearchResults.tracks[:max_results], - "albums": SearchResults.albums[:max_results], - "artists": SearchResults.artists[:max_results], - "playlists": SearchResults.playlists[:max_results], + "results": results } @@ -198,20 +192,20 @@ def search_load_more(): if s_type == "tracks": t = SearchResults.tracks return { - "tracks": t[index : index + SEARCH_COUNT], + "tracks": t[index: index + SEARCH_COUNT], "more": len(t) > index + SEARCH_COUNT, } elif s_type == "albums": a = SearchResults.albums return { - "albums": a[index : index + SEARCH_COUNT], + "albums": a[index: index + SEARCH_COUNT], "more": len(a) > index + SEARCH_COUNT, } elif s_type == "artists": a = SearchResults.artists return { - "artists": a[index : index + SEARCH_COUNT], + "artists": a[index: index + SEARCH_COUNT], "more": len(a) > index + SEARCH_COUNT, } diff --git a/app/api/settings.py b/app/api/settings.py index 6942706..86ce98f 100644 --- a/app/api/settings.py +++ b/app/api/settings.py @@ -4,9 +4,10 @@ from app import settings from app.logger import log from app.lib import populate from app.db.store import Store -from app.utils import background, get_random_str from app.lib.watchdogg import Watcher as WatchDog from app.db.sqlite.settings import SettingsSQLMethods as sdb +from app.utils.generators import get_random_str +from app.utils.threading import background api = Blueprint("settings", __name__, url_prefix="/") diff --git a/app/api/track.py b/app/api/track.py index a7eb33b..708edf0 100644 --- a/app/api/track.py +++ b/app/api/track.py @@ -1,7 +1,9 @@ """ Contains all the track routes. """ -from flask import Blueprint, send_file +import os + +from flask import Blueprint, send_file, request from app.db.store import Store @@ -15,20 +17,31 @@ def send_track_file(trackhash: str): Falls back to track hash if id is not found. """ msg = {"msg": "File Not Found"} + + def get_mime(filename: str) -> str: + ext = filename.rsplit(".", maxsplit=1)[-1] + return f"audio/{ext}" + + filepath = request.args.get("filepath") + + if filepath is not None and os.path.exists(filepath): + audio_type = get_mime(filepath) + return send_file(filepath, mimetype=audio_type) + if trackhash is None: return msg, 404 - try: - track = Store.get_tracks_by_trackhashes([trackhash])[0] - except IndexError: - track = None + tracks = Store.get_tracks_by_trackhashes([trackhash]) - if track is None: - return msg, 404 + for track in tracks: + if track is None: + return msg, 404 - audio_type = track.filepath.rsplit(".", maxsplit=1)[-1] + audio_type = get_mime(track.filepath) - try: - return send_file(track.filepath, mimetype=f"audio/{audio_type}") - except FileNotFoundError: - return msg, 404 + try: + return send_file(track.filepath, mimetype=audio_type) + except FileNotFoundError: + return msg, 404 + + return msg, 404 diff --git a/app/arg_handler.py b/app/arg_handler.py index 0f14b5b..5af4155 100644 --- a/app/arg_handler.py +++ b/app/arg_handler.py @@ -9,7 +9,7 @@ import PyInstaller.__main__ as bundler from app import settings from app.print_help import HELP_MESSAGE -from app.utils import is_windows +from app.utils.wintools import is_windows config = ConfigParser() config.read("pyinstaller.config.ini") diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..cb1ace6 --- /dev/null +++ b/app/config.py @@ -0,0 +1,61 @@ +""" +Module for managing the JSON config file. +""" + +import json +from enum import Enum +from typing import Type + +from app.settings import JSON_CONFIG_PATH + + +class ConfigKeys(Enum): + ROOT_DIRS = ("root_dirs", list[str]) + PLAYLIST_DIRS = ("playlist_dirs", list[str]) + USE_ART_COLORS = ("use_art_colors", bool) + DEFAULT_ART_COLOR = ("default_art_color", str) + SHUFFLE_MODE = ("shuffle_mode", str) + REPEAT_MODE = ("repeat_mode", str) + AUTOPLAY_ON_START = ("autoplay_on_start", bool) + VOLUME = ("volume", int) + + def __init__(self, key_name: str, data_type: Type): + self.key_name = key_name + self.data_type = data_type + + def get_data_type(self) -> Type: + return self.data_type + + +class ConfigManager: + def __init__(self, config_file_path: str): + self.config_file_path = config_file_path + + def read_config(self): + try: + with open(self.config_file_path) as f: + return json.load(f) + except FileNotFoundError: + return {} + + # in case of errors, return an empty dict + + def write_config(self, config_data): + with open(self.config_file_path, "w") as f: + json.dump(config_data, f, indent=4) + + def get_value(self, key: ConfigKeys): + config_data = self.read_config() + value = config_data.get(key.key_name) + + if value is not None: + return key.get_data_type()(value) + + def set_value(self, key: ConfigKeys, value): + config_data = self.read_config() + config_data[key.key_name] = value + self.write_config(config_data) + + +settings = ConfigManager(JSON_CONFIG_PATH) +a = settings.get_value(ConfigKeys.ROOT_DIRS) diff --git a/app/db/__init__.py b/app/db/__init__.py index e6216ae..e69de29 100644 --- a/app/db/__init__.py +++ b/app/db/__init__.py @@ -1,214 +0,0 @@ -class AlbumMethods: - """ - Lists all the methods that can be found in the Albums class. - """ - - def insert_album(): - """ - Inserts a new album object into the database. - """ - pass - - def get_all_albums(): - """ - Returns all the albums in the database. - """ - pass - - def get_album_by_id(): - """ - Returns a single album matching the passed id. - """ - pass - - def get_album_by_name(): - """ - Returns a single album matching the passed name. - """ - pass - - def get_album_by_artist(): - """ - Returns a single album matching the passed artist name. - """ - pass - - -class ArtistMethods: - """ - Lists all the methods that can be found in the Artists class. - """ - - def insert_artist(): - """ - Inserts a new artist object into the database. - """ - pass - - def get_all_artists(): - """ - Returns all the artists in the database. - """ - pass - - def get_artist_by_id(): - """ - Returns an artist matching the mongo Id. - """ - pass - - def get_artists_by_name(): - """ - Returns all the artists matching the query. - """ - pass - - -class PlaylistMethods: - """ - Lists all the methods that can be found in the Playlists class. - """ - - def insert_playlist(): - """ - Inserts a new playlist object into the database. - """ - pass - - def get_all_playlists(): - """ - Returns all the playlists in the database. - """ - pass - - def get_playlist_by_id(): - """ - Returns a single playlist matching the id in the query params. - """ - pass - - def add_track_to_playlist(): - """ - Adds a track to a playlist. - """ - pass - - def get_playlist_by_name(): - """ - Returns a single playlist matching the name in the query params. - """ - pass - - def update_playlist(): - """ - Updates a playlist. - """ - pass - - -class TrackMethods: - """ - Lists all the methods that can be found in the Tracks class. - """ - - def insert_one_track(): - """ - Inserts a new track object into the database. - """ - pass - - def drop_db(): - """ - Drops the entire database. - """ - pass - - def get_all_tracks(): - """ - Returns all the tracks in the database. - """ - pass - - def get_track_by_id(): - """ - Returns a single track matching the id in the query params. - """ - pass - - def get_track_by_album(): - """ - Returns a single track matching the album in the query params. - """ - pass - - def search_tracks_by_album(): - """ - Returns all the tracks matching the albums in the query params (using regex). - """ - pass - - def search_tracks_by_artist(): - """ - Returns all the tracks matching the artists in the query params. - """ - pass - - def find_track_by_title(): - """ - Finds all the tracks matching the title in the query params. - """ - pass - - def find_tracks_by_album(): - """ - Finds all the tracks matching the album in the query params. - """ - pass - - def find_tracks_by_folder(): - """ - Finds all the tracks matching the folder in the query params. - """ - pass - - def find_tracks_by_artist(): - """ - Finds all the tracks matching the artist in the query params. - """ - pass - - def find_tracks_by_albumartist(): - """ - Finds all the tracks matching the album artist in the query params. - """ - pass - - def get_track_by_path(): - """ - Returns a single track matching the path in the query params. - """ - pass - - def remove_track_by_path(): - """ - Removes a track from the database. Returns a boolean indicating success or failure of the operation. - """ - pass - - def remove_track_by_id(): - """ - Removes a track from the database. Returns a boolean indicating success or failure of the operation. - """ - pass - - def find_tracks_by_albumhash(): - """ - Returns all the tracks matching the passed hash. - """ - pass - - def get_dir_t_count(): - """ - Returns a list of all the tracks matching the path in the query params. - """ - pass diff --git a/app/db/sqlite/albums.py b/app/db/sqlite/albums.py index c016bba..05ebcec 100644 --- a/app/db/sqlite/albums.py +++ b/app/db/sqlite/albums.py @@ -1,11 +1,9 @@ from sqlite3 import Cursor -from app.db import AlbumMethods - from .utils import SQLiteManager, tuple_to_album, tuples_to_albums -class SQLiteAlbumMethods(AlbumMethods): +class SQLiteAlbumMethods: @classmethod def insert_one_album(cls, cur: Cursor, albumhash: str, colors: str): """ diff --git a/app/db/sqlite/playlists.py b/app/db/sqlite/playlists.py index dc55bb9..2182faf 100644 --- a/app/db/sqlite/playlists.py +++ b/app/db/sqlite/playlists.py @@ -4,7 +4,8 @@ from collections import OrderedDict from app.db.sqlite.tracks import SQLiteTrackMethods from app.db.sqlite.utils import SQLiteManager, tuple_to_playlist, tuples_to_playlists from app.models import Artist -from app.utils import background, create_new_date +from app.utils.generators import create_new_date +from app.utils.threading import background class SQLitePlaylistMethods: diff --git a/app/db/sqlite/settings.py b/app/db/sqlite/settings.py index d6e0ea4..c49d07f 100644 --- a/app/db/sqlite/settings.py +++ b/app/db/sqlite/settings.py @@ -1,5 +1,5 @@ from app.db.sqlite.utils import SQLiteManager -from app.utils import win_replace_slash +from app.utils.wintools import win_replace_slash class SettingsSQLMethods: @@ -19,7 +19,7 @@ class SettingsSQLMethods: cur.execute(sql) dirs = cur.fetchall() - dirs = [dir[0] for dir in dirs] + dirs = [_dir[0] for _dir in dirs] return [win_replace_slash(d) for d in dirs] @staticmethod @@ -31,7 +31,7 @@ class SettingsSQLMethods: sql = "INSERT INTO settings (root_dirs) VALUES (?)" existing_dirs = SettingsSQLMethods.get_root_dirs() - dirs = [dir for dir in dirs if dir not in existing_dirs] + dirs = [_dir for _dir in dirs if _dir not in existing_dirs] if len(dirs) == 0: return @@ -85,4 +85,4 @@ class SettingsSQLMethods: with SQLiteManager(userdata_db=True) as cur: cur.execute(sql) dirs = cur.fetchall() - return [dir[0] for dir in dirs] + return [_dir[0] for _dir in dirs] diff --git a/app/db/sqlite/tracks.py b/app/db/sqlite/tracks.py index f7dbbc9..3bd0850 100644 --- a/app/db/sqlite/tracks.py +++ b/app/db/sqlite/tracks.py @@ -3,7 +3,6 @@ Contains the SQLiteTrackMethods class which contains methods for interacting with the tracks table. """ - from collections import OrderedDict from sqlite3 import Cursor @@ -38,7 +37,8 @@ class SQLiteTrackMethods: title, track, trackhash - ) VALUES(:album, :albumartist, :albumhash, :artist, :bitrate, :copyright, :date, :disc, :duration, :filepath, :folder, :genre, :title, :track, :trackhash) + ) VALUES(:album, :albumartist, :albumhash, :artist, :bitrate, :copyright, + :date, :disc, :duration, :filepath, :folder, :genre, :title, :track, :trackhash) """ track = OrderedDict(sorted(track.items())) diff --git a/app/db/store.py b/app/db/store.py index 90c17fc..3b440b5 100644 --- a/app/db/store.py +++ b/app/db/store.py @@ -11,14 +11,12 @@ from app.db.sqlite.albums import SQLiteAlbumMethods as aldb from app.db.sqlite.artists import SQLiteArtistMethods as ardb from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb from app.db.sqlite.tracks import SQLiteTrackMethods as tdb +from app.lib.artistlib import get_all_artists from app.models import Album, Artist, Folder, Track -from app.utils import ( - UseBisection, - create_folder_hash, - get_all_artists, - remove_duplicates, - win_replace_slash, -) +from app.utils.bisection import UseBisection +from app.utils.hashing import create_folder_hash +from app.utils.remove_duplicates import remove_duplicates +from app.utils.wintools import win_replace_slash class Store: diff --git a/app/functions.py b/app/functions.py index d739a3e..428486c 100644 --- a/app/functions.py +++ b/app/functions.py @@ -5,14 +5,16 @@ import time from requests import ConnectionError as RequestConnectionError from requests import ReadTimeout -from app import utils from app.lib.artistlib import CheckArtistImages from app.lib.populate import Populate, PopulateCancelledError from app.lib.trackslib import validate_tracks from app.logger import log +from app.utils.generators import get_random_str +from app.utils.network import Ping +from app.utils.threading import background -@utils.background +@background def run_periodic_checks(): """ Checks for new songs every N minutes. @@ -23,11 +25,11 @@ def run_periodic_checks(): while True: try: - Populate(key=utils.get_random_str()) + Populate(key=get_random_str()) except PopulateCancelledError: pass - if utils.Ping()(): + if Ping()(): try: CheckArtistImages() except (RequestConnectionError, ReadTimeout): diff --git a/app/lib/artistlib.py b/app/lib/artistlib.py index 1e4ac73..3c60ef0 100644 --- a/app/lib/artistlib.py +++ b/app/lib/artistlib.py @@ -9,9 +9,9 @@ from tqdm import tqdm from requests.exceptions import ConnectionError as ReqConnError, ReadTimeout from app import settings -from app.models import Artist -from app.db.store import Store -from app.utils import create_hash +from app.models import Artist, Track, Album +from app.db import store +from app.utils.hashing import create_hash def get_artist_image_link(artist: str): @@ -38,6 +38,7 @@ def get_artist_image_link(artist: str): return None +# TODO: Move network calls to utils/network.py class DownloadImage: def __init__(self, url: str, name: str) -> None: sm_path = Path(settings.ARTIST_IMG_SM_PATH) / name @@ -71,8 +72,8 @@ class CheckArtistImages: with ThreadPoolExecutor() as pool: list( tqdm( - pool.map(self.download_image, Store.artists), - total=len(Store.artists), + pool.map(self.download_image, store.Store.artists), + total=len(store.Store.artists), desc="Downloading artist images", ) ) @@ -95,13 +96,9 @@ class CheckArtistImages: return DownloadImage(url, name=f"{artist.artisthash}.webp") -# def fetch_album_bio(title: str, albumartist: str) -> str | None: -# """ -# Returns the album bio for a given album. -# """ -# last_fm_url = "http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={}&artist={}&album={}&format=json".format( -# settings.LAST_FM_API_KEY, albumartist, title -# ) +# def fetch_album_bio(title: str, albumartist: str) -> str | None: """ Returns the album bio for a given album. """ +# last_fm_url = "http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={}&artist={}&album={ +# }&format=json".format( settings.LAST_FM_API_KEY, albumartist, title ) # try: # response = requests.get(last_fm_url) @@ -128,3 +125,42 @@ class CheckArtistImages: # def __call__(self): # return fetch_album_bio(self.title, self.albumartist) + + +def get_artists_from_tracks(tracks: list[Track]) -> set[str]: + """ + Extracts all artists from a list of tracks. Returns a list of Artists. + """ + artists = set() + + master_artist_list = [[x.name for x in t.artist] for t in tracks] + artists = artists.union(*master_artist_list) + + return artists + + +def get_albumartists(albums: list[Album]) -> set[str]: + artists = set() + + for album in albums: + albumartists = [a.name for a in album.albumartists] + + artists.update(albumartists) + + return artists + + +def get_all_artists( + tracks: list[Track], albums: list[Album] +) -> list[Artist]: + artists_from_tracks = get_artists_from_tracks(tracks=tracks) + artist_from_albums = get_albumartists(albums=albums) + + artists = list(artists_from_tracks.union(artist_from_albums)) + artists = sorted(artists) + + lower_artists = set(a.lower().strip() for a in artists) + indices = [[ar.lower().strip() for ar in artists].index(a) for a in lower_artists] + artists = [artists[i] for i in indices] + + return [Artist(a) for a in artists] diff --git a/app/lib/folderslib.py b/app/lib/folderslib.py index 18ee2f7..0193036 100644 --- a/app/lib/folderslib.py +++ b/app/lib/folderslib.py @@ -5,7 +5,7 @@ from app.db.store import Store from app.models import Folder, Track from app.settings import SUPPORTED_FILES from app.logger import log -from app.utils import win_replace_slash +from app.utils.wintools import win_replace_slash class GetFilesAndDirs: diff --git a/app/lib/playlistlib.py b/app/lib/playlistlib.py index eb002a9..63c6866 100644 --- a/app/lib/playlistlib.py +++ b/app/lib/playlistlib.py @@ -4,13 +4,11 @@ This library contains all the functions related to playlists. import os import random import string -from datetime import datetime from typing import Any from PIL import Image, ImageSequence from app import settings -from app.logger import log def create_thumbnail(image: Any, img_path: str) -> str: @@ -80,36 +78,33 @@ def save_p_image(file, pid: str): return filename - -class ValidatePlaylistThumbs: - """ - Removes all unused images in the images/playlists folder. - """ - - def __init__(self) -> None: - images = [] - playlists = Get.get_all_playlists() - - log.info("Validating playlist thumbnails") - for playlist in playlists: - if playlist.image: - img_path = playlist.image.split("/")[-1] - thumb_path = playlist.thumb.split("/")[-1] - - images.append(img_path) - images.append(thumb_path) - - p_path = os.path.join(settings.APP_DIR, "images", "playlists") - - for image in os.listdir(p_path): - if image not in images: - os.remove(os.path.join(p_path, image)) - - log.info("Validating playlist thumbnails ... ✅") - - -def create_new_date(): - return datetime.now() +# +# class ValidatePlaylistThumbs: +# """ +# Removes all unused images in the images/playlists folder. +# """ +# +# def __init__(self) -> None: +# images = [] +# playlists = Get.get_all_playlists() +# +# log.info("Validating playlist thumbnails") +# for playlist in playlists: +# if playlist.image: +# img_path = playlist.image.split("/")[-1] +# thumb_path = playlist.thumb.split("/")[-1] +# +# images.append(img_path) +# images.append(thumb_path) +# +# p_path = os.path.join(settings.APP_DIR, "images", "playlists") +# +# for image in os.listdir(p_path): +# if image not in images: +# os.remove(os.path.join(p_path, image)) +# +# log.info("Validating playlist thumbnails ... ✅") +# # TODO: Fix ValidatePlaylistThumbs diff --git a/app/lib/populate.py b/app/lib/populate.py index 7349021..00da0cd 100644 --- a/app/lib/populate.py +++ b/app/lib/populate.py @@ -11,7 +11,7 @@ from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors from app.lib.taglib import extract_thumb, get_tags from app.logger import log from app.models import Album, Artist, Track -from app.utils import run_fast_scandir +from app.utils.filesystem import run_fast_scandir get_all_tracks = SQLiteTrackMethods.get_all_tracks insert_many_tracks = SQLiteTrackMethods.insert_many_tracks @@ -72,7 +72,6 @@ class Populate: ProcessAlbumColors() ProcessArtistColors() - @staticmethod def filter_untagged(tracks: list[Track], files: list[str]): tagged_files = [t.filepath for t in tracks] diff --git a/app/lib/searchlib.py b/app/lib/searchlib.py index fc21a07..2cb2883 100644 --- a/app/lib/searchlib.py +++ b/app/lib/searchlib.py @@ -1,12 +1,15 @@ """ This library contains all the functions related to the search functionality. """ -from typing import List +from typing import List, Generator, TypeVar, Any +import itertools from rapidfuzz import fuzz, process from unidecode import unidecode from app import models +from app.db.store import Store +from app.utils.remove_duplicates import remove_duplicates ratio = fuzz.ratio wratio = fuzz.WRatio @@ -35,31 +38,32 @@ class Limit: class SearchTracks: - def __init__(self, tracks: List[models.Track], query: str) -> None: + def __init__(self, query: str) -> None: self.query = query - self.tracks = tracks + self.tracks = Store.tracks def __call__(self) -> List[models.Track]: """ Gets all songs with a given title. """ - tracks = [unidecode(track.og_title).lower() for track in self.tracks] + track_titles = [unidecode(track.og_title).lower() for track in self.tracks] results = process.extract( self.query, - tracks, + track_titles, scorer=fuzz.WRatio, score_cutoff=Cutoff.tracks, limit=Limit.tracks, ) - return [self.tracks[i[2]] for i in results] + tracks = [self.tracks[i[2]] for i in results] + return remove_duplicates(tracks) class SearchArtists: - def __init__(self, artists: list[models.Artist], query: str) -> None: + def __init__(self, query: str) -> None: self.query = query - self.artists = artists + self.artists = Store.artists def __call__(self) -> list: """ @@ -75,14 +79,13 @@ class SearchArtists: limit=Limit.artists, ) - artists = [a[0] for a in results] return [self.artists[i[2]] for i in results] class SearchAlbums: - def __init__(self, albums: List[models.Album], query: str) -> None: + def __init__(self, query: str) -> None: self.query = query - self.albums = albums + self.albums = Store.albums def __call__(self) -> List[models.Album]: """ @@ -125,3 +128,90 @@ class SearchPlaylists: ) return [self.playlists[i[2]] for i in results] + + +_type = List[models.Track | models.Album | models.Artist] +_S2 = TypeVar("_S2") +_ResultType = int | float + + +def get_titles(items: _type): + for item in items: + if isinstance(item, models.Track): + text = item.og_title + elif isinstance(item, models.Album): + text = item.title + # print(text) + elif isinstance(item, models.Artist): + text = item.name + else: + text = None + + yield text + + +class SearchAll: + """ + Joins all tracks, albums and artists + then fuzzy searches them as a single unit. + """ + + @staticmethod + def collect_all(): + all_items: _type = [] + + all_items.extend(Store.tracks) + all_items.extend(Store.albums) + all_items.extend(Store.artists) + + return all_items, get_titles(all_items) + + @staticmethod + def get_results(items: Generator[str, Any, None], query: str): + items = list(items) + + results = process.extract( + query=query, + choices=items, + scorer=fuzz.WRatio, + score_cutoff=Cutoff.tracks, + limit=20 + ) + + return results + + @staticmethod + def sort_results(items: _type): + """ + Separates results into differrent lists using itertools.groupby. + """ + mapped_items = [ + {"type": "track", "item": item} if isinstance(item, models.Track) else + {"type": "album", "item": item} if isinstance(item, models.Album) else + {"type": "artist", "item": item} if isinstance(item, models.Artist) else + {"type": "Unknown", "item": item} for item in items + ] + + mapped_items.sort(key=lambda x: x["type"]) + + groups = [ + list(group) for key, group in + itertools.groupby(mapped_items, lambda x: x["type"]) + ] + + print(len(groups)) + + # merge items of a group into a dict that looks like: {"albums": [album1, ...]} + groups = [ + {f"{group[0]['type']}s": [i['item'] for i in group]} for group in groups + ] + + return groups + + @staticmethod + def search(query: str): + items, titles = SearchAll.collect_all() + results = SearchAll.get_results(titles, query) + results = [items[i[2]] for i in results] + + return SearchAll.sort_results(results) diff --git a/app/lib/taglib.py b/app/lib/taglib.py index cb71be9..f885a87 100644 --- a/app/lib/taglib.py +++ b/app/lib/taglib.py @@ -6,12 +6,9 @@ from PIL import Image, UnidentifiedImageError from tinytag import TinyTag from app import settings -from app.utils import ( - create_hash, - parse_artist_from_filename, - parse_title_from_filename, - win_replace_slash, -) +from app.utils.hashing import create_hash +from app.utils.parsers import parse_title_from_filename, parse_artist_from_filename +from app.utils.wintools import win_replace_slash def parse_album_art(filepath: str): diff --git a/app/lib/trackslib.py b/app/lib/trackslib.py index faa166b..08138d6 100644 --- a/app/lib/trackslib.py +++ b/app/lib/trackslib.py @@ -15,5 +15,6 @@ def validate_tracks() -> None: """ for track in tqdm(Store.tracks, desc="Removing deleted tracks"): if not os.path.exists(track.filepath): + print(f"Removing {track.filepath}") Store.tracks.remove(track) tdb.remove_track_by_filepath(track.filepath) diff --git a/app/lib/watchdogg.py b/app/lib/watchdogg.py index 1fde544..a5313f1 100644 --- a/app/lib/watchdogg.py +++ b/app/lib/watchdogg.py @@ -8,7 +8,6 @@ import time from watchdog.events import PatternMatchingEventHandler from watchdog.observers import Observer - from app.logger import log from app.db.store import Store from app.lib.taglib import get_tags @@ -91,6 +90,9 @@ class Watcher: "WatchdogError: Failed to start watchdog, root directories could not be resolved." ) return + except OSError as e: + log.error('Failed to start watchdog. %s', e) + return try: while True: diff --git a/app/migrations/_preinit/__init__.py b/app/migrations/__preinit/__init__.py similarity index 97% rename from app/migrations/_preinit/__init__.py rename to app/migrations/__preinit/__init__.py index 6b37938..c834a37 100644 --- a/app/migrations/_preinit/__init__.py +++ b/app/migrations/__preinit/__init__.py @@ -22,7 +22,7 @@ def run_preinit_migrations(): """ try: userdb_version = MigrationManager.get_preinit_version() - except (OperationalError): + except OperationalError: userdb_version = 0 for migration in all_preinits: diff --git a/app/migrations/_preinit/move_to_xdg_folder.py b/app/migrations/__preinit/move_to_xdg_folder.py similarity index 100% rename from app/migrations/_preinit/move_to_xdg_folder.py rename to app/migrations/__preinit/move_to_xdg_folder.py diff --git a/app/models.py b/app/models.py deleted file mode 100644 index 03815e0..0000000 --- a/app/models.py +++ /dev/null @@ -1,258 +0,0 @@ -""" -Contains all the models for objects generation and typing. -""" -import dataclasses -import json -from dataclasses import dataclass - -from app import utils, settings - - -@dataclass(slots=True) -class Artist: - """ - Artist class - """ - - name: str - artisthash: str = "" - image: str = "" - trackcount: int = 0 - albumcount: int = 0 - duration: int = 0 - colors: list[str] = dataclasses.field(default_factory=list) - is_favorite: bool = False - - def __post_init__(self): - self.artisthash = utils.create_hash(self.name, decode=True) - self.image = self.artisthash + ".webp" - self.colors = json.loads(str(self.colors)) - - -@dataclass(slots=True) -class Track: - """ - Track class - """ - - album: str - albumartist: str | list[Artist] - albumhash: str - artist: str | list[Artist] - bitrate: int - copyright: str - date: str - disc: int - duration: int - filepath: str - folder: str - genre: str | list[str] - title: str - track: int - trackhash: str - - filetype: str = "" - image: str = "" - artist_hashes: list[str] = dataclasses.field(default_factory=list) - is_favorite: bool = False - og_title: str = "" - - def __post_init__(self): - self.og_title = self.title - if self.artist is not None: - artists = utils.split_artists(self.artist) - new_title = self.title - - if settings.EXTRACT_FEAT: - featured, new_title = utils.parse_feat_from_title(self.title) - original_lower = "-".join([a.lower() for a in artists]) - artists.extend([a for a in featured if a.lower() not in original_lower]) - - if settings.REMOVE_PROD: - new_title = utils.remove_prod(new_title) - - # if track is a single - if self.og_title == self.album: - self.album = new_title - - self.title = new_title - - self.artist_hashes = [utils.create_hash(a, decode=True) for a in artists] - self.artist = [Artist(a) for a in artists] - - albumartists = utils.split_artists(self.albumartist) - self.albumartist = [Artist(a) for a in albumartists] - - self.filetype = self.filepath.rsplit(".", maxsplit=1)[-1] - self.image = self.albumhash + ".webp" - - if self.genre is not None: - self.genre = str(self.genre).replace("/", ",").replace(";", ",") - self.genre = str(self.genre).lower().split(",") - self.genre = [g.strip() for g in self.genre] - - -@dataclass(slots=True) -class Album: - """ - Creates an album object - """ - - albumhash: str - title: str - albumartists: list[Artist] - - albumartisthash: str = "" - image: str = "" - count: int = 0 - duration: int = 0 - colors: list[str] = dataclasses.field(default_factory=list) - date: str = "" - - is_soundtrack: bool = False - is_compilation: bool = False - is_single: bool = False - is_EP: bool = False - is_favorite: bool = False - is_live: bool = False - genres: list[str] = dataclasses.field(default_factory=list) - - def __post_init__(self): - self.image = self.albumhash + ".webp" - self.albumartisthash = "-".join(a.artisthash for a in self.albumartists) - - def set_colors(self, colors: list[str]): - self.colors = colors - - def check_type(self): - """ - Runs all the checks to determine the type of album. - """ - self.is_soundtrack = self.check_is_soundtrack() - if self.is_soundtrack: - return - - self.is_live = self.check_is_live_album() - if self.is_live: - return - - self.is_compilation = self.check_is_compilation() - if self.is_compilation: - return - - self.is_EP = self.check_is_ep() - - def check_is_soundtrack(self) -> bool: - """ - Checks if the album is a soundtrack. - """ - keywords = ["motion picture", "soundtrack"] - for keyword in keywords: - if keyword in self.title.lower(): - return True - - return False - - def check_is_compilation(self) -> bool: - """ - Checks if the album is a compilation. - """ - artists = [a.name for a in self.albumartists] # type: ignore - artists = "".join(artists).lower() - - if "various artists" in artists: - return True - - substrings = ["the essential", "best of", "greatest hits", "#1 hits", "number ones", "super hits", - "ultimate collection"] - - for substring in substrings: - if substring in self.title.lower(): - return True - - return False - - def check_is_live_album(self): - """ - Checks if the album is a live album. - """ - keywords = ["live from", "live at", "live in"] - for keyword in keywords: - if keyword in self.title.lower(): - return True - - return False - - def check_is_ep(self) -> bool: - """ - Checks if the album is an EP. - """ - return self.title.strip().endswith(" EP") - - def check_is_single(self, tracks: list[Track]): - """ - Checks if the album is a single. - """ - if ( - len(tracks) == 1 - and tracks[0].title == self.title - - # and tracks[0].track == 1 - # and tracks[0].disc == 1 - # Todo: Are the above commented checks necessary? - ): - self.is_single = True - - def get_date_from_tracks(self, tracks: list[Track]): - for track in tracks: - if track.date != "Unknown": - self.date = track.date - break - - -@dataclass(slots=True) -class Playlist: - """Creates playlist objects""" - - id: int - artisthashes: str | list[str] - banner_pos: int - has_gif: str | bool - image: str - last_updated: str - name: str - trackhashes: str | list[str] - - thumb: str = "" - count: int = 0 - duration: int = 0 - - def __post_init__(self): - self.trackhashes = json.loads(str(self.trackhashes)) - self.artisthashes = json.loads(str(self.artisthashes)) - - self.count = len(self.trackhashes) - self.has_gif = bool(int(self.has_gif)) - - if self.image is not None: - self.thumb = "thumb_" + self.image - else: - self.image = "None" - self.thumb = "None" - - -@dataclass(slots=True, frozen=True) -class Folder: - name: str - path: str - has_tracks: bool - is_sym: bool = False - path_hash: str = "" - - -class FavType: - """Favorite types enum""" - - track = "track" - album = "album" - artist = "artist" diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..834432c --- /dev/null +++ b/app/models/__init__.py @@ -0,0 +1,16 @@ +from .album import Album +from .track import Track +from .artist import Artist, ArtistMinimal +from .enums import FavType +from .playlist import Playlist +from .folder import Folder + +__all__ = [ + "Album", + "Track", + "Artist", + "ArtistMinimal", + "Playlist", + "Folder", + "FavType", +] diff --git a/app/models/album.py b/app/models/album.py new file mode 100644 index 0000000..5afadbf --- /dev/null +++ b/app/models/album.py @@ -0,0 +1,123 @@ +import dataclasses +from dataclasses import dataclass + +from .track import Track +from .artist import Artist + + +@dataclass(slots=True) +class Album: + """ + Creates an album object + """ + + albumhash: str + title: str + albumartists: list[Artist] + + albumartisthash: str = "" + image: str = "" + count: int = 0 + duration: int = 0 + colors: list[str] = dataclasses.field(default_factory=list) + date: str = "" + + is_soundtrack: bool = False + is_compilation: bool = False + is_single: bool = False + is_EP: bool = False + is_favorite: bool = False + is_live: bool = False + genres: list[str] = dataclasses.field(default_factory=list) + + def __post_init__(self): + self.image = self.albumhash + ".webp" + self.albumartisthash = "-".join(a.artisthash for a in self.albumartists) + + def set_colors(self, colors: list[str]): + self.colors = colors + + def check_type(self): + """ + Runs all the checks to determine the type of album. + """ + self.is_soundtrack = self.check_is_soundtrack() + if self.is_soundtrack: + return + + self.is_live = self.check_is_live_album() + if self.is_live: + return + + self.is_compilation = self.check_is_compilation() + if self.is_compilation: + return + + self.is_EP = self.check_is_ep() + + def check_is_soundtrack(self) -> bool: + """ + Checks if the album is a soundtrack. + """ + keywords = ["motion picture", "soundtrack"] + for keyword in keywords: + if keyword in self.title.lower(): + return True + + return False + + def check_is_compilation(self) -> bool: + """ + Checks if the album is a compilation. + """ + artists = [a.name for a in self.albumartists] # type: ignore + artists = "".join(artists).lower() + + if "various artists" in artists: + return True + + substrings = ["the essential", "best of", "greatest hits", "#1 hits", "number ones", "super hits", + "ultimate collection"] + + for substring in substrings: + if substring in self.title.lower(): + return True + + return False + + def check_is_live_album(self): + """ + Checks if the album is a live album. + """ + keywords = ["live from", "live at", "live in"] + for keyword in keywords: + if keyword in self.title.lower(): + return True + + return False + + def check_is_ep(self) -> bool: + """ + Checks if the album is an EP. + """ + return self.title.strip().endswith(" EP") + + def check_is_single(self, tracks: list[Track]): + """ + Checks if the album is a single. + """ + if ( + len(tracks) == 1 + and tracks[0].title == self.title + + # and tracks[0].track == 1 + # and tracks[0].disc == 1 + # Todo: Are the above commented checks necessary? + ): + self.is_single = True + + def get_date_from_tracks(self, tracks: list[Track]): + for track in tracks: + if track.date != "Unknown": + self.date = track.date + break diff --git a/app/models/artist.py b/app/models/artist.py new file mode 100644 index 0000000..87743dd --- /dev/null +++ b/app/models/artist.py @@ -0,0 +1,41 @@ +import dataclasses +import json +from dataclasses import dataclass + +from app.utils.hashing import create_hash + + +@dataclass(slots=True) +class Artist: + """ + Artist class + """ + + name: str + artisthash: str = "" + image: str = "" + trackcount: int = 0 + albumcount: int = 0 + duration: int = 0 + colors: list[str] = dataclasses.field(default_factory=list) + is_favorite: bool = False + + def __post_init__(self): + self.artisthash = create_hash(self.name, decode=True) + self.image = self.artisthash + ".webp" + self.colors = json.loads(str(self.colors)) + + +@dataclass(slots=True) +class ArtistMinimal: + """ + ArtistMinimal class + """ + + name: str + artisthash: str = "" + image: str = "" + + def __post_init__(self): + self.artisthash = create_hash(self.name, decode=True) + self.image = self.artisthash + ".webp" diff --git a/app/models/enums.py b/app/models/enums.py new file mode 100644 index 0000000..d74550b --- /dev/null +++ b/app/models/enums.py @@ -0,0 +1,6 @@ +class FavType: + """Favorite types enum""" + + track = "track" + album = "album" + artist = "artist" diff --git a/app/models/folder.py b/app/models/folder.py new file mode 100644 index 0000000..7e73da3 --- /dev/null +++ b/app/models/folder.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + + +@dataclass(slots=True, frozen=True) +class Folder: + name: str + path: str + has_tracks: bool + is_sym: bool = False + path_hash: str = "" diff --git a/app/models/playlist.py b/app/models/playlist.py new file mode 100644 index 0000000..8d4ece3 --- /dev/null +++ b/app/models/playlist.py @@ -0,0 +1,33 @@ +import json +from dataclasses import dataclass + + +@dataclass(slots=True) +class Playlist: + """Creates playlist objects""" + + id: int + artisthashes: str | list[str] + banner_pos: int + has_gif: str | bool + image: str + last_updated: str + name: str + trackhashes: str | list[str] + + thumb: str = "" + count: int = 0 + duration: int = 0 + + def __post_init__(self): + self.trackhashes = json.loads(str(self.trackhashes)) + self.artisthashes = json.loads(str(self.artisthashes)) + + self.count = len(self.trackhashes) + self.has_gif = bool(int(self.has_gif)) + + if self.image is not None: + self.thumb = "thumb_" + self.image + else: + self.image = "None" + self.thumb = "None" diff --git a/app/models/track.py b/app/models/track.py new file mode 100644 index 0000000..c85f350 --- /dev/null +++ b/app/models/track.py @@ -0,0 +1,70 @@ +import dataclasses +from dataclasses import dataclass + +from app import settings +from .artist import ArtistMinimal +from app.utils.hashing import create_hash +from app.utils.parsers import split_artists, remove_prod, parse_feat_from_title + + +@dataclass(slots=True) +class Track: + """ + Track class + """ + + album: str + albumartist: str | list[ArtistMinimal] + albumhash: str + artist: str | list[ArtistMinimal] + bitrate: int + copyright: str + date: str + disc: int + duration: int + filepath: str + folder: str + genre: str | list[str] + title: str + track: int + trackhash: str + + filetype: str = "" + image: str = "" + artist_hashes: list[str] = dataclasses.field(default_factory=list) + is_favorite: bool = False + og_title: str = "" + + def __post_init__(self): + self.og_title = self.title + if self.artist is not None: + artists = split_artists(self.artist) + new_title = self.title + + if settings.EXTRACT_FEAT: + featured, new_title = parse_feat_from_title(self.title) + original_lower = "-".join([a.lower() for a in artists]) + artists.extend([a for a in featured if a.lower() not in original_lower]) + + if settings.REMOVE_PROD: + new_title = remove_prod(new_title) + + # if track is a single + if self.og_title == self.album: + self.album = new_title + + self.title = new_title + + self.artist_hashes = [create_hash(a, decode=True) for a in artists] + self.artist = [ArtistMinimal(a) for a in artists] + + albumartists = split_artists(self.albumartist) + self.albumartist = [ArtistMinimal(a) for a in albumartists] + + self.filetype = self.filepath.rsplit(".", maxsplit=1)[-1] + self.image = self.albumhash + ".webp" + + if self.genre is not None: + self.genre = str(self.genre).replace("/", ",").replace(";", ",") + self.genre = str(self.genre).lower().split(",") + self.genre = [g.strip() for g in self.genre] diff --git a/app/settings.py b/app/settings.py index d1a1f23..56cf0fe 100644 --- a/app/settings.py +++ b/app/settings.py @@ -75,6 +75,7 @@ APP_DB_NAME = "swing.db" USER_DATA_DB_NAME = "userdata.db" APP_DB_PATH = os.path.join(APP_DIR, APP_DB_NAME) USERDATA_DB_PATH = os.path.join(APP_DIR, USER_DATA_DB_NAME) +JSON_CONFIG_PATH = os.path.join(APP_DIR, "config.json") class FLASKVARS: @@ -122,3 +123,5 @@ class TCOLOR: BOLD = "\033[1m" UNDERLINE = "\033[4m" # credits: https://stackoverflow.com/a/287944 + + diff --git a/app/setup/__init__.py b/app/setup/__init__.py index 1a6eb7d..b731df6 100644 --- a/app/setup/__init__.py +++ b/app/setup/__init__.py @@ -1,136 +1,17 @@ """ -Contains the functions to prepare the server for use. +Prepares the server for use. """ -import os -import shutil -import time -from configparser import ConfigParser - -from app import settings -from app.db.sqlite import create_connection, create_tables, queries from app.db.store import Store -from app.migrations import apply_migrations, set_postinit_migration_versions -from app.migrations._preinit import ( - run_preinit_migrations, - set_preinit_migration_versions, -) -from app.settings import APP_DB_PATH, USERDATA_DB_PATH -from app.utils import get_home_res_path - -config = ConfigParser() - -config_path = get_home_res_path("pyinstaller.config.ini") -config.read(config_path) - -try: - IS_BUILD = config["DEFAULT"]["BUILD"] == "True" -except KeyError: - # If the key doesn't exist, it means that the app is being executed in dev mode. - IS_BUILD = False - - -class CopyFiles: - """Copies assets to the app directory.""" - - def __init__(self) -> None: - assets_dir = "assets" - - if IS_BUILD: - assets_dir = get_home_res_path("assets") - - files = [ - { - "src": assets_dir, - "dest": os.path.join(settings.APP_DIR, "assets"), - "is_dir": True, - } - ] - - for entry in files: - src = os.path.join(os.getcwd(), entry["src"]) - - if entry["is_dir"]: - shutil.copytree( - src, - entry["dest"], - ignore=shutil.ignore_patterns( - "*.pyc", - ), - copy_function=shutil.copy2, - dirs_exist_ok=True, - ) - break - - shutil.copy2(src, entry["dest"]) - - -def create_config_dir() -> None: - """ - Creates the config directory if it doesn't exist. - """ - thumb_path = os.path.join("images", "thumbnails") - small_thumb_path = os.path.join(thumb_path, "small") - large_thumb_path = os.path.join(thumb_path, "large") - - artist_img_path = os.path.join("images", "artists") - small_artist_img_path = os.path.join(artist_img_path, "small") - large_artist_img_path = os.path.join(artist_img_path, "large") - - playlist_img_path = os.path.join("images", "playlists") - - dirs = [ - "", # creates the config folder - "images", - thumb_path, - small_thumb_path, - large_thumb_path, - artist_img_path, - small_artist_img_path, - large_artist_img_path, - playlist_img_path, - ] - - for _dir in dirs: - path = os.path.join(settings.APP_DIR, _dir) - exists = os.path.exists(path) - - if not exists: - os.makedirs(path) - os.chmod(path, 0o755) - - CopyFiles() - - -def setup_sqlite(): - """ - Create Sqlite databases and tables. - """ - # if os.path.exists(DB_PATH): - # os.remove(DB_PATH) - run_preinit_migrations() - - app_db_conn = create_connection(APP_DB_PATH) - playlist_db_conn = create_connection(USERDATA_DB_PATH) - - create_tables(app_db_conn, queries.CREATE_APPDB_TABLES) - create_tables(playlist_db_conn, queries.CREATE_USERDATA_TABLES) - - create_tables(app_db_conn, queries.CREATE_MIGRATIONS_TABLE) - create_tables(playlist_db_conn, queries.CREATE_MIGRATIONS_TABLE) - - app_db_conn.close() - playlist_db_conn.close() - - apply_migrations() - set_preinit_migration_versions() - set_postinit_migration_versions() - - Store.load_all_tracks() - Store.process_folders() - Store.load_albums() - Store.load_artists() +from app.setup.files import create_config_dir +from app.setup.sqlite import setup_sqlite, run_migrations def run_setup(): create_config_dir() setup_sqlite() + run_migrations() + + Store.load_all_tracks() + Store.process_folders() + Store.load_albums() + Store.load_artists() diff --git a/app/setup/files.py b/app/setup/files.py new file mode 100644 index 0000000..9aafb7e --- /dev/null +++ b/app/setup/files.py @@ -0,0 +1,93 @@ +""" +This module contains the functions that are used to +create the config directory and copy the assets to the app directory. +""" + +import os +import shutil +from configparser import ConfigParser + +from app import settings +from app.utils.filesystem import get_home_res_path + +config = ConfigParser() +config_path = get_home_res_path("pyinstaller.config.ini") +config.read(config_path) + +try: + IS_BUILD = config["DEFAULT"]["BUILD"] == "True" +except KeyError: + # If the key doesn't exist, it means that the app is being executed in dev mode. + IS_BUILD = False + + +class CopyFiles: + """Copies assets to the app directory.""" + + def __init__(self) -> None: + assets_dir = "assets" + + if IS_BUILD: + assets_dir = get_home_res_path("assets") + + files = [ + { + "src": assets_dir, + "dest": os.path.join(settings.APP_DIR, "assets"), + "is_dir": True, + } + ] + + for entry in files: + src = os.path.join(os.getcwd(), entry["src"]) + + if entry["is_dir"]: + shutil.copytree( + src, + entry["dest"], + ignore=shutil.ignore_patterns( + "*.pyc", + ), + copy_function=shutil.copy2, + dirs_exist_ok=True, + ) + break + + shutil.copy2(src, entry["dest"]) + + +def create_config_dir() -> None: + """ + Creates the config directory if it doesn't exist. + """ + thumb_path = os.path.join("images", "thumbnails") + small_thumb_path = os.path.join(thumb_path, "small") + large_thumb_path = os.path.join(thumb_path, "large") + + artist_img_path = os.path.join("images", "artists") + small_artist_img_path = os.path.join(artist_img_path, "small") + large_artist_img_path = os.path.join(artist_img_path, "large") + + playlist_img_path = os.path.join("images", "playlists") + + dirs = [ + "", # creates the config folder + "images", + thumb_path, + small_thumb_path, + large_thumb_path, + artist_img_path, + small_artist_img_path, + large_artist_img_path, + playlist_img_path, + ] + + for _dir in dirs: + path = os.path.join(settings.APP_DIR, _dir) + exists = os.path.exists(path) + + if not exists: + os.makedirs(path) + os.chmod(path, 0o755) + + CopyFiles() diff --git a/app/setup/sqlite.py b/app/setup/sqlite.py new file mode 100644 index 0000000..fc34be8 --- /dev/null +++ b/app/setup/sqlite.py @@ -0,0 +1,41 @@ +""" +Module to setup Sqlite databases and tables. +Applies migrations. +""" + +from app.db.sqlite import create_connection, create_tables, queries +from app.migrations import apply_migrations, set_postinit_migration_versions +from app.migrations.__preinit import run_preinit_migrations, set_preinit_migration_versions + +from app.settings import APP_DB_PATH, USERDATA_DB_PATH + + +def setup_sqlite(): + """ + Create Sqlite databases and tables. + """ + # if os.path.exists(DB_PATH): + # os.remove(DB_PATH) + + run_preinit_migrations() + + app_db_conn = create_connection(APP_DB_PATH) + playlist_db_conn = create_connection(USERDATA_DB_PATH) + + create_tables(app_db_conn, queries.CREATE_APPDB_TABLES) + create_tables(playlist_db_conn, queries.CREATE_USERDATA_TABLES) + + create_tables(app_db_conn, queries.CREATE_MIGRATIONS_TABLE) + create_tables(playlist_db_conn, queries.CREATE_MIGRATIONS_TABLE) + + app_db_conn.close() + playlist_db_conn.close() + + +def run_migrations(): + """ + Run migrations and updates migration version. + """ + apply_migrations() + set_preinit_migration_versions() + set_postinit_migration_versions() diff --git a/app/start_info_logger.py b/app/start_info_logger.py index 559ce1d..24bd7d5 100644 --- a/app/start_info_logger.py +++ b/app/start_info_logger.py @@ -1,8 +1,8 @@ import os -from app.utils import get_ip from app.settings import TCOLOR, APP_VERSION, FLASKVARS, APP_DIR from app import settings +from app.utils.network import get_ip def log_startup_info(): diff --git a/app/utils.py b/app/utils.py deleted file mode 100644 index 7659bb9..0000000 --- a/app/utils.py +++ /dev/null @@ -1,364 +0,0 @@ -""" -This module contains mini functions for the server. -""" -import hashlib -import os -import platform -import random -import re -import socket as Socket -import string -import threading -from datetime import datetime -from pathlib import Path -from collections import defaultdict -from operator import attrgetter - -import requests -from unidecode import unidecode - -from app import models -from app.settings import SUPPORTED_FILES - -CWD = Path(__file__).parent.resolve() - - -def background(func): - """ - a threading decorator - use @background above the function you want to run in the background - """ - - def background_func(*a, **kw): - threading.Thread(target=func, args=a, kwargs=kw).start() - - return background_func - - -def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]: - """ - Scans a directory for files with a specific extension. - Returns a list of files and folders in the directory. - """ - - if _dir == "": - return [], [] - - subfolders = [] - files = [] - - try: - for _file in os.scandir(_dir): - if _file.is_dir() and not _file.name.startswith("."): - subfolders.append(_file.path) - if _file.is_file(): - ext = os.path.splitext(_file.name)[1].lower() - if ext in SUPPORTED_FILES: - files.append(win_replace_slash(_file.path)) - - if full or len(files) == 0: - for _dir in list(subfolders): - sub_dirs, _file = run_fast_scandir(_dir, full=True) - subfolders.extend(sub_dirs) - files.extend(_file) - except (OSError, PermissionError, FileNotFoundError, ValueError): - return [], [] - - return subfolders, files - - -def remove_duplicates(tracks: list[models.Track]) -> list[models.Track]: - """ - Remove duplicates from a list of Track objects based on the trackhash attribute. - Retains objects with the highest bitrate. - """ - hash_to_tracks = defaultdict(list) - - for track in tracks: - hash_to_tracks[track.trackhash].append(track) - - tracks = [] - - for track_group in hash_to_tracks.values(): - max_bitrate_track = max(track_group, key=attrgetter("bitrate")) - tracks.append(max_bitrate_track) - - return tracks - - -def create_hash(*args: str, decode=False, limit=7) -> str: - """ - Creates a simple hash for an album - """ - str_ = "".join(args) - - if decode: - str_ = unidecode(str_) - - str_ = str_.lower().strip().replace(" ", "") - str_ = "".join(t for t in str_ if t.isalnum()) - str_ = str_.encode("utf-8") - str_ = hashlib.sha256(str_).hexdigest() - return str_[-limit:] - - -def create_folder_hash(*args: str, limit=7) -> str: - """ - Creates a simple hash for an album - """ - strings = [s.lower().strip().replace(" ", "") for s in args] - - strings = ["".join([t for t in s if t.isalnum()]) for s in strings] - strings = [s.encode("utf-8") for s in strings] - strings = [hashlib.sha256(s).hexdigest()[-limit:] for s in strings] - return "".join(strings) - - -def create_new_date(): - """ - It creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS" - :return: A string of the current date and time. - """ - now = datetime.now() - return now.strftime("%Y-%m-%d %H:%M:%S") - - -class UseBisection: - """ - Uses bisection to find a list of items in another list. - - returns a list of found items with `None` items being not found - items. - """ - - def __init__(self, source: list, search_from: str, queries: list[str]) -> None: - self.source_list = source - self.queries_list = queries - self.attr = search_from - - def find(self, query: str): - left = 0 - right = len(self.source_list) - 1 - - while left <= right: - mid = (left + right) // 2 - if self.source_list[mid].__getattribute__(self.attr) == query: - return self.source_list[mid] - elif self.source_list[mid].__getattribute__(self.attr) > query: - right = mid - 1 - else: - left = mid + 1 - - return None - - def __call__(self) -> list: - if len(self.source_list) == 0: - return [None] - - return [self.find(query) for query in self.queries_list] - - -class Ping: - """ - Checks if there is a connection to the internet by pinging google.com - """ - - @staticmethod - def __call__() -> bool: - try: - requests.get("https://google.com", timeout=10) - return True - except (requests.exceptions.ConnectionError, requests.Timeout): - return False - - -def get_artists_from_tracks(tracks: list[models.Track]) -> set[str]: - """ - Extracts all artists from a list of tracks. Returns a list of Artists. - """ - artists = set() - - master_artist_list = [[x.name for x in t.artist] for t in tracks] # type: ignore - artists = artists.union(*master_artist_list) - - return artists - - -def get_albumartists(albums: list[models.Album]) -> set[str]: - artists = set() - - for album in albums: - albumartists = [a.name for a in album.albumartists] # type: ignore - - artists.update(albumartists) - - return artists - - -def get_all_artists( - tracks: list[models.Track], albums: list[models.Album] -) -> list[models.Artist]: - artists_from_tracks = get_artists_from_tracks(tracks) - artist_from_albums = get_albumartists(albums) - - artists = list(artists_from_tracks.union(artist_from_albums)) - artists = sorted(artists) - - lower_artists = set(a.lower().strip() for a in artists) - indices = [[ar.lower().strip() for ar in artists].index(a) for a in lower_artists] - artists = [artists[i] for i in indices] - - return [models.Artist(a) for a in artists] - - -def bisection_search_string(strings: list[str], target: str) -> str | None: - """ - Finds a string in a list of strings using bisection search. - """ - if not strings: - return None - - strings = sorted(strings) - - left = 0 - right = len(strings) - 1 - while left <= right: - middle = (left + right) // 2 - if strings[middle] == target: - return strings[middle] - - if strings[middle] < target: - left = middle + 1 - else: - right = middle - 1 - - return None - - -def get_home_res_path(filename: str): - """ - Returns a path to resources in the home directory of this project. - Used to resolve resources in builds. - """ - try: - return (CWD / ".." / filename).resolve() - except ValueError: - return None - - -def get_ip(): - """ - Returns the IP address of this device. - """ - soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM) - soc.connect(("8.8.8.8", 80)) - ip_address = str(soc.getsockname()[0]) - soc.close() - - return ip_address - - -def is_windows(): - """ - Returns True if the OS is Windows. - """ - return platform.system() == "Windows" - - -def parse_feat_from_title(title: str) -> tuple[list[str], str]: - """ - Extracts featured artists from a song title using regex. - """ - regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)" - # regex for square brackets 👇 - sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]" - - match = re.search(regex, title, re.IGNORECASE) - - if not match: - match = re.search(sqr_regex, title, re.IGNORECASE) - regex = sqr_regex - - if not match: - return [], title - - artists = match.group(1) - artists = split_artists(artists, with_and=True) - - # remove "feat" group from title - new_title = re.sub(regex, "", title, flags=re.IGNORECASE) - return artists, new_title - - -def get_random_str(length=5): - """ - Generates a random string of length `length`. - """ - return "".join(random.choices(string.ascii_letters + string.digits, k=length)) - - -def win_replace_slash(path: str): - if is_windows(): - return path.replace("\\", "/").replace("//", "/") - - return path - - -def split_artists(src: str, with_and: bool = False): - exp = r"\s*(?: and |&|,|;)\s*" if with_and else r"\s*[,;]\s*" - - artists = re.split(exp, src) - return [a.strip() for a in artists] - - -def parse_artist_from_filename(title: str): - """ - Extracts artist names from a song title using regex. - """ - - regex = r"^(.+?)\s*[-–—]\s*(?:.+?)$" - match = re.search(regex, title, re.IGNORECASE) - - if not match: - return [] - - artists = match.group(1) - artists = split_artists(artists) - return artists - - -def parse_title_from_filename(title: str): - """ - Extracts track title from a song title using regex. - """ - - regex = r"^(?:.+?)\s*[-–—]\s*(.+?)$" - match = re.search(regex, title, re.IGNORECASE) - - if not match: - return title - - res = match.group(1) - # remove text in brackets starting with "official" case-insensitive - res = re.sub(r"\s*\([^)]*official[^)]*\)", "", res, flags=re.IGNORECASE) - return res.strip() - - -def remove_prod(title: str) -> str: - """ - Removes the producer string in a track title using regex. - """ - - # check if title contain title, if not return it. - if not ("prod." in title.lower()): - return title - - # check if title has brackets - if re.search(r"[()\[\]]", title): - regex = r"\s?(\(|\[)prod\..*?(\)|\])\s?" - else: - regex = r"\s?\bprod\.\s*\S+" - - # remove the producer string - title = re.sub(regex, "", title, flags=re.IGNORECASE) - return title.strip() diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/utils/bisection.py b/app/utils/bisection.py new file mode 100644 index 0000000..ca30aba --- /dev/null +++ b/app/utils/bisection.py @@ -0,0 +1,57 @@ +class UseBisection: + """ + Uses bisection to find a list of items in another list. + + returns a list of found items with `None` items being not found + items. + """ + + def __init__(self, source: list, search_from: str, queries: list[str]) -> None: + self.source_list = source + self.queries_list = queries + self.attr = search_from + + def find(self, query: str): + left = 0 + right = len(self.source_list) - 1 + + while left <= right: + mid = (left + right) // 2 + if self.source_list[mid].__getattribute__(self.attr) == query: + return self.source_list[mid] + elif self.source_list[mid].__getattribute__(self.attr) > query: + right = mid - 1 + else: + left = mid + 1 + + return None + + def __call__(self) -> list: + if len(self.source_list) == 0: + return [None] + + return [self.find(query) for query in self.queries_list] + + +def bisection_search_string(strings: list[str], target: str) -> str | None: + """ + Finds a string in a list of strings using bisection search. + """ + if not strings: + return None + + strings = sorted(strings) + + left = 0 + right = len(strings) - 1 + while left <= right: + middle = (left + right) // 2 + if strings[middle] == target: + return strings[middle] + + if strings[middle] < target: + left = middle + 1 + else: + right = middle - 1 + + return None diff --git a/app/utils/filesystem.py b/app/utils/filesystem.py new file mode 100644 index 0000000..17b9c11 --- /dev/null +++ b/app/utils/filesystem.py @@ -0,0 +1,50 @@ +import os +from pathlib import Path + +from app.settings import SUPPORTED_FILES +from app.utils.wintools import win_replace_slash + +CWD = Path(__file__).parent.resolve() + + +def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]: + """ + Scans a directory for files with a specific extension. + Returns a list of files and folders in the directory. + """ + + if _dir == "": + return [], [] + + subfolders = [] + files = [] + + try: + for _file in os.scandir(_dir): + if _file.is_dir() and not _file.name.startswith("."): + subfolders.append(_file.path) + if _file.is_file(): + ext = os.path.splitext(_file.name)[1].lower() + if ext in SUPPORTED_FILES: + files.append(win_replace_slash(_file.path)) + + if full or len(files) == 0: + for _dir in list(subfolders): + sub_dirs, _file = run_fast_scandir(_dir, full=True) + subfolders.extend(sub_dirs) + files.extend(_file) + except (OSError, PermissionError, FileNotFoundError, ValueError): + return [], [] + + return subfolders, files + + +def get_home_res_path(filename: str): + """ + Returns a path to resources in the home directory of this project. + Used to resolve resources in builds. + """ + try: + return (CWD / ".." / filename).resolve() + except ValueError: + return None diff --git a/app/utils/generators.py b/app/utils/generators.py new file mode 100644 index 0000000..4a6b324 --- /dev/null +++ b/app/utils/generators.py @@ -0,0 +1,19 @@ +import string +from datetime import datetime +import random + + +def create_new_date(): + """ + It creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS" + :return: A string of the current date and time. + """ + now = datetime.now() + return now.strftime("%Y-%m-%d %H:%M:%S") + + +def get_random_str(length=5): + """ + Generates a random string of length `length`. + """ + return "".join(random.choices(string.ascii_letters + string.digits, k=length)) diff --git a/app/utils/hashing.py b/app/utils/hashing.py new file mode 100644 index 0000000..b976e22 --- /dev/null +++ b/app/utils/hashing.py @@ -0,0 +1,31 @@ +import hashlib + +from unidecode import unidecode + + +def create_hash(*args: str, decode=False, limit=7) -> str: + """ + Creates a simple hash for an album + """ + str_ = "".join(args) + + if decode: + str_ = unidecode(str_) + + str_ = str_.lower().strip().replace(" ", "") + str_ = "".join(t for t in str_ if t.isalnum()) + str_ = str_.encode("utf-8") + str_ = hashlib.sha256(str_).hexdigest() + return str_[-limit:] + + +def create_folder_hash(*args: str, limit=7) -> str: + """ + Creates a simple hash for an album + """ + strings = [s.lower().strip().replace(" ", "") for s in args] + + strings = ["".join([t for t in s if t.isalnum()]) for s in strings] + strings = [s.encode("utf-8") for s in strings] + strings = [hashlib.sha256(s).hexdigest()[-limit:] for s in strings] + return "".join(strings) diff --git a/app/utils/network.py b/app/utils/network.py new file mode 100644 index 0000000..3682c71 --- /dev/null +++ b/app/utils/network.py @@ -0,0 +1,28 @@ +import requests +import socket as Socket + + +class Ping: + """ + Checks if there is a connection to the internet by pinging google.com + """ + + @staticmethod + def __call__() -> bool: + try: + requests.get("https://google.com", timeout=10) + return True + except (requests.exceptions.ConnectionError, requests.Timeout): + return False + + +def get_ip(): + """ + Returns the IP address of this device. + """ + soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM) + soc.connect(("8.8.8.8", 80)) + ip_address = str(soc.getsockname()[0]) + soc.close() + + return ip_address diff --git a/app/utils/parsers.py b/app/utils/parsers.py new file mode 100644 index 0000000..d48fc3c --- /dev/null +++ b/app/utils/parsers.py @@ -0,0 +1,86 @@ +import re + + +def split_artists(src: str, with_and: bool = False): + exp = r"\s*(?: and |&|,|;)\s*" if with_and else r"\s*[,;]\s*" + + artists = re.split(exp, src) + return [a.strip() for a in artists] + + +def parse_artist_from_filename(title: str): + """ + Extracts artist names from a song title using regex. + """ + + regex = r"^(.+?)\s*[-–—]\s*(?:.+?)$" + match = re.search(regex, title, re.IGNORECASE) + + if not match: + return [] + + artists = match.group(1) + artists = split_artists(artists) + return artists + + +def parse_title_from_filename(title: str): + """ + Extracts track title from a song title using regex. + """ + + regex = r"^(?:.+?)\s*[-–—]\s*(.+?)$" + match = re.search(regex, title, re.IGNORECASE) + + if not match: + return title + + res = match.group(1) + # remove text in brackets starting with "official" case-insensitive + res = re.sub(r"\s*\([^)]*official[^)]*\)", "", res, flags=re.IGNORECASE) + return res.strip() + + +def remove_prod(title: str) -> str: + """ + Removes the producer string in a track title using regex. + """ + + # check if title contain title, if not return it. + if not ("prod." in title.lower()): + return title + + # check if title has brackets + if re.search(r"[()\[\]]", title): + regex = r"\s?(\(|\[)prod\..*?(\)|\])\s?" + else: + regex = r"\s?\bprod\.\s*\S+" + + # remove the producer string + title = re.sub(regex, "", title, flags=re.IGNORECASE) + return title.strip() + + +def parse_feat_from_title(title: str) -> tuple[list[str], str]: + """ + Extracts featured artists from a song title using regex. + """ + regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)" + # regex for square brackets 👇 + sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]" + + match = re.search(regex, title, re.IGNORECASE) + + if not match: + match = re.search(sqr_regex, title, re.IGNORECASE) + regex = sqr_regex + + if not match: + return [], title + + artists = match.group(1) + artists = split_artists(artists, with_and=True) + + # remove "feat" group from title + new_title = re.sub(regex, "", title, flags=re.IGNORECASE) + return artists, new_title diff --git a/app/utils/remove_duplicates.py b/app/utils/remove_duplicates.py new file mode 100644 index 0000000..93ae5ac --- /dev/null +++ b/app/utils/remove_duplicates.py @@ -0,0 +1,23 @@ +from collections import defaultdict +from operator import attrgetter + +from app.models import Track + + +def remove_duplicates(tracks: list[Track]) -> list[Track]: + """ + Remove duplicates from a list of Track objects based on the trackhash attribute. + Retains objects with the highest bitrate. + """ + hash_to_tracks = defaultdict(list) + + for track in tracks: + hash_to_tracks[track.trackhash].append(track) + + tracks = [] + + for track_group in hash_to_tracks.values(): + max_bitrate_track = max(track_group, key=attrgetter("bitrate")) + tracks.append(max_bitrate_track) + + return tracks diff --git a/app/utils/threading.py b/app/utils/threading.py new file mode 100644 index 0000000..11ec9e0 --- /dev/null +++ b/app/utils/threading.py @@ -0,0 +1,13 @@ +import threading + + +def background(func): + """ + a threading decorator + use @background above the function you want to run in the background + """ + + def background_func(*a, **kw): + threading.Thread(target=func, args=a, kwargs=kw).start() + + return background_func diff --git a/app/utils/wintools.py b/app/utils/wintools.py new file mode 100644 index 0000000..b29f578 --- /dev/null +++ b/app/utils/wintools.py @@ -0,0 +1,16 @@ +import platform + + +# TODO: Check is_windows on app start in settings.py +def is_windows(): + """ + Returns True if the OS is Windows. + """ + return platform.system() == "Windows" + + +def win_replace_slash(path: str): + if is_windows(): + return path.replace("\\", "/").replace("//", "/") + + return path diff --git a/manage.py b/manage.py index 0d68061..ab07b7e 100644 --- a/manage.py +++ b/manage.py @@ -10,7 +10,8 @@ from app.lib.watchdogg import Watcher as WatchDog from app.settings import FLASKVARS from app.setup import run_setup from app.start_info_logger import log_startup_info -from app.utils import background, get_home_res_path +from app.utils.filesystem import get_home_res_path +from app.utils.threading import background werkzeug = logging.getLogger("werkzeug") werkzeug.setLevel(logging.ERROR) diff --git a/tests/test_utils.py b/tests/test_utils.py index 7546f7b..fd8da26 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,5 @@ -from hypothesis import given -from app.utils import parse_feat_from_title +# from hypothesis import given +from app.utils.parsers import parse_feat_from_title def test_extract_featured_artists_from_title():