add method and route to search across tracks, albums and artists.

+ break models into separate files
+ same for the utils and setup
This commit is contained in:
geoffrey45 2023-03-09 13:08:50 +03:00
parent d39c0ea2f8
commit e3ec9db989
55 changed files with 1113 additions and 1137 deletions

View File

@ -1,8 +1,8 @@
# Swing music
![SWING MUSIC PLAYER BANNER IMAGE](./screenshots/on-readme2.webp)
![SWING MUSIC PLAYER BANNER IMAGE](screenshots/on-readme2.webp)
![SWING MUSIC PLAYER BANNER IMAGE](./screenshots/on-readme1.webp)
![SWING MUSIC PLAYER BANNER IMAGE](screenshots/on-readme1.webp)
---

View File

@ -17,7 +17,6 @@ def create_api():
CORS(app)
with app.app_context():
app.register_blueprint(album.api)
app.register_blueprint(artist.api)
app.register_blueprint(track.api)

View File

@ -6,13 +6,12 @@ from dataclasses import asdict
from flask import Blueprint, request
from app import utils
from app.db.sqlite.albums import SQLiteAlbumMethods as adb
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import FavType, Track
from app.utils.remove_duplicates import remove_duplicates
get_album_by_id = adb.get_album_by_id
get_albums_by_albumartist = adb.get_albums_by_albumartist
check_is_fav = favdb.check_is_favorite
@ -20,8 +19,10 @@ api = Blueprint("album", __name__, url_prefix="")
@api.route("/album", methods=["POST"])
def get_album():
"""Returns all the tracks in the given album."""
def get_album_tracks_and_info():
"""
Returns all the tracks in the given album
"""
data = request.get_json()
error_msg = {"msg": "No hash provided"}
@ -58,7 +59,7 @@ def get_album():
return list(genres)
album.genres = get_album_genres(tracks)
tracks = utils.remove_duplicates(tracks)
tracks = remove_duplicates(tracks)
album.count = len(tracks)
album.get_date_from_tracks(tracks)
@ -83,7 +84,7 @@ def get_album():
@api.route("/album/<albumhash>/tracks", methods=["GET"])
def get_album_tracks(albumhash: str):
"""
Returns all the tracks in the given album.
Returns all the tracks in the given album, sorted by disc and track number.
"""
tracks = Store.get_tracks_by_albumhash(albumhash)
tracks = [asdict(t) for t in tracks]
@ -104,11 +105,11 @@ def get_artist_albums():
if data is None:
return {"msg": "No albumartist provided"}
albumartists: str = data["albumartists"] # type: ignore
albumartists: str = data["albumartists"]
limit: int = data.get("limit")
exclude: str = data.get("exclude")
albumartists: list[str] = albumartists.split(",") # type: ignore
albumartists: list[str] = albumartists.split(",")
albums = [
{
@ -121,22 +122,3 @@ def get_artist_albums():
albums = [a for a in albums if len(a["albums"]) > 0]
return {"data": albums}
# @album_bp.route("/album/bio", methods=["POST"])
# def get_album_bio():
# """Returns the album bio for the given album."""
# data = request.get_json()
# album_hash = data["hash"]
# err_msg = {"bio": "No bio found"}
# album = instances.album_instance.find_album_by_hash(album_hash)
# if album is None:
# return err_msg, 404
# bio = FetchAlbumBio(album["title"], album["artist"])()
# if bio is None:
# return err_msg, 404
# return {"bio": bio}

View File

@ -8,7 +8,7 @@ from flask import Blueprint, request
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import Album, FavType, Track
from app.utils import remove_duplicates
from app.utils.remove_duplicates import remove_duplicates
api = Blueprint("artist", __name__, url_prefix="/")

View File

@ -3,7 +3,7 @@ from flask import Blueprint, request
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import FavType
from app.utils import UseBisection
from app.utils.bisection import UseBisection
api = Blueprint("favorite", __name__, url_prefix="/")

View File

@ -10,8 +10,9 @@ from flask import Blueprint, request
from app import settings
from app.lib.folderslib import GetFilesAndDirs
from app.db.sqlite.settings import SettingsSQLMethods as db
from app.models import Folder
from app.utils import create_folder_hash, is_windows, win_replace_slash
from app.models.folder import Folder
from app.utils.hashing import create_folder_hash
from app.utils.wintools import win_replace_slash, is_windows
api = Blueprint("folder", __name__, url_prefix="/")

View File

@ -11,7 +11,8 @@ from app import models, serializer
from app.db.sqlite.playlists import SQLitePlaylistMethods
from app.db.store import Store
from app.lib import playlistlib
from app.utils import create_new_date, remove_duplicates
from app.utils.generators import create_new_date
from app.utils.remove_duplicates import remove_duplicates
api = Blueprint("playlist", __name__, url_prefix="/")

View File

@ -2,16 +2,15 @@
Contains all the search routes.
"""
from unidecode import unidecode
from flask import Blueprint, request
from app import models, utils
from app import models
from app.db.store import Store
from app.lib import searchlib
from unidecode import unidecode
api = Blueprint("search", __name__, url_prefix="/")
SEARCH_COUNT = 12
"""The max amount of items to return per request"""
@ -28,48 +27,36 @@ class SearchResults:
artists: list[models.Artist] = []
class DoSearch:
"""Class containing the methods that perform searching."""
class Search:
def __init__(self, query: str) -> None:
"""
:param :str:`query`: the search query.
"""
self.tracks: list[models.Track] = []
self.query = unidecode(query)
SearchResults.query = self.query
def search_tracks(self):
"""Calls :class:`SearchTracks` which returns the tracks that fuzzily match
"""
Calls :class:`SearchTracks` which returns the tracks that fuzzily match
the search terms. Then adds them to the `SearchResults` store.
"""
self.tracks = Store.tracks
tracks = searchlib.SearchTracks(self.tracks, self.query)()
tracks = searchlib.SearchTracks(self.query)()
if len(tracks) == 0:
return []
tracks = utils.remove_duplicates(tracks)
SearchResults.tracks = tracks
return tracks
def search_artists(self):
"""Calls :class:`SearchArtists` which returns the artists that fuzzily match
the search term. Then adds them to the `SearchResults` store.
"""
artists = [a.name for a in Store.artists]
artists = searchlib.SearchArtists(Store.artists, self.query)()
artists = searchlib.SearchArtists(self.query)()
SearchResults.artists = artists
return artists
def search_albums(self):
"""Calls :class:`SearchAlbums` which returns the albums that fuzzily match
the search term. Then adds them to the `SearchResults` store.
"""
albums = Store.albums
albums = searchlib.SearchAlbums(albums, self.query)()
albums = searchlib.SearchAlbums(self.query)()
SearchResults.albums = albums
return albums
@ -86,6 +73,10 @@ class DoSearch:
# return playlists
def get_top_results(self):
finder = searchlib.SearchAll()
return finder.search(self.query)
def search_all(self):
"""Calls all the search methods."""
self.search_tracks()
@ -104,7 +95,7 @@ def search_tracks():
if not query:
return {"error": "No query provided"}, 400
tracks = DoSearch(query).search_tracks()
tracks = Search(query).search_tracks()
return {
"tracks": tracks[:SEARCH_COUNT],
@ -122,7 +113,7 @@ def search_albums():
if not query:
return {"error": "No query provided"}, 400
tracks = DoSearch(query).search_albums()
tracks = Search(query).search_albums()
return {
"albums": tracks[:SEARCH_COUNT],
@ -140,7 +131,7 @@ def search_artists():
if not query:
return {"error": "No query provided"}, 400
artists = DoSearch(query).search_artists()
artists = Search(query).search_artists()
return {
"artists": artists[:SEARCH_COUNT],
@ -176,14 +167,17 @@ def get_top_results():
if not query:
return {"error": "No query provided"}, 400
DoSearch(query).search_all()
results = Search(query).get_top_results()
max_results = 2
# max_results = 2
# return {
# "tracks": SearchResults.tracks[:max_results],
# "albums": SearchResults.albums[:max_results],
# "artists": SearchResults.artists[:max_results],
# "playlists": SearchResults.playlists[:max_results],
# }
return {
"tracks": SearchResults.tracks[:max_results],
"albums": SearchResults.albums[:max_results],
"artists": SearchResults.artists[:max_results],
"playlists": SearchResults.playlists[:max_results],
"results": results
}
@ -198,20 +192,20 @@ def search_load_more():
if s_type == "tracks":
t = SearchResults.tracks
return {
"tracks": t[index : index + SEARCH_COUNT],
"tracks": t[index: index + SEARCH_COUNT],
"more": len(t) > index + SEARCH_COUNT,
}
elif s_type == "albums":
a = SearchResults.albums
return {
"albums": a[index : index + SEARCH_COUNT],
"albums": a[index: index + SEARCH_COUNT],
"more": len(a) > index + SEARCH_COUNT,
}
elif s_type == "artists":
a = SearchResults.artists
return {
"artists": a[index : index + SEARCH_COUNT],
"artists": a[index: index + SEARCH_COUNT],
"more": len(a) > index + SEARCH_COUNT,
}

View File

@ -4,9 +4,10 @@ from app import settings
from app.logger import log
from app.lib import populate
from app.db.store import Store
from app.utils import background, get_random_str
from app.lib.watchdogg import Watcher as WatchDog
from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.utils.generators import get_random_str
from app.utils.threading import background
api = Blueprint("settings", __name__, url_prefix="/")

View File

@ -1,7 +1,9 @@
"""
Contains all the track routes.
"""
from flask import Blueprint, send_file
import os
from flask import Blueprint, send_file, request
from app.db.store import Store
@ -15,20 +17,31 @@ def send_track_file(trackhash: str):
Falls back to track hash if id is not found.
"""
msg = {"msg": "File Not Found"}
def get_mime(filename: str) -> str:
ext = filename.rsplit(".", maxsplit=1)[-1]
return f"audio/{ext}"
filepath = request.args.get("filepath")
if filepath is not None and os.path.exists(filepath):
audio_type = get_mime(filepath)
return send_file(filepath, mimetype=audio_type)
if trackhash is None:
return msg, 404
try:
track = Store.get_tracks_by_trackhashes([trackhash])[0]
except IndexError:
track = None
tracks = Store.get_tracks_by_trackhashes([trackhash])
if track is None:
return msg, 404
for track in tracks:
if track is None:
return msg, 404
audio_type = track.filepath.rsplit(".", maxsplit=1)[-1]
audio_type = get_mime(track.filepath)
try:
return send_file(track.filepath, mimetype=f"audio/{audio_type}")
except FileNotFoundError:
return msg, 404
try:
return send_file(track.filepath, mimetype=audio_type)
except FileNotFoundError:
return msg, 404
return msg, 404

View File

@ -9,7 +9,7 @@ import PyInstaller.__main__ as bundler
from app import settings
from app.print_help import HELP_MESSAGE
from app.utils import is_windows
from app.utils.wintools import is_windows
config = ConfigParser()
config.read("pyinstaller.config.ini")

61
app/config.py Normal file
View File

@ -0,0 +1,61 @@
"""
Module for managing the JSON config file.
"""
import json
from enum import Enum
from typing import Type
from app.settings import JSON_CONFIG_PATH
class ConfigKeys(Enum):
ROOT_DIRS = ("root_dirs", list[str])
PLAYLIST_DIRS = ("playlist_dirs", list[str])
USE_ART_COLORS = ("use_art_colors", bool)
DEFAULT_ART_COLOR = ("default_art_color", str)
SHUFFLE_MODE = ("shuffle_mode", str)
REPEAT_MODE = ("repeat_mode", str)
AUTOPLAY_ON_START = ("autoplay_on_start", bool)
VOLUME = ("volume", int)
def __init__(self, key_name: str, data_type: Type):
self.key_name = key_name
self.data_type = data_type
def get_data_type(self) -> Type:
return self.data_type
class ConfigManager:
def __init__(self, config_file_path: str):
self.config_file_path = config_file_path
def read_config(self):
try:
with open(self.config_file_path) as f:
return json.load(f)
except FileNotFoundError:
return {}
# in case of errors, return an empty dict
def write_config(self, config_data):
with open(self.config_file_path, "w") as f:
json.dump(config_data, f, indent=4)
def get_value(self, key: ConfigKeys):
config_data = self.read_config()
value = config_data.get(key.key_name)
if value is not None:
return key.get_data_type()(value)
def set_value(self, key: ConfigKeys, value):
config_data = self.read_config()
config_data[key.key_name] = value
self.write_config(config_data)
settings = ConfigManager(JSON_CONFIG_PATH)
a = settings.get_value(ConfigKeys.ROOT_DIRS)

View File

@ -1,214 +0,0 @@
class AlbumMethods:
"""
Lists all the methods that can be found in the Albums class.
"""
def insert_album():
"""
Inserts a new album object into the database.
"""
pass
def get_all_albums():
"""
Returns all the albums in the database.
"""
pass
def get_album_by_id():
"""
Returns a single album matching the passed id.
"""
pass
def get_album_by_name():
"""
Returns a single album matching the passed name.
"""
pass
def get_album_by_artist():
"""
Returns a single album matching the passed artist name.
"""
pass
class ArtistMethods:
"""
Lists all the methods that can be found in the Artists class.
"""
def insert_artist():
"""
Inserts a new artist object into the database.
"""
pass
def get_all_artists():
"""
Returns all the artists in the database.
"""
pass
def get_artist_by_id():
"""
Returns an artist matching the mongo Id.
"""
pass
def get_artists_by_name():
"""
Returns all the artists matching the query.
"""
pass
class PlaylistMethods:
"""
Lists all the methods that can be found in the Playlists class.
"""
def insert_playlist():
"""
Inserts a new playlist object into the database.
"""
pass
def get_all_playlists():
"""
Returns all the playlists in the database.
"""
pass
def get_playlist_by_id():
"""
Returns a single playlist matching the id in the query params.
"""
pass
def add_track_to_playlist():
"""
Adds a track to a playlist.
"""
pass
def get_playlist_by_name():
"""
Returns a single playlist matching the name in the query params.
"""
pass
def update_playlist():
"""
Updates a playlist.
"""
pass
class TrackMethods:
"""
Lists all the methods that can be found in the Tracks class.
"""
def insert_one_track():
"""
Inserts a new track object into the database.
"""
pass
def drop_db():
"""
Drops the entire database.
"""
pass
def get_all_tracks():
"""
Returns all the tracks in the database.
"""
pass
def get_track_by_id():
"""
Returns a single track matching the id in the query params.
"""
pass
def get_track_by_album():
"""
Returns a single track matching the album in the query params.
"""
pass
def search_tracks_by_album():
"""
Returns all the tracks matching the albums in the query params (using regex).
"""
pass
def search_tracks_by_artist():
"""
Returns all the tracks matching the artists in the query params.
"""
pass
def find_track_by_title():
"""
Finds all the tracks matching the title in the query params.
"""
pass
def find_tracks_by_album():
"""
Finds all the tracks matching the album in the query params.
"""
pass
def find_tracks_by_folder():
"""
Finds all the tracks matching the folder in the query params.
"""
pass
def find_tracks_by_artist():
"""
Finds all the tracks matching the artist in the query params.
"""
pass
def find_tracks_by_albumartist():
"""
Finds all the tracks matching the album artist in the query params.
"""
pass
def get_track_by_path():
"""
Returns a single track matching the path in the query params.
"""
pass
def remove_track_by_path():
"""
Removes a track from the database. Returns a boolean indicating success or failure of the operation.
"""
pass
def remove_track_by_id():
"""
Removes a track from the database. Returns a boolean indicating success or failure of the operation.
"""
pass
def find_tracks_by_albumhash():
"""
Returns all the tracks matching the passed hash.
"""
pass
def get_dir_t_count():
"""
Returns a list of all the tracks matching the path in the query params.
"""
pass

View File

@ -1,11 +1,9 @@
from sqlite3 import Cursor
from app.db import AlbumMethods
from .utils import SQLiteManager, tuple_to_album, tuples_to_albums
class SQLiteAlbumMethods(AlbumMethods):
class SQLiteAlbumMethods:
@classmethod
def insert_one_album(cls, cur: Cursor, albumhash: str, colors: str):
"""

View File

@ -4,7 +4,8 @@ from collections import OrderedDict
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.db.sqlite.utils import SQLiteManager, tuple_to_playlist, tuples_to_playlists
from app.models import Artist
from app.utils import background, create_new_date
from app.utils.generators import create_new_date
from app.utils.threading import background
class SQLitePlaylistMethods:

View File

@ -1,5 +1,5 @@
from app.db.sqlite.utils import SQLiteManager
from app.utils import win_replace_slash
from app.utils.wintools import win_replace_slash
class SettingsSQLMethods:
@ -19,7 +19,7 @@ class SettingsSQLMethods:
cur.execute(sql)
dirs = cur.fetchall()
dirs = [dir[0] for dir in dirs]
dirs = [_dir[0] for _dir in dirs]
return [win_replace_slash(d) for d in dirs]
@staticmethod
@ -31,7 +31,7 @@ class SettingsSQLMethods:
sql = "INSERT INTO settings (root_dirs) VALUES (?)"
existing_dirs = SettingsSQLMethods.get_root_dirs()
dirs = [dir for dir in dirs if dir not in existing_dirs]
dirs = [_dir for _dir in dirs if _dir not in existing_dirs]
if len(dirs) == 0:
return
@ -85,4 +85,4 @@ class SettingsSQLMethods:
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
dirs = cur.fetchall()
return [dir[0] for dir in dirs]
return [_dir[0] for _dir in dirs]

View File

@ -3,7 +3,6 @@ Contains the SQLiteTrackMethods class which contains methods for
interacting with the tracks table.
"""
from collections import OrderedDict
from sqlite3 import Cursor
@ -38,7 +37,8 @@ class SQLiteTrackMethods:
title,
track,
trackhash
) VALUES(:album, :albumartist, :albumhash, :artist, :bitrate, :copyright, :date, :disc, :duration, :filepath, :folder, :genre, :title, :track, :trackhash)
) VALUES(:album, :albumartist, :albumhash, :artist, :bitrate, :copyright,
:date, :disc, :duration, :filepath, :folder, :genre, :title, :track, :trackhash)
"""
track = OrderedDict(sorted(track.items()))

View File

@ -11,14 +11,12 @@ from app.db.sqlite.albums import SQLiteAlbumMethods as aldb
from app.db.sqlite.artists import SQLiteArtistMethods as ardb
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.sqlite.tracks import SQLiteTrackMethods as tdb
from app.lib.artistlib import get_all_artists
from app.models import Album, Artist, Folder, Track
from app.utils import (
UseBisection,
create_folder_hash,
get_all_artists,
remove_duplicates,
win_replace_slash,
)
from app.utils.bisection import UseBisection
from app.utils.hashing import create_folder_hash
from app.utils.remove_duplicates import remove_duplicates
from app.utils.wintools import win_replace_slash
class Store:

View File

@ -5,14 +5,16 @@ import time
from requests import ConnectionError as RequestConnectionError
from requests import ReadTimeout
from app import utils
from app.lib.artistlib import CheckArtistImages
from app.lib.populate import Populate, PopulateCancelledError
from app.lib.trackslib import validate_tracks
from app.logger import log
from app.utils.generators import get_random_str
from app.utils.network import Ping
from app.utils.threading import background
@utils.background
@background
def run_periodic_checks():
"""
Checks for new songs every N minutes.
@ -23,11 +25,11 @@ def run_periodic_checks():
while True:
try:
Populate(key=utils.get_random_str())
Populate(key=get_random_str())
except PopulateCancelledError:
pass
if utils.Ping()():
if Ping()():
try:
CheckArtistImages()
except (RequestConnectionError, ReadTimeout):

View File

@ -9,9 +9,9 @@ from tqdm import tqdm
from requests.exceptions import ConnectionError as ReqConnError, ReadTimeout
from app import settings
from app.models import Artist
from app.db.store import Store
from app.utils import create_hash
from app.models import Artist, Track, Album
from app.db import store
from app.utils.hashing import create_hash
def get_artist_image_link(artist: str):
@ -38,6 +38,7 @@ def get_artist_image_link(artist: str):
return None
# TODO: Move network calls to utils/network.py
class DownloadImage:
def __init__(self, url: str, name: str) -> None:
sm_path = Path(settings.ARTIST_IMG_SM_PATH) / name
@ -71,8 +72,8 @@ class CheckArtistImages:
with ThreadPoolExecutor() as pool:
list(
tqdm(
pool.map(self.download_image, Store.artists),
total=len(Store.artists),
pool.map(self.download_image, store.Store.artists),
total=len(store.Store.artists),
desc="Downloading artist images",
)
)
@ -95,13 +96,9 @@ class CheckArtistImages:
return DownloadImage(url, name=f"{artist.artisthash}.webp")
# def fetch_album_bio(title: str, albumartist: str) -> str | None:
# """
# Returns the album bio for a given album.
# """
# last_fm_url = "http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={}&artist={}&album={}&format=json".format(
# settings.LAST_FM_API_KEY, albumartist, title
# )
# def fetch_album_bio(title: str, albumartist: str) -> str | None: """ Returns the album bio for a given album. """
# last_fm_url = "http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={}&artist={}&album={
# }&format=json".format( settings.LAST_FM_API_KEY, albumartist, title )
# try:
# response = requests.get(last_fm_url)
@ -128,3 +125,42 @@ class CheckArtistImages:
# def __call__(self):
# return fetch_album_bio(self.title, self.albumartist)
def get_artists_from_tracks(tracks: list[Track]) -> set[str]:
"""
Extracts all artists from a list of tracks. Returns a list of Artists.
"""
artists = set()
master_artist_list = [[x.name for x in t.artist] for t in tracks]
artists = artists.union(*master_artist_list)
return artists
def get_albumartists(albums: list[Album]) -> set[str]:
artists = set()
for album in albums:
albumartists = [a.name for a in album.albumartists]
artists.update(albumartists)
return artists
def get_all_artists(
tracks: list[Track], albums: list[Album]
) -> list[Artist]:
artists_from_tracks = get_artists_from_tracks(tracks=tracks)
artist_from_albums = get_albumartists(albums=albums)
artists = list(artists_from_tracks.union(artist_from_albums))
artists = sorted(artists)
lower_artists = set(a.lower().strip() for a in artists)
indices = [[ar.lower().strip() for ar in artists].index(a) for a in lower_artists]
artists = [artists[i] for i in indices]
return [Artist(a) for a in artists]

View File

@ -5,7 +5,7 @@ from app.db.store import Store
from app.models import Folder, Track
from app.settings import SUPPORTED_FILES
from app.logger import log
from app.utils import win_replace_slash
from app.utils.wintools import win_replace_slash
class GetFilesAndDirs:

View File

@ -4,13 +4,11 @@ This library contains all the functions related to playlists.
import os
import random
import string
from datetime import datetime
from typing import Any
from PIL import Image, ImageSequence
from app import settings
from app.logger import log
def create_thumbnail(image: Any, img_path: str) -> str:
@ -80,36 +78,33 @@ def save_p_image(file, pid: str):
return filename
class ValidatePlaylistThumbs:
"""
Removes all unused images in the images/playlists folder.
"""
def __init__(self) -> None:
images = []
playlists = Get.get_all_playlists()
log.info("Validating playlist thumbnails")
for playlist in playlists:
if playlist.image:
img_path = playlist.image.split("/")[-1]
thumb_path = playlist.thumb.split("/")[-1]
images.append(img_path)
images.append(thumb_path)
p_path = os.path.join(settings.APP_DIR, "images", "playlists")
for image in os.listdir(p_path):
if image not in images:
os.remove(os.path.join(p_path, image))
log.info("Validating playlist thumbnails ... ✅")
def create_new_date():
return datetime.now()
#
# class ValidatePlaylistThumbs:
# """
# Removes all unused images in the images/playlists folder.
# """
#
# def __init__(self) -> None:
# images = []
# playlists = Get.get_all_playlists()
#
# log.info("Validating playlist thumbnails")
# for playlist in playlists:
# if playlist.image:
# img_path = playlist.image.split("/")[-1]
# thumb_path = playlist.thumb.split("/")[-1]
#
# images.append(img_path)
# images.append(thumb_path)
#
# p_path = os.path.join(settings.APP_DIR, "images", "playlists")
#
# for image in os.listdir(p_path):
# if image not in images:
# os.remove(os.path.join(p_path, image))
#
# log.info("Validating playlist thumbnails ... ✅")
#
# TODO: Fix ValidatePlaylistThumbs

View File

@ -11,7 +11,7 @@ from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.taglib import extract_thumb, get_tags
from app.logger import log
from app.models import Album, Artist, Track
from app.utils import run_fast_scandir
from app.utils.filesystem import run_fast_scandir
get_all_tracks = SQLiteTrackMethods.get_all_tracks
insert_many_tracks = SQLiteTrackMethods.insert_many_tracks
@ -72,7 +72,6 @@ class Populate:
ProcessAlbumColors()
ProcessArtistColors()
@staticmethod
def filter_untagged(tracks: list[Track], files: list[str]):
tagged_files = [t.filepath for t in tracks]

View File

@ -1,12 +1,15 @@
"""
This library contains all the functions related to the search functionality.
"""
from typing import List
from typing import List, Generator, TypeVar, Any
import itertools
from rapidfuzz import fuzz, process
from unidecode import unidecode
from app import models
from app.db.store import Store
from app.utils.remove_duplicates import remove_duplicates
ratio = fuzz.ratio
wratio = fuzz.WRatio
@ -35,31 +38,32 @@ class Limit:
class SearchTracks:
def __init__(self, tracks: List[models.Track], query: str) -> None:
def __init__(self, query: str) -> None:
self.query = query
self.tracks = tracks
self.tracks = Store.tracks
def __call__(self) -> List[models.Track]:
"""
Gets all songs with a given title.
"""
tracks = [unidecode(track.og_title).lower() for track in self.tracks]
track_titles = [unidecode(track.og_title).lower() for track in self.tracks]
results = process.extract(
self.query,
tracks,
track_titles,
scorer=fuzz.WRatio,
score_cutoff=Cutoff.tracks,
limit=Limit.tracks,
)
return [self.tracks[i[2]] for i in results]
tracks = [self.tracks[i[2]] for i in results]
return remove_duplicates(tracks)
class SearchArtists:
def __init__(self, artists: list[models.Artist], query: str) -> None:
def __init__(self, query: str) -> None:
self.query = query
self.artists = artists
self.artists = Store.artists
def __call__(self) -> list:
"""
@ -75,14 +79,13 @@ class SearchArtists:
limit=Limit.artists,
)
artists = [a[0] for a in results]
return [self.artists[i[2]] for i in results]
class SearchAlbums:
def __init__(self, albums: List[models.Album], query: str) -> None:
def __init__(self, query: str) -> None:
self.query = query
self.albums = albums
self.albums = Store.albums
def __call__(self) -> List[models.Album]:
"""
@ -125,3 +128,90 @@ class SearchPlaylists:
)
return [self.playlists[i[2]] for i in results]
_type = List[models.Track | models.Album | models.Artist]
_S2 = TypeVar("_S2")
_ResultType = int | float
def get_titles(items: _type):
for item in items:
if isinstance(item, models.Track):
text = item.og_title
elif isinstance(item, models.Album):
text = item.title
# print(text)
elif isinstance(item, models.Artist):
text = item.name
else:
text = None
yield text
class SearchAll:
"""
Joins all tracks, albums and artists
then fuzzy searches them as a single unit.
"""
@staticmethod
def collect_all():
all_items: _type = []
all_items.extend(Store.tracks)
all_items.extend(Store.albums)
all_items.extend(Store.artists)
return all_items, get_titles(all_items)
@staticmethod
def get_results(items: Generator[str, Any, None], query: str):
items = list(items)
results = process.extract(
query=query,
choices=items,
scorer=fuzz.WRatio,
score_cutoff=Cutoff.tracks,
limit=20
)
return results
@staticmethod
def sort_results(items: _type):
"""
Separates results into differrent lists using itertools.groupby.
"""
mapped_items = [
{"type": "track", "item": item} if isinstance(item, models.Track) else
{"type": "album", "item": item} if isinstance(item, models.Album) else
{"type": "artist", "item": item} if isinstance(item, models.Artist) else
{"type": "Unknown", "item": item} for item in items
]
mapped_items.sort(key=lambda x: x["type"])
groups = [
list(group) for key, group in
itertools.groupby(mapped_items, lambda x: x["type"])
]
print(len(groups))
# merge items of a group into a dict that looks like: {"albums": [album1, ...]}
groups = [
{f"{group[0]['type']}s": [i['item'] for i in group]} for group in groups
]
return groups
@staticmethod
def search(query: str):
items, titles = SearchAll.collect_all()
results = SearchAll.get_results(titles, query)
results = [items[i[2]] for i in results]
return SearchAll.sort_results(results)

View File

@ -6,12 +6,9 @@ from PIL import Image, UnidentifiedImageError
from tinytag import TinyTag
from app import settings
from app.utils import (
create_hash,
parse_artist_from_filename,
parse_title_from_filename,
win_replace_slash,
)
from app.utils.hashing import create_hash
from app.utils.parsers import parse_title_from_filename, parse_artist_from_filename
from app.utils.wintools import win_replace_slash
def parse_album_art(filepath: str):

View File

@ -15,5 +15,6 @@ def validate_tracks() -> None:
"""
for track in tqdm(Store.tracks, desc="Removing deleted tracks"):
if not os.path.exists(track.filepath):
print(f"Removing {track.filepath}")
Store.tracks.remove(track)
tdb.remove_track_by_filepath(track.filepath)

View File

@ -8,7 +8,6 @@ import time
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from app.logger import log
from app.db.store import Store
from app.lib.taglib import get_tags
@ -91,6 +90,9 @@ class Watcher:
"WatchdogError: Failed to start watchdog, root directories could not be resolved."
)
return
except OSError as e:
log.error('Failed to start watchdog. %s', e)
return
try:
while True:

View File

@ -22,7 +22,7 @@ def run_preinit_migrations():
"""
try:
userdb_version = MigrationManager.get_preinit_version()
except (OperationalError):
except OperationalError:
userdb_version = 0
for migration in all_preinits:

View File

@ -1,258 +0,0 @@
"""
Contains all the models for objects generation and typing.
"""
import dataclasses
import json
from dataclasses import dataclass
from app import utils, settings
@dataclass(slots=True)
class Artist:
"""
Artist class
"""
name: str
artisthash: str = ""
image: str = ""
trackcount: int = 0
albumcount: int = 0
duration: int = 0
colors: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
def __post_init__(self):
self.artisthash = utils.create_hash(self.name, decode=True)
self.image = self.artisthash + ".webp"
self.colors = json.loads(str(self.colors))
@dataclass(slots=True)
class Track:
"""
Track class
"""
album: str
albumartist: str | list[Artist]
albumhash: str
artist: str | list[Artist]
bitrate: int
copyright: str
date: str
disc: int
duration: int
filepath: str
folder: str
genre: str | list[str]
title: str
track: int
trackhash: str
filetype: str = ""
image: str = ""
artist_hashes: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
og_title: str = ""
def __post_init__(self):
self.og_title = self.title
if self.artist is not None:
artists = utils.split_artists(self.artist)
new_title = self.title
if settings.EXTRACT_FEAT:
featured, new_title = utils.parse_feat_from_title(self.title)
original_lower = "-".join([a.lower() for a in artists])
artists.extend([a for a in featured if a.lower() not in original_lower])
if settings.REMOVE_PROD:
new_title = utils.remove_prod(new_title)
# if track is a single
if self.og_title == self.album:
self.album = new_title
self.title = new_title
self.artist_hashes = [utils.create_hash(a, decode=True) for a in artists]
self.artist = [Artist(a) for a in artists]
albumartists = utils.split_artists(self.albumartist)
self.albumartist = [Artist(a) for a in albumartists]
self.filetype = self.filepath.rsplit(".", maxsplit=1)[-1]
self.image = self.albumhash + ".webp"
if self.genre is not None:
self.genre = str(self.genre).replace("/", ",").replace(";", ",")
self.genre = str(self.genre).lower().split(",")
self.genre = [g.strip() for g in self.genre]
@dataclass(slots=True)
class Album:
"""
Creates an album object
"""
albumhash: str
title: str
albumartists: list[Artist]
albumartisthash: str = ""
image: str = ""
count: int = 0
duration: int = 0
colors: list[str] = dataclasses.field(default_factory=list)
date: str = ""
is_soundtrack: bool = False
is_compilation: bool = False
is_single: bool = False
is_EP: bool = False
is_favorite: bool = False
is_live: bool = False
genres: list[str] = dataclasses.field(default_factory=list)
def __post_init__(self):
self.image = self.albumhash + ".webp"
self.albumartisthash = "-".join(a.artisthash for a in self.albumartists)
def set_colors(self, colors: list[str]):
self.colors = colors
def check_type(self):
"""
Runs all the checks to determine the type of album.
"""
self.is_soundtrack = self.check_is_soundtrack()
if self.is_soundtrack:
return
self.is_live = self.check_is_live_album()
if self.is_live:
return
self.is_compilation = self.check_is_compilation()
if self.is_compilation:
return
self.is_EP = self.check_is_ep()
def check_is_soundtrack(self) -> bool:
"""
Checks if the album is a soundtrack.
"""
keywords = ["motion picture", "soundtrack"]
for keyword in keywords:
if keyword in self.title.lower():
return True
return False
def check_is_compilation(self) -> bool:
"""
Checks if the album is a compilation.
"""
artists = [a.name for a in self.albumartists] # type: ignore
artists = "".join(artists).lower()
if "various artists" in artists:
return True
substrings = ["the essential", "best of", "greatest hits", "#1 hits", "number ones", "super hits",
"ultimate collection"]
for substring in substrings:
if substring in self.title.lower():
return True
return False
def check_is_live_album(self):
"""
Checks if the album is a live album.
"""
keywords = ["live from", "live at", "live in"]
for keyword in keywords:
if keyword in self.title.lower():
return True
return False
def check_is_ep(self) -> bool:
"""
Checks if the album is an EP.
"""
return self.title.strip().endswith(" EP")
def check_is_single(self, tracks: list[Track]):
"""
Checks if the album is a single.
"""
if (
len(tracks) == 1
and tracks[0].title == self.title
# and tracks[0].track == 1
# and tracks[0].disc == 1
# Todo: Are the above commented checks necessary?
):
self.is_single = True
def get_date_from_tracks(self, tracks: list[Track]):
for track in tracks:
if track.date != "Unknown":
self.date = track.date
break
@dataclass(slots=True)
class Playlist:
"""Creates playlist objects"""
id: int
artisthashes: str | list[str]
banner_pos: int
has_gif: str | bool
image: str
last_updated: str
name: str
trackhashes: str | list[str]
thumb: str = ""
count: int = 0
duration: int = 0
def __post_init__(self):
self.trackhashes = json.loads(str(self.trackhashes))
self.artisthashes = json.loads(str(self.artisthashes))
self.count = len(self.trackhashes)
self.has_gif = bool(int(self.has_gif))
if self.image is not None:
self.thumb = "thumb_" + self.image
else:
self.image = "None"
self.thumb = "None"
@dataclass(slots=True, frozen=True)
class Folder:
name: str
path: str
has_tracks: bool
is_sym: bool = False
path_hash: str = ""
class FavType:
"""Favorite types enum"""
track = "track"
album = "album"
artist = "artist"

16
app/models/__init__.py Normal file
View File

@ -0,0 +1,16 @@
from .album import Album
from .track import Track
from .artist import Artist, ArtistMinimal
from .enums import FavType
from .playlist import Playlist
from .folder import Folder
__all__ = [
"Album",
"Track",
"Artist",
"ArtistMinimal",
"Playlist",
"Folder",
"FavType",
]

123
app/models/album.py Normal file
View File

@ -0,0 +1,123 @@
import dataclasses
from dataclasses import dataclass
from .track import Track
from .artist import Artist
@dataclass(slots=True)
class Album:
"""
Creates an album object
"""
albumhash: str
title: str
albumartists: list[Artist]
albumartisthash: str = ""
image: str = ""
count: int = 0
duration: int = 0
colors: list[str] = dataclasses.field(default_factory=list)
date: str = ""
is_soundtrack: bool = False
is_compilation: bool = False
is_single: bool = False
is_EP: bool = False
is_favorite: bool = False
is_live: bool = False
genres: list[str] = dataclasses.field(default_factory=list)
def __post_init__(self):
self.image = self.albumhash + ".webp"
self.albumartisthash = "-".join(a.artisthash for a in self.albumartists)
def set_colors(self, colors: list[str]):
self.colors = colors
def check_type(self):
"""
Runs all the checks to determine the type of album.
"""
self.is_soundtrack = self.check_is_soundtrack()
if self.is_soundtrack:
return
self.is_live = self.check_is_live_album()
if self.is_live:
return
self.is_compilation = self.check_is_compilation()
if self.is_compilation:
return
self.is_EP = self.check_is_ep()
def check_is_soundtrack(self) -> bool:
"""
Checks if the album is a soundtrack.
"""
keywords = ["motion picture", "soundtrack"]
for keyword in keywords:
if keyword in self.title.lower():
return True
return False
def check_is_compilation(self) -> bool:
"""
Checks if the album is a compilation.
"""
artists = [a.name for a in self.albumartists] # type: ignore
artists = "".join(artists).lower()
if "various artists" in artists:
return True
substrings = ["the essential", "best of", "greatest hits", "#1 hits", "number ones", "super hits",
"ultimate collection"]
for substring in substrings:
if substring in self.title.lower():
return True
return False
def check_is_live_album(self):
"""
Checks if the album is a live album.
"""
keywords = ["live from", "live at", "live in"]
for keyword in keywords:
if keyword in self.title.lower():
return True
return False
def check_is_ep(self) -> bool:
"""
Checks if the album is an EP.
"""
return self.title.strip().endswith(" EP")
def check_is_single(self, tracks: list[Track]):
"""
Checks if the album is a single.
"""
if (
len(tracks) == 1
and tracks[0].title == self.title
# and tracks[0].track == 1
# and tracks[0].disc == 1
# Todo: Are the above commented checks necessary?
):
self.is_single = True
def get_date_from_tracks(self, tracks: list[Track]):
for track in tracks:
if track.date != "Unknown":
self.date = track.date
break

41
app/models/artist.py Normal file
View File

@ -0,0 +1,41 @@
import dataclasses
import json
from dataclasses import dataclass
from app.utils.hashing import create_hash
@dataclass(slots=True)
class Artist:
"""
Artist class
"""
name: str
artisthash: str = ""
image: str = ""
trackcount: int = 0
albumcount: int = 0
duration: int = 0
colors: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
def __post_init__(self):
self.artisthash = create_hash(self.name, decode=True)
self.image = self.artisthash + ".webp"
self.colors = json.loads(str(self.colors))
@dataclass(slots=True)
class ArtistMinimal:
"""
ArtistMinimal class
"""
name: str
artisthash: str = ""
image: str = ""
def __post_init__(self):
self.artisthash = create_hash(self.name, decode=True)
self.image = self.artisthash + ".webp"

6
app/models/enums.py Normal file
View File

@ -0,0 +1,6 @@
class FavType:
"""Favorite types enum"""
track = "track"
album = "album"
artist = "artist"

10
app/models/folder.py Normal file
View File

@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass(slots=True, frozen=True)
class Folder:
name: str
path: str
has_tracks: bool
is_sym: bool = False
path_hash: str = ""

33
app/models/playlist.py Normal file
View File

@ -0,0 +1,33 @@
import json
from dataclasses import dataclass
@dataclass(slots=True)
class Playlist:
"""Creates playlist objects"""
id: int
artisthashes: str | list[str]
banner_pos: int
has_gif: str | bool
image: str
last_updated: str
name: str
trackhashes: str | list[str]
thumb: str = ""
count: int = 0
duration: int = 0
def __post_init__(self):
self.trackhashes = json.loads(str(self.trackhashes))
self.artisthashes = json.loads(str(self.artisthashes))
self.count = len(self.trackhashes)
self.has_gif = bool(int(self.has_gif))
if self.image is not None:
self.thumb = "thumb_" + self.image
else:
self.image = "None"
self.thumb = "None"

70
app/models/track.py Normal file
View File

@ -0,0 +1,70 @@
import dataclasses
from dataclasses import dataclass
from app import settings
from .artist import ArtistMinimal
from app.utils.hashing import create_hash
from app.utils.parsers import split_artists, remove_prod, parse_feat_from_title
@dataclass(slots=True)
class Track:
"""
Track class
"""
album: str
albumartist: str | list[ArtistMinimal]
albumhash: str
artist: str | list[ArtistMinimal]
bitrate: int
copyright: str
date: str
disc: int
duration: int
filepath: str
folder: str
genre: str | list[str]
title: str
track: int
trackhash: str
filetype: str = ""
image: str = ""
artist_hashes: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
og_title: str = ""
def __post_init__(self):
self.og_title = self.title
if self.artist is not None:
artists = split_artists(self.artist)
new_title = self.title
if settings.EXTRACT_FEAT:
featured, new_title = parse_feat_from_title(self.title)
original_lower = "-".join([a.lower() for a in artists])
artists.extend([a for a in featured if a.lower() not in original_lower])
if settings.REMOVE_PROD:
new_title = remove_prod(new_title)
# if track is a single
if self.og_title == self.album:
self.album = new_title
self.title = new_title
self.artist_hashes = [create_hash(a, decode=True) for a in artists]
self.artist = [ArtistMinimal(a) for a in artists]
albumartists = split_artists(self.albumartist)
self.albumartist = [ArtistMinimal(a) for a in albumartists]
self.filetype = self.filepath.rsplit(".", maxsplit=1)[-1]
self.image = self.albumhash + ".webp"
if self.genre is not None:
self.genre = str(self.genre).replace("/", ",").replace(";", ",")
self.genre = str(self.genre).lower().split(",")
self.genre = [g.strip() for g in self.genre]

View File

@ -75,6 +75,7 @@ APP_DB_NAME = "swing.db"
USER_DATA_DB_NAME = "userdata.db"
APP_DB_PATH = os.path.join(APP_DIR, APP_DB_NAME)
USERDATA_DB_PATH = os.path.join(APP_DIR, USER_DATA_DB_NAME)
JSON_CONFIG_PATH = os.path.join(APP_DIR, "config.json")
class FLASKVARS:
@ -122,3 +123,5 @@ class TCOLOR:
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
# credits: https://stackoverflow.com/a/287944

View File

@ -1,136 +1,17 @@
"""
Contains the functions to prepare the server for use.
Prepares the server for use.
"""
import os
import shutil
import time
from configparser import ConfigParser
from app import settings
from app.db.sqlite import create_connection, create_tables, queries
from app.db.store import Store
from app.migrations import apply_migrations, set_postinit_migration_versions
from app.migrations._preinit import (
run_preinit_migrations,
set_preinit_migration_versions,
)
from app.settings import APP_DB_PATH, USERDATA_DB_PATH
from app.utils import get_home_res_path
config = ConfigParser()
config_path = get_home_res_path("pyinstaller.config.ini")
config.read(config_path)
try:
IS_BUILD = config["DEFAULT"]["BUILD"] == "True"
except KeyError:
# If the key doesn't exist, it means that the app is being executed in dev mode.
IS_BUILD = False
class CopyFiles:
"""Copies assets to the app directory."""
def __init__(self) -> None:
assets_dir = "assets"
if IS_BUILD:
assets_dir = get_home_res_path("assets")
files = [
{
"src": assets_dir,
"dest": os.path.join(settings.APP_DIR, "assets"),
"is_dir": True,
}
]
for entry in files:
src = os.path.join(os.getcwd(), entry["src"])
if entry["is_dir"]:
shutil.copytree(
src,
entry["dest"],
ignore=shutil.ignore_patterns(
"*.pyc",
),
copy_function=shutil.copy2,
dirs_exist_ok=True,
)
break
shutil.copy2(src, entry["dest"])
def create_config_dir() -> None:
"""
Creates the config directory if it doesn't exist.
"""
thumb_path = os.path.join("images", "thumbnails")
small_thumb_path = os.path.join(thumb_path, "small")
large_thumb_path = os.path.join(thumb_path, "large")
artist_img_path = os.path.join("images", "artists")
small_artist_img_path = os.path.join(artist_img_path, "small")
large_artist_img_path = os.path.join(artist_img_path, "large")
playlist_img_path = os.path.join("images", "playlists")
dirs = [
"", # creates the config folder
"images",
thumb_path,
small_thumb_path,
large_thumb_path,
artist_img_path,
small_artist_img_path,
large_artist_img_path,
playlist_img_path,
]
for _dir in dirs:
path = os.path.join(settings.APP_DIR, _dir)
exists = os.path.exists(path)
if not exists:
os.makedirs(path)
os.chmod(path, 0o755)
CopyFiles()
def setup_sqlite():
"""
Create Sqlite databases and tables.
"""
# if os.path.exists(DB_PATH):
# os.remove(DB_PATH)
run_preinit_migrations()
app_db_conn = create_connection(APP_DB_PATH)
playlist_db_conn = create_connection(USERDATA_DB_PATH)
create_tables(app_db_conn, queries.CREATE_APPDB_TABLES)
create_tables(playlist_db_conn, queries.CREATE_USERDATA_TABLES)
create_tables(app_db_conn, queries.CREATE_MIGRATIONS_TABLE)
create_tables(playlist_db_conn, queries.CREATE_MIGRATIONS_TABLE)
app_db_conn.close()
playlist_db_conn.close()
apply_migrations()
set_preinit_migration_versions()
set_postinit_migration_versions()
Store.load_all_tracks()
Store.process_folders()
Store.load_albums()
Store.load_artists()
from app.setup.files import create_config_dir
from app.setup.sqlite import setup_sqlite, run_migrations
def run_setup():
create_config_dir()
setup_sqlite()
run_migrations()
Store.load_all_tracks()
Store.process_folders()
Store.load_albums()
Store.load_artists()

93
app/setup/files.py Normal file
View File

@ -0,0 +1,93 @@
"""
This module contains the functions that are used to
create the config directory and copy the assets to the app directory.
"""
import os
import shutil
from configparser import ConfigParser
from app import settings
from app.utils.filesystem import get_home_res_path
config = ConfigParser()
config_path = get_home_res_path("pyinstaller.config.ini")
config.read(config_path)
try:
IS_BUILD = config["DEFAULT"]["BUILD"] == "True"
except KeyError:
# If the key doesn't exist, it means that the app is being executed in dev mode.
IS_BUILD = False
class CopyFiles:
"""Copies assets to the app directory."""
def __init__(self) -> None:
assets_dir = "assets"
if IS_BUILD:
assets_dir = get_home_res_path("assets")
files = [
{
"src": assets_dir,
"dest": os.path.join(settings.APP_DIR, "assets"),
"is_dir": True,
}
]
for entry in files:
src = os.path.join(os.getcwd(), entry["src"])
if entry["is_dir"]:
shutil.copytree(
src,
entry["dest"],
ignore=shutil.ignore_patterns(
"*.pyc",
),
copy_function=shutil.copy2,
dirs_exist_ok=True,
)
break
shutil.copy2(src, entry["dest"])
def create_config_dir() -> None:
"""
Creates the config directory if it doesn't exist.
"""
thumb_path = os.path.join("images", "thumbnails")
small_thumb_path = os.path.join(thumb_path, "small")
large_thumb_path = os.path.join(thumb_path, "large")
artist_img_path = os.path.join("images", "artists")
small_artist_img_path = os.path.join(artist_img_path, "small")
large_artist_img_path = os.path.join(artist_img_path, "large")
playlist_img_path = os.path.join("images", "playlists")
dirs = [
"", # creates the config folder
"images",
thumb_path,
small_thumb_path,
large_thumb_path,
artist_img_path,
small_artist_img_path,
large_artist_img_path,
playlist_img_path,
]
for _dir in dirs:
path = os.path.join(settings.APP_DIR, _dir)
exists = os.path.exists(path)
if not exists:
os.makedirs(path)
os.chmod(path, 0o755)
CopyFiles()

41
app/setup/sqlite.py Normal file
View File

@ -0,0 +1,41 @@
"""
Module to setup Sqlite databases and tables.
Applies migrations.
"""
from app.db.sqlite import create_connection, create_tables, queries
from app.migrations import apply_migrations, set_postinit_migration_versions
from app.migrations.__preinit import run_preinit_migrations, set_preinit_migration_versions
from app.settings import APP_DB_PATH, USERDATA_DB_PATH
def setup_sqlite():
"""
Create Sqlite databases and tables.
"""
# if os.path.exists(DB_PATH):
# os.remove(DB_PATH)
run_preinit_migrations()
app_db_conn = create_connection(APP_DB_PATH)
playlist_db_conn = create_connection(USERDATA_DB_PATH)
create_tables(app_db_conn, queries.CREATE_APPDB_TABLES)
create_tables(playlist_db_conn, queries.CREATE_USERDATA_TABLES)
create_tables(app_db_conn, queries.CREATE_MIGRATIONS_TABLE)
create_tables(playlist_db_conn, queries.CREATE_MIGRATIONS_TABLE)
app_db_conn.close()
playlist_db_conn.close()
def run_migrations():
"""
Run migrations and updates migration version.
"""
apply_migrations()
set_preinit_migration_versions()
set_postinit_migration_versions()

View File

@ -1,8 +1,8 @@
import os
from app.utils import get_ip
from app.settings import TCOLOR, APP_VERSION, FLASKVARS, APP_DIR
from app import settings
from app.utils.network import get_ip
def log_startup_info():

View File

@ -1,364 +0,0 @@
"""
This module contains mini functions for the server.
"""
import hashlib
import os
import platform
import random
import re
import socket as Socket
import string
import threading
from datetime import datetime
from pathlib import Path
from collections import defaultdict
from operator import attrgetter
import requests
from unidecode import unidecode
from app import models
from app.settings import SUPPORTED_FILES
CWD = Path(__file__).parent.resolve()
def background(func):
"""
a threading decorator
use @background above the function you want to run in the background
"""
def background_func(*a, **kw):
threading.Thread(target=func, args=a, kwargs=kw).start()
return background_func
def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]:
"""
Scans a directory for files with a specific extension.
Returns a list of files and folders in the directory.
"""
if _dir == "":
return [], []
subfolders = []
files = []
try:
for _file in os.scandir(_dir):
if _file.is_dir() and not _file.name.startswith("."):
subfolders.append(_file.path)
if _file.is_file():
ext = os.path.splitext(_file.name)[1].lower()
if ext in SUPPORTED_FILES:
files.append(win_replace_slash(_file.path))
if full or len(files) == 0:
for _dir in list(subfolders):
sub_dirs, _file = run_fast_scandir(_dir, full=True)
subfolders.extend(sub_dirs)
files.extend(_file)
except (OSError, PermissionError, FileNotFoundError, ValueError):
return [], []
return subfolders, files
def remove_duplicates(tracks: list[models.Track]) -> list[models.Track]:
"""
Remove duplicates from a list of Track objects based on the trackhash attribute.
Retains objects with the highest bitrate.
"""
hash_to_tracks = defaultdict(list)
for track in tracks:
hash_to_tracks[track.trackhash].append(track)
tracks = []
for track_group in hash_to_tracks.values():
max_bitrate_track = max(track_group, key=attrgetter("bitrate"))
tracks.append(max_bitrate_track)
return tracks
def create_hash(*args: str, decode=False, limit=7) -> str:
"""
Creates a simple hash for an album
"""
str_ = "".join(args)
if decode:
str_ = unidecode(str_)
str_ = str_.lower().strip().replace(" ", "")
str_ = "".join(t for t in str_ if t.isalnum())
str_ = str_.encode("utf-8")
str_ = hashlib.sha256(str_).hexdigest()
return str_[-limit:]
def create_folder_hash(*args: str, limit=7) -> str:
"""
Creates a simple hash for an album
"""
strings = [s.lower().strip().replace(" ", "") for s in args]
strings = ["".join([t for t in s if t.isalnum()]) for s in strings]
strings = [s.encode("utf-8") for s in strings]
strings = [hashlib.sha256(s).hexdigest()[-limit:] for s in strings]
return "".join(strings)
def create_new_date():
"""
It creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS"
:return: A string of the current date and time.
"""
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
class UseBisection:
"""
Uses bisection to find a list of items in another list.
returns a list of found items with `None` items being not found
items.
"""
def __init__(self, source: list, search_from: str, queries: list[str]) -> None:
self.source_list = source
self.queries_list = queries
self.attr = search_from
def find(self, query: str):
left = 0
right = len(self.source_list) - 1
while left <= right:
mid = (left + right) // 2
if self.source_list[mid].__getattribute__(self.attr) == query:
return self.source_list[mid]
elif self.source_list[mid].__getattribute__(self.attr) > query:
right = mid - 1
else:
left = mid + 1
return None
def __call__(self) -> list:
if len(self.source_list) == 0:
return [None]
return [self.find(query) for query in self.queries_list]
class Ping:
"""
Checks if there is a connection to the internet by pinging google.com
"""
@staticmethod
def __call__() -> bool:
try:
requests.get("https://google.com", timeout=10)
return True
except (requests.exceptions.ConnectionError, requests.Timeout):
return False
def get_artists_from_tracks(tracks: list[models.Track]) -> set[str]:
"""
Extracts all artists from a list of tracks. Returns a list of Artists.
"""
artists = set()
master_artist_list = [[x.name for x in t.artist] for t in tracks] # type: ignore
artists = artists.union(*master_artist_list)
return artists
def get_albumartists(albums: list[models.Album]) -> set[str]:
artists = set()
for album in albums:
albumartists = [a.name for a in album.albumartists] # type: ignore
artists.update(albumartists)
return artists
def get_all_artists(
tracks: list[models.Track], albums: list[models.Album]
) -> list[models.Artist]:
artists_from_tracks = get_artists_from_tracks(tracks)
artist_from_albums = get_albumartists(albums)
artists = list(artists_from_tracks.union(artist_from_albums))
artists = sorted(artists)
lower_artists = set(a.lower().strip() for a in artists)
indices = [[ar.lower().strip() for ar in artists].index(a) for a in lower_artists]
artists = [artists[i] for i in indices]
return [models.Artist(a) for a in artists]
def bisection_search_string(strings: list[str], target: str) -> str | None:
"""
Finds a string in a list of strings using bisection search.
"""
if not strings:
return None
strings = sorted(strings)
left = 0
right = len(strings) - 1
while left <= right:
middle = (left + right) // 2
if strings[middle] == target:
return strings[middle]
if strings[middle] < target:
left = middle + 1
else:
right = middle - 1
return None
def get_home_res_path(filename: str):
"""
Returns a path to resources in the home directory of this project.
Used to resolve resources in builds.
"""
try:
return (CWD / ".." / filename).resolve()
except ValueError:
return None
def get_ip():
"""
Returns the IP address of this device.
"""
soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM)
soc.connect(("8.8.8.8", 80))
ip_address = str(soc.getsockname()[0])
soc.close()
return ip_address
def is_windows():
"""
Returns True if the OS is Windows.
"""
return platform.system() == "Windows"
def parse_feat_from_title(title: str) -> tuple[list[str], str]:
"""
Extracts featured artists from a song title using regex.
"""
regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)"
# regex for square brackets 👇
sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]"
match = re.search(regex, title, re.IGNORECASE)
if not match:
match = re.search(sqr_regex, title, re.IGNORECASE)
regex = sqr_regex
if not match:
return [], title
artists = match.group(1)
artists = split_artists(artists, with_and=True)
# remove "feat" group from title
new_title = re.sub(regex, "", title, flags=re.IGNORECASE)
return artists, new_title
def get_random_str(length=5):
"""
Generates a random string of length `length`.
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
def win_replace_slash(path: str):
if is_windows():
return path.replace("\\", "/").replace("//", "/")
return path
def split_artists(src: str, with_and: bool = False):
exp = r"\s*(?: and |&|,|;)\s*" if with_and else r"\s*[,;]\s*"
artists = re.split(exp, src)
return [a.strip() for a in artists]
def parse_artist_from_filename(title: str):
"""
Extracts artist names from a song title using regex.
"""
regex = r"^(.+?)\s*[-–—]\s*(?:.+?)$"
match = re.search(regex, title, re.IGNORECASE)
if not match:
return []
artists = match.group(1)
artists = split_artists(artists)
return artists
def parse_title_from_filename(title: str):
"""
Extracts track title from a song title using regex.
"""
regex = r"^(?:.+?)\s*[-–—]\s*(.+?)$"
match = re.search(regex, title, re.IGNORECASE)
if not match:
return title
res = match.group(1)
# remove text in brackets starting with "official" case-insensitive
res = re.sub(r"\s*\([^)]*official[^)]*\)", "", res, flags=re.IGNORECASE)
return res.strip()
def remove_prod(title: str) -> str:
"""
Removes the producer string in a track title using regex.
"""
# check if title contain title, if not return it.
if not ("prod." in title.lower()):
return title
# check if title has brackets
if re.search(r"[()\[\]]", title):
regex = r"\s?(\(|\[)prod\..*?(\)|\])\s?"
else:
regex = r"\s?\bprod\.\s*\S+"
# remove the producer string
title = re.sub(regex, "", title, flags=re.IGNORECASE)
return title.strip()

0
app/utils/__init__.py Normal file
View File

57
app/utils/bisection.py Normal file
View File

@ -0,0 +1,57 @@
class UseBisection:
"""
Uses bisection to find a list of items in another list.
returns a list of found items with `None` items being not found
items.
"""
def __init__(self, source: list, search_from: str, queries: list[str]) -> None:
self.source_list = source
self.queries_list = queries
self.attr = search_from
def find(self, query: str):
left = 0
right = len(self.source_list) - 1
while left <= right:
mid = (left + right) // 2
if self.source_list[mid].__getattribute__(self.attr) == query:
return self.source_list[mid]
elif self.source_list[mid].__getattribute__(self.attr) > query:
right = mid - 1
else:
left = mid + 1
return None
def __call__(self) -> list:
if len(self.source_list) == 0:
return [None]
return [self.find(query) for query in self.queries_list]
def bisection_search_string(strings: list[str], target: str) -> str | None:
"""
Finds a string in a list of strings using bisection search.
"""
if not strings:
return None
strings = sorted(strings)
left = 0
right = len(strings) - 1
while left <= right:
middle = (left + right) // 2
if strings[middle] == target:
return strings[middle]
if strings[middle] < target:
left = middle + 1
else:
right = middle - 1
return None

50
app/utils/filesystem.py Normal file
View File

@ -0,0 +1,50 @@
import os
from pathlib import Path
from app.settings import SUPPORTED_FILES
from app.utils.wintools import win_replace_slash
CWD = Path(__file__).parent.resolve()
def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]:
"""
Scans a directory for files with a specific extension.
Returns a list of files and folders in the directory.
"""
if _dir == "":
return [], []
subfolders = []
files = []
try:
for _file in os.scandir(_dir):
if _file.is_dir() and not _file.name.startswith("."):
subfolders.append(_file.path)
if _file.is_file():
ext = os.path.splitext(_file.name)[1].lower()
if ext in SUPPORTED_FILES:
files.append(win_replace_slash(_file.path))
if full or len(files) == 0:
for _dir in list(subfolders):
sub_dirs, _file = run_fast_scandir(_dir, full=True)
subfolders.extend(sub_dirs)
files.extend(_file)
except (OSError, PermissionError, FileNotFoundError, ValueError):
return [], []
return subfolders, files
def get_home_res_path(filename: str):
"""
Returns a path to resources in the home directory of this project.
Used to resolve resources in builds.
"""
try:
return (CWD / ".." / filename).resolve()
except ValueError:
return None

19
app/utils/generators.py Normal file
View File

@ -0,0 +1,19 @@
import string
from datetime import datetime
import random
def create_new_date():
"""
It creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS"
:return: A string of the current date and time.
"""
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
def get_random_str(length=5):
"""
Generates a random string of length `length`.
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=length))

