Release v1.4.0 #157

Lyrics Support
This commit is contained in:
Mungai Njoroge 2023-11-14 03:36:00 -08:00 committed by GitHub
commit 52173d4c7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 824 additions and 232 deletions

25
.github/changelog.md vendored
View File

@ -1,10 +1,23 @@
`v1.3.1` is a patch release to fix issue #149
This version adds a few new features and minor bug fixes.
# Bug fix
`ValueError: Decompressed Data Too Large` error when processing images with large album covers.
# What's new?
---
1. Synced lyrics support #126
2. Lyrics finder plugin (experimental)
3. Context option to search artist or album on streaming platforms
_See [changelog for `v1.3.0`](https://github.com/swing-opensource/swingmusic/releases/tag/v1.3.0) for all main changes since `v1.2.0`._
### Lyrics support
Have fun!
You can now sing along your music with the new lyrics feature. Click the lyrics button in the bottom bar to view lyrics.
### Lyrics finder plugin
This experimental will find synced lyrics for you. Go to `settings > plugins` to start using it. When lyrics are found, they will be saved to a lrc file in the same directory as the playing track.
# Bug fixes
1. Blank page on safari #155
2. Telemetry has been removed #153
3. Seeking before playing will maintain the position when playback starts
4. A forgotten few
_PS: I also attempted to add a cool fullscreen standby view, but I couldn't animate the images when going to the next/prev track, so I ditched it_

View File

@ -72,8 +72,9 @@ jobs:
run: |
python -m poetry run python manage.py --build
env:
POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }}
LASTFM_API_KEY: ${{ secrets.LASTFM_API_KEY }}
PLUGIN_LYRICS_AUTHORITY: ${{ secrets.PLUGIN_LYRICS_AUTHORITY }}
PLUGIN_LYRICS_ROOT_URL: ${{ secrets.PLUGIN_LYRICS_ROOT_URL }}
- name: Verify Linux build success
if: matrix.os == 'ubuntu-20.04'
run: |
@ -102,7 +103,7 @@ jobs:
name: win32
path: dist/swingmusic.exe
retention-days: 1
release:
name: Create New Release
runs-on: ubuntu-latest

View File

