From 762970c0ed00427d1cbf163930786dfda53f83b8 Mon Sep 17 00:00:00 2001 From: Ghost <62809003+Ghost6446@users.noreply.github.com> Date: Fri, 24 May 2024 12:02:13 +0200 Subject: [PATCH] Add download speed --- README.md | 18 +- Src/Api/Animeunity/Core/Class/EpisodeType.py | 6 +- Src/Api/Animeunity/Core/Class/SearchType.py | 6 +- Src/Api/Animeunity/Core/Vix_player/player.py | 4 +- Src/Api/Animeunity/anime.py | 8 +- Src/Api/Animeunity/site.py | 19 +- .../Core/Class/EpisodeType.py | 4 +- .../Core/Class/SearchType.py | 4 +- .../Core/Vix_player/player.py | 2 +- Src/Api/Streamingcommunity/film.py | 5 +- Src/Api/Streamingcommunity/series.py | 5 +- Src/Api/Streamingcommunity/site.py | 25 +- Src/Lib/Hls/M3U8/__init__.py | 6 + Src/Lib/Hls/M3U8/decryption.py | 127 ++++ Src/Lib/Hls/M3U8/lib_parser/__init__.py | 38 ++ Src/Lib/Hls/M3U8/lib_parser/_util.py | 28 + Src/Lib/Hls/M3U8/lib_parser/model.py | 359 ++++++++++++ Src/Lib/Hls/M3U8/lib_parser/parser.py | 338 +++++++++++ Src/Lib/Hls/M3U8/lib_parser/protocol.py | 17 + Src/Lib/Hls/M3U8/math_calc.py | 41 ++ Src/Lib/Hls/M3U8/parser.py | 547 ++++++++++++++++++ Src/Lib/Hls/M3U8/url_fix.py | 54 ++ Src/Lib/Hls/downloader.py | 5 +- Src/Lib/Hls/segments.py | 44 +- Src/Lib/Instagram/core/_util.py | 27 - Src/Lib/Instagram/core/get_profile.py | 40 -- Src/Lib/Instagram/model/__init__.py | 3 - Src/Lib/Instagram/model/profile.py | 138 ----- Src/Lib/Request/my_requests.py | 5 +- Src/Util/color.py | 20 + config.json | 17 +- 31 files changed, 1666 insertions(+), 294 deletions(-) create mode 100644 Src/Lib/Hls/M3U8/__init__.py create mode 100644 Src/Lib/Hls/M3U8/decryption.py create mode 100644 Src/Lib/Hls/M3U8/lib_parser/__init__.py create mode 100644 Src/Lib/Hls/M3U8/lib_parser/_util.py create mode 100644 Src/Lib/Hls/M3U8/lib_parser/model.py create mode 100644 Src/Lib/Hls/M3U8/lib_parser/parser.py create mode 100644 Src/Lib/Hls/M3U8/lib_parser/protocol.py create mode 100644 Src/Lib/Hls/M3U8/math_calc.py create mode 100644 Src/Lib/Hls/M3U8/parser.py create mode 100644 Src/Lib/Hls/M3U8/url_fix.py delete mode 100644 Src/Lib/Instagram/core/_util.py delete mode 100644 Src/Lib/Instagram/core/get_profile.py delete mode 100644 Src/Lib/Instagram/model/__init__.py delete mode 100644 Src/Lib/Instagram/model/profile.py create mode 100644 Src/Util/color.py diff --git a/README.md b/README.md index a3cf8b0..fd97d67 100644 --- a/README.md +++ b/README.md @@ -54,13 +54,10 @@ You can change some behaviors by tweaking the configuration file. ### Options (DEFAULT) -* get_moment_title: Whether to fetch the title of the moment or not. - - Default Value: false - * root_path: Path where the script will add movies and TV series folders (see [Path Examples](#Path-examples)). - Default Value: media/streamingcommunity -* not_close: Whether to keep the application running after completion or not. +* not_close: This option, when activated, prevents the script from closing after its initial execution, allowing it to restart automatically after completing the first run. - Default Value: false * map_episode_name: Mapping to choose the name of all episodes of TV Shows (see [Episode Name Usage](#Episode-name-usage)). @@ -73,20 +70,25 @@ You can change some behaviors by tweaking the configuration file. * tdqm_workers: The number of workers that will cooperate to download .ts files. **A high value may slow down your PC** - Default Value: 20 -* enable_time_quit: Whether to enable quitting the download after a certain time period. - - Default Value: false - * tqdm_show_progress: Whether to show progress during downloads or not. - Default Value: true -* cleanup_tmp_folder: Whether to clean up temporary folders after processing or not. +* save_m3u8_content: Enabling this feature saves various playlists and indexes in the temporary folder during the download process, ensuring all necessary files are retained for playback or further processing. - Default Value: true + * fake_proxy: Speed up download for streaming film and series. **Dont work for anime, need to set to FALSE** - Default Value: true +* create_report: When enabled, this option saves the name of the series or movie being downloaded along with the date and file size in a CSV file, providing a log of downloaded content. + - Default Value: false + + ### Options (M3U8_OPTIONS) +* cleanup_tmp_folder: Upon final conversion, this option ensures the removal of all unformatted audio, video tracks, and subtitles from the temporary folder, thereby maintaining cleanliness and efficiency. + - Default Value: true + * specific_list_audio: A list of specific audio languages to download. - Example Value: ['ara', 'baq', 'cat', 'chi', 'cze', 'dan', 'dut', 'eng', 'fil', 'fin', 'forced-ita', 'fre', 'ger', 'glg', 'gre', 'heb', 'hin', 'hun', 'ind', 'ita', 'jpn', 'kan', 'kor', 'mal', 'may', 'nob', 'nor', 'pol', 'por', 'rum', 'rus', 'spa', 'swe', 'tam', 'tel', 'tha', 'tur', 'ukr', 'vie'] diff --git a/Src/Api/Animeunity/Core/Class/EpisodeType.py b/Src/Api/Animeunity/Core/Class/EpisodeType.py index 90ffc2a..14b9694 100644 --- a/Src/Api/Animeunity/Core/Class/EpisodeType.py +++ b/Src/Api/Animeunity/Core/Class/EpisodeType.py @@ -8,8 +8,8 @@ from Src.Util._jsonConfig import config_manager # Config -SC_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -SC_DOMAIN_NOW = config_manager.get('SITE', 'streaming_domain') +AU_SITE_NAME = "animeunity" +AU_DOMAIN_NOW = config_manager.get('SITE', AU_SITE_NAME) @@ -23,7 +23,7 @@ class Image: self.created_at: str = image_data.get('created_at', '') self.updated_at: str = image_data.get('updated_at', '') self.original_url_field: str = image_data.get('original_url_field', '') - self.url: str = f"https://cdn.{SC_SITE_NAME}.{SC_DOMAIN_NOW}/images/{self.filename}" + self.url: str = f"https://cdn.{AU_SITE_NAME}.{AU_DOMAIN_NOW}/images/{self.filename}" def __str__(self): return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')" diff --git a/Src/Api/Animeunity/Core/Class/SearchType.py b/Src/Api/Animeunity/Core/Class/SearchType.py index ff7e7ec..6c492bf 100644 --- a/Src/Api/Animeunity/Core/Class/SearchType.py +++ b/Src/Api/Animeunity/Core/Class/SearchType.py @@ -8,8 +8,8 @@ from Src.Util._jsonConfig import config_manager # Config -SC_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -SC_DOMAIN_NOW = config_manager.get('SITE', 'streaming_domain') +AU_SITE_NAME = "animeunity" +AU_DOMAIN_NOW = config_manager.get('SITE', AU_SITE_NAME) @@ -20,7 +20,7 @@ class Image: self.filename: str = data.get('filename') self.type: str = data.get('type') self.original_url_field: str = data.get('original_url_field') - self.url: str = f"https://cdn.{SC_SITE_NAME}.{SC_DOMAIN_NOW}/images/{self.filename}" + self.url: str = f"https://cdn.{AU_SITE_NAME}.{AU_DOMAIN_NOW}/images/{self.filename}" def __str__(self): return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')" diff --git a/Src/Api/Animeunity/Core/Vix_player/player.py b/Src/Api/Animeunity/Core/Vix_player/player.py index 62604d2..aefafe4 100644 --- a/Src/Api/Animeunity/Core/Vix_player/player.py +++ b/Src/Api/Animeunity/Core/Vix_player/player.py @@ -30,8 +30,8 @@ class VideoSource: 'user-agent': get_headers() } self.is_series = False - self.base_name = config_manager.get('SITE', 'anime_site_name') - self.domain = config_manager.get('SITE', 'anime_domain') + self.base_name = "animeunity" + self.domain = config_manager.get('SITE', self.base_name) def setup(self, media_id: int = None, series_name: str = None): """ diff --git a/Src/Api/Animeunity/anime.py b/Src/Api/Animeunity/anime.py index a6f1519..d430004 100644 --- a/Src/Api/Animeunity/anime.py +++ b/Src/Api/Animeunity/anime.py @@ -18,7 +18,9 @@ from .Core.Util import manage_selection # Config ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -ANIME_FOLDER = config_manager.get('SITE', 'anime_site_name') +ANIME_FOLDER = "animeunity" +SERIES_FOLDER= "Serie" +MOVIE_FOLDER = "Movie" # Variable @@ -49,9 +51,9 @@ def download_episode(index_select: int): # Create output path out_path = None if video_source.is_series: - out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, "Serie", video_source.series_name, f"{index_select+1}.mp4") + out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, SERIES_FOLDER, video_source.series_name, f"{index_select+1}.mp4") else: - out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, "Movie", video_source.series_name, f"{index_select}.mp4") + out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, MOVIE_FOLDER, video_source.series_name, f"{index_select}.mp4") # Crete downloader obj_download = Downloader( diff --git a/Src/Api/Animeunity/site.py b/Src/Api/Animeunity/site.py index 3edf557..e374456 100644 --- a/Src/Api/Animeunity/site.py +++ b/Src/Api/Animeunity/site.py @@ -21,11 +21,8 @@ from .Core.Class.SearchType import MediaManager, MediaItem # Config -GET_TITLES_OF_MOMENT = config_manager.get_bool('DEFAULT', 'get_moment_title') -SC_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -SC_DOMAIN_NOW = config_manager.get('SITE', 'streaming_domain') -AU_SITE_NAME = config_manager.get('SITE', 'anime_site_name') -AU_DOMAIN_NOW = config_manager.get('SITE', 'anime_domain') +AU_SITE_NAME = "animeunity" +AU_DOMAIN_NOW = config_manager.get('SITE', AU_SITE_NAME) # Variable @@ -88,16 +85,17 @@ def update_domain(): response.status_code # If the current site is inaccessible, try to obtain a new domain - except Exception as e: + except: # Get new domain + console.print("[red]\nExtract new DOMAIN from TLD list.") new_domain = extract_domain(method="light") console.log(f"[cyan]Extract new domain: [red]{new_domain}") if new_domain: # Update configuration with the new domain - config_manager.set_key('SITE', 'anime_domain', new_domain) + config_manager.set_key('SITE', AU_SITE_NAME, new_domain) config_manager.write_config() else: @@ -144,9 +142,8 @@ def title_search(title: str) -> int: update_domain() # Get token and session value from configuration - url_site_name = config_manager.get('SITE', 'anime_site_name') - url_domain = config_manager.get('SITE', 'anime_domain') - data = get_token(url_site_name, url_domain) + url_domain = config_manager.get('SITE', AU_SITE_NAME) + data = get_token(AU_SITE_NAME, url_domain) # Prepare cookies to be used in the request cookies = { @@ -167,7 +164,7 @@ def title_search(title: str) -> int: } # Send a POST request to the API endpoint for live search - response = requests.post(f'https://www.{url_site_name}.{url_domain}/livesearch', cookies=cookies, headers=headers, json_data=json_data) + response = requests.post(f'https://www.{AU_SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json_data=json_data) # Process each record returned in the response for record in response.json()['records']: diff --git a/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py b/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py index 90ffc2a..468e98b 100644 --- a/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py +++ b/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py @@ -8,8 +8,8 @@ from Src.Util._jsonConfig import config_manager # Config -SC_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -SC_DOMAIN_NOW = config_manager.get('SITE', 'streaming_domain') +SC_SITE_NAME = "streamingcommunity" +SC_DOMAIN_NOW = config_manager.get('SITE', SC_SITE_NAME) diff --git a/Src/Api/Streamingcommunity/Core/Class/SearchType.py b/Src/Api/Streamingcommunity/Core/Class/SearchType.py index ff7e7ec..115677d 100644 --- a/Src/Api/Streamingcommunity/Core/Class/SearchType.py +++ b/Src/Api/Streamingcommunity/Core/Class/SearchType.py @@ -8,8 +8,8 @@ from Src.Util._jsonConfig import config_manager # Config -SC_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -SC_DOMAIN_NOW = config_manager.get('SITE', 'streaming_domain') +SC_SITE_NAME = "streamingcommunity" +SC_DOMAIN_NOW = config_manager.get('SITE', SC_SITE_NAME) diff --git a/Src/Api/Streamingcommunity/Core/Vix_player/player.py b/Src/Api/Streamingcommunity/Core/Vix_player/player.py index ff33ca8..bab894a 100644 --- a/Src/Api/Streamingcommunity/Core/Vix_player/player.py +++ b/Src/Api/Streamingcommunity/Core/Vix_player/player.py @@ -32,7 +32,7 @@ class VideoSource: 'user-agent': get_headers() } self.is_series = False - self.base_name = config_manager.get('SITE', 'streaming_site_name') + self.base_name = "streamingcommunity" def setup(self, version: str = None, domain: str = None, media_id: int = None, series_name: str = None): """ diff --git a/Src/Api/Streamingcommunity/film.py b/Src/Api/Streamingcommunity/film.py index 5fd73ed..44b653a 100644 --- a/Src/Api/Streamingcommunity/film.py +++ b/Src/Api/Streamingcommunity/film.py @@ -21,7 +21,8 @@ from .Core.Vix_player.player import VideoSource # Config ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -STREAMING_FOLDER = config_manager.get('SITE', 'streaming_site_name') +STREAMING_FOLDER = config_manager.get('SITE', 'streamingcommunity') +MOVIE_FOLDER = "Movie" # Variable @@ -65,5 +66,5 @@ def download_film(id_film: str, title_name: str, domain: str): # Download the film using the m3u8 playlist, key, and output filename Downloader( m3u8_playlist = master_playlist, - output_filename = os.path.join(ROOT_PATH, STREAMING_FOLDER, "Movie", title_name, mp4_format) + output_filename = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name, mp4_format) ).start() \ No newline at end of file diff --git a/Src/Api/Streamingcommunity/series.py b/Src/Api/Streamingcommunity/series.py index fdd568d..2a6c542 100644 --- a/Src/Api/Streamingcommunity/series.py +++ b/Src/Api/Streamingcommunity/series.py @@ -22,7 +22,8 @@ from .Core.Util import manage_selection, map_episode_title # Config ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -STREAMING_FOLDER = config_manager.get('SITE', 'streaming_site_name') +STREAMING_FOLDER = "streamingcommunity" +SERIES_FOLDER = "Serie" # Variable @@ -87,7 +88,7 @@ def donwload_video(tv_name: str, index_season_selected: int, index_episode_selec # Define filename and path for the downloaded video mp4_name = remove_special_characters(f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4") - mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, "Serie", tv_name, f"S{index_season_selected}") + mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, SERIES_FOLDER, tv_name, f"S{index_season_selected}") os.makedirs(mp4_path, exist_ok=True) if not can_create_file(mp4_name): diff --git a/Src/Api/Streamingcommunity/site.py b/Src/Api/Streamingcommunity/site.py index e00d667..3537d49 100644 --- a/Src/Api/Streamingcommunity/site.py +++ b/Src/Api/Streamingcommunity/site.py @@ -26,11 +26,8 @@ from .Core.Class.SearchType import MediaManager, MediaItem # Config -GET_TITLES_OF_MOMENT = config_manager.get_bool('DEFAULT', 'get_moment_title') -SC_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -SC_DOMAIN_NOW = config_manager.get('SITE', 'streaming_domain') -AU_SITE_NAME = config_manager.get('SITE', 'anime_site_name') -AU_DOMAIN_NOW = config_manager.get('SITE', 'anime_domain') +SC_SITE_NAME = "streamingcommunity" +SC_DOMAIN_NOW = config_manager.get('SITE', SC_SITE_NAME) # Variable @@ -98,7 +95,7 @@ def get_version_and_domain(new_domain = None) -> Tuple[str, str]: # Get the current domain from the configuration if new_domain is None: - config_domain = config_manager.get('SITE', 'streaming_domain') + config_domain = config_manager.get('SITE', SC_SITE_NAME) else: config_domain = new_domain @@ -113,26 +110,16 @@ def get_version_and_domain(new_domain = None) -> Tuple[str, str]: # Extract version from the response version, list_title_top_10 = get_version(response.text) - # Get titles in the moment - if GET_TITLES_OF_MOMENT: - console.print("[cyan]Scrape information (Top 10 titoli di oggi) [white]...") - table_top_10 = TVShowManager() - table_top_10.set_slice_end(10) - table_top_10.add_column({"Index": {'color': 'red'}, "Name": {'color': 'magenta'}, "Type": {'color': 'yellow'}}) - - for i, obj_title in enumerate(list_title_top_10): - table_top_10.add_tv_show({'Index': str(i), 'Name': obj_title.get('name'), 'Type': obj_title.get('type')}) - table_top_10.display_data(table_top_10.tv_shows) - return version, config_domain - except Exception as e: + except: + console.print("[red]\nExtract new DOMAIN from TLD list.") new_domain = extract_domain(method="light") console.log(f"[cyan]Extract new domain: [red]{new_domain}") # Update the domain in the configuration file - config_manager.set_key('SITE', 'streaming_domain', str(new_domain)) + config_manager.set_key('SITE', SC_SITE_NAME, str(new_domain)) config_manager.write_config() # Retry to get the version and domain diff --git a/Src/Lib/Hls/M3U8/__init__.py b/Src/Lib/Hls/M3U8/__init__.py new file mode 100644 index 0000000..a0bacd5 --- /dev/null +++ b/Src/Lib/Hls/M3U8/__init__.py @@ -0,0 +1,6 @@ +# 02.04.24 + +from .decryption import M3U8_Decryption +from .math_calc import M3U8_Ts_Files +from .parser import M3U8_Parser, M3U8_Codec +from .url_fix import m3u8_url_fix \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/decryption.py b/Src/Lib/Hls/M3U8/decryption.py new file mode 100644 index 0000000..b02946c --- /dev/null +++ b/Src/Lib/Hls/M3U8/decryption.py @@ -0,0 +1,127 @@ +# 03.04.24 + +import sys +import logging +import subprocess +import importlib.util + + +# Internal utilities +from Src.Util.console import console + + +# Check if Crypto module is installed +crypto_spec = importlib.util.find_spec("Crypto") +crypto_installed = crypto_spec is not None + + +if crypto_installed: + logging.info("Decrypy use: Crypto") + from Crypto.Cipher import AES # type: ignore + from Crypto.Util.Padding import unpad # type: ignore + + class M3U8_Decryption: + """ + Class for decrypting M3U8 playlist content using AES encryption when the Crypto module is available. + """ + def __init__(self, key: bytes, iv: bytes, method: str) -> None: + """ + Initialize the M3U8_Decryption object. + + Args: + - key (bytes): The encryption key. + - iv (bytes): The initialization vector (IV). + - method (str): The encryption method. + """ + self.key = key + if "0x" in str(iv): + self.iv = bytes.fromhex(iv.replace("0x", "")) + else: + self.iv = iv + self.method = method + logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})") + + def decrypt(self, ciphertext: bytes) -> bytes: + """ + Decrypt the ciphertext using the specified encryption method. + + Args: + - ciphertext (bytes): The encrypted content to decrypt. + + Returns: + bytes: The decrypted content. + """ + if self.method == "AES": + cipher = AES.new(self.key, AES.MODE_ECB) + decrypted_data = cipher.decrypt(ciphertext) + return unpad(decrypted_data, AES.block_size) + + elif self.method == "AES-128": + cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv) + decrypted_data = cipher.decrypt(ciphertext) + return unpad(decrypted_data, AES.block_size) + + elif self.method == "AES-128-CTR": + cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv) + return cipher.decrypt(ciphertext) + + else: + raise ValueError("Invalid or unsupported method") + +else: + + # Check if openssl command is available + openssl_available = subprocess.run(["openssl", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0 + logging.info("Decrypy use: OPENSSL") + + if not openssl_available: + console.log("[red]Neither Crypto nor openssl is installed. Please install either one of them.") + sys.exit(0) + + class M3U8_Decryption: + """ + Class for decrypting M3U8 playlist content using OpenSSL when the Crypto module is not available. + """ + def __init__(self, key: bytes, iv: bytes, method: str) -> None: + """ + Initialize the M3U8_Decryption object. + + Args: + - key (bytes): The encryption key. + - iv (bytes): The initialization vector (IV). + - method (str): The encryption method. + """ + self.key = key + if "0x" in str(iv): + self.iv = bytes.fromhex(iv.replace("0x", "")) + else: + self.iv = iv + self.method = method + logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})") + + def decrypt(self, ciphertext: bytes) -> bytes: + """ + Decrypt the ciphertext using the specified encryption method. + + Args: + - ciphertext (bytes): The encrypted content to decrypt. + + Returns: + bytes: The decrypted content. + """ + if self.method == "AES": + openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt' + decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) + + elif self.method == "AES-128": + openssl_cmd = f'openssl enc -d -aes-128-cbc -K {self.key[:16].hex()} -iv {self.iv.hex()}' + decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) + + elif self.method == "AES-128-CTR": + openssl_cmd = f'openssl enc -d -aes-128-ctr -K {self.key[:16].hex()} -iv {self.iv.hex()}' + decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) + + else: + raise ValueError("Invalid or unsupported method") + + return decrypted_data diff --git a/Src/Lib/Hls/M3U8/lib_parser/__init__.py b/Src/Lib/Hls/M3U8/lib_parser/__init__.py new file mode 100644 index 0000000..1d99d3d --- /dev/null +++ b/Src/Lib/Hls/M3U8/lib_parser/__init__.py @@ -0,0 +1,38 @@ +# 15.04.24 + +import os + + +# Internal utilities +from .model import M3U8 + + +def load(raw_content, uri): + """ + Parses the content of an M3U8 playlist and returns an M3U8 object. + + Args: + raw_content (str): The content of the M3U8 playlist as a string. + uri (str): The URI of the M3U8 playlist file or stream. + + Returns: + M3U8: An object representing the parsed M3U8 playlist. + + Raises: + IOError: If the raw_content is empty or if the URI cannot be accessed. + ValueError: If the raw_content is not a valid M3U8 playlist format. + + Example: + >>> m3u8_content = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:10\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:10.0,\nhttp://example.com/segment0.ts\n#EXTINF:10.0,\nhttp://example.com/segment1.ts\n" + >>> uri = "http://example.com/playlist.m3u8" + >>> playlist = load(m3u8_content, uri) + """ + + if not raw_content: + raise IOError("Empty content provided.") + + if not uri: + raise IOError("Empty URI provided.") + + base_uri = os.path.dirname(uri) + return M3U8(raw_content, base_uri=base_uri) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/lib_parser/_util.py b/Src/Lib/Hls/M3U8/lib_parser/_util.py new file mode 100644 index 0000000..48b5b0a --- /dev/null +++ b/Src/Lib/Hls/M3U8/lib_parser/_util.py @@ -0,0 +1,28 @@ +# 19.04.24 + +import itertools + + +def remove_quotes_parser(*attrs): + """ + Returns a dictionary mapping attribute names to a function that removes quotes from their values. + """ + return dict(zip(attrs, itertools.repeat(remove_quotes))) + + +def remove_quotes(string): + """ + Removes quotes from a string. + """ + quotes = ('"', "'") + if string and string[0] in quotes and string[-1] in quotes: + return string[1:-1] + return string + + +def normalize_attribute(attribute): + """ + Normalizes an attribute name by converting hyphens to underscores and converting to lowercase. + """ + return attribute.replace('-', '_').lower().strip() + diff --git a/Src/Lib/Hls/M3U8/lib_parser/model.py b/Src/Lib/Hls/M3U8/lib_parser/model.py new file mode 100644 index 0000000..c1a89ef --- /dev/null +++ b/Src/Lib/Hls/M3U8/lib_parser/model.py @@ -0,0 +1,359 @@ +# 15.04.24 + +import os +from collections import namedtuple + + +# Internal utilities +from ..lib_parser import parser + + +# Variable +StreamInfo = namedtuple('StreamInfo', ['bandwidth', 'program_id', 'resolution', 'codecs']) +Media = namedtuple('Media', ['uri', 'type', 'group_id', 'language', 'name','default', 'autoselect', 'forced', 'characteristics']) + + + +class M3U8: + """ + Represents a single M3U8 playlist. Should be instantiated with the content as string. + + Args: + - content: the m3u8 content as string + - base_path: all urls (key and segments url) will be updated with this base_path, + ex: base_path = "http://videoserver.com/hls" + - base_uri: uri the playlist comes from. it is propagated to SegmentList and Key + ex: http://example.com/path/to + + Attribute: + - key: it's a `Key` object, the EXT-X-KEY from m3u8. Or None + - segments: a `SegmentList` object, represents the list of `Segment`s from this playlist + - is_variant: Returns true if this M3U8 is a variant playlist, with links to other M3U8s with different bitrates. + If true, `playlists` is a list of the playlists available, and `iframe_playlists` is a list of the i-frame playlists available. + - is_endlist: Returns true if EXT-X-ENDLIST tag present in M3U8. + Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8 + - playlists: If this is a variant playlist (`is_variant` is True), returns a list of Playlist objects + - iframe_playlists: If this is a variant playlist (`is_variant` is True), returns a list of IFramePlaylist objects + - playlist_type: A lower-case string representing the type of the playlist, which can be one of VOD (video on demand) or EVENT. + - media: If this is a variant playlist (`is_variant` is True), returns a list of Media objects + - target_duration: Returns the EXT-X-TARGETDURATION as an integer + Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2 + - media_sequence: Returns the EXT-X-MEDIA-SEQUENCE as an integer + Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3 + - program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a string + Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5 + - version: Return the EXT-X-VERSION as is + - allow_cache: Return the EXT-X-ALLOW-CACHE as is + - files: Returns an iterable with all files from playlist, in order. This includes segments and key uri, if present. + - base_uri: It is a property (getter and setter) used by SegmentList and Key to have absolute URIs. + - is_i_frames_only: Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8. + Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12 + + """ + + # Mapping of simple attributes (obj attribute, parser attribute) + SIMPLE_ATTRIBUTES = ( + ('is_variant', 'is_variant'), + ('is_endlist', 'is_endlist'), + ('is_i_frames_only', 'is_i_frames_only'), + ('target_duration', 'targetduration'), + ('media_sequence', 'media_sequence'), + ('program_date_time', 'program_date_time'), + ('version', 'version'), + ('allow_cache', 'allow_cache'), + ('playlist_type', 'playlist_type') + ) + + def __init__(self, content=None, base_path=None, base_uri=None): + """ + Initialize the M3U8 object. + + Parameters: + - content: M3U8 content (string). + - base_path: Base path for relative URIs (string). + - base_uri: Base URI for absolute URIs (string). + """ + if content is not None: + self.data = parser.parse(content) + else: + self.data = {} + self._base_uri = base_uri + self.base_path = base_path + self._initialize_attributes() + + def _initialize_attributes(self): + """ + Initialize attributes based on parsed data. + """ + # Initialize key and segments + self.key = Key(base_uri=self.base_uri, **self.data.get('key', {})) if 'key' in self.data else None + self.segments = SegmentList([Segment(base_uri=self.base_uri, **params) for params in self.data.get('segments', [])]) + + # Initialize simple attributes + for attr, param in self.SIMPLE_ATTRIBUTES: + setattr(self, attr, self.data.get(param)) + + # Initialize files, media, playlists, and iframe_playlists + self.files = [] + if self.key: + self.files.append(self.key.uri) + self.files.extend(self.segments.uri) + + self.media = [Media( + uri = media.get('uri'), + type = media.get('type'), + group_id = media.get('group_id'), + language = media.get('language'), + name = media.get('name'), + default = media.get('default'), + autoselect = media.get('autoselect'), + forced = media.get('forced'), + characteristics = media.get('characteristics')) + for media in self.data.get('media', []) + ] + self.playlists = PlaylistList([Playlist( + base_uri = self.base_uri, + media = self.media, + **playlist + )for playlist in self.data.get('playlists', []) + ]) + self.iframe_playlists = PlaylistList() + for ifr_pl in self.data.get('iframe_playlists', []): + self.iframe_playlists.append( + IFramePlaylist( + base_uri = self.base_uri, + uri = ifr_pl['uri'], + iframe_stream_info=ifr_pl['iframe_stream_info']) + ) + + @property + def base_uri(self): + """ + Get the base URI. + """ + return self._base_uri + + @base_uri.setter + def base_uri(self, new_base_uri): + """ + Set the base URI. + """ + self._base_uri = new_base_uri + self.segments.base_uri = new_base_uri + + +class BasePathMixin: + """ + Mixin class for managing base paths. + """ + @property + def base_path(self): + """ + Get the base path. + """ + return os.path.dirname(self.uri) + + @base_path.setter + def base_path(self, newbase_path): + """ + Set the base path. + """ + if not self.base_path: + self.uri = "%s/%s" % (newbase_path, self.uri) + self.uri = self.uri.replace(self.base_path, newbase_path) + + +class GroupedBasePathMixin: + """ + Mixin class for managing base paths across a group of items. + """ + + def _set_base_uri(self, new_base_uri): + """ + Set the base URI for each item in the group. + """ + for item in self: + item.base_uri = new_base_uri + + base_uri = property(None, _set_base_uri) + + def _set_base_path(self, new_base_path): + """ + Set the base path for each item in the group. + """ + for item in self: + item.base_path = new_base_path + + base_path = property(None, _set_base_path) + + +class Segment(BasePathMixin): + """ + Class representing a segment in an M3U8 playlist. + Inherits from BasePathMixin for managing base paths. + """ + + def __init__(self, uri, base_uri, program_date_time=None, duration=None, + title=None, byterange=None, discontinuity=False, key=None): + """ + Initialize a Segment object. + + Args: + - uri: URI of the segment. + - base_uri: Base URI for the segment. + - program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a datetime + Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5 + - duration: Duration of the segment (optional). + - title: Title attribute from EXTINF parameter + - byterange: Byterange information of the segment (optional). + - discontinuity: Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists + Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11 + - key: Key for encryption (optional). + """ + self.uri = uri + self.duration = duration + self.title = title + self.base_uri = base_uri + self.byterange = byterange + self.program_date_time = program_date_time + self.discontinuity = discontinuity + #self.key = key + + +class SegmentList(list, GroupedBasePathMixin): + """ + Class representing a list of segments in an M3U8 playlist. + Inherits from list and GroupedBasePathMixin for managing base paths across a group of items. + """ + + @property + def uri(self): + """ + Get the URI of each segment in the SegmentList. + + Returns: + - List of URIs of segments in the SegmentList. + """ + return [seg.uri for seg in self] + + +class Key(BasePathMixin): + """ + Class representing a key used for encryption in an M3U8 playlist. + Inherits from BasePathMixin for managing base paths. + """ + + def __init__(self, method, uri, base_uri, iv=None): + """ + Initialize a Key object. + + Args: + - method: Encryption method. + ex: "AES-128" + - uri: URI of the key. + ex: "https://priv.example.com/key.php?r=52" + - base_uri: Base URI for the key. + ex: http://example.com/path/to + - iv: Initialization vector (optional). + ex: 0X12A + """ + self.method = method + self.uri = uri + self.iv = iv + self.base_uri = base_uri + + +class Playlist(BasePathMixin): + """ + Playlist object representing a link to a variant M3U8 with a specific bitrate. + + More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10 + """ + + def __init__(self, uri, stream_info, media, base_uri): + """ + Initialize a Playlist object. + + Args: + - uri: URI of the playlist. + - stream_info: is a named tuple containing the attributes: `program_id`, + - media: List of Media objects associated with the playlist. + - base_uri: Base URI for the playlist. + """ + self.uri = uri + self.base_uri = base_uri + + # Extract resolution information from stream_info + resolution = stream_info.get('resolution') + if resolution is not None: + values = resolution.split('x') + resolution_pair = (int(values[0]), int(values[1])) + else: + resolution_pair = None + + # Create StreamInfo object + self.stream_info = StreamInfo( + bandwidth = stream_info['bandwidth'], + program_id = stream_info.get('program_id'), + resolution = resolution_pair, + codecs = stream_info.get('codecs') + ) + + # Filter media based on group ID and media type + self.media = [] + for media_type in ('audio', 'video', 'subtitles'): + group_id = stream_info.get(media_type) + if group_id: + self.media += filter(lambda m: m.group_id == group_id, media) + + +class IFramePlaylist(BasePathMixin): + """ + Class representing an I-Frame playlist in an M3U8 playlist. + Inherits from BasePathMixin for managing base paths. + """ + + def __init__(self, base_uri, uri, iframe_stream_info): + """ + Initialize an IFramePlaylist object. + + Args: + - base_uri: Base URI for the I-Frame playlist. + - uri: URI of the I-Frame playlist. + - iframe_stream_info, is a named tuple containing the attributes: + `program_id`, `bandwidth`, `codecs` and `resolution` which is a tuple (w, h) of integers + """ + self.uri = uri + self.base_uri = base_uri + + # Extract resolution information from iframe_stream_info + resolution = iframe_stream_info.get('resolution') + if resolution is not None: + values = resolution.split('x') + resolution_pair = (int(values[0]), int(values[1])) + else: + resolution_pair = None + + # Create StreamInfo object for I-Frame playlist + self.iframe_stream_info = StreamInfo( + bandwidth = iframe_stream_info.get('bandwidth'), + program_id = iframe_stream_info.get('program_id'), + resolution = resolution_pair, + codecs = iframe_stream_info.get('codecs') + ) + +class PlaylistList(list, GroupedBasePathMixin): + """ + Class representing a list of playlists in an M3U8 playlist. + Inherits from list and GroupedBasePathMixin for managing base paths across a group of items. + """ + + def __str__(self): + """ + Return a string representation of the PlaylistList. + + Returns: + - String representation of the PlaylistList. + """ + output = [str(playlist) for playlist in self] + return '\n'.join(output) diff --git a/Src/Lib/Hls/M3U8/lib_parser/parser.py b/Src/Lib/Hls/M3U8/lib_parser/parser.py new file mode 100644 index 0000000..dd49f96 --- /dev/null +++ b/Src/Lib/Hls/M3U8/lib_parser/parser.py @@ -0,0 +1,338 @@ +# 15.04.24 + +import re +import logging +import datetime + + +# Internal utilities +from ..lib_parser import protocol +from ._util import ( + remove_quotes, + remove_quotes_parser, + normalize_attribute +) + + +# External utilities +from Src.Util._jsonConfig import config_manager + + +# Variable +REMOVE_EMPTY_ROW = config_manager.get_bool('M3U8_PARSER', 'skip_empty_row_playlist') +ATTRIBUTELISTPATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''') + + +def parse(content): + """ + Given an M3U8 playlist content, parses the content and extracts metadata. + + Args: + content (str): The M3U8 playlist content. + + Returns: + dict: A dictionary containing the parsed metadata. + """ + + # Initialize data dictionary with default values + data = { + 'is_variant': False, + 'is_endlist': False, + 'is_i_frames_only': False, + 'playlist_type': None, + 'playlists': [], + 'iframe_playlists': [], + 'segments': [], + 'media': [], + } + + # Initialize state dictionary for tracking parsing state + state = { + 'expect_segment': False, + 'expect_playlist': False, + } + + # Iterate over lines in the content + content = content.split("\n") + content_length = len(content) + i = 0 + + while i < content_length: + line = content[i] + line_stripped = line.strip() + is_end = i + 1 == content_length - 2 + + if REMOVE_EMPTY_ROW: + if i < content_length - 2: + actual_row = extract_params(line_stripped) + next_row = extract_params(content[i + 2].strip()) + + if actual_row is not None and next_row is None and not is_end: + logging.info(f"Skip row: {line_stripped}") + i += 1 + continue + + i += 1 + + if line.startswith(protocol.ext_x_byterange): + _parse_byterange(line, state) + state['expect_segment'] = True + + elif state['expect_segment']: + _parse_ts_chunk(line, data, state) + state['expect_segment'] = False + + elif state['expect_playlist']: + _parse_variant_playlist(line, data, state) + state['expect_playlist'] = False + + elif line.startswith(protocol.ext_x_targetduration): + _parse_simple_parameter(line, data, float) + elif line.startswith(protocol.ext_x_media_sequence): + _parse_simple_parameter(line, data, int) + elif line.startswith(protocol.ext_x_discontinuity): + state['discontinuity'] = True + elif line.startswith(protocol.ext_x_version): + _parse_simple_parameter(line, data) + elif line.startswith(protocol.ext_x_allow_cache): + _parse_simple_parameter(line, data) + + elif line.startswith(protocol.ext_x_key): + state['current_key'] = _parse_key(line) + data['key'] = data.get('key', state['current_key']) + + elif line.startswith(protocol.extinf): + _parse_extinf(line, data, state) + state['expect_segment'] = True + + elif line.startswith(protocol.ext_x_stream_inf): + state['expect_playlist'] = True + _parse_stream_inf(line, data, state) + + elif line.startswith(protocol.ext_x_i_frame_stream_inf): + _parse_i_frame_stream_inf(line, data) + + elif line.startswith(protocol.ext_x_media): + _parse_media(line, data, state) + + elif line.startswith(protocol.ext_x_playlist_type): + _parse_simple_parameter(line, data) + + elif line.startswith(protocol.ext_i_frames_only): + data['is_i_frames_only'] = True + + elif line.startswith(protocol.ext_x_endlist): + data['is_endlist'] = True + + return data + + +def extract_params(line): + """ + Extracts parameters from a formatted input string. + + Args: + - line (str): The string containing the parameters to extract. + + Returns: + dict or None: A dictionary containing the extracted parameters with their respective values. + """ + params = {} + matches = re.findall(r'([A-Z\-]+)=("[^"]*"|[^",\s]*)', line) + if not matches: + return None + for match in matches: + param, value = match + params[param] = value.strip('"') + return params + +def _parse_key(line): + """ + Parses the #EXT-X-KEY line and extracts key attributes. + + Args: + - line (str): The #EXT-X-KEY line from the playlist. + + Returns: + dict: A dictionary containing the key attributes. + """ + params = ATTRIBUTELISTPATTERN.split(line.replace(protocol.ext_x_key + ':', ''))[1::2] + key = {} + for param in params: + name, value = param.split('=', 1) + key[normalize_attribute(name)] = remove_quotes(value) + return key + +def _parse_extinf(line, data, state): + """ + Parses the #EXTINF line and extracts segment duration and title. + + Args: + - line (str): The #EXTINF line from the playlist. + - data (dict): The dictionary to store the parsed data. + - state (dict): The parsing state. + """ + duration, title = line.replace(protocol.extinf + ':', '').split(',') + state['segment'] = {'duration': float(duration), 'title': remove_quotes(title)} + +def _parse_ts_chunk(line, data, state): + """ + Parses a segment URI line and adds it to the segment list. + + Args: + line (str): The segment URI line from the playlist. + data (dict): The dictionary to store the parsed data. + state (dict): The parsing state. + """ + segment = state.pop('segment') + if state.get('current_program_date_time'): + segment['program_date_time'] = state['current_program_date_time'] + state['current_program_date_time'] += datetime.timedelta(seconds=segment['duration']) + segment['uri'] = line + segment['discontinuity'] = state.pop('discontinuity', False) + if state.get('current_key'): + segment['key'] = state['current_key'] + data['segments'].append(segment) + +def _parse_attribute_list(prefix, line, atribute_parser): + """ + Parses a line containing a list of attributes and their values. + + Args: + - prefix (str): The prefix to identify the line. + - line (str): The line containing the attributes. + - atribute_parser (dict): A dictionary mapping attribute names to parsing functions. + + Returns: + dict: A dictionary containing the parsed attributes. + """ + params = ATTRIBUTELISTPATTERN.split(line.replace(prefix + ':', ''))[1::2] + + attributes = {} + for param in params: + name, value = param.split('=', 1) + name = normalize_attribute(name) + + if name in atribute_parser: + value = atribute_parser[name](value) + + attributes[name] = value + + return attributes + +def _parse_stream_inf(line, data, state): + """ + Parses the #EXT-X-STREAM-INF line and extracts stream information. + + Args: + - line (str): The #EXT-X-STREAM-INF line from the playlist. + - data (dict): The dictionary to store the parsed data. + - state (dict): The parsing state. + """ + data['is_variant'] = True + atribute_parser = remove_quotes_parser('codecs', 'audio', 'video', 'subtitles') + atribute_parser["program_id"] = int + atribute_parser["bandwidth"] = int + state['stream_info'] = _parse_attribute_list(protocol.ext_x_stream_inf, line, atribute_parser) + +def _parse_i_frame_stream_inf(line, data): + """ + Parses the #EXT-X-I-FRAME-STREAM-INF line and extracts I-frame stream information. + + Args: + - line (str): The #EXT-X-I-FRAME-STREAM-INF line from the playlist. + - data (dict): The dictionary to store the parsed data. + """ + atribute_parser = remove_quotes_parser('codecs', 'uri') + atribute_parser["program_id"] = int + atribute_parser["bandwidth"] = int + iframe_stream_info = _parse_attribute_list(protocol.ext_x_i_frame_stream_inf, line, atribute_parser) + iframe_playlist = {'uri': iframe_stream_info.pop('uri'), + 'iframe_stream_info': iframe_stream_info} + + data['iframe_playlists'].append(iframe_playlist) + +def _parse_media(line, data, state): + """ + Parses the #EXT-X-MEDIA line and extracts media attributes. + + Args: + - line (str): The #EXT-X-MEDIA line from the playlist. + - data (dict): The dictionary to store the parsed data. + - state (dict): The parsing state. + """ + quoted = remove_quotes_parser('uri', 'group_id', 'language', 'name', 'characteristics') + media = _parse_attribute_list(protocol.ext_x_media, line, quoted) + data['media'].append(media) + +def _parse_variant_playlist(line, data, state): + """ + Parses a variant playlist line and extracts playlist information. + + Args: + - line (str): The variant playlist line from the playlist. + - data (dict): The dictionary to store the parsed data. + - state (dict): The parsing state. + """ + playlist = {'uri': line, 'stream_info': state.pop('stream_info')} + + data['playlists'].append(playlist) + +def _parse_byterange(line, state): + """ + Parses the #EXT-X-BYTERANGE line and extracts byte range information. + + Args: + - line (str): The #EXT-X-BYTERANGE line from the playlist. + - state (dict): The parsing state. + """ + state['segment']['byterange'] = line.replace(protocol.ext_x_byterange + ':', '') + +def _parse_simple_parameter_raw_value(line, cast_to=str, normalize=False): + """ + Parses a line containing a simple parameter and its value. + + Args: + - line (str): The line containing the parameter and its value. + - cast_to (type): The type to which the value should be cast. + - normalize (bool): Whether to normalize the parameter name. + + Returns: + tuple: A tuple containing the parameter name and its value. + """ + param, value = line.split(':', 1) + param = normalize_attribute(param.replace('#EXT-X-', '')) + if normalize: + value = normalize_attribute(value) + return param, cast_to(value) + +def _parse_and_set_simple_parameter_raw_value(line, data, cast_to=str, normalize=False): + """ + Parses a line containing a simple parameter and its value, and sets it in the data dictionary. + + Args: + - line (str): The line containing the parameter and its value. + - data (dict): The dictionary to store the parsed data. + - cast_to (type): The type to which the value should be cast. + - normalize (bool): Whether to normalize the parameter name. + + Returns: + The parsed value. + """ + param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize) + data[param] = value + return data[param] + +def _parse_simple_parameter(line, data, cast_to=str): + """ + Parses a line containing a simple parameter and its value, and sets it in the data dictionary. + + Args: + line (str): The line containing the parameter and its value. + data (dict): The dictionary to store the parsed data. + cast_to (type): The type to which the value should be cast. + + Returns: + The parsed value. + """ + return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/lib_parser/protocol.py b/Src/Lib/Hls/M3U8/lib_parser/protocol.py new file mode 100644 index 0000000..7fcf5a5 --- /dev/null +++ b/Src/Lib/Hls/M3U8/lib_parser/protocol.py @@ -0,0 +1,17 @@ +# 15.04.24 + +ext_x_targetduration = '#EXT-X-TARGETDURATION' +ext_x_media_sequence = '#EXT-X-MEDIA-SEQUENCE' +ext_x_program_date_time = '#EXT-X-PROGRAM-DATE-TIME' +ext_x_media = '#EXT-X-MEDIA' +ext_x_playlist_type = '#EXT-X-PLAYLIST-TYPE' +ext_x_key = '#EXT-X-KEY' +ext_x_stream_inf = '#EXT-X-STREAM-INF' +ext_x_version = '#EXT-X-VERSION' +ext_x_allow_cache = '#EXT-X-ALLOW-CACHE' +ext_x_endlist = '#EXT-X-ENDLIST' +extinf = '#EXTINF' +ext_i_frames_only = '#EXT-X-I-FRAMES-ONLY' +ext_x_byterange = '#EXT-X-BYTERANGE' +ext_x_i_frame_stream_inf = '#EXT-X-I-FRAME-STREAM-INF' +ext_x_discontinuity = '#EXT-X-DISCONTINUITY' diff --git a/Src/Lib/Hls/M3U8/math_calc.py b/Src/Lib/Hls/M3U8/math_calc.py new file mode 100644 index 0000000..46aaccb --- /dev/null +++ b/Src/Lib/Hls/M3U8/math_calc.py @@ -0,0 +1,41 @@ +# 20.02.24 + +# Internal utilities +from Src.Util.os import format_size + + +class M3U8_Ts_Files: + def __init__(self): + """ + Initialize the TSFileSizeCalculator object. + + Args: + - num_segments (int): The number of segments. + """ + self.ts_file_sizes = [] + + def add_ts_file_size(self, size: int): + """ + Add a file size to the list of file sizes. + + Args: + - size (float): The size of the ts file to be added. + """ + self.ts_file_sizes.append(size) + + def calculate_total_size(self): + """ + Calculate the total size of the files. + + Returns: + float: The mean size of the files in a human-readable format. + """ + + if len(self.ts_file_sizes) == 0: + return 0 + + total_size = sum(self.ts_file_sizes) + mean_size = total_size / len(self.ts_file_sizes) + + # Return format mean + return format_size(mean_size) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/parser.py b/Src/Lib/Hls/M3U8/parser.py new file mode 100644 index 0000000..b387891 --- /dev/null +++ b/Src/Lib/Hls/M3U8/parser.py @@ -0,0 +1,547 @@ +# 20.04.25 + +import logging + + +# Internal utilities +from .lib_parser import load + + +# External libraries +from Src.Lib.Request.my_requests import requests + + +# Costant +CODEC_MAPPINGS = { + "video": { + "avc1": "libx264", + "avc2": "libx264", + "avc3": "libx264", + "avc4": "libx264", + "hev1": "libx265", + "hev2": "libx265", + "hvc1": "libx265", + "hvc2": "libx265", + "vp8": "libvpx", + "vp9": "libvpx-vp9", + "vp10": "libvpx-vp9" + }, + "audio": { + "mp4a": "aac", + "mp3": "libmp3lame", + "ac-3": "ac3", + "ec-3": "eac3", + "opus": "libopus", + "vorbis": "libvorbis" + } +} + +RESOLUTIONS = [ + (7680, 4320), + (3840, 2160), + (2560, 1440), + (1920, 1080), + (1280, 720), + (640, 480) +] + + + +class M3U8_Codec: + """ + Represents codec information for an M3U8 playlist. + """ + + def __init__(self, bandwidth, resolution, codecs): + """ + Initializes the M3U8Codec object with the provided parameters. + + Args: + - bandwidth (int): Bandwidth of the codec. + - resolution (str): Resolution of the codec. + - codecs (str): Codecs information in the format "avc1.xxxxxx,mp4a.xx". + """ + self.bandwidth = bandwidth + self.resolution = resolution + self.codecs = codecs + self.audio_codec = None + self.video_codec = None + self.extract_codecs() + self.parse_codecs() + + def extract_codecs(self): + """ + Parses the codecs information to extract audio and video codecs. + Extracted codecs are set as attributes: audio_codec and video_codec. + """ + + # Split the codecs string by comma + codecs_list = self.codecs.split(',') + + # Separate audio and video codecs + for codec in codecs_list: + if codec.startswith('avc'): + self.video_codec = codec + elif codec.startswith('mp4a'): + self.audio_codec = codec + + def convert_video_codec(self, video_codec_identifier) -> str: + + """ + Convert video codec identifier to codec name. + + Args: + - video_codec_identifier (str): Identifier of the video codec. + + Returns: + str: Codec name corresponding to the identifier. + """ + + # Extract codec type from the identifier + codec_type = video_codec_identifier.split('.')[0] + + # Retrieve codec mapping from the provided mappings or fallback to static mappings + video_codec_mapping = CODEC_MAPPINGS.get('video', {}) + codec_name = video_codec_mapping.get(codec_type) + + if codec_name: + return codec_name + + else: + logging.warning(f"No corresponding video codec found for {video_codec_identifier}. Using default codec libx264.") + return "libx264" # Default + + def convert_audio_codec(self, audio_codec_identifier) -> str: + + """ + Convert audio codec identifier to codec name. + + Args: + - audio_codec_identifier (str): Identifier of the audio codec. + + Returns: + str: Codec name corresponding to the identifier. + """ + + # Extract codec type from the identifier + codec_type = audio_codec_identifier.split('.')[0] + + # Retrieve codec mapping from the provided mappings or fallback to static mappings + audio_codec_mapping = CODEC_MAPPINGS.get('audio', {}) + codec_name = audio_codec_mapping.get(codec_type) + + if codec_name: + return codec_name + + else: + logging.warning(f"No corresponding audio codec found for {audio_codec_identifier}. Using default codec aac.") + return "aac" # Default + + def parse_codecs(self): + """ + Parse video and audio codecs. + This method updates `video_codec_name` and `audio_codec_name` attributes. + """ + + self.video_codec_name = self.convert_video_codec(self.video_codec) + self.audio_codec_name = self.convert_audio_codec(self.audio_codec) + + def __str__(self): + """ + Returns a string representation of the M3U8Codec object. + """ + return f"BANDWIDTH={self.bandwidth},RESOLUTION={self.resolution},CODECS=\"{self.codecs}\"" + + +class M3U8_Video: + def __init__(self, video_playlist) -> None: + """ + Initializes an M3U8_Video object with the provided video playlist. + + Args: + - video_playlist (M3U8): An M3U8 object representing the video playlist. + """ + self.video_playlist = video_playlist + + def get_best_uri(self): + """ + Returns the URI with the highest resolution from the video playlist. + + Returns: + tuple or None: A tuple containing the URI with the highest resolution and its resolution value, or None if the video list is empty. + """ + if not self.video_playlist: + return None + + best_uri = max(self.video_playlist, key=lambda x: x['resolution']) + return best_uri['uri'], best_uri['resolution'] + + def get_worst_uri(self): + """ + Returns the URI with the lowest resolution from the video playlist. + + Returns: + - tuple or None: A tuple containing the URI with the lowest resolution and its resolution value, or None if the video list is empty. + """ + if not self.video_playlist: + return None + + worst_uri = min(self.video_playlist, key=lambda x: x['resolution']) + return worst_uri['uri'], worst_uri['resolution'] + + def get_custom_uri(self, y_resolution): + """ + Returns the URI corresponding to a custom resolution from the video list. + + Args: + - video_list (list): A list of dictionaries containing video URIs and resolutions. + - custom_resolution (tuple): A tuple representing the custom resolution. + + Returns: + str or None: The URI corresponding to the custom resolution, or None if not found. + """ + for video in self.video_playlist: + logging.info(f"Check resolution from playlist: {int(video['resolution'][1])}, with input: {int(y_resolution)}") + + if int(video['resolution'][1]) == int(y_resolution): + return video['uri'], video['resolution'] + + return None, None + + def get_list_resolution(self): + """ + Retrieve a list of resolutions from the video playlist. + + Returns: + list: A list of resolutions extracted from the video playlist. + """ + return [video['resolution'] for video in self.video_playlist] + + +class M3U8_Audio: + def __init__(self, audio_playlist) -> None: + """ + Initializes an M3U8_Audio object with the provided audio playlist. + + Args: + - audio_playlist (M3U8): An M3U8 object representing the audio playlist. + """ + self.audio_playlist = audio_playlist + + def get_uri_by_language(self, language): + """ + Returns a dictionary with 'name' and 'uri' given a specific language. + + Args: + - audio_list (list): List of dictionaries containing audio information. + - language (str): The desired language. + + Returns: + dict or None: Dictionary with 'name', 'language', and 'uri' for the specified language, or None if not found. + """ + for audio in self.audio_playlist: + if audio['language'] == language: + return {'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} + return None + + def get_all_uris_and_names(self): + """ + Returns a list of dictionaries containing all URIs and names. + + Args: + - audio_list (list): List of dictionaries containing audio information. + + Returns: + list: List of dictionaries containing 'name', 'language', and 'uri' for all audio in the list. + """ + return [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist] + + def get_default_uri(self): + """ + Returns the dictionary with 'default' equal to 'YES'. + + Args: + - audio_list (list): List of dictionaries containing audio information. + + Returns: + dict or None: Dictionary with 'default' equal to 'YES', or None if not found. + """ + for audio in self.audio_playlist: + if audio['default'] == 'YES': + return audio.get('uri') + return None + + +class M3U8_Subtitle: + def __init__(self, subtitle_playlist) -> None: + """ + Initializes an M3U8_Subtitle object with the provided subtitle playlist. + + Args: + - subtitle_playlist (M3U8): An M3U8 object representing the subtitle playlist. + """ + self.subtitle_playlist = subtitle_playlist + + def get_uri_by_language(self, language): + """ + Returns a dictionary with 'name' and 'uri' given a specific language for subtitles. + + Args: + - subtitle_list (list): List of dictionaries containing subtitle information. + - language (str): The desired language. + + Returns: + dict or None: Dictionary with 'name' and 'uri' for the specified language for subtitles, or None if not found. + """ + for subtitle in self.subtitle_playlist: + if subtitle['language'] == language: + return {'name': subtitle['name'], 'uri': subtitle['uri']} + return None + + def get_all_uris_and_names(self): + """ + Returns a list of dictionaries containing all URIs and names of subtitles. + + Args: + - subtitle_list (list): List of dictionaries containing subtitle information. + + Returns: + list: List of dictionaries containing 'name' and 'uri' for all subtitles in the list. + """ + return [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist] + + def get_default_uri(self): + """ + Returns the dictionary with 'default' equal to 'YES' for subtitles. + + Args: + - subtitle_list (list): List of dictionaries containing subtitle information. + + Returns: + dict or None: Dictionary with 'default' equal to 'YES' for subtitles, or None if not found. + """ + for subtitle in self.subtitle_playlist: + if subtitle['default'] == 'YES': + return subtitle + return None + + def download_all(self, custom_subtitle): + """ + Download all subtitles listed in the object's attributes, filtering based on a provided list of custom subtitles. + + Args: + - custom_subtitle (list): A list of custom subtitles to download. + + Returns: + list: A list containing dictionaries with subtitle information including name, language, and URI. + """ + + output = [] # Initialize an empty list to store subtitle information + + # Iterate through all available subtitles + for obj_subtitle in self.subtitle_get_all_uris_and_names(): + + # Check if the subtitle name is not in the list of custom subtitles, and skip if not found + if obj_subtitle.get('name') not in custom_subtitle: + continue + + # Send a request to retrieve the subtitle content + logging.info(f"Download subtitle: {obj_subtitle.get('name')}") + response_subitle = requests.get(obj_subtitle.get('uri')) + + try: + # Try to extract the VTT URL from the subtitle content + sub_parse = M3U8_Parser() + sub_parse.parse_data(obj_subtitle.get('uri'), response_subitle.text) + url_subititle = sub_parse.subtitle[0] + + output.append({ + 'name': obj_subtitle.get('name'), + 'language': obj_subtitle.get('language'), + 'uri': url_subititle + }) + + except Exception as e: + logging.error(f"Cant donwload: {obj_subtitle.get('name')}, error: {e}") + + return output + + +class M3U8_Parser: + def __init__(self): + self.segments = [] + self.video_playlist = [] + self.keys = None + self.subtitle_playlist = [] + self.subtitle = [] + self.audio_playlist = [] + self.codec: M3U8_Codec = None + self._video: M3U8_Video = None + self._audio: M3U8_Audio = None + self._subtitle: M3U8_Subtitle = None + + self.__create_variable__() + + def parse_data(self, uri, raw_content) -> None: + """ + Extracts all information present in the provided M3U8 content. + + Args: + - m3u8_content (str): The content of the M3U8 file. + """ + + + # Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles + m3u8_obj = load(raw_content, uri) + + self.__parse_video_info__(m3u8_obj) + self.__parse_encryption_keys__(m3u8_obj) + self.__parse_subtitles_and_audio__(m3u8_obj) + self.__parse_segments__(m3u8_obj) + + @staticmethod + def extract_resolution(uri: str) -> int: + """ + Extracts the video resolution from the given URI. + + Args: + - uri (str): The URI containing video information. + + Returns: + int: The video resolution if found, otherwise 0. + """ + + # Log + logging.info(f"Try extract resolution from: {uri}") + + for resolution in RESOLUTIONS: + if "http" in str(uri): + if str(resolution[1]) in uri: + return resolution + + # Default resolution return (not best) + logging.error("No resolution found with custom parsing.") + logging.warning("Try set remove duplicate line to TRUE.") + return (0, 0) + + def __parse_video_info__(self, m3u8_obj) -> None: + """ + Extracts video information from the M3U8 object. + + Args: + - m3u8_obj: The M3U8 object containing video playlists. + """ + + try: + for playlist in m3u8_obj.playlists: + + # Direct access resolutions in m3u8 obj + if playlist.stream_info.resolution is not None: + + self.video_playlist.append({ + "uri": playlist.uri, + "resolution": playlist.stream_info.resolution + }) + + # Find resolutions in uri + else: + + self.video_playlist.append({ + "uri": playlist.uri, + "resolution": M3U8_Parser.extract_resolution(playlist.uri) + }) + + # Dont stop + continue + + # Check if all key is present to create codec + try: + self.codec = M3U8_Codec( + playlist.stream_info.bandwidth, + playlist.stream_info.resolution, + playlist.stream_info.codecs + ) + except: + logging.error(f"Error parsing codec: {e}") + + except Exception as e: + logging.error(f"Error parsing video info: {e}") + + def __parse_encryption_keys__(self, m3u8_obj) -> None: + """ + Extracts encryption keys from the M3U8 object. + + Args: + - m3u8_obj: The M3U8 object containing encryption keys. + """ + try: + + if m3u8_obj.key is not None: + if self.keys is None: + self.keys = { + 'method': m3u8_obj.key.method, + 'iv': m3u8_obj.key.iv, + 'uri': m3u8_obj.key.uri + } + + + except Exception as e: + logging.error(f"Error parsing encryption keys: {e}") + pass + + def __parse_subtitles_and_audio__(self, m3u8_obj) -> None: + """ + Extracts subtitles and audio information from the M3U8 object. + + Args: + - m3u8_obj: The M3U8 object containing subtitles and audio data. + """ + try: + for media in m3u8_obj.media: + if media.type == "SUBTITLES": + self.subtitle_playlist.append({ + "type": media.type, + "name": media.name, + "default": media.default, + "language": media.language, + "uri": media.uri + }) + + if media.type == "AUDIO": + self.audio_playlist.append({ + "type": media.type, + "name": media.name, + "default": media.default, + "language": media.language, + "uri": media.uri + }) + + except Exception as e: + logging.error(f"Error parsing subtitles and audio: {e}") + + def __parse_segments__(self, m3u8_obj) -> None: + """ + Extracts segment information from the M3U8 object. + + Args: + - m3u8_obj: The M3U8 object containing segment data. + """ + + try: + for segment in m3u8_obj.segments: + if "vtt" not in segment.uri: + self.segments.append(segment.uri) + else: + self.subtitle.append(segment.uri) + + except Exception as e: + logging.error(f"Error parsing segments: {e}") + + def __create_variable__(self): + """ + Initialize variables for video, audio, and subtitle playlists. + """ + + self._video = M3U8_Video(self.video_playlist) + self._audio = M3U8_Audio(self.audio_playlist) + self._subtitle = M3U8_Subtitle(self.subtitle_playlist) diff --git a/Src/Lib/Hls/M3U8/url_fix.py b/Src/Lib/Hls/M3U8/url_fix.py new file mode 100644 index 0000000..17f6f7d --- /dev/null +++ b/Src/Lib/Hls/M3U8/url_fix.py @@ -0,0 +1,54 @@ +# 20.03.24 + +import logging +from urllib.parse import urlparse, urljoin + + +class M3U8_UrlFix: + def __init__(self, url: str = None) -> None: + """ + Initializes an M3U8_UrlFix object with the provided playlist URL. + + Args: + - url (str, optional): The URL of the playlist. Defaults to None. + """ + self.url_playlist: str = url + + def set_playlist(self, url: str) -> None: + """ + Set the M3U8 playlist URL. + + Args: + - url (str): The M3U8 playlist URL. + """ + self.url_playlist = url + + def generate_full_url(self, url_resource: str) -> str: + """ + Generate a full URL for a given resource using the base URL from the playlist. + + Args: + - url_resource (str): The relative URL of the resource within the playlist. + + Returns: + str: The full URL for the specified resource. + """ + + # Check if m3u8 url playlist is present + if self.url_playlist == None: + logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present") + raise + + # Parse the playlist URL to extract the base URL components + parsed_playlist_url = urlparse(self.url_playlist) + + # Construct the base URL using the scheme, netloc, and path from the playlist URL + base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}" + + # Join the base URL with the relative resource URL to get the full URL + full_url = urljoin(base_url, url_resource) + + return full_url + +# Output +m3u8_url_fix = M3U8_UrlFix() \ No newline at end of file diff --git a/Src/Lib/Hls/downloader.py b/Src/Lib/Hls/downloader.py index 72c7d06..c22278a 100644 --- a/Src/Lib/Hls/downloader.py +++ b/Src/Lib/Hls/downloader.py @@ -10,6 +10,7 @@ from datetime import datetime from Src.Util.console import console, Panel from Src.Lib.Request.my_requests import requests from Src.Util.headers import get_headers +from Src.Util.color import Colors from Src.Util._jsonConfig import config_manager from Src.Util.os import ( remove_folder, @@ -230,7 +231,7 @@ class Downloader(): video_m3u8.get_info() # Download the video segments - video_m3u8.download_streams("[purple]video") + video_m3u8.download_streams(f"{Colors.MAGENTA}video") else: console.log("[cyan]Video [red]already exists.") @@ -272,7 +273,7 @@ class Downloader(): audio_m3u8.get_info() # Download the audio segments - audio_m3u8.download_streams(f"[purple]audio [red]{obj_audio.get('language')}") + audio_m3u8.download_streams(f"{Colors.MAGENTA}audio {Colors.RED}{obj_audio.get('language')}") else: console.log(f"[cyan]Audio [white]([green]{obj_audio.get('language')}[white]) [red]already exists.") diff --git a/Src/Lib/Hls/segments.py b/Src/Lib/Hls/segments.py index 6327a24..0c4647e 100644 --- a/Src/Lib/Hls/segments.py +++ b/Src/Lib/Hls/segments.py @@ -7,25 +7,19 @@ import queue import threading import signal import logging -import warnings import binascii from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin, urlparse, urlunparse - -# Disable specific warnings -from tqdm import TqdmExperimentalWarning -warnings.filterwarnings("ignore", category=TqdmExperimentalWarning) - - # External libraries -from tqdm.rich import tqdm +from tqdm import tqdm # Internal utilities from Src.Util.console import console from Src.Util.headers import get_headers +from Src.Util.color import Colors from Src.Lib.Request.my_requests import requests from Src.Util._jsonConfig import config_manager from Src.Util.os import ( @@ -82,6 +76,8 @@ class M3U8_Segments: self.ctrl_c_detected = False # Global variable to track Ctrl+C detection os.makedirs(self.tmp_folder, exist_ok=True) # Create the temporary folder if it does not exist + self.list_speeds = [] + self.average_over = int(TQDM_MAX_WORKER / 3) def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes: """ @@ -210,19 +206,35 @@ class M3U8_Segments: if FAKE_PROXY: ts_url = self.__gen_proxy__(ts_url, self.segments.index(ts_url)) + # Make request and calculate time duration + start_time = time.time() response = requests.get(ts_url, headers=headers_segments, timeout=REQUESTS_TIMEOUT, verify_ssl=False) # Send GET request for the segment - + duration = time.time() - start_time + if response.ok: # Get the content of the segment segment_content = response.content + total_downloaded = len(response.content) + + # Calculate mbps + speed_mbps = (total_downloaded * 8) / (duration * 1_000_000) * TQDM_MAX_WORKER + self.list_speeds.append(speed_mbps) + + # Get average speed after (average_over) + if len(self.list_speeds) > self.average_over: + self.list_speeds.pop(0) + average_speed = ( sum(self.list_speeds) / len(self.list_speeds) ) / 10 # MB/s + #print(f"{average_speed:.2f} MB/s") + #progress_counter.set_postfix_str(f"{average_speed:.2f} MB/s") + if TQDM_SHOW_PROGRESS: self.downloaded_size += len(response.content) # Update the downloaded size self.class_ts_files_size.add_ts_file_size(len(response.content) * len(self.segments)) # Update the TS file size class downloaded_size_str = format_size(self.downloaded_size) # Format the downloaded size estimate_total_size = self.class_ts_files_size.calculate_total_size() # Calculate the estimated total size - progress_counter.set_description(f"[yellow]Downloading [white]({add_desc}[white]) [[green]{downloaded_size_str} [white]/ [green]{estimate_total_size}[white]]") + progress_counter.set_postfix_str(f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size_str.split(' ')[0]} {Colors.WHITE}< {Colors.GREEN}{estimate_total_size.split(' ')[0]} {Colors.RED}MB {Colors.WHITE}| {Colors.CYAN}{average_speed:.2f} {Colors.RED}MB/s") # Decrypt the segment content if decryption is needed if self.decryption is not None: @@ -239,7 +251,7 @@ class M3U8_Segments: logging.warning(f"Failed to download segment: {ts_url}") except Exception as e: - logging.warning(f"Exception while downloading segment: {e}") + logging.error(f"Exception while downloading segment: {e}") def write_segments_to_file(self, stop_event: threading.Event): """ @@ -278,7 +290,15 @@ class M3U8_Segments: - add_desc (str): Additional description for the progress bar. """ stop_event = threading.Event() # Event to signal stopping - progress_bar = tqdm(desc=f"[yellow]Downloading [white]({add_desc}[white])", unit="MB", total=len(self.segments)) + + # bar_format="{desc}: {percentage:.0f}% | {bar} | {n_fmt}/{total_fmt} [ {elapsed}<{remaining}, {rate_fmt}{postfix} ]" + progress_bar = tqdm( + total=len(self.segments), + unit='s', + ascii=' #', + bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.0f}}% {Colors.MAGENTA}{{bar}} {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]", + dynamic_ncols=True + ) def signal_handler(sig, frame): self.ctrl_c_detected = True # Set global variable to indicate Ctrl+C detection diff --git a/Src/Lib/Instagram/core/_util.py b/Src/Lib/Instagram/core/_util.py deleted file mode 100644 index 340b465..0000000 --- a/Src/Lib/Instagram/core/_util.py +++ /dev/null @@ -1,27 +0,0 @@ -# 24.03.24 - -import random -import uuid - - -# Internal logic -from Src.Util._jsonConfig import config_manager -from Src.Lib.UserAgent import ua - - -def get_headers(profile_name): - - return { - 'accept': '*/*', - 'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7', - 'referer': f'https://www.instagram.com/{profile_name}/', - 'x-ig-app-id': '936619743392459', - 'user-agent': ua.get_random_user_agent('chrome') - } - - -def get_cookies(): - - return { - 'sessionid': config_manager.get('DEFAULT', 'instagram_session') - } \ No newline at end of file diff --git a/Src/Lib/Instagram/core/get_profile.py b/Src/Lib/Instagram/core/get_profile.py deleted file mode 100644 index 6e4abc3..0000000 --- a/Src/Lib/Instagram/core/get_profile.py +++ /dev/null @@ -1,40 +0,0 @@ -# 24.03.24 - -import logging - - -# External utilities -from Src.Lib.Request import requests - - -# Internal utilities -from ._util import get_headers, get_cookies -from ..model import InstaProfile - - -def get_data(profile_name) -> InstaProfile: - - # Prepare url - url = f"https://i.instagram.com/api/v1/users/web_profile_info/?username={profile_name}" - - # Get response from request - response = requests.get(url, headers=get_headers(profile_name), cookies=get_cookies()) - - if response.ok: - - # Get json response - data_json = response.json() - - if data_json is not None: - - # Parse json - obj_InstaProfile = InstaProfile(data_json) - - return obj_InstaProfile - - else: - logging.error(f"Cant fetch data for this profile: {profile_name}, empty json response.") - - else: - logging.error(f"Cant fetch data for this profile: {profile_name}.") - diff --git a/Src/Lib/Instagram/model/__init__.py b/Src/Lib/Instagram/model/__init__.py deleted file mode 100644 index e013861..0000000 --- a/Src/Lib/Instagram/model/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# 24.03.24 - -from .profile import InstaProfile \ No newline at end of file diff --git a/Src/Lib/Instagram/model/profile.py b/Src/Lib/Instagram/model/profile.py deleted file mode 100644 index 7cfd581..0000000 --- a/Src/Lib/Instagram/model/profile.py +++ /dev/null @@ -1,138 +0,0 @@ -# 24.03.24 - -from typing import List - - -class BioLink: - def __init__(self, title: str, lynx_url: str, url: str, link_type: str): - """ - Initialize a BioLink object. - - Args: - title (str): The title of the link. - lynx_url (str): The Lynx URL of the link. - url (str): The URL of the link. - link_type (str): The type of the link. - """ - self.title = title - self.lynx_url = lynx_url - self.url = url - self.link_type = link_type - - def __str__(self) -> str: - """ - Return a string representation of the BioLink object. - - Returns: - str: String representation of the BioLink object. - """ - return f"Title: {self.title}, Lynx URL: {self.lynx_url}, URL: {self.url}, Link Type: {self.link_type}" - - -class InstaProfile: - def __init__(self, data: dict): - """ - Initialize an InstaProfile object. - - Args: - data (dict): Data representing the Instagram profile. - """ - self.data = data - - def get_username(self) -> str: - """ - Get the username of the Instagram profile. - - Returns: - str: Username of the Instagram profile. - """ - try: - return self.data['data']['user']['username'] - except KeyError: - raise KeyError("Username not found in profile data.") - - def get_full_name(self) -> str: - """ - Get the full name of the Instagram profile. - - Returns: - str: Full name of the Instagram profile. - """ - try: - return self.data['data']['user']['full_name'] - except KeyError: - raise KeyError("Full name not found in profile data.") - - def get_biography(self) -> str: - """ - Get the biography of the Instagram profile. - - Returns: - str: Biography of the Instagram profile. - """ - try: - return self.data['data']['user']['biography'] - except KeyError: - raise KeyError("Biography not found in profile data.") - - def get_bio_links(self) -> List[BioLink]: - """ - Get the bio links associated with the Instagram profile. - - Returns: - List[BioLink]: List of BioLink objects representing bio links. - """ - try: - bio_links_data = self.data['data']['user']['bio_links'] - return [BioLink(link['title'], link['lynx_url'], link['url'], link['link_type']) for link in bio_links_data] - except KeyError: - raise KeyError("Bio links not found in profile data.") - - def get_external_url(self) -> str: - """ - Get the external URL of the Instagram profile. - - Returns: - str: External URL of the Instagram profile. - """ - try: - return self.data['data']['user']['external_url'] - except KeyError: - raise KeyError("External URL not found in profile data.") - - def get_followers_count(self) -> int: - """ - Get the number of followers of the Instagram profile. - - Returns: - int: Number of followers of the Instagram profile. - """ - try: - return self.data['data']['user']['edge_followed_by']['count'] - except KeyError: - raise KeyError("Followers count not found in profile data.") - - def get_following_count(self) -> int: - """ - Get the number of accounts the Instagram profile is following. - - Returns: - int: Number of accounts the Instagram profile is following. - """ - try: - return self.data['data']['user']['edge_follow']['count'] - except KeyError: - raise KeyError("Following count not found in profile data.") - - def is_private(self) -> bool: - """ - Check if the Instagram profile is private. - - Returns: - bool: True if the profile is private, False otherwise. - """ - try: - return self.data['data']['user']['is_private'] - except KeyError: - raise KeyError("Private status not found in profile data.") - diff --git a/Src/Lib/Request/my_requests.py b/Src/Lib/Request/my_requests.py index b36fc17..18da38c 100644 --- a/Src/Lib/Request/my_requests.py +++ b/Src/Lib/Request/my_requests.py @@ -73,12 +73,13 @@ def parse_http_error(error_string: str): # Regular expression to match the error pattern error_pattern = re.compile(r"HTTP Error (\d{3}): (.+)") - match = error_pattern.search(error_string) + if match: error_code = match.group(1) message = match.group(2) return {'error_code': error_code, 'message': message} + else: logging.error(f"Error string does not match expected format: {error_string}") return None @@ -375,8 +376,6 @@ class ManageRequests: """ logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}") - print("=> ", e) - if self.attempt < self.retries: logging.info(f"Retrying request for URL '{self.url}' (attempt {self.attempt}/{self.retries})") time.sleep(HTTP_DELAY) diff --git a/Src/Util/color.py b/Src/Util/color.py new file mode 100644 index 0000000..d909d03 --- /dev/null +++ b/Src/Util/color.py @@ -0,0 +1,20 @@ +# 24.05.24 + +class Colors: + BLACK = "\033[30m" + RED = "\033[31m" + GREEN = "\033[32m" + YELLOW = "\033[33m" + BLUE = "\033[34m" + MAGENTA = "\033[35m" + CYAN = "\033[36m" + LIGHT_GRAY = "\033[37m" + DARK_GRAY = "\033[90m" + LIGHT_RED = "\033[91m" + LIGHT_GREEN = "\033[92m" + LIGHT_YELLOW = "\033[93m" + LIGHT_BLUE = "\033[94m" + LIGHT_MAGENTA = "\033[95m" + LIGHT_CYAN = "\033[96m" + WHITE = "\033[97m" + RESET = "\033[0m" \ No newline at end of file diff --git a/config.json b/config.json index e069b38..f7a3086 100644 --- a/config.json +++ b/config.json @@ -5,23 +5,18 @@ "log_to_file": true, "show_message": true, "clean_console": true, - "get_moment_title": false, "root_path": "Video", - "not_close": false, "map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)", - "instagram_session": "", - "create_job_database": false + "create_job_database": false, + "not_close": false }, "SITE": { - "streaming_site_name": "streamingcommunity", - "streaming_domain": "foo", - "anime_site_name": "animeunity", - "anime_domain": "to" + "streamingcommunity": "foo", + "animeunity": "epic" }, "M3U8": { - "tdqm_workers": 20, + "tdqm_workers": 30, "delay_start_workers": 0, - "hide_request_error": false, "requests_timeout": 10, "enable_time_quit": false, "tqdm_progress_timeout": 10, @@ -29,7 +24,7 @@ "tqdm_show_progress": true, "save_m3u8_content": true, "fake_proxy": true, - "fake_proxy_ip": ["162.19.255.78", "162.19.255.36", "162.19.255.224", "162.19.255.223", "162.19.254.244", "162.19.254.232", "162.19.254.230", "162.19.228.127", "162.19.228.105"], + "fake_proxy_ip": ["57.129.7.85", "57.129.7.188", "57.129.7.174", "57.129.4.77", "57.129.16.196", "57.129.16.156", "57.129.16.139", "57.129.16.135", "57.129.13.175", "57.129.13.157", "51.38.112.237", "51.195.107.7", "51.195.107.230"], "create_report": false }, "M3U8_PARSER": {