31
app/utils/hashing.py Normal file
View File

@ -0,0 +1,31 @@
import hashlib
from unidecode import unidecode
def create_hash(*args: str, decode=False, limit=7) -> str:
"""
Creates a simple hash for an album
"""
str_ = "".join(args)
if decode:
str_ = unidecode(str_)
str_ = str_.lower().strip().replace(" ", "")
str_ = "".join(t for t in str_ if t.isalnum())
str_ = str_.encode("utf-8")
str_ = hashlib.sha256(str_).hexdigest()
return str_[-limit:]
def create_folder_hash(*args: str, limit=7) -> str:
"""
Creates a simple hash for an album
"""
strings = [s.lower().strip().replace(" ", "") for s in args]
strings = ["".join([t for t in s if t.isalnum()]) for s in strings]
strings = [s.encode("utf-8") for s in strings]
strings = [hashlib.sha256(s).hexdigest()[-limit:] for s in strings]
return "".join(strings)

28
app/utils/network.py Normal file
View File

@ -0,0 +1,28 @@
import requests
import socket as Socket
class Ping:
"""
Checks if there is a connection to the internet by pinging google.com
"""
@staticmethod
def __call__() -> bool:
try:
requests.get("https://google.com", timeout=10)
return True
except (requests.exceptions.ConnectionError, requests.Timeout):
return False
def get_ip():
"""
Returns the IP address of this device.
"""
soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM)
soc.connect(("8.8.8.8", 80))
ip_address = str(soc.getsockname()[0])
soc.close()
return ip_address