@ -3,7 +3,7 @@
</div>
<div align="center" style="font-size: 2rem"><b>Swing Music</b></div>
<div align="center"><b><sub><code>v1.3.1</code></sub></b></div>
<div align="center"><b><sub><code>v1.4.0</code></sub></b></div>
**<div align="center" style="padding-top: 1.25rem">[Download](https://swingmusic.vercel.app/downloads) • <a href="https://swingmusic.vercel.app/support-us.html" target="_blank">Support Development</a> • [Browse Docs](https://swingmusic.vercel.app/guide/introduction.html) • [Screenshots](https://swingmusic.vercel.app)</div>**

View File

@ -6,6 +6,7 @@ from flask import Flask
from flask_compress import Compress
from flask_cors import CORS
from .plugins import lyrics as lyrics_plugin
from app.api import (
album,
artist,
@ -17,6 +18,8 @@ from app.api import (
search,
send_file,
settings,
lyrics,
plugins,
)
@ -43,5 +46,10 @@ def create_api():
app.register_blueprint(imgserver.api)
app.register_blueprint(settings.api)
app.register_blueprint(colors.api)
app.register_blueprint(lyrics.api)
# Plugins
app.register_blueprint(plugins.api)
app.register_blueprint(lyrics_plugin.api)
return app

View File

@ -44,7 +44,7 @@ def get_album_tracks_and_info():
album = AlbumStore.get_album_by_hash(albumhash)
if album is None:
return error_msg, 204
return error_msg, 404
tracks = TrackStore.get_tracks_by_albumhash(albumhash)
@ -52,7 +52,7 @@ def get_album_tracks_and_info():
return error_msg, 404
if len(tracks) == 0:
return error_msg, 204
return error_msg, 404
def get_album_genres(tracks: list[Track]):
genres = set()

View File

@ -1,8 +1,8 @@
"""
Contains all the artist(s) routes.
"""
import random
import math
import random
from datetime import datetime
from flask import Blueprint, request
@ -15,29 +15,15 @@ from app.serializers.track import serialize_tracks
from app.store.albums import AlbumStore
from app.store.artists import ArtistStore
from app.store.tracks import TrackStore
from app.telemetry import Telemetry
from app.utils.threading import background
api = Blueprint("artist", __name__, url_prefix="/")
ARTIST_VISIT_COUNT = 0
@background
def send_event():
global ARTIST_VISIT_COUNT
ARTIST_VISIT_COUNT += 1
if ARTIST_VISIT_COUNT % 5 == 0:
Telemetry.send_artist_visited()
@api.route("/artist/<artisthash>", methods=["GET"])
def get_artist(artisthash: str):
"""
Get artist data.
"""
send_event()
limit = request.args.get("limit")
if limit is None:
@ -211,7 +197,6 @@ def get_similar_artists(artisthash: str):
if artist is None:
return {"error": "Artist not found"}, 404
# result = LastFMStore.get_similar_artists_for(artist.artisthash)
result = fmdb.get_similar_artists_for(artist.artisthash)
if result is None:

View File

@ -68,14 +68,24 @@ def get_all_drives(is_win: bool = False):
"""
Returns a list of all the drives on a Windows machine.
"""
drives = psutil.disk_partitions()
drives = psutil.disk_partitions(all=True)
drives = [d.mountpoint for d in drives]
if is_win:
drives = [win_replace_slash(d) for d in drives]
else:
remove = ["/boot", "/boot/efi", "/tmp"]
drives = [d for d in drives if d not in remove]
remove = (
"/boot",
"/tmp",
"/snap",
"/var",
"/sys",
"/proc",
"/etc",
"/run",
"/dev",
)
drives = [d for d in drives if not d.startswith(remove)]
return drives
@ -94,8 +104,6 @@ def list_folders():
req_dir = "$root"
if req_dir == "$root":
# req_dir = settings.USER_HOME_DIR
# if is_win:
return {
"folders": [{"name": d, "path": d} for d in get_all_drives(is_win=is_win)]
}

59
app/api/lyrics.py Normal file
View File

@ -0,0 +1,59 @@
from flask import Blueprint, request
from app.lib.lyrics import (
get_lyrics,
check_lyrics_file,
get_lyrics_from_duplicates,
get_lyrics_from_tags,
)
api = Blueprint("lyrics", __name__, url_prefix="")
@api.route("/lyrics", methods=["POST"])
def send_lyrics():
"""
Returns the lyrics for a track
"""
data = request.get_json()
filepath = data.get("filepath", None)
trackhash = data.get("trackhash", None)
if filepath is None or trackhash is None:
return {"error": "No filepath or trackhash provided"}, 400
is_synced = True
lyrics, copyright = get_lyrics(filepath)
if not lyrics:
lyrics, copyright = get_lyrics_from_duplicates(trackhash, filepath)
if not lyrics:
lyrics, is_synced, copyright = get_lyrics_from_tags(filepath)
if not lyrics:
return {"error": "No lyrics found"}
return {"lyrics": lyrics, "synced": is_synced, "copyright": copyright}, 200
@api.route("/lyrics/check", methods=["POST"])
def check_lyrics():
data = request.get_json()
filepath = data.get("filepath", None)
trackhash = data.get("trackhash", None)
if filepath is None or trackhash is None:
return {"error": "No filepath or trackhash provided"}, 400
exists = check_lyrics_file(filepath, trackhash)
if exists:
return {"exists": exists}, 200
exists = get_lyrics_from_tags(filepath, just_check=True)
return {"exists": exists}, 200

View File

@ -0,0 +1,42 @@
from flask import Blueprint, request
from app.db.sqlite.plugins import PluginsMethods
api = Blueprint("plugins", __name__, url_prefix="/plugins")
@api.route("/", methods=["GET"])
def get_all_plugins():
plugins = PluginsMethods.get_all_plugins()
return {"plugins": plugins}
@api.route("/setactive", methods=["GET"])
def activate_deactivate_plugin():
name = request.args.get("plugin", None)
state = request.args.get("state", None)
if not name or not state:
return {"error": "Missing plugin or state"}, 400
PluginsMethods.plugin_set_active(name, int(state))
return {"message": "OK"}, 200
@api.route("/settings", methods=["POST"])
def update_plugin_settings():
data = request.get_json()
plugin = data.get("plugin", None)
settings = data.get("settings", None)
if not plugin or not settings:
return {"error": "Missing plugin or settings"}, 400
PluginsMethods.update_plugin_settings(plugin_name=plugin, settings=settings)
plugin = PluginsMethods.get_plugin_by_name(plugin)
return {"status": "success", "settings": plugin.settings}

47
app/api/plugins/lyrics.py Normal file
View File

@ -0,0 +1,47 @@
from flask import Blueprint, request
from app.lib.lyrics import format_synced_lyrics
from app.plugins.lyrics import Lyrics
from app.utils.hashing import create_hash
api = Blueprint("lyricsplugin", __name__, url_prefix="/plugins/lyrics")
@api.route("/search", methods=["POST"])
def search_lyrics():
data = request.get_json()
trackhash = data.get("trackhash", "")
title = data.get("title", "")
artist = data.get("artist", "")
album = data.get("album", "")
filepath = data.get("filepath", None)
finder = Lyrics()
data = finder.search_lyrics_by_title_and_artist(title, artist)
if not data:
return {"trackhash": trackhash, "lyrics": None}
perfect_match = data[0]
for track in data:
i_title = track["title"]
i_album = track["album"]
if create_hash(i_title) == create_hash(title) and create_hash(
i_album
) == create_hash(album):
perfect_match = track
track_id = perfect_match["track_id"]
lrc = finder.download_lyrics(track_id, filepath)
if lrc is not None:
lines = lrc.split("\n")
lyrics = format_synced_lyrics(lines)
return {"trackhash": trackhash, "lyrics": lyrics}, 200
return {"trackhash": trackhash, "lyrics": lrc}, 200

View File

@ -1,5 +1,6 @@
from flask import Blueprint, request
from app.db.sqlite.plugins import PluginsMethods as pdb
from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.lib import populate
from app.lib.watchdogg import Watcher as WatchDog
@ -11,7 +12,7 @@ from app.store.tracks import TrackStore
from app.utils.generators import get_random_str
from app.utils.threading import background
api = Blueprint("settings", __name__, url_prefix="/")
api = Blueprint("settings", __name__, url_prefix="")
def get_child_dirs(parent: str, children: list[str]):
@ -160,6 +161,7 @@ def get_all_settings():
"""
settings = sdb.get_all_settings()
plugins = pdb.get_all_plugins()
key_list = list(mapp.keys())
s = {}
@ -180,6 +182,7 @@ def get_all_settings():
root_dirs = sdb.get_root_dirs()
s["root_dirs"] = root_dirs
s['plugins'] = plugins
return {
"settings": s,

View File

@ -43,24 +43,27 @@ class HandleArgs:
print("https://www.youtube.com/watch?v=wZv62ShoStY")
sys.exit(0)
lastfm_key = settings.Keys.LASTFM_API
posthog_key = settings.Keys.POSTHOG_API_KEY
config_keys = [
"LASTFM_API_KEY",
"PLUGIN_LYRICS_AUTHORITY",
"PLUGIN_LYRICS_ROOT_URL",
]
if not lastfm_key:
log.error("ERROR: LASTFM_API_KEY not set in environment")
sys.exit(0)
lines = []
if not posthog_key:
log.error("ERROR: POSTHOG_API_KEY not set in environment")
sys.exit(0)
for key in config_keys:
value = settings.Keys.get(key)
if not value:
log.error(f"ERROR: {key} not set in environment")
sys.exit(0)
lines.append(f'{key} = "{value}"\n')
try:
with open("./app/configs.py", "w", encoding="utf-8") as file:
# copy the api keys to the config file
line1 = f'LASTFM_API_KEY = "{lastfm_key}"\n'
line2 = f'POSTHOG_API_KEY = "{posthog_key}"\n'
file.write(line1)
file.write(line2)
file.writelines(lines)
_s = ";" if is_windows() else ":"
@ -80,10 +83,8 @@ class HandleArgs:
finally:
# revert and remove the api keys for dev mode
with open("./app/configs.py", "w", encoding="utf-8") as file:
line1 = "LASTFM_API_KEY = ''\n"
line2 = "POSTHOG_API_KEY = ''\n"
file.write(line1)
file.write(line2)
lines = [f'{key} = ""\n' for key in config_keys]
file.writelines(lines)
sys.exit(0)

View File

@ -1,2 +1,3 @@
LASTFM_API_KEY = ''
POSTHOG_API_KEY = ''
LASTFM_API_KEY = ""
PLUGIN_LYRICS_AUTHORITY = ""
PLUGIN_LYRICS_ROOT_URL = ""

View File

@ -3,7 +3,6 @@ This module contains the functions to interact with the SQLite database.
"""
import sqlite3
from pathlib import Path
from sqlite3 import Connection as SqlConn
@ -19,19 +18,4 @@ def create_tables(conn: SqlConn, sql_query: str):
"""
Executes the specifiend SQL file to create database tables.
"""
# with open(sql_query, "r", encoding="utf-8") as sql_file:
conn.executescript(sql_query)
def setup_search_db():
"""
Creates the search database.
"""
db = sqlite3.connect(":memory:")
sql_file = "queries/fts5.sql"
current_path = Path(__file__).parent.resolve()
sql_path = current_path.joinpath(sql_file)
with open(sql_path, "r", encoding="utf-8") as sql_file:
db.executescript(sql_file.read())

View File

@ -0,0 +1,93 @@
import json
from app.models.plugins import Plugin
from ..utils import SQLiteManager
def plugin_tuple_to_obj(plugin_tuple: tuple) -> Plugin:
return Plugin(
name=plugin_tuple[1],
description=plugin_tuple[2],
active=bool(plugin_tuple[3]),
settings=json.loads(plugin_tuple[4]),
)
class PluginsMethods:
@classmethod
def insert_plugin(cls, plugin: Plugin):
"""
Inserts one plugin into the database
"""
sql = """INSERT OR IGNORE INTO plugins(
name,
description,
active,
settings
) VALUES(?,?,?,?)
"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(
sql,
(
plugin.name,
plugin.description,
int(plugin.active),
json.dumps(plugin.settings),
),
)
lastrowid = cur.lastrowid
return lastrowid
@classmethod
def insert_lyrics_plugin(cls):
plugin = Plugin(
name="lyrics_finder",
description="Find lyrics from the internet",
active=False,
settings={"auto_download": False},
)
cls.insert_plugin(plugin)
@classmethod
def get_all_plugins(cls):
with SQLiteManager(userdata_db=True) as cur:
cur.execute("SELECT * FROM plugins")
plugins = cur.fetchall()
cur.close()
if plugins is not None:
return [plugin_tuple_to_obj(plugin) for plugin in plugins]
return []
@classmethod
def plugin_set_active(cls, name: str, state: int):
with SQLiteManager(userdata_db=True) as cur:
cur.execute("UPDATE plugins SET active=? WHERE name=?", (state, name))
cur.close()
@classmethod
def update_plugin_settings(cls, plugin_name: str, settings: dict):
with SQLiteManager(userdata_db=True) as cur:
cur.execute(
"UPDATE plugins SET settings=? WHERE name=?",
(json.dumps(settings), plugin_name),
)
cur.close()
@classmethod
def get_plugin_by_name(cls, name: str):
with SQLiteManager(userdata_db=True) as cur:
cur.execute("SELECT * FROM plugins WHERE name=?", (name,))
plugin = cur.fetchone()
cur.close()
if plugin is not None:
return plugin_tuple_to_obj(plugin)
return None

View File

@ -41,6 +41,14 @@ CREATE TABLE IF NOT EXISTS lastfm_similar_artists (
similar_artists text NOT NULL,
UNIQUE (artisthash)
);
CREATE TABLE IF NOT EXISTS plugins (
id integer PRIMARY KEY,
name text NOT NULL UNIQUE,
description text NOT NULL,
active integer NOT NULL DEFAULT 0,
settings text
)
"""
CREATE_APPDB_TABLES = """

View File

@ -3,7 +3,7 @@ from enum import Enum
class AlbumVersionEnum(Enum):
"""
Enum for album versions.
Enum that registers supported album versions.
"""
Explicit = ("explicit",)
@ -40,7 +40,7 @@ class AlbumVersionEnum(Enum):
BONUS_EDITION = ("bonus",)
BONUS_TRACK = ("bonus track",)
ORIGINAL = ("original",)
ORIGINAL = ("original", "og")
INTL_VERSION = ("international",)
UK_VERSION = ("uk version",)
US_VERSION = ("us version",)
@ -59,4 +59,7 @@ class AlbumVersionEnum(Enum):
def get_all_keywords():
"""
Returns a joint string of all album versions.
"""
return "|".join("|".join(i.value) for i in AlbumVersionEnum)

176
app/lib/lyrics.py Normal file
View File

@ -0,0 +1,176 @@
from pathlib import Path
from tinytag import TinyTag
from app.store.tracks import TrackStore
def split_line(line: str):
"""
Split a lyrics line into time and lyrics
"""
items = line.split("]")
time = items[0].removeprefix("[")
lyric = items[1] if len(items) > 1 else ""
return (time, lyric.strip())
def convert_to_milliseconds(time: str):
"""
Converts a lyrics time string into milliseconds.
"""
try:
minutes, seconds = time.split(":")
except ValueError:
return 0
milliseconds = int(minutes) * 60 * 1000 + float(seconds) * 1000
return int(milliseconds)
def format_synced_lyrics(lines: list[str]):
"""
Formats synced lyrics into a list of dicts
"""
lyrics = []
for line in lines:
# if line starts with [ and ends with ] .ie. ID3 tag, skip it
if line.startswith("[") and line.endswith("]"):
continue
# if line does not start with [ skip it
if not line.startswith("["):
continue
time, lyric = split_line(line)
milliseconds = convert_to_milliseconds(time)
lyrics.append({"time": milliseconds, "text": lyric})
return lyrics
def get_lyrics_from_lrc(filepath: str):
with open(filepath, mode="r") as file:
lines = (f.removesuffix("\n") for f in file.readlines())
return format_synced_lyrics(lines)
def get_lyrics_file_rel_to_track(filepath: str):
"""
Finds the lyrics file relative to the track file
"""
lyrics_path = Path(filepath).with_suffix(".lrc")
if lyrics_path.exists():
return lyrics_path
def check_lyrics_file_rel_to_track(filepath: str):
"""
Checks if the lyrics file exists relative to the track file
"""
lyrics_path = Path(filepath).with_suffix(".lrc")
if lyrics_path.exists():
return True
else:
return False
def get_lyrics(track_path: str):
"""
Gets the lyrics for a track
"""
lyrics_path = get_lyrics_file_rel_to_track(track_path)
if lyrics_path:
lyrics = get_lyrics_from_lrc(lyrics_path)
copyright = get_extras(track_path, ["copyright"])
return lyrics, copyright[0]
else:
return None, ""
def get_lyrics_from_duplicates(trackhash: str, filepath: str):
"""
Finds the lyrics from other duplicate tracks
"""
for track in TrackStore.tracks:
if track.trackhash == trackhash and track.filepath != filepath:
lyrics, copyright = get_lyrics(track.filepath)
if lyrics:
return lyrics, copyright
return None, ""
def check_lyrics_file(filepath: str, trackhash: str):
lyrics_exists = check_lyrics_file_rel_to_track(filepath)
if lyrics_exists:
return True
for track in TrackStore.tracks:
if track.trackhash == trackhash and track.filepath != filepath:
lyrics_exists = check_lyrics_file_rel_to_track(track.filepath)
if lyrics_exists:
return True
return False
def test_is_synced(lyrics: list[str]):
"""
Tests if the lyric lines passed are synced.
"""
for line in lyrics:
time, _ = split_line(line)
milliseconds = convert_to_milliseconds(time)
if milliseconds != 0:
return True
return False
def get_extras(filepath: str, keys: list[str]):
"""
Get extra tags from an audio file.
"""
try:
tags = TinyTag.get(filepath)
except Exception:
return [""] * len(keys)
extras = tags.extra
return [extras.get(key, "").strip() for key in keys]
def get_lyrics_from_tags(filepath: str, just_check: bool = False):
"""
Gets the lyrics from the tags of the track
"""
lyrics, copyright = get_extras(filepath, ["lyrics", "copyright"])
lyrics = lyrics.replace("engdesc", "")
exists = bool(lyrics.replace("\n", "").strip())
if just_check:
return exists
if not exists:
return None, False, ""
lines = lyrics.split("\n")
synced = test_is_synced(lines[:15])
if synced:
return format_synced_lyrics(lines), synced, copyright
return lines, synced, copyright

View File

@ -11,8 +11,6 @@ from app.migrations.base import Migration
from app.settings import Paths
from app.utils.decorators import coroutine
from app.utils.hashing import create_hash
from app.telemetry import Telemetry
from app.utils.threading import background
# playlists table
# ---------------
@ -25,11 +23,6 @@ from app.utils.threading import background
# 6: trackhashes
@background
def send_telemetry():
Telemetry.send_app_installed()
class RemoveSmallThumbnailFolder(Migration):
"""
Removes the small thumbnail folder.
@ -41,7 +34,6 @@ class RemoveSmallThumbnailFolder(Migration):
@staticmethod
def migrate():
send_telemetry()
thumbs_sm_path = Paths.get_sm_thumb_path()
thumbs_lg_path = Paths.get_lg_thumb_path()

View File

@ -159,6 +159,8 @@ class Album:
"""
return self.title.strip().endswith(" EP")
# TODO: check against number of tracks
def check_is_single(self, tracks: list[Track]):
"""
Checks if the album is a single.

10
app/models/plugins.py Normal file
View File

@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass
class Plugin:
name: str
description: str
active: bool
settings: dict

30
app/plugins/__init__.py Normal file
View File

@ -0,0 +1,30 @@
class Plugin:
"""
Class that all plugins should inherit from
"""
def __init__(self, name: str, description: str) -> None:
self.enabled = False
self.name = name
self.description = description
def set_active(self, state: bool):
self.enabled = state
def plugin_method(func):
"""
A decorator that prevents execution if the plugin is disabled.
Should be used on all plugin methods
"""
def wrapper(*args, **kwargs):
plugin: Plugin = args[0]
if plugin.enabled:
return func(*args, **kwargs)
else:
return
return wrapper

215
app/plugins/lyrics.py Normal file
View File

@ -0,0 +1,215 @@
import json
import os
import time
from pathlib import Path
from typing import List, Optional
import requests
from app.db.sqlite.plugins import PluginsMethods
from app.plugins import Plugin, plugin_method
from app.settings import Keys, Paths
class LRCProvider:
"""
Base class for all of the synced (LRC format) lyrics providers.
"""
session = requests.Session()
def __init__(self) -> None:
self.session.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
}
)
def get_lrc_by_id(self, track_id: str) -> Optional[str]:
"""
Returns the synced lyrics of the song in lrc.
### Arguments
- track_id: The ID of the track defined in the provider database. e.g. Spotify/Deezer track ID
"""
raise NotImplementedError
def get_lrc(self, search_term: str) -> Optional[str]:
"""
Returns the synced lyrics of the song in lrc.
"""
raise NotImplementedError
class LyricsProvider(LRCProvider):
"""
Musixmatch provider class
"""
ROOT_URL = Keys.PLUGIN_LYRICS_ROOT_URL
def __init__(self) -> None:
super().__init__()
self.token = None
self.session.headers.update(
{
"authority": Keys.PLUGIN_LYRICS_AUTHORITY,
"cookie": "AWSELBCORS=0; AWSELB=0",
}
)
def _get(self, action: str, query: List[tuple]):
if action != "token.get" and self.token is None:
self._get_token()
query.append(("app_id", "web-desktop-app-v1.0"))
if self.token is not None:
query.append(("usertoken", self.token))
t = str(int(time.time() * 1000))
query.append(("t", t))
try:
url = self.ROOT_URL + action
except TypeError:
return None
try:
response = self.session.get(url, params=query)
except requests.exceptions.ConnectionError:
return None
if response is not None and response.ok:
return response
return None
def _get_token(self):
# Check if token is cached and not expired
plugin_path = Paths.get_lyrics_plugins_path()
token_path = os.path.join(plugin_path, "token.json")
current_time = int(time.time())
if os.path.exists(token_path):
with open(token_path, "r", encoding="utf-8") as token_file:
cached_token_data: dict = json.load(token_file)
cached_token = cached_token_data.get("token")
expiration_time = cached_token_data.get("expiration_time")
if cached_token and expiration_time and current_time < expiration_time:
self.token = cached_token
return
# Token not cached or expired, fetch a new token
res = self._get("token.get", [("user_language", "en")])
if res is None:
return
res = res.json()
if res["message"]["header"]["status_code"] == 401:
time.sleep(10)
return self._get_token()
new_token = res["message"]["body"]["user_token"]
expiration_time = current_time + 600 # 10 minutes expiration
# Cache the new token
self.token = new_token
token_data = {"token": new_token, "expiration_time": expiration_time}
os.makedirs(plugin_path, exist_ok=True)
with open(token_path, "w", encoding="utf-8") as token_file:
json.dump(token_data, token_file)
def get_lrc_by_id(self, track_id: str) -> Optional[str]:
res = self._get(
"track.subtitle.get", [("track_id", track_id), ("subtitle_format", "lrc")]
)
try:
res = res.json()
body = res["message"]["body"]
except AttributeError:
return None
if not body:
return None
return body["subtitle"]["subtitle_body"]
def get_lrc(self, title: str, artist: str) -> Optional[str]:
res = self._get(
"track.search",
[
("q_track", title),
("q_artist", artist),
("page_size", "5"),
("page", "1"),
("f_has_lyrics", "1"),
("s_track_rating", "desc"),
("quorum_factor", "1.0"),
],
)
try:
body = res.json()["message"]["body"]
except AttributeError:
return []
try:
tracks = body["track_list"]
except TypeError:
return []
return [
{
"track_id": t["track"]["track_id"],
"title": t["track"]["track_name"],
"artist": t["track"]["artist_name"],
"album": t["track"]["album_name"],
"image": t["track"]["album_coverart_100x100"],
}
for t in tracks
]
class Lyrics(Plugin):
def __init__(self) -> None:
plugin = PluginsMethods.get_plugin_by_name("lyrics_finder")
if not plugin:
return
name = plugin.name
description = plugin.description
super().__init__(name, description)
self.provider = LyricsProvider()
if plugin:
self.set_active(bool(int(plugin.active)))
@plugin_method
def search_lyrics_by_title_and_artist(self, title: str, artist: str):
return self.provider.get_lrc(title, artist)
@plugin_method
def download_lyrics(self, trackid: str, path: str):
lrc = self.provider.get_lrc_by_id(trackid)
is_valid = lrc is not None and lrc.replace("\n", "").strip() != ""
if not is_valid:
return None
path = Path(path).with_suffix(".lrc")
try:
with open(path, "w", encoding="utf-8") as f:
f.write(lrc)
return lrc
except:
return lrc

