diff --git a/.gitignore b/.gitignore index c4bcffc..be44e50 100644 --- a/.gitignore +++ b/.gitignore @@ -14,8 +14,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ -lib64/ parts/ sdist/ var/ @@ -34,21 +32,6 @@ MANIFEST pip-log.txt pip-delete-this-directory.txt -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - # Translations *.mo *.pot @@ -59,38 +42,9 @@ local_settings.py db.sqlite3 db.sqlite3-journal -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - # Jupyter Notebook .ipynb_checkpoints -# IPython -profile_default/ -ipython_config.py -.pdm.toml - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - # Environments .env .venv @@ -100,29 +54,5 @@ ENV/ env.bak/ venv.bak/ -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - # Other Video \ No newline at end of file diff --git a/Src/Assets/Un_giorno....png b/Src/Assets/Un_giorno....png new file mode 100644 index 0000000..88e5f00 Binary files /dev/null and b/Src/Assets/Un_giorno....png differ diff --git a/Src/Lib/FFmpeg/__init__.py b/Src/Lib/FFmpeg/__init__.py new file mode 100644 index 0000000..9f8bd06 --- /dev/null +++ b/Src/Lib/FFmpeg/__init__.py @@ -0,0 +1,4 @@ +# 20.02.24 + +from .util.installer import check_ffmpeg +from .my_m3u8 import Downloader \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/__init__.py b/Src/Lib/FFmpeg/util/__init__.py new file mode 100644 index 0000000..373a9eb --- /dev/null +++ b/Src/Lib/FFmpeg/util/__init__.py @@ -0,0 +1 @@ +# TO DO \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/decryption.py b/Src/Lib/FFmpeg/util/decryption.py new file mode 100644 index 0000000..3bef125 --- /dev/null +++ b/Src/Lib/FFmpeg/util/decryption.py @@ -0,0 +1,113 @@ +# 29.04.24 + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import cmac +from cryptography.hazmat.primitives.asymmetric import rsa as RSA + +class M3U8_Decryption: + def __init__(self, key: bytes, iv: bytes = None) -> None: + """ + Initialize the M3U8_Decryption class. + + Args: + - key (bytes): Encryption key. + - method (str): Encryption method (e.g., "AES", "Blowfish"). + - iv (bytes): Initialization Vector (bytes), default is None. + """ + + self.key = key + self.iv = iv + + def set_method(self, method: str): + """ + Set the encryption method. + + Args: + - method (str): Encryption method (e.g., "AES", "Blowfish"). + """ + + self.method = method + + def parse_key(self, raw_iv: str) -> None: + """ + Parse the raw IV string and set the IV. + + Args: + - raw_iv (str): Raw IV string in hexadecimal format (e.g., "43A6D967D5C17290D98322F5C8F6660B"). + """ + + if "0x" in str(raw_iv): + self.iv = bytes.fromhex(raw_iv.replace("0x", "")) + else: + self.iv = raw_iv + + def _check_iv_size(self, expected_size: int) -> None: + """ + Check the size of the initialization vector (IV). + + Args: + - expected_size (int): The expected size of the IV. + """ + + if self.iv is not None and len(self.iv) != expected_size: + raise ValueError(f"Invalid IV size ({len(self.iv)}) for {self.method}. Expected size: {expected_size}") + + def generate_cmac(self, data: bytes) -> bytes: + """ + Generate CMAC (Cipher-based Message Authentication Code). + + Args: + - data (bytes): The data to generate CMAC for. + + Returns: + - bytes: The CMAC digest. + """ + + if self.method == "AES-CMAC": + cipher = Cipher(algorithms.AES(self.key), modes.ECB(), backend=default_backend()) + encryptor = cipher.encryptor() + cmac_obj = cmac.CMAC(encryptor) + cmac_obj.update(data) + return cmac_obj.finalize() + else: + raise ValueError("Invalid method") + + def decrypt(self, ciphertext: bytes) -> bytes: + """ + Decrypt the ciphertext using the specified encryption method. + + Args: + - ciphertext (bytes): The ciphertext to decrypt. + + Returns: + - bytes: The decrypted data. + """ + + if self.method == "AES": + self._check_iv_size(16) + cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) + + elif self.method == "AES-128": + self._check_iv_size(16) + cipher = Cipher(algorithms.AES(self.key[:16]), modes.CBC(self.iv), backend=default_backend()) + + elif self.method == "AES-128-CTR": + self._check_iv_size(16) + cipher = Cipher(algorithms.AES(self.key[:16]), modes.CTR(self.iv), backend=default_backend()) + + elif self.method == "Blowfish": + self._check_iv_size(8) + cipher = Cipher(algorithms.Blowfish(self.key), modes.CBC(self.iv), backend=default_backend()) + + elif self.method == "RSA": + private_key = RSA.import_key(self.key) + cipher = Cipher(algorithms.RSA(private_key), backend=default_backend()) + + else: + raise ValueError("Invalid or unsupported method") + + decryptor = cipher.decryptor() + decrypted_data = decryptor.update(ciphertext) + decryptor.finalize() + + return decrypted_data \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/helper.py b/Src/Lib/FFmpeg/util/helper.py new file mode 100644 index 0000000..5867ed1 --- /dev/null +++ b/Src/Lib/FFmpeg/util/helper.py @@ -0,0 +1,389 @@ +# 31.01.24 + +# Class +from Src.Util.console import console + +# Import +import ffmpeg +import hashlib +import os +import logging +import shutil + + +def has_audio_stream(video_path: str) -> bool: + """ + Check if the input video has an audio stream. + + Parameters: + - video_path (str): Path to the input video file. + + Returns: + - has_audio (bool): True if the input video has an audio stream, False otherwise. + """ + try: + probe_result = ffmpeg.probe(video_path, select_streams='a') + return bool(probe_result['streams']) + except ffmpeg.Error: + return None + +def get_video_duration(file_path: str) -> (float): + """ + Get the duration of a video file. + + Args: + file_path (str): The path to the video file. + + Returns: + (float): The duration of the video in seconds if successful, + None if there's an error. + """ + + try: + + # Use FFmpeg probe to get video information + probe = ffmpeg.probe(file_path) + + # Extract duration from the video information + return float(probe['format']['duration']) + + except ffmpeg.Error as e: + + # Handle FFmpeg errors + print(f"Error: {e.stderr}") + return None + +def format_duration(seconds: float) -> list[int, int, int]: + """ + Format duration in seconds into hours, minutes, and seconds. + + Args: + seconds (float): Duration in seconds. + + Returns: + list[int, int, int]: List containing hours, minutes, and seconds. + """ + + hours, remainder = divmod(seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + return int(hours), int(minutes), int(seconds) + +def print_duration_table(file_path: str) -> None: + """ + Print duration of a video file in hours, minutes, and seconds. + + Args: + file_path (str): The path to the video file. + """ + + video_duration = get_video_duration(file_path) + + if video_duration is not None: + + # Format duration into hours, minutes, and seconds + hours, minutes, seconds = format_duration(video_duration) + + # Print the formatted duration + console.log(f"[cyan]Info [green]'{file_path}': [purple]{int(hours)}[red]h [purple]{int(minutes)}[red]m [purple]{int(seconds)}[red]s") + +def compute_sha1_hash(input_string: str) -> (str): + """ + Computes the SHA-1 hash of the input string. + + Args: + input_string (str): The string to be hashed. + + Returns: + str: The SHA-1 hash of the input string. + """ + # Compute the SHA-1 hash + hashed_string = hashlib.sha1(input_string.encode()).hexdigest() + + # Return the hashed string + return hashed_string + +# SINGLE SUBTITLE +def add_subtitle(input_video_path: str, input_subtitle_path: str, output_video_path: str, subtitle_language: str = 'ita', prefix: str = "single_sub") -> str: + """ + Convert a video with a single subtitle. + + Args: + input_video_path (str): Path to the input video file. + input_subtitle_path (str): Path to the input subtitle file. + output_video_path (str): Path to save the output video file. + subtitle_language (str, optional): Language of the subtitle. Defaults to 'ita'. + prefix (str, optional): Prefix to add at the beginning of the output filename. Defaults to "subtitled". + + Returns: + output_video_path (str): Path to the saved output video file. + """ + + # Check if input_video_path and input_subtitle_path exist + if not os.path.exists(input_video_path): + raise FileNotFoundError(f"Input video file '{input_video_path}' not found.") + + if not os.path.exists(input_subtitle_path): + raise FileNotFoundError(f"Input subtitle file '{input_subtitle_path}' not found.") + + # Set up the output file name by modifying the video file name + output_filename = os.path.splitext(os.path.basename(input_video_path))[0] + output_file_name = f"{prefix}_{output_filename}.mp4" + output_video_path = os.path.join(os.path.dirname(output_video_path), output_file_name) + + # Input settings + input_video = ffmpeg.input(input_video_path) + input_subtitle = ffmpeg.input(input_subtitle_path) + + # Output settings + output_args = { + 'c:s': 'mov_text', + 'c:v': 'copy', + 'c:a': 'copy', + 'metadata:s:s:0': 'language=' + subtitle_language, + } + + # Combine inputs, map subtitle stream, and output + ffmpeg.output( + input_video, + input_subtitle, + output_video_path, + **output_args + ).global_args( + '-map', '0:v', + '-map', '0:a', + '-map', '1:s' + ).run() + + # Return + return output_video_path + +# SEGMENTS +def concatenate_and_save(file_list_path: str, output_filename: str, video_decoding: str = None, audio_decoding: str = None, prefix: str = "segments", output_directory: str = None) -> str: + """ + Concatenate input files and save the output with specified decoding parameters. + + Parameters: + - file_list_path (str): Path to the file list containing the segments. + - output_filename (str): Output filename for the concatenated video. + - video_decoding (str): Video decoding parameter (optional). + - audio_decoding (str): Audio decoding parameter (optional). + - prefix (str): Prefix to add at the end of output file name (default is "segments"). + - output_directory (str): Directory to save the output file. If not provided, defaults to the current directory. + + Returns: + - output_file_path (str): Path to the saved output file. + """ + + try: + # Input and output arguments + input_args = { + 'format': 'concat', + 'safe': 0 + } + + output_args = { + 'c': 'copy', + 'loglevel': 'error', + 'y': None + } + + # Add encoding parameter for video and audio + global_args = [] + if video_decoding: + global_args.extend(['-c:v', video_decoding]) + if audio_decoding: + global_args.extend(['-c:a', audio_decoding]) + + # Set up the output file name by modifying the video file name + output_file_name = os.path.splitext(output_filename)[0] + f"_{prefix}.mp4" + + # Determine output directory + if output_directory: + output_file_path = os.path.join(output_directory, output_file_name) + else: + output_file_path = output_file_name + + # Concatenate input files and output + output = ( + ffmpeg.input(file_list_path, **input_args) + .output(output_file_path, **output_args) + .global_args(*global_args) + ) + + # Execute the process + process = output.run() + + except ffmpeg.Error as ffmpeg_error: + + logging.error(f"Error saving MP4: {ffmpeg_error.stdout}") + return "" + + # Remove the temporary file list and folder and completely remove tmp folder + logging.info("Cleanup...") + os.remove(file_list_path) + shutil.rmtree("tmp", ignore_errors=True) + + # Return + return output_file_path + +# AUDIOS +def join_audios(video_path: str, audio_tracks: list[dict[str, str]], prefix: str = "merged") -> str: + """ + Join video with multiple audio tracks and sync them if there are matching segments. + + Parameters: + - video_path (str): Path to the video file. + - audio_tracks (List[Dict[str, str]]): A list of dictionaries, where each dictionary contains 'audio_path'. + - prefix (str, optional): Prefix to add at the beginning of the output filename. Defaults to "merged". + + Returns: + - out_path (str): Path to the saved output video file. + """ + + try: + + # Check if video_path exists + if not os.path.exists(video_path): + raise FileNotFoundError(f"Video file '{video_path}' not found.") + + # Create input streams for video and audio using ffmpeg's. + video_stream = ffmpeg.input(video_path) + + # Create a list to store audio streams and map arguments + audio_streams = [] + map_arguments = [] + + # Iterate through audio tracks + for i, audio_track in enumerate(audio_tracks): + audio_path = audio_track.get('path', '') + + # Check if audio_path exists + if audio_path: + if not os.path.exists(audio_path): + logging.warning(f"Audio file '{audio_path}' not found.") + continue + + audio_stream = ffmpeg.input(audio_path) + audio_streams.append(audio_stream) + map_arguments.extend(['-map', f'{i + 1}:a:0']) + + # Set up a process to combine the video and audio streams and create an output file with .mp4 extension. + output_file_name = f"{prefix}_{os.path.splitext(os.path.basename(video_path))[0]}.mp4" + out_path = os.path.join(os.path.dirname(video_path), output_file_name) + + # Output arguments + output_args = { + 'vcodec': 'copy', + 'acodec': 'copy', + 'loglevel': 'error' + } + + # Combine inputs, map audio streams, and set output + process = ( + ffmpeg.output( + video_stream, + *audio_streams, + out_path, + **output_args + ) + .global_args( + '-map', '0:v:0', + *map_arguments, + '-shortest', + '-strict', 'experimental', + ) + .run(overwrite_output=True) + ) + + logging.info("[M3U8_Downloader] Merge completed successfully.") + + # Return + return out_path + + except ffmpeg.Error as ffmpeg_error: + logging.error("[M3U8_Downloader] Ffmpeg error: %s", ffmpeg_error) + return "" + +# SUBTITLES +def transcode_with_subtitles(video: str, subtitles_list: list[dict[str, str]], output_file: str, prefix: str = "transcoded") -> str: + + """ + Transcode a video with subtitles. + + Args: + - video (str): Path to the input video file. + - subtitles_list (list[dict[str, str]]): List of dictionaries containing subtitles information. + - output_file (str): Path to the output transcoded video file. + - prefix (str): Prefix to add to the output file name. Default is "transcoded". + + Returns: + - str: Path to the transcoded video file. + """ + + try: + + # Check if the input video file exists + if not os.path.exists(video): + raise FileNotFoundError(f"Video file '{video}' not found.") + + # Get input video from video path + input_ffmpeg = ffmpeg.input(video) + input_video = input_ffmpeg['v'] + input_audio = input_ffmpeg['a'] + + # List with subtitles path and metadata + input_subtitles = [] + metadata = {} + + # Iterate through subtitle tracks + for idx, sub_dict in enumerate(subtitles_list): + # Get path and name of subtitles + sub_file = sub_dict.get('path') + title = sub_dict.get('name') + + # Check if the subtitle file exists + if not os.path.exists(sub_file): + raise FileNotFoundError(f"Subtitle file '{sub_file}' not found.") + + # Append ffmpeg input to list + input_ffmpeg_sub = ffmpeg.input(sub_file) + input_subtitles.append(input_ffmpeg_sub['s']) + + # Add metadata for title + metadata[f'metadata:s:s:{idx}'] = f"title={title}" + + # Check if the input video has an audio stream + logging.info(f"There is audio: {has_audio_stream(video)}") + + # Set up the output file name by adding the prefix + output_filename = f"{prefix}_{os.path.splitext(os.path.basename(video))[0]}.mkv" + output_file = os.path.join(os.path.dirname(output_file), output_filename) + + # Configure ffmpeg output + output_ffmpeg = ffmpeg.output( + input_video, + *(input_audio,) if has_audio_stream(video) else (), # If there is no audio stream + *input_subtitles, + output_file, + vcodec='copy', + acodec='copy' if has_audio_stream(video) else (), # If there is no audio stream + **metadata, + loglevel='error' + ) + + # Overwrite output file if exists + output_ffmpeg = ffmpeg.overwrite_output(output_ffmpeg) + + # Run ffmpeg command + ffmpeg.run(output_ffmpeg, overwrite_output=True) + + # Rename video from mkv -> mp4 + output_filename_mp4 = output_file.replace("mkv", "mp4") + os.rename(output_file, output_filename_mp4) + + return output_filename_mp4 + + except ffmpeg.Error as ffmpeg_error: + print(f"Error: {ffmpeg_error}") + return "" \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/installer.py b/Src/Lib/FFmpeg/util/installer.py new file mode 100644 index 0000000..07a8f03 --- /dev/null +++ b/Src/Lib/FFmpeg/util/installer.py @@ -0,0 +1,143 @@ +# 24.01.2023 + +# Class +from Src.Util.console import console + +# Import +import subprocess +import os +import requests +import zipfile +import sys +import ctypes + + +# [ func ] +def isAdmin() -> (bool): + """ + Check if the current user has administrative privileges. + + Returns: + bool: True if the user is an administrator, False otherwise. + """ + + try: + is_admin = (os.getuid() == 0) + + except AttributeError: + is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 + + return is_admin + +def get_version(): + """ + Get the version of FFmpeg installed on the system. + + This function runs the 'ffmpeg -version' command to retrieve version information + about the installed FFmpeg binary. + """ + + try: + # Run the FFmpeg command to get version information + output = subprocess.check_output(['ffmpeg', '-version'], stderr=subprocess.STDOUT, universal_newlines=True) + + # Extract version information from the output + version_lines = [line for line in output.split('\n') if line.startswith('ffmpeg version')] + + if version_lines: + + # Extract version number from the version line + version = version_lines[0].split(' ')[2] + console.print(f"[blue]FFmpeg version: [red]{version}") + + except subprocess.CalledProcessError as e: + # If there's an error executing the FFmpeg command + print("Error executing FFmpeg command:", e.output.strip()) + raise e + +def download_ffmpeg(): + """ + Download FFmpeg binary for Windows and add it to the system PATH. + + This function downloads the FFmpeg binary zip file from the specified URL, + extracts it to a directory named 'ffmpeg', and adds the 'bin' directory of + FFmpeg to the system PATH so that it can be accessed from the command line. + """ + + # SInizializate start variable + ffmpeg_url = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-git-full.7z" + ffmpeg_dir = "ffmpeg" + + print("[yellow]Downloading FFmpeg...[/yellow]") + + try: + response = requests.get(ffmpeg_url) + + # Create the directory to extract FFmpeg if it doesn't exist + os.makedirs(ffmpeg_dir, exist_ok=True) + + # Save the zip file + zip_file_path = os.path.join(ffmpeg_dir, "ffmpeg.zip") + with open(zip_file_path, "wb") as zip_file: + zip_file.write(response.content) + + # Extract the zip file + with zipfile.ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(ffmpeg_dir) + + # Add the FFmpeg directory to the system PATH + ffmpeg_bin_dir = os.path.join(os.getcwd(), ffmpeg_dir, "bin") + os.environ["PATH"] += os.pathsep + ffmpeg_bin_dir + + # Remove the downloaded zip file + os.remove(zip_file_path) + + except requests.RequestException as e: + # If there's an issue with downloading FFmpeg + print(f"Failed to download FFmpeg: {e}") + raise e + + except zipfile.BadZipFile as e: + # If the downloaded file is not a valid zip file + print(f"Failed to extract FFmpeg zip file: {e}") + raise e + +def check_ffmpeg(): + """ + Check if FFmpeg is installed and available on the system PATH. + + This function checks if FFmpeg is installed and available on the system PATH. + If FFmpeg is found, it prints its version. If not found, it attempts to download + FFmpeg and add it to the system PATH. + """ + + console.print("[green]Checking FFmpeg...") + + try: + # Try running the FFmpeg command to check if it exists + subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + console.print("[blue]FFmpeg is installed. \n") + + # Get and print FFmpeg version + #get_version() + + except subprocess.CalledProcessError: + try: + # If FFmpeg is not found, attempt to download and add it to the PATH + console.print("[cyan]FFmpeg is not found in the PATH. Downloading and adding to the PATH...[/cyan]") + + # Check if user has admin privileges + if not isAdmin(): + console.log("[red]You need to be admin to proceed!") + sys.exit(0) + + # Download FFmpeg and add it to the PATH + download_ffmpeg() + sys.exit(0) + + except Exception as e: + + # If unable to download or add FFmpeg to the PATH + console.print("[red]Unable to download or add FFmpeg to the PATH.[/red]") + console.print(f"Error: {e}") + sys.exit(0) diff --git a/Src/Lib/FFmpeg/util/math_calc.py b/Src/Lib/FFmpeg/util/math_calc.py new file mode 100644 index 0000000..1254275 --- /dev/null +++ b/Src/Lib/FFmpeg/util/math_calc.py @@ -0,0 +1,39 @@ +# 29.02.24 + +from Src.Util.os import format_size + +class TSFileSizeCalculator: + def __init__(self): + """ + Initialize the TSFileSizeCalculator object. + + Args: + num_segments (int): The number of segments. + """ + self.ts_file_sizes = [] + + def add_ts_file_size(self, size: int): + """ + Add a file size to the list of file sizes. + + Args: + size (float): The size of the ts file to be added. + """ + self.ts_file_sizes.append(size) + + def calculate_total_size(self): + """ + Calculate the total size of the files. + + Returns: + float: The mean size of the files in a human-readable format. + """ + + if len(self.ts_file_sizes) == 0: + return 0 + + total_size = sum(self.ts_file_sizes) + mean_size = total_size / len(self.ts_file_sizes) + + # Return format mean + return format_size(mean_size) \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/parser.py b/Src/Lib/FFmpeg/util/parser.py new file mode 100644 index 0000000..a69c04d --- /dev/null +++ b/Src/Lib/FFmpeg/util/parser.py @@ -0,0 +1,288 @@ +# 29.04.25 + +# Class import +from Src.Util.headers import get_headers + +# Import +from m3u8 import M3U8 +import logging +import requests + +class M3U8_Parser: + def __init__(self, DOWNLOAD_SPECIFIC_SUBTITLE = None): + """ + Initializes M3U8_Parser with empty lists for segments, playlists, keys, and subtitles. + """ + + self.segments = [] + self.video_playlist = [] + self.keys = {} + self.subtitle_playlist = [] # No vvt ma url a vvt + self.subtitle = [] # Url a vvt + self.audio_ts = [] + self.DOWNLOAD_SPECIFIC_SUBTITLE = DOWNLOAD_SPECIFIC_SUBTITLE + + def parse_data(self, m3u8_content: str) -> (None): + """ + Extracts all information present in the provided M3U8 content. + + Args: + - m3u8_content (str): The content of the M3U8 file. + """ + + try: + + # Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles + m3u8_obj = M3U8(m3u8_content) + + # Collect video info with url, resolution and codecs + for playlist in m3u8_obj.playlists: + self.video_playlist.append({ + "uri": playlist.uri, + "width": playlist.stream_info.resolution, + "codecs": playlist.stream_info.codecs + }) + + # Collect info of encryption if present, method, uri and iv + for key in m3u8_obj.keys: + if key is not None: + self.keys = ({ + "method": key.method, + "uri": key.uri, + "iv": key.iv + }) + + # Collect info of subtitles, type, name, language and uri + # for audio and subtitles + for media in m3u8_obj.media: + if media.type == "SUBTITLES": + self.subtitle_playlist.append({ + "type": media.type, + "name": media.name, + "default": media.default, + "language": media.language, + "uri": media.uri + }) + + + if media.type == "AUDIO": + self.audio_ts.append({ + "type": media.type, + "name": media.name, + "default": media.default, + "language": media.language, + "uri": media.uri + }) + + # Collect info about url of subtitles or segmenets + # m3u8 playlist + # m3u8 index + for segment in m3u8_obj.segments: + + # Collect uri of request to vtt + if "vtt" not in segment.uri: + self.segments.append(segment.uri) + + # Collect info of subtitle + else: + self.subtitle.append(segment.uri) + + except Exception as e: + logging.error(f"[M3U8_Parser] Error parsing M3U8 content: {e}") + + def get_resolution(self, uri: str) -> (int): + """ + Gets the resolution from the provided URI. + + Args: + - uri (str): The URI to extract resolution from. + + Returns: + - int: The resolution if found, otherwise 0. + """ + + if '1080' in uri: + return 1080 + elif '720' in uri: + return 720 + elif '480' in uri: + return 480 + else: + return 0 + + def get_best_quality(self) -> (dict): + """ + Returns the URI of the M3U8 playlist with the best quality. + + Returns: + - str: The URI of the M3U8 playlist with the best quality and decoding if present, otherwise return None + """ + + if self.video_playlist: + + try: + + # Sort the list of video playlist items based on the 'width' attribute in descending order. + # The 'width' attribute is extracted using the lambda function as the sorting key. + sorted_uris = sorted(self.video_playlist, key=lambda x: x['width'], reverse=True) + + # And get the first with best resolution + return sorted_uris[0] + + except: + logging.error("[M3U8_Parser] Error: Can't find M3U8 resolution by width...") + logging.info("[M3U8_Parser] Try searching in URI") + + # Sort the list of video playlist items based on the 'width' attribute if present, + # otherwise, use the resolution obtained from the 'uri' attribute as a fallback. + # Sorting is done in descending order (reverse=True). + sorted_uris = sorted(self.video_playlist, key=lambda x: x.get('width') if x.get('width') is not None else self.get_resolution(x.get('uri')), reverse=True) + + # And get the first with best resolution + return sorted_uris[0] + else: + + logging.info("[M3U8_Parser] No video playlists found.") + return None + + def get_subtitles(self): + """ + Download all subtitles if present. + + Return: + - list: list of subtitle with [name_language, uri] or None if there is no subtitle + """ + + # Create full path where store data of subtitle + logging.info("Download subtitle ...") + + if self.subtitle_playlist: + output = [] + + # For all subtitle find + for sub_info in self.subtitle_playlist: + + # Get language name + name_language = sub_info.get("language") + logging.info(f"[M3U8_Parser] Find subtitle: {name_language}") + + # Check if there is custom subtitles to download + if len(self.DOWNLOAD_SPECIFIC_SUBTITLE) > 0: + + # Check if language in list + if name_language not in self.DOWNLOAD_SPECIFIC_SUBTITLE: + continue + + # Make request to m3u8 subtitle to extract vtt + logging.info(f"[M3U8_Parser] Download subtitle: {name_language}") + req_sub_content = requests.get(sub_info.get("uri"), headers={'user-agent': get_headers()}) + + try: + + # Try extract vtt url + sub_parse = M3U8_Parser() + sub_parse.parse_data(req_sub_content.text) + url_subititle = sub_parse.subtitle[0] + + # Add name and url to output list + output.append({ + 'name': sub_info.get('name'), + 'language': name_language, + 'uri': url_subititle + }) + + except Exception as e: + logging.error(f"[M3U8_Parser] Cant donwload: {name_language}, error: {e}") + + # Return + return output + + else: + logging.info("[M3U8_Parser] No subtitle find") + return None + + def get_track_audios(self) -> list: + """ + Return a list of available audio files with dictionaries {'language': xx, 'uri: xx} + + Returns: + list: A list of dictionaries containing language and URI information for audio tracks, or None if no audio tracks are found. + """ + + logging.info(f"[M3U8_Parser] Finding {len(self.audio_ts)} playlist(s) with audio.") + + if self.audio_ts: + logging.info("[M3U8_Parser] Getting list of available audio names") + list_output = [] + + # For all languages present in m3u8 + for obj_audio in self.audio_ts: + + # Add language and URI + list_output.append({ + 'language': obj_audio.get('language'), + 'uri': obj_audio.get('uri') + }) + + # Return + return list_output + + else: + logging.info("[M3U8_Parser] No audio tracks found") + return None + + def get_default_subtitle(self): + """ + Retrieves the default subtitle information from the subtitle playlist. + + Returns: + dict: A dictionary containing the name and URI of the default subtitle, or None if no default subtitle is found. + """ + + dict_default_sub = None + + # Check if there are subtitles in the playlist + if self.subtitle_playlist: + + # Iterate through each subtitle in the playlist + for sub_info in self.subtitle_playlist: + + # Check if the subtitle is marked as default + is_default = sub_info.get("default") + + if is_default == "YES": + dict_default_sub = { + 'name': sub_info.get('name'), + 'uri': sub_info.get('uri'), + } + + # Return the default subtitle dictionary + return dict_default_sub + + def get_default_track_audio(self): + """ + Retrieves the default audio track information from the audio_ts list. + + Returns: + dict: A dictionary containing the name and URI of the default audio track, or None if no default audio track is found. + """ + + dict_default_audio = None + + # Check if there are audio tracks in the list + if self.audio_ts: + + # Iterate through each audio track object in the list + for obj_audio in self.audio_ts: + + # Check if the audio track is marked as default + is_default = obj_audio.get("default") + + if is_default == "YES": + dict_default_audio = { + 'name': obj_audio.get('name'), + 'uri': obj_audio.get('uri'), + } + + # Return the default audio track dictionary + return dict_default_audio \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/url_fix.py b/Src/Lib/FFmpeg/util/url_fix.py new file mode 100644 index 0000000..d5febc7 --- /dev/null +++ b/Src/Lib/FFmpeg/util/url_fix.py @@ -0,0 +1,48 @@ +# 29.03.24 + +# Import +import logging +import sys +from urllib.parse import urlparse, urljoin + +class M3U8_UrlFix: + def __init__(self) -> None: + + # Initialize the class with an empty playlist URL + self.url_playlist: str = "" + + def set_playlist(self, url: str) -> None: + """ + Set the M3U8 playlist URL. + + Parameters: + - url (str): The M3U8 playlist URL. + """ + self.url_playlist = url + + def generate_full_url(self, url_resource: str) -> str: + """ + Generate a full URL for a given resource using the base URL from the playlist. + + Parameters: + - url_resource (str): The relative URL of the resource within the playlist. + + Returns: + - str: The full URL for the specified resource. + """ + + # Check if m3u8 url playlist is present + if self.url_playlist == None: + logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present") + sys.exit(0) + + # Parse the playlist URL to extract the base URL components + parsed_playlist_url = urlparse(self.url_playlist) + + # Construct the base URL using the scheme, netloc, and path from the playlist URL + base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}" + + # Join the base URL with the relative resource URL to get the full URL + full_url = urljoin(base_url, url_resource) + + return full_url \ No newline at end of file