86
app/utils/parsers.py Normal file
View File

@ -0,0 +1,86 @@
import re
def split_artists(src: str, with_and: bool = False):
exp = r"\s*(?: and |&|,|;)\s*" if with_and else r"\s*[,;]\s*"
artists = re.split(exp, src)
return [a.strip() for a in artists]
def parse_artist_from_filename(title: str):
"""
Extracts artist names from a song title using regex.
"""
regex = r"^(.+?)\s*[-–—]\s*(?:.+?)$"
match = re.search(regex, title, re.IGNORECASE)
if not match:
return []
artists = match.group(1)
artists = split_artists(artists)
return artists
def parse_title_from_filename(title: str):
"""
Extracts track title from a song title using regex.
"""
regex = r"^(?:.+?)\s*[-–—]\s*(.+?)$"
match = re.search(regex, title, re.IGNORECASE)
if not match:
return title
res = match.group(1)
# remove text in brackets starting with "official" case-insensitive
res = re.sub(r"\s*\([^)]*official[^)]*\)", "", res, flags=re.IGNORECASE)
return res.strip()
def remove_prod(title: str) -> str:
"""
Removes the producer string in a track title using regex.
"""
# check if title contain title, if not return it.
if not ("prod." in title.lower()):
return title
# check if title has brackets
if re.search(r"[()\[\]]", title):
regex = r"\s?(\(|\[)prod\..*?(\)|\])\s?"
else:
regex = r"\s?\bprod\.\s*\S+"
# remove the producer string
title = re.sub(regex, "", title, flags=re.IGNORECASE)
return title.strip()
def parse_feat_from_title(title: str) -> tuple[list[str], str]:
"""
Extracts featured artists from a song title using regex.
"""
regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)"
# regex for square brackets 👇
sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]"
match = re.search(regex, title, re.IGNORECASE)
if not match:
match = re.search(sqr_regex, title, re.IGNORECASE)
regex = sqr_regex
if not match:
return [], title
artists = match.group(1)
artists = split_artists(artists, with_and=True)
# remove "feat" group from title
new_title = re.sub(regex, "", title, flags=re.IGNORECASE)
return artists, new_title