5
app/plugins/register.py Normal file
View File

@ -0,0 +1,5 @@
from app.db.sqlite.plugins import PluginsMethods
def register_plugins():
PluginsMethods.insert_lyrics_plugin()

View File

@ -1,19 +1,20 @@
"""
Requests related to artists
"""
import urllib.parse
import requests
from requests import ConnectionError, HTTPError, ReadTimeout
from app import settings
from app.utils.hashing import create_hash
from requests import ConnectionError, HTTPError, ReadTimeout
import urllib.parse
def fetch_similar_artists(name: str):
"""
Fetches similar artists from Last.fm
"""
url = f"https://ws.audioscrobbler.com/2.0/?method=artist.getsimilar&artist={urllib.parse.quote_plus(name, safe='')}&api_key={settings.Keys.LASTFM_API}&format=json&limit=250"
url = f"https://ws.audioscrobbler.com/2.0/?method=artist.getsimilar&artist={urllib.parse.quote_plus(name, safe='')}&api_key={settings.Keys.LASTFM_API_KEY}&format=json&limit=250"
try:
response = requests.get(url, timeout=10)

View File

@ -16,7 +16,7 @@ else:
class Release:
APP_VERSION = "1.3.0"
APP_VERSION = "1.4.0"
class Paths:
@ -83,6 +83,14 @@ class Paths:
def get_assets_path(cls):
return join(Paths.get_app_dir(), "assets")
@classmethod
def get_plugins_path(cls):
return join(Paths.get_app_dir(), "plugins")
@classmethod
def get_lyrics_plugins_path(cls):
return join(Paths.get_plugins_path(), "lyrics")
# defaults
class Defaults:
@ -231,19 +239,25 @@ class TCOLOR:
class Keys:
# get last fm api key from os environment
LASTFM_API = os.environ.get("LASTFM_API_KEY")
POSTHOG_API_KEY = os.environ.get("POSTHOG_API_KEY")
LASTFM_API_KEY = os.environ.get("LASTFM_API_KEY")
PLUGIN_LYRICS_AUTHORITY = os.environ.get("PLUGIN_LYRICS_AUTHORITY")
PLUGIN_LYRICS_ROOT_URL = os.environ.get("PLUGIN_LYRICS_ROOT_URL")
@classmethod
def load(cls):
if IS_BUILD:
cls.LASTFM_API = configs.LASTFM_API_KEY
cls.POSTHOG_API_KEY = configs.POSTHOG_API_KEY
cls.LASTFM_API_KEY = configs.LASTFM_API_KEY
cls.PLUGIN_LYRICS_AUTHORITY = configs.PLUGIN_LYRICS_AUTHORITY
cls.PLUGIN_LYRICS_ROOT_URL = configs.PLUGIN_LYRICS_ROOT_URL
cls.verify_keys()
@classmethod
def verify_keys(cls):
if not cls.LASTFM_API:
if not cls.LASTFM_API_KEY:
print("ERROR: LASTFM_API_KEY not set in environment")
sys.exit(0)
@classmethod
def get(cls, key: str):
return getattr(cls, key, None)

