diff --git a/app/api/__init__.py b/app/api/__init__.py index 9aa5dc9..2474e46 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -6,6 +6,7 @@ from flask import Flask from flask_compress import Compress from flask_cors import CORS +from .plugins import lyrics as lyrics_plugin from app.api import ( album, artist, @@ -17,7 +18,8 @@ from app.api import ( search, send_file, settings, - lyrics + lyrics, + plugins, ) @@ -46,4 +48,8 @@ def create_api(): app.register_blueprint(colors.api) app.register_blueprint(lyrics.api) + # Plugins + app.register_blueprint(plugins.api) + app.register_blueprint(lyrics_plugin.api) + return app diff --git a/app/api/lyrics.py b/app/api/lyrics.py index c7d6147..7253279 100644 --- a/app/api/lyrics.py +++ b/app/api/lyrics.py @@ -56,3 +56,4 @@ def check_lyrics(): exists = get_lyrics_from_tags(filepath, just_check=True) return {"exists": exists}, 200 + diff --git a/app/api/plugins/__init__.py b/app/api/plugins/__init__.py new file mode 100644 index 0000000..c8f3912 --- /dev/null +++ b/app/api/plugins/__init__.py @@ -0,0 +1,26 @@ +from flask import Blueprint, request + +from app.db.sqlite.plugins import PluginsMethods + + +api = Blueprint("plugins", __name__, url_prefix="/plugins") + + +@api.route("/", methods=["GET"]) +def get_all_plugins(): + plugins = PluginsMethods.get_all_plugins() + + return {"plugins": plugins} + + +@api.route("/setactive", methods=["GET"]) +def activate_deactivate_plugin(): + name = request.args.get("plugin", None) + state = request.args.get("state", None) + + if not name or not state: + return {"error": "Missing plugin or state"}, 400 + + PluginsMethods.plugin_set_active(name, int(state)) + + return {"message": "OK"}, 200 diff --git a/app/api/plugins/lyrics.py b/app/api/plugins/lyrics.py new file mode 100644 index 0000000..e1a2df6 --- /dev/null +++ b/app/api/plugins/lyrics.py @@ -0,0 +1,39 @@ +from flask import Blueprint, request +from app.plugins.lyrics import Lyrics +from app.utils.hashing import create_hash + +api = Blueprint("lyricsplugin", __name__, url_prefix="/plugins/lyrics") + + +@api.route("/search", methods=["POST"]) +def search_lyrics(): + data = request.get_json() + + title = data.get("title", "") + artist = data.get("artist", "") + album = data.get("album", "") + filepath = data.get("filepath", None) + + finder = Lyrics() + + data = finder.search_lyrics_by_title_and_artist(title, artist) + + if not data: + return {"downloaded": False, "all": []}, 404 + + perfect_match = data[0] + + for track in data: + i_title = track["title"] + i_album = track["album"] + + if create_hash(i_title) == create_hash(title) and create_hash( + i_album + ) == create_hash(album): + perfect_match = track + break + + track_id = perfect_match["track_id"] + downloaded = finder.download_lyrics_to_path_by_id(track_id, filepath) + + return {"downloaded": downloaded, "all": data}, 200 diff --git a/app/configs.py b/app/configs.py index a021ec7..f085562 100644 --- a/app/configs.py +++ b/app/configs.py @@ -1,2 +1,4 @@ -LASTFM_API_KEY = '' -POSTHOG_API_KEY = '' +LASTFM_API_KEY = "" +POSTHOG_API_KEY = "" +PLUGIN_LYRICS_AUTHORITY = "" +PLUGIN_LYRICS_ROOT_URL = "" diff --git a/app/db/sqlite/__init__.py b/app/db/sqlite/__init__.py index 9f65c16..43bd79c 100644 --- a/app/db/sqlite/__init__.py +++ b/app/db/sqlite/__init__.py @@ -3,7 +3,6 @@ This module contains the functions to interact with the SQLite database. """ import sqlite3 -from pathlib import Path from sqlite3 import Connection as SqlConn @@ -19,19 +18,4 @@ def create_tables(conn: SqlConn, sql_query: str): """ Executes the specifiend SQL file to create database tables. """ - # with open(sql_query, "r", encoding="utf-8") as sql_file: conn.executescript(sql_query) - - -def setup_search_db(): - """ - Creates the search database. - """ - db = sqlite3.connect(":memory:") - sql_file = "queries/fts5.sql" - - current_path = Path(__file__).parent.resolve() - sql_path = current_path.joinpath(sql_file) - - with open(sql_path, "r", encoding="utf-8") as sql_file: - db.executescript(sql_file.read()) diff --git a/app/db/sqlite/plugins/__init__.py b/app/db/sqlite/plugins/__init__.py new file mode 100644 index 0000000..be764c2 --- /dev/null +++ b/app/db/sqlite/plugins/__init__.py @@ -0,0 +1,91 @@ +import json + +from app.models.plugins import Plugin +from ..utils import SQLiteManager + + +def plugin_tuple_to_obj(plugin_tuple: tuple) -> Plugin: + return Plugin( + name=plugin_tuple[1], + description=plugin_tuple[2], + active=bool(plugin_tuple[3]), + settings=json.loads(plugin_tuple[4]), + ) + + +class PluginsMethods: + @classmethod + def insert_plugin(cls, plugin: Plugin): + """ + Inserts one plugin into the database + """ + + sql = """INSERT OR IGNORE INTO plugins( + name, + description, + active, + settings + ) VALUES(?,?,?,?) + """ + + with SQLiteManager(userdata_db=True) as cur: + cur.execute( + sql, + ( + plugin.name, + plugin.description, + int(plugin.active), + json.dumps(plugin.settings), + ), + ) + lastrowid = cur.lastrowid + + return lastrowid + + @classmethod + def insert_lyrics_plugin(cls): + plugin = Plugin( + name="lyrics_finder", + description="Find lyrics from the internet", + active=False, + settings={}, + ) + cls.insert_plugin(plugin) + + @classmethod + def get_all_plugins(cls): + with SQLiteManager(userdata_db=True) as cur: + cur.execute("SELECT * FROM plugins") + plugins = cur.fetchall() + cur.close() + + if plugins is not None: + return [plugin_tuple_to_obj(plugin) for plugin in plugins] + + return [] + + @classmethod + def plugin_set_active(cls, name: str, state: int): + with SQLiteManager(userdata_db=True) as cur: + cur.execute("UPDATE plugins SET active=? WHERE name=?", (state, name)) + cur.close() + + def update_plugin_settings(self, plugin: Plugin): + with SQLiteManager(userdata_db=True) as cur: + cur.execute( + "UPDATE plugins SET settings=? WHERE name=?", + (json.dumps(plugin.settings), plugin.name), + ) + cur.close() + + @classmethod + def get_plugin_by_name(cls, name: str): + with SQLiteManager(userdata_db=True) as cur: + cur.execute("SELECT * FROM plugins WHERE name=?", (name,)) + plugin = cur.fetchone() + cur.close() + + if plugin is not None: + return plugin_tuple_to_obj(plugin) + + return None diff --git a/app/db/sqlite/queries.py b/app/db/sqlite/queries.py index 317f11a..591af61 100644 --- a/app/db/sqlite/queries.py +++ b/app/db/sqlite/queries.py @@ -41,6 +41,14 @@ CREATE TABLE IF NOT EXISTS lastfm_similar_artists ( similar_artists text NOT NULL, UNIQUE (artisthash) ); + +CREATE TABLE IF NOT EXISTS plugins ( + id integer PRIMARY KEY, + name text NOT NULL, + description text NOT NULL, + active integer NOT NULL DEFAULT 0, + settings text +) """ CREATE_APPDB_TABLES = """ diff --git a/app/models/plugins.py b/app/models/plugins.py new file mode 100644 index 0000000..d3b55f8 --- /dev/null +++ b/app/models/plugins.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + + +@dataclass +class Plugin: + name: str + description: str + active: bool + settings: dict + diff --git a/app/plugins/__init__.py b/app/plugins/__init__.py new file mode 100644 index 0000000..70a03d9 --- /dev/null +++ b/app/plugins/__init__.py @@ -0,0 +1,30 @@ +class Plugin: + """ + Class that all plugins should inherit from + """ + + def __init__(self, name: str, description: str) -> None: + self.enabled = False + self.name = name + self.description = description + + def set_active(self, state: bool): + self.enabled = state + + +def plugin_method(func): + """ + A decorator that prevents execution if the plugin is disabled. + Should be used on all plugin methods + """ + + def wrapper(*args, **kwargs): + plugin: Plugin = args[0] + + if plugin.enabled: + return func(*args, **kwargs) + else: + return + + return wrapper + diff --git a/app/plugins/lyrics.py b/app/plugins/lyrics.py new file mode 100644 index 0000000..aaafa35 --- /dev/null +++ b/app/plugins/lyrics.py @@ -0,0 +1,196 @@ +import json +import os +import time +from pathlib import Path +from typing import List, Optional + +import requests + +from app.db.sqlite.plugins import PluginsMethods +from app.plugins import Plugin, plugin_method +from app.settings import Keys, Paths + +# from .base import LRCProvider +# from ..utils import get_best_match + + +class LRCProvider: + """ + Base class for all of the synced (LRC format) lyrics providers. + """ + + session = requests.Session() + + def __init__(self) -> None: + self.session.headers.update( + { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36" + } + ) + + def get_lrc_by_id(self, track_id: str) -> Optional[str]: + """ + Returns the synced lyrics of the song in lrc. + + ### Arguments + - track_id: The ID of the track defined in the provider database. e.g. Spotify/Deezer track ID + """ + raise NotImplementedError + + def get_lrc(self, search_term: str) -> Optional[str]: + """ + Returns the synced lyrics of the song in lrc. + """ + raise NotImplementedError + + +class LyricsProvider(LRCProvider): + """ + Musixmatch provider class + """ + + ROOT_URL = Keys.PLUGIN_LYRICS_ROOT_URL + + def __init__(self) -> None: + super().__init__() + self.token = None + self.session.headers.update( + { + "authority": Keys.PLUGIN_LYRICS_AUTHORITY, + "cookie": "AWSELBCORS=0; AWSELB=0", + } + ) + + def _get(self, action: str, query: List[tuple]): + if action != "token.get" and self.token is None: + self._get_token() + + query.append(("app_id", "web-desktop-app-v1.0")) + if self.token is not None: + query.append(("usertoken", self.token)) + + t = str(int(time.time() * 1000)) + query.append(("t", t)) + url = self.ROOT_URL + action + + try: + response = self.session.get(url, params=query) + except requests.exceptions.ConnectionError: + return None + + return response + + def _get_token(self): + # Check if token is cached and not expired + plugin_path = Paths.get_lyrics_plugins_path() + token_path = os.path.join(plugin_path, "token.json") + + current_time = int(time.time()) + + if os.path.exists(token_path): + with open(token_path, "r", encoding="utf-8") as token_file: + cached_token_data: dict = json.load(token_file) + + cached_token = cached_token_data.get("token") + expiration_time = cached_token_data.get("expiration_time") + + if cached_token and expiration_time and current_time < expiration_time: + self.token = cached_token + return + + # Token not cached or expired, fetch a new token + d = self._get("token.get", [("user_language", "en")]).json() + if d["message"]["header"]["status_code"] == 401: + time.sleep(10) + return self._get_token() + + new_token = d["message"]["body"]["user_token"] + expiration_time = current_time + 600 # 10 minutes expiration + + # Cache the new token + self.token = new_token + token_data = {"token": new_token, "expiration_time": expiration_time} + + os.makedirs(plugin_path, exist_ok=True) + with open(token_path, "w", encoding="utf-8") as token_file: + json.dump(token_data, token_file) + + def get_lrc_by_id(self, track_id: str) -> Optional[str]: + res = self._get( + "track.subtitle.get", [("track_id", track_id), ("subtitle_format", "lrc")] + ) + + if not res.ok: + return None + + res = res.json() + body = res["message"]["body"] + + if not body: + return None + + return body["subtitle"]["subtitle_body"] + + def get_lrc(self, title: str, artist: str) -> Optional[str]: + r = self._get( + "track.search", + [ + ("q_track_artist", f"{title} {artist}"), + ("q_track", title), + ("q_artist", artist), + ("page_size", "5"), + ("page", "1"), + ("f_has_lyrics", "1"), + ("s_track_rating", "desc"), + ("quorum_factor", "1.0"), + ], + ) + + body = r.json()["message"]["body"] + tracks = body["track_list"] + + return [ + { + "track_id": t["track"]["track_id"], + "title": t["track"]["track_name"], + "artist": t["track"]["artist_name"], + "album": t["track"]["album_name"], + "image": t["track"]["album_coverart_100x100"], + } + for t in tracks + ] + + +class Lyrics(Plugin): + def __init__(self) -> None: + plugin = PluginsMethods.get_plugin_by_name("lyrics_finder") + + if not plugin: + return + + name = plugin.name + description = plugin.description + + super().__init__(name, description) + + self.provider = LyricsProvider() + + if plugin: + self.set_active(bool(int(plugin.active))) + + @plugin_method + def search_lyrics_by_title_and_artist(self, title: str, artist: str): + return self.provider.get_lrc(title, artist) + + @plugin_method + def download_lyrics_to_path_by_id(self, trackid: str, path: str): + lrc = self.provider.get_lrc_by_id(trackid) + + path = Path(path).with_suffix(".lrc") + + if not lrc or lrc.replace("\n", "").strip() == "": + return False + with open(path, "w", encoding="utf-8") as f: + f.write(lrc) + + return True diff --git a/app/plugins/register.py b/app/plugins/register.py new file mode 100644 index 0000000..7ace226 --- /dev/null +++ b/app/plugins/register.py @@ -0,0 +1,5 @@ +from app.db.sqlite.plugins import PluginsMethods + + +def register_plugins(): + PluginsMethods.insert_lyrics_plugin() diff --git a/app/settings.py b/app/settings.py index 228c250..600944c 100644 --- a/app/settings.py +++ b/app/settings.py @@ -83,6 +83,14 @@ class Paths: def get_assets_path(cls): return join(Paths.get_app_dir(), "assets") + @classmethod + def get_plugins_path(cls): + return join(Paths.get_app_dir(), "plugins") + + @classmethod + def get_lyrics_plugins_path(cls): + return join(Paths.get_plugins_path(), "lyrics") + # defaults class Defaults: @@ -233,6 +241,8 @@ class Keys: # get last fm api key from os environment LASTFM_API = os.environ.get("LASTFM_API_KEY") POSTHOG_API_KEY = os.environ.get("POSTHOG_API_KEY") + PLUGIN_LYRICS_AUTHORITY = os.environ.get("PLUGIN_LYRICS_AUTHORITY") + PLUGIN_LYRICS_ROOT_URL = os.environ.get("PLUGIN_LYRICS_ROOT_URL") @classmethod def load(cls): diff --git a/app/setup/files.py b/app/setup/files.py index 2399c2b..9686121 100644 --- a/app/setup/files.py +++ b/app/setup/files.py @@ -65,6 +65,8 @@ def create_config_dir() -> None: dirs = [ "", # creates the config folder "images", + "plugins", + "plugins/lyrics", thumb_path, small_thumb_path, large_thumb_path, diff --git a/manage.py b/manage.py index b2c9af4..a5cffa4 100644 --- a/manage.py +++ b/manage.py @@ -18,6 +18,7 @@ from app.setup import run_setup from app.start_info_logger import log_startup_info from app.utils.filesystem import get_home_res_path from app.utils.threading import background +from app.plugins.register import register_plugins mimetypes.add_type("text/css", ".css") @@ -90,6 +91,8 @@ def run_swingmusic(): HandleArgs() log_startup_info() bg_run_setup() + register_plugins() + start_watchdog() init_telemetry()