View File

@ -0,0 +1,23 @@
from collections import defaultdict
from operator import attrgetter
from app.models import Track
def remove_duplicates(tracks: list[Track]) -> list[Track]:
"""
Remove duplicates from a list of Track objects based on the trackhash attribute.
Retains objects with the highest bitrate.
"""
hash_to_tracks = defaultdict(list)
for track in tracks:
hash_to_tracks[track.trackhash].append(track)
tracks = []
for track_group in hash_to_tracks.values():
max_bitrate_track = max(track_group, key=attrgetter("bitrate"))
tracks.append(max_bitrate_track)
return tracks

13
app/utils/threading.py Normal file
View File

@ -0,0 +1,13 @@
import threading
def background(func):
"""
a threading decorator
use @background above the function you want to run in the background
"""
def background_func(*a, **kw):
threading.Thread(target=func, args=a, kwargs=kw).start()
return background_func

16
app/utils/wintools.py Normal file
View File

@ -0,0 +1,16 @@
import platform
# TODO: Check is_windows on app start in settings.py
def is_windows():
"""
Returns True if the OS is Windows.
"""
return platform.system() == "Windows"
def win_replace_slash(path: str):
if is_windows():
return path.replace("\\", "/").replace("//", "/")
return path

View File

@ -10,7 +10,8 @@ from app.lib.watchdogg import Watcher as WatchDog
from app.settings import FLASKVARS
from app.setup import run_setup
from app.start_info_logger import log_startup_info
from app.utils import background, get_home_res_path
from app.utils.filesystem import get_home_res_path
from app.utils.threading import background
werkzeug = logging.getLogger("werkzeug")
werkzeug.setLevel(logging.ERROR)

View File

@ -1,5 +1,5 @@
from hypothesis import given
from app.utils import parse_feat_from_title
# from hypothesis import given
from app.utils.parsers import parse_feat_from_title
def test_extract_featured_artists_from_title():