From d1bfa4b08d1adeda2af9fac164418f5ec15836b4 Mon Sep 17 00:00:00 2001 From: Lovi <62809003+Lovi-0@users.noreply.github.com> Date: Sun, 9 Feb 2025 12:05:11 +0100 Subject: [PATCH] Fix ffmpeg installation linux --- StreamingCommunity/Util/ffmpeg_installer.py | 132 ++++++++++---------- 1 file changed, 69 insertions(+), 63 deletions(-) diff --git a/StreamingCommunity/Util/ffmpeg_installer.py b/StreamingCommunity/Util/ffmpeg_installer.py index 87b1729..3b13eac 100644 --- a/StreamingCommunity/Util/ffmpeg_installer.py +++ b/StreamingCommunity/Util/ffmpeg_installer.py @@ -4,8 +4,6 @@ import os import glob import gzip import shutil -import tarfile -import zipfile import logging import platform import subprocess @@ -179,64 +177,59 @@ class FFMPEGDownloader: size = file.write(chunk) progress.update(download_task, advance=size) return True + except Exception as e: logging.error(f"Download error: {e}") return False - def _extract_and_copy_binaries(self, archive_path: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: + def _extract_file(self, gz_path: str, final_path: str) -> bool: """ - Extract FFmpeg binaries from the downloaded archive and copy them to the base directory. + Extract a gzipped file and set proper permissions. Parameters: - archive_path (str): Path to the downloaded archive file + gz_path (str): Path to the gzipped file + final_path (str): Path where the extracted file should be saved Returns: - Tuple[Optional[str], Optional[str], Optional[str]]: Paths to the extracted ffmpeg, - ffprobe, and ffplay executables. Returns None for each executable that couldn't be extracted. + bool: True if extraction was successful, False otherwise """ try: - extraction_path = os.path.join(self.base_dir, 'temp_extract') - os.makedirs(extraction_path, exist_ok=True) + logging.info(f"Attempting to extract {gz_path} to {final_path}") + + # Check if source file exists and is readable + if not os.path.exists(gz_path): + logging.error(f"Source file {gz_path} does not exist") + return False + + if not os.access(gz_path, os.R_OK): + logging.error(f"Source file {gz_path} is not readable") + return False - if archive_path.endswith('.zip'): - with zipfile.ZipFile(archive_path, 'r') as zip_ref: - zip_ref.extractall(extraction_path) - elif archive_path.endswith('.tar.xz'): - with tarfile.open(archive_path) as tar_ref: - tar_ref.extractall(extraction_path) - elif archive_path.endswith('.gz'): - file_name = os.path.basename(archive_path)[:-3] # Remove extension .gz - output_path = os.path.join(extraction_path, file_name) - with gzip.open(archive_path, 'rb') as f_in, open(output_path, 'wb') as f_out: + # Extract the file + with gzip.open(gz_path, 'rb') as f_in: + # Test if the gzip file is valid + try: + f_in.read(1) + f_in.seek(0) + except Exception as e: + logging.error(f"Invalid gzip file {gz_path}: {e}") + return False + + # Extract the file + with open(final_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) - else: - raise ValueError("Unsupported archive format") - config = FFMPEG_CONFIGURATION[self.os_name] - executables = config['executables'] - found_paths = [] - - for executable in executables: - exe_paths = glob.glob(os.path.join(extraction_path, '**', executable), recursive=True) - if exe_paths: - dest_path = os.path.join(self.base_dir, executable) - shutil.copy2(exe_paths[0], dest_path) - - if self.os_name != 'windows': - os.chmod(dest_path, 0o755) - - found_paths.append(dest_path) - else: - found_paths.append(None) - - shutil.rmtree(extraction_path, ignore_errors=True) - os.remove(archive_path) - - return tuple(found_paths) if len(found_paths) == 3 else (None, None, None) + # Set executable permissions + os.chmod(final_path, 0o755) + logging.info(f"Successfully extracted {gz_path} to {final_path}") + + # Remove the gzip file + os.remove(gz_path) + return True except Exception as e: - logging.error(f"Extraction/copy error: {e}") - return None, None, None + logging.error(f"Extraction error for {gz_path}: {e}") + return False def download(self) -> Tuple[Optional[str], Optional[str], Optional[str]]: """ @@ -244,30 +237,43 @@ class FFMPEGDownloader: Returns: Tuple[Optional[str], Optional[str], Optional[str]]: Paths to ffmpeg, ffprobe, and ffplay executables. - Returns None for each executable that couldn't be downloaded or set up. """ config = FFMPEG_CONFIGURATION[self.os_name] executables = [exe.format(arch=self.arch) for exe in config['executables']] - + successful_extractions = [] + for executable in executables: - download_url = f"https://github.com/eugeneware/ffmpeg-static/releases/latest/download/{executable}.gz" - download_path = os.path.join(self.base_dir, f"{executable}.gz") - final_path = os.path.join(self.base_dir, executable) - - console.print(f"[bold blue]Downloading {executable} from GitHub[/]") - if not self._download_file(download_url, download_path): - return None, None, None - - with gzip.open(download_path, 'rb') as f_in, open(final_path, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - - os.chmod(final_path, 0o755) - os.remove(download_path) - + try: + download_url = f"https://github.com/eugeneware/ffmpeg-static/releases/latest/download/{executable}.gz" + download_path = os.path.join(self.base_dir, f"{executable}.gz") + final_path = os.path.join(self.base_dir, executable) + + # Log the current operation + logging.info(f"Processing {executable}") + console.print(f"[bold blue]Downloading {executable} from GitHub[/]") + + # Download the file + if not self._download_file(download_url, download_path): + console.print(f"[bold red]Failed to download {executable}[/]") + continue + + # Extract the file + if self._extract_file(download_path, final_path): + successful_extractions.append(final_path) + console.print(f"[bold green]Successfully installed {executable}[/]") + else: + console.print(f"[bold red]Failed to extract {executable}[/]") + + except Exception as e: + logging.error(f"Error processing {executable}: {e}") + console.print(f"[bold red]Error processing {executable}: {str(e)}[/]") + continue + + # Return the results based on successful extractions return ( - os.path.join(self.base_dir, executables[0]), - os.path.join(self.base_dir, executables[1]), - None + successful_extractions[0] if len(successful_extractions) > 0 else None, + successful_extractions[1] if len(successful_extractions) > 1 else None, + None # ffplay is not included in the current implementation ) def check_ffmpeg() -> Tuple[Optional[str], Optional[str], Optional[str]]: