swingmusic/app/lib/populate.py
mungai-njoroge 9d4f7af581 rewrite populate.get_image() to extract a
track thumbnail from the first track in an album that has one.

+ rewrite Populate.remove_modified with sets
+ clean the SqliteManager utility class
+ Rewrite ProcessTrackThumbnails to use a process pool instead of a thread pool
+ rewrite track store's  remove_tracks_by_filepaths to utilize sets
2023-06-21 09:20:56 +03:00

217 lines
6.4 KiB
Python

from collections import deque
import os
from typing import Generator
from tqdm import tqdm
from requests import ConnectionError as RequestConnectionError
from requests import ReadTimeout
from app import settings
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.lib.artistlib import CheckArtistImages
from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.taglib import extract_thumb, get_tags
from app.lib.trackslib import validate_tracks
from app.logger import log
from app.models import Album, Artist, Track
from app.utils.filesystem import run_fast_scandir
from app.store.albums import AlbumStore
from app.store.tracks import TrackStore
from app.store.artists import ArtistStore
from app.utils.network import Ping
get_all_tracks = SQLiteTrackMethods.get_all_tracks
insert_many_tracks = SQLiteTrackMethods.insert_many_tracks
remove_tracks_by_filepaths = SQLiteTrackMethods.remove_tracks_by_filepaths
POPULATE_KEY = ""
class PopulateCancelledError(Exception):
pass
class Populate:
"""
Populates the database with all songs in the music directory
checks if the song is in the database, if not, it adds it
also checks if the album art exists in the image path, if not tries to extract it.
"""
def __init__(self, key: str) -> None:
global POPULATE_KEY
POPULATE_KEY = key
validate_tracks()
tracks = get_all_tracks()
dirs_to_scan = sdb.get_root_dirs()
if len(dirs_to_scan) == 0:
log.warning(
(
"The root directory is not configured. "
+ "Open the app in your webbrowser to configure."
)
)
return
try:
if dirs_to_scan[0] == "$home":
dirs_to_scan = [settings.Paths.USER_HOME_DIR]
except IndexError:
pass
files = set()
for _dir in dirs_to_scan:
files = files.union(run_fast_scandir(_dir, full=True)[1])
unmodified = self.remove_modified(tracks)
untagged = files - unmodified
if len(untagged) != 0:
self.tag_untagged(untagged, key)
ProcessTrackThumbnails()
ProcessAlbumColors()
ProcessArtistColors()
tried_to_download_new_images = False
if Ping()():
tried_to_download_new_images = True
try:
CheckArtistImages()
except (RequestConnectionError, ReadTimeout):
log.error(
"Internet connection lost. Downloading artist images stopped."
)
else:
log.warning(
f"No internet connection. Downloading artist images halted for {settings.get_scan_sleep_time()} seconds."
)
# Re-process the new artist images.
if tried_to_download_new_images:
ProcessArtistColors()
@staticmethod
def remove_modified(tracks: Generator[Track, None, None]):
"""
Removes tracks from the database that have been modified
since they were added to the database.
"""
unmodified = set()
modified = set()
for track in tracks:
if track.last_mod == os.path.getmtime(track.filepath):
unmodified.add(track.filepath)
continue
modified.add(track.filepath)
TrackStore.remove_tracks_by_filepaths(modified)
remove_tracks_by_filepaths(modified)
return unmodified
@staticmethod
def tag_untagged(untagged: set[str], key: str):
log.info("Found %s new tracks", len(untagged))
tagged_tracks: deque[dict] = deque()
tagged_count = 0
fav_tracks = favdb.get_fav_tracks()
fav_tracks = "-".join([t[1] for t in fav_tracks])
for file in tqdm(untagged, desc="Reading files"):
if POPULATE_KEY != key:
raise PopulateCancelledError("Populate key changed")
tags = get_tags(file)
if tags is not None:
tagged_tracks.append(tags)
track = Track(**tags)
track.is_favorite = track.trackhash in fav_tracks
TrackStore.add_track(track)
if not AlbumStore.album_exists(track.albumhash):
AlbumStore.add_album(AlbumStore.create_album(track))
for artist in track.artist:
if not ArtistStore.artist_exists(artist.artisthash):
ArtistStore.add_artist(Artist(artist.name))
for artist in track.albumartist:
if not ArtistStore.artist_exists(artist.artisthash):
ArtistStore.add_artist(Artist(artist.name))
tagged_count += 1
else:
log.warning("Could not read file: %s", file)
if len(tagged_tracks) > 0:
log.info("Adding %s tracks to database", len(tagged_tracks))
insert_many_tracks(tagged_tracks)
log.info("Added %s/%s tracks", tagged_count, len(untagged))
def get_image(album: Album):
"""
The function retrieves an image from an album by iterating through its tracks and extracting the thumbnail from the first track that has one.
:param album: An instance of the `Album` class representing the album to retrieve the image from.
:type album: Album
:return: None
"""
matching_tracks = filter(
lambda t: t.albumhash == album.albumhash, TrackStore.tracks
)
try:
track = next(matching_tracks)
extracted = extract_thumb(track.filepath, track.image)
while not extracted:
try:
track = next(matching_tracks)
extracted = extract_thumb(track.filepath, track.image)
except StopIteration:
break
return
except StopIteration:
pass
from multiprocessing import Pool, cpu_count
class ProcessTrackThumbnails:
"""
Extracts the album art from all albums in album store.
"""
def __init__(self) -> None:
with Pool(processes=cpu_count()) as pool:
results = list(
tqdm(
pool.imap_unordered(get_image, AlbumStore.albums),
total=len(AlbumStore.albums),
desc="Extracting track images",
)
)
list(results)