View File

@ -65,6 +65,8 @@ def create_config_dir() -> None:
dirs = [
"", # creates the config folder
"images",
"plugins",
"plugins/lyrics",
thumb_path,
small_thumb_path,
large_thumb_path,

View File

@ -1,76 +0,0 @@
import uuid as UUID
from posthog import Posthog
from app.logger import log
from app.settings import Keys, Paths, Release
from app.utils.hashing import create_hash
from app.utils.network import has_connection
class Telemetry:
"""
Handles sending telemetry data to posthog.
"""
user_id = ""
off = False
@classmethod
def init(cls) -> None:
try:
cls.posthog = Posthog(
project_api_key=Keys.POSTHOG_API_KEY,
host="https://app.posthog.com",
disable_geoip=False,
)
cls.create_userid()
except AssertionError:
cls.disable_telemetry()
@classmethod
def create_userid(cls):
"""
Creates a unique user id for the user and saves it to a file.
"""
uuid_path = Paths.get_app_dir() + "/userid.txt"
try:
with open(uuid_path, "r") as f:
cls.user_id = f.read().strip()
except FileNotFoundError:
uuid = str(UUID.uuid4())
cls.user_id = "user_" + create_hash(uuid, limit=15)
with open(uuid_path, "w") as f:
f.write(cls.user_id)
@classmethod
def disable_telemetry(cls):
cls.off = True
@classmethod
def send_event(cls, event: str):
"""
Sends an event to posthog.
"""
if cls.off:
return
if has_connection():
cls.posthog.capture(cls.user_id, event=f"v{Release.APP_VERSION}-{event}")
@classmethod
def send_app_installed(cls):
"""
Sends an event to posthog when the app is installed.
"""
cls.send_event("app-installed")
@classmethod
def send_artist_visited(cls):
"""
Sends an event to posthog when an artist page is visited.
"""
cls.send_event("artist-page-visited")

9
build.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/zsh
# builds the latest version of the client and server
cd ../swingmusic-client
yarn build --outDir ../swingmusic/client
cd ../swingmusic
poetry run python manage.py --build

View File

@ -4,15 +4,15 @@ This file is used to run the application.
import logging
import mimetypes
import os
import setproctitle
from flask import request
from app.telemetry import Telemetry
import setproctitle
from app.api import create_api
from app.arg_handler import HandleArgs
from app.lib.watchdogg import Watcher as WatchDog
from app.periodic_scan import run_periodic_scans
from app.plugins.register import register_plugins
from app.settings import FLASKVARS, Keys
from app.setup import run_setup
from app.start_info_logger import log_startup_info
@ -39,25 +39,31 @@ app = create_api()
app.static_folder = get_home_res_path("client")
# @app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_client_files(path: str):
"""
Serves the static files in the client folder.
"""
# js_or_css = path.endswith(".js") or path.endswith(".css")
# if not js_or_css:
# return app.send_static_file(path)
js_or_css = path.endswith(".js") or path.endswith(".css")
if not js_or_css:
return app.send_static_file(path)
# gzipped_path = path + ".gz"
gzipped_path = path + ".gz"
user_agent = request.headers.get("User-Agent")
is_safari = user_agent.find("Safari") >= 0 and user_agent.find("Chrome") < 0
if is_safari:
return app.send_static_file(path)
accepts_gzip = request.headers.get("Accept-Encoding", "").find("gzip") >= 0
if accepts_gzip:
if os.path.exists(os.path.join(app.static_folder, gzipped_path)):
response = app.make_response(app.send_static_file(gzipped_path))
response.headers["Content-Encoding"] = "gzip"
return response
# if request.headers.get("Accept-Encoding", "").find("gzip") >= 0:
# if os.path.exists(os.path.join(app.static_folder, gzipped_path)):
# response = app.make_response(app.send_static_file(gzipped_path))
# response.headers["Content-Encoding"] = "gzip"
# return response
# else:
# return app.send_static_file(path)
return app.send_static_file(path)
@ -71,7 +77,6 @@ def serve_client():
@background
def bg_run_setup() -> None:
run_setup()
run_periodic_scans()
@ -80,18 +85,15 @@ def start_watchdog():
WatchDog().run()
@background
def init_telemetry():
Telemetry.init()
def run_swingmusic():
Keys.load()
HandleArgs()
log_startup_info()
run_setup()
bg_run_setup()
register_plugins()
start_watchdog()
init_telemetry()
setproctitle.setproctitle(
f"swingmusic - {FLASKVARS.FLASK_HOST}:{FLASKVARS.FLASK_PORT}"

49
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand.
[[package]]
name = "altgraph"
@ -48,17 +48,6 @@ docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-
tests = ["attrs[tests-no-zope]", "zope-interface"]
tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"]
[[package]]
name = "backoff"
version = "2.2.1"
description = "Function decoration for backoff and retry"
optional = false
python-versions = ">=3.7,<4.0"
files = [
{file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"},
{file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"},
]
[[package]]
name = "black"
version = "22.12.0"
@ -777,17 +766,6 @@ files = [
{file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"},
]
[[package]]
name = "monotonic"
version = "1.6"
description = "An implementation of time.monotonic() for Python 2 & < 3.3"
optional = false
python-versions = "*"
files = [
{file = "monotonic-1.6-py2.py3-none-any.whl", hash = "sha256:68687e19a14f11f26d140dd5c86f3dba4bf5df58003000ed467e0e2a69bca96c"},
{file = "monotonic-1.6.tar.gz", hash = "sha256:3a55207bcfed53ddd5c5bae174524062935efed17792e9de2ad0205ce9ad63f7"},
]
[[package]]
name = "mypy-extensions"
version = "1.0.0"
@ -975,29 +953,6 @@ files = [
dev = ["pre-commit", "tox"]
testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "posthog"
version = "3.0.2"
description = "Integrate PostHog into any python application."
optional = false
python-versions = "*"
files = [
{file = "posthog-3.0.2-py2.py3-none-any.whl", hash = "sha256:a8c0af6f2401fbe50f90e68c4143d0824b54e872de036b1c2f23b5abb39d88ce"},
{file = "posthog-3.0.2.tar.gz", hash = "sha256:701fba6e446a4de687c6e861b587e7b7741955ad624bf34fe013c06a0fec6fb3"},
]
[package.dependencies]
backoff = ">=1.10.0"
monotonic = ">=1.5"
python-dateutil = ">2.1"
requests = ">=2.7,<3.0"
six = ">=1.5"
[package.extras]
dev = ["black", "flake8", "flake8-print", "isort", "pre-commit"]
sentry = ["django", "sentry-sdk"]
test = ["coverage", "flake8", "freezegun (==0.3.15)", "mock (>=2.0.0)", "pylint", "pytest"]
[[package]]
name = "psutil"
version = "5.9.5"
@ -1745,4 +1700,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.12"
content-hash = "52427f2a27236efb5bcafec3d7db6d2e926dd908593bd595aae5446dfc75ea70"
content-hash = "6b0eebfb7c29b88c87c31f6efc13229d17148c9643b6d9e37576e5a23e6c967c"

View File

@ -23,7 +23,6 @@ pendulum = "^2.1.2"
flask-compress = "^1.13"
tabulate = "^0.9.0"
setproctitle = "^1.3.2"
posthog = "^3.0.2"
[tool.poetry.dev-dependencies]
pylint = "^2.15.5"