From 2542b5c6d441c4cdb98bf0e6f91600aa9be8d9f5 Mon Sep 17 00:00:00 2001 From: Ghost <62809003+Ghost6446@users.noreply.github.com> Date: Sun, 26 May 2024 11:08:46 +0200 Subject: [PATCH] Add altadefinizione --- README.md | 8 +- .../Altadefinizione/Core/Class/SearchType.py | 62 ++ .../Altadefinizione/Core/Player/supervideo.py | 182 ++++++ Src/Api/Altadefinizione/__init__.py | 38 ++ Src/Api/Altadefinizione/film.py | 58 ++ Src/Api/Altadefinizione/site.py | 140 +++++ Src/Api/Animeunity/Core/Vix_player/player.py | 11 +- Src/Api/Animeunity/__init__.py | 2 +- Src/Api/Animeunity/anime.py | 11 +- Src/Api/Animeunity/site.py | 5 +- .../Streamingcommunity/Core/Util/manage_ep.py | 3 - .../Core/Vix_player/player.py | 41 +- Src/Api/Streamingcommunity/film.py | 15 +- Src/Api/Streamingcommunity/series.py | 9 +- Src/Api/Streamingcommunity/site.py | 8 +- Src/Lib/E_Table/sql_table.py | 2 +- Src/Lib/FFmpeg/command.py | 28 +- Src/Lib/Hls/M3U8/__init__.py | 6 - Src/Lib/Hls/M3U8/decryption.py | 127 ---- Src/Lib/Hls/M3U8/lib_parser/__init__.py | 38 -- Src/Lib/Hls/M3U8/lib_parser/_util.py | 28 - Src/Lib/Hls/M3U8/lib_parser/model.py | 359 ------------ Src/Lib/Hls/M3U8/lib_parser/parser.py | 338 ----------- Src/Lib/Hls/M3U8/lib_parser/protocol.py | 17 - Src/Lib/Hls/M3U8/math_calc.py | 41 -- Src/Lib/Hls/M3U8/parser.py | 547 ------------------ Src/Lib/Hls/downloader.py | 55 +- Src/Lib/Hls/segments.py | 208 +++---- Src/Lib/M3U8/__init__.py | 4 +- Src/Lib/M3U8/estimator.py | 81 +++ Src/Lib/M3U8/math_calc.py | 41 -- Src/Lib/M3U8/url_fix.py | 54 -- .../M3U8/url_fix.py => M3U8/url_fixer.py} | 4 +- Src/Lib/Request/my_requests.py | 10 +- Src/Util/file_validator.py | 2 + Src/Util/message.py | 15 +- Src/Util/node_jjs.py | 56 ++ Src/Util/os.py | 108 +++- Src/Util/table.py | 5 +- config.json | 32 +- job_series.py | 205 ------- run.py | 74 ++- update.py | 152 ----- 43 files changed, 980 insertions(+), 2250 deletions(-) create mode 100644 Src/Api/Altadefinizione/Core/Class/SearchType.py create mode 100644 Src/Api/Altadefinizione/Core/Player/supervideo.py create mode 100644 Src/Api/Altadefinizione/__init__.py create mode 100644 Src/Api/Altadefinizione/film.py create mode 100644 Src/Api/Altadefinizione/site.py delete mode 100644 Src/Lib/Hls/M3U8/__init__.py delete mode 100644 Src/Lib/Hls/M3U8/decryption.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/__init__.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/_util.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/model.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/parser.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/protocol.py delete mode 100644 Src/Lib/Hls/M3U8/math_calc.py delete mode 100644 Src/Lib/Hls/M3U8/parser.py create mode 100644 Src/Lib/M3U8/estimator.py delete mode 100644 Src/Lib/M3U8/math_calc.py delete mode 100644 Src/Lib/M3U8/url_fix.py rename Src/Lib/{Hls/M3U8/url_fix.py => M3U8/url_fixer.py} (97%) create mode 100644 Src/Util/node_jjs.py delete mode 100644 job_series.py delete mode 100644 update.py diff --git a/README.md b/README.md index fd97d67..dcb0bf0 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ You can change some behaviors by tweaking the configuration file. - Example Value: %(tv_name) [S%(season)] [E%(episode)] %(episode_name) -### Options (M3U8) +### Options (M3U8_DOWNLOAD) * tdqm_workers: The number of workers that will cooperate to download .ts files. **A high value may slow down your PC** - Default Value: 20 @@ -73,10 +73,6 @@ You can change some behaviors by tweaking the configuration file. * tqdm_show_progress: Whether to show progress during downloads or not. - Default Value: true -* save_m3u8_content: Enabling this feature saves various playlists and indexes in the temporary folder during the download process, ensuring all necessary files are retained for playback or further processing. - - Default Value: true - - * fake_proxy: Speed up download for streaming film and series. **Dont work for anime, need to set to FALSE** - Default Value: true @@ -84,7 +80,7 @@ You can change some behaviors by tweaking the configuration file. - Default Value: false -### Options (M3U8_OPTIONS) +### Options (M3U8_FILTER) * cleanup_tmp_folder: Upon final conversion, this option ensures the removal of all unformatted audio, video tracks, and subtitles from the temporary folder, thereby maintaining cleanliness and efficiency. - Default Value: true diff --git a/Src/Api/Altadefinizione/Core/Class/SearchType.py b/Src/Api/Altadefinizione/Core/Class/SearchType.py new file mode 100644 index 0000000..8e58566 --- /dev/null +++ b/Src/Api/Altadefinizione/Core/Class/SearchType.py @@ -0,0 +1,62 @@ +# 26.05.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('name') + self.type: str = "film" + self.score: str = data.get('score') + self.url: int = data.get('url') + + def __str__(self): + return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" + diff --git a/Src/Api/Altadefinizione/Core/Player/supervideo.py b/Src/Api/Altadefinizione/Core/Player/supervideo.py new file mode 100644 index 0000000..206e359 --- /dev/null +++ b/Src/Api/Altadefinizione/Core/Player/supervideo.py @@ -0,0 +1,182 @@ +# 26.05.24 + +import re +import os +import sys +import time +import logging +import subprocess + + +# External libraries +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.console import console +from Src.Lib.Request import requests +from Src.Util.headers import get_headers +from Src.Util.node_jjs import run_node_script + + +class VideoSource: + + def __init__(self) -> None: + """ + Initializes the VideoSource object with default values. + + Attributes: + headers (dict): An empty dictionary to store HTTP headers. + """ + self.headers = {'user-agent': get_headers()} + + def setup(self, url: str) -> None: + """ + Sets up the video source with the provided URL. + + Args: + url (str): The URL of the video source. + """ + self.url = url + + def make_request(self, url: str) -> str: + """ + Make an HTTP GET request to the provided URL. + + Args: + url (str): The URL to make the request to. + + Returns: + str: The response content if successful, None otherwise. + """ + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.text + + except Exception as e: + logging.error(f"Request failed: {e}") + return None + + def parse_html(self, html_content: str) -> BeautifulSoup: + """ + Parse the provided HTML content using BeautifulSoup. + + Args: + html_content (str): The HTML content to parse. + + Returns: + BeautifulSoup: Parsed HTML content if successful, None otherwise. + """ + + try: + soup = BeautifulSoup(html_content, "html.parser") + return soup + + except Exception as e: + logging.error(f"Failed to parse HTML content: {e}") + return None + + def get_iframe(self, soup): + """ + Extracts the source URL of the second iframe in the provided BeautifulSoup object. + + Args: + soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML. + + Returns: + str: The source URL of the second iframe, or None if not found. + """ + iframes = soup.find_all("iframe") + if iframes and len(iframes) > 1: + return iframes[1].get("src") + + return None + + def find_content(self, url): + """ + Makes a request to the specified URL and parses the HTML content. + + Args: + url (str): The URL to fetch content from. + + Returns: + BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails. + """ + content = self.make_request(url) + if content: + return self.parse_html(content) + + return None + + def get_result_node_js(self, soup): + """ + Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL. + + Args: + soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content. + + Returns: + str: The output from the Node.js script, or None if the script cannot be found or executed. + """ + for script in soup.find_all("script"): + if "eval" in str(script): + new_script = str(script.text).replace("eval", "var a = ") + new_script = new_script.replace(")))", ")));console.log(a);") + return run_node_script(new_script) + + return None + + def get_playlist(self) -> str: + """ + Download a video from the provided URL. + + Returns: + str: The URL of the downloaded video if successful, None otherwise. + """ + try: + html_content = self.make_request(self.url) + if not html_content: + logging.error("Failed to fetch HTML content.") + return None + + soup = self.parse_html(html_content) + if not soup: + logging.error("Failed to parse HTML content.") + return None + + iframe_src = self.get_iframe(soup) + if not iframe_src: + logging.error("No iframe found.") + return None + + down_page_soup = self.find_content(iframe_src) + if not down_page_soup: + logging.error("Failed to fetch down page content.") + return None + + pattern = r'data-link="(//supervideo[^"]+)"' + match = re.search(pattern, str(down_page_soup)) + if not match: + logging.error("No match found for supervideo URL.") + return None + + supervideo_url = "https:" + match.group(1) + supervideo_soup = self.find_content(supervideo_url) + if not supervideo_soup: + logging.error("Failed to fetch supervideo content.") + return None + + result = self.get_result_node_js(supervideo_soup) + if not result: + logging.error("No video URL found in script.") + return None + + master_playlist = str(result).split(":")[3].split('"}')[0] + return f"https:{master_playlist}" + + except Exception as e: + logging.error(f"An error occurred: {e}") + return None + diff --git a/Src/Api/Altadefinizione/__init__.py b/Src/Api/Altadefinizione/__init__.py new file mode 100644 index 0000000..c64423d --- /dev/null +++ b/Src/Api/Altadefinizione/__init__.py @@ -0,0 +1,38 @@ +# 26.05.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import ( + title_search, + get_select_title, + manager_clear +) + +from .film import download_film + + +def main_film(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + film_search = msg.ask("\n[purple]Insert word to search in all site: ").strip() + len_database = title_search(film_search) + + if len_database != 0: + + # Select title from list + select_title = get_select_title() + + # Download only film + download_film( + title_name=select_title.name, + url=select_title.url + ) + + # End + console.print("\n[red]Done") diff --git a/Src/Api/Altadefinizione/film.py b/Src/Api/Altadefinizione/film.py new file mode 100644 index 0000000..e35648a --- /dev/null +++ b/Src/Api/Altadefinizione/film.py @@ -0,0 +1,58 @@ +# 26.05.24 + +import os +import sys +import logging + + +# Internal utilities +from Src.Util.console import console +from Src.Util._jsonConfig import config_manager +from Src.Lib.Hls.downloader import Downloader +from Src.Util.message import start_message + + +# Logic class +from .Core.Player.supervideo import VideoSource + + +# Config +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +STREAMING_FOLDER = "altadefinizione" +MOVIE_FOLDER = "Movie" + + +# Variable +video_source = VideoSource() + + +def download_film(title_name: str, url: str): + """ + Downloads a film using the provided film ID, title name, and domain. + + Args: + - title_name (str): The name of the film title. + - url (str): The url of the video + """ + + # Start message and display film information + start_message() + console.print(f"[yellow]Download: [red]{title_name} \n") + + # Set domain and media ID for the video source + video_source.setup( + url = url + ) + + # Define output path + mp4_name = str(title_name).replace("-", "_") + ".mp4" + mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name) + + # Get m3u8 master playlist + master_playlist = video_source.get_playlist() + + # Download the film using the m3u8 playlist, key, and output filename + Downloader( + m3u8_playlist = master_playlist, + output_filename = os.path.join(mp4_path, mp4_name) + ).start() \ No newline at end of file diff --git a/Src/Api/Altadefinizione/site.py b/Src/Api/Altadefinizione/site.py new file mode 100644 index 0000000..725992b --- /dev/null +++ b/Src/Api/Altadefinizione/site.py @@ -0,0 +1,140 @@ +# 26.05.24 + +import sys +import json +import logging + + +# External libraries +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.table import TVShowManager +from Src.Lib.Request import requests +from Src.Util.headers import get_headers +from Src.Util.console import console +from Src.Util._jsonConfig import config_manager +from Src.Lib.Unidecode import transliterate + + +# Logic class +from .Core.Class.SearchType import MediaManager, MediaItem + + +# Config +AD_SITE_NAME = "altadefinizione" +AD_DOMAIN_NOW = config_manager.get('SITE', AD_SITE_NAME) + + +# Variable +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(title_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + - domain (str): The domain to search on. + + Returns: + int: The number of titles found. + """ + + # Send request to search for titles + response = requests.get(f"https://{AD_SITE_NAME}.{AD_DOMAIN_NOW}/page/1/?story={transliterate(title_search).replace(' ', '+')}&do=search&subaction=search&titleonly=3") + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + table_content = soup.find('div', id="dle-content") + + # Scrape div film in table on single page + for film_div in table_content.find_all('div', class_='col-lg-3'): + title = film_div.find('h2', class_='titleFilm').get_text(strip=True) + link = film_div.find('h2', class_='titleFilm').find('a')['href'] + imdb_rating = film_div.find('div', class_='imdb-rate').get_text(strip=True).split(":")[-1] + + film_info = { + 'name': title, + 'url': link, + 'score': imdb_rating + } + + media_search_manager.add_media(film_info) + + # Return the number of titles found + return media_search_manager.get_length() + + +def get_select_title(type_filter: list = None) -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Args: + - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + "Score": {'color': 'cyan'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + + # Filter for only a list of category + if type_filter is not None: + if str(media.type) not in type_filter: + continue + + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + 'Score': media.score, + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + table_show_manager.clear() + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) + + +def manager_clear(): + """ + Clears the data lists managed by media_search_manager and table_show_manager. + + This function clears the data lists managed by global variables media_search_manager + and table_show_manager. It removes all the items from these lists, effectively + resetting them to empty lists. + """ + global media_search_manager, table_show_manager + + # Clear list of data + media_search_manager.clear() + table_show_manager.clear() diff --git a/Src/Api/Animeunity/Core/Vix_player/player.py b/Src/Api/Animeunity/Core/Vix_player/player.py index aefafe4..576a344 100644 --- a/Src/Api/Animeunity/Core/Vix_player/player.py +++ b/Src/Api/Animeunity/Core/Vix_player/player.py @@ -56,14 +56,11 @@ class VideoSource: try: - # Make a request to collect information about preview of the title response = requests.post(f"https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}", headers = self.headers) response.raise_for_status() - if response.ok: - - # Collect all info about preview - self.obj_preview = PreviewManager(response.json()) + # Collect all info about preview + self.obj_preview = PreviewManager(response.json()) except Exception as e: logging.error(f"Error collecting preview info: {e}") @@ -78,7 +75,6 @@ class VideoSource: """ try: - # Fetch episode count from API endpoint response = requests.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/") response.raise_for_status() @@ -101,13 +97,11 @@ class VideoSource: """ try: - # Define parameters for API request params = { "start_range": index_ep, "end_range": index_ep + 1 } - # Fetch episode information from API endpoint response = requests.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params) response.raise_for_status() @@ -131,7 +125,6 @@ class VideoSource: """ try: - # Fetch embed URL from API endpoint response = requests.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}") response.raise_for_status() diff --git a/Src/Api/Animeunity/__init__.py b/Src/Api/Animeunity/__init__.py index a0ca70e..7e8407a 100644 --- a/Src/Api/Animeunity/__init__.py +++ b/Src/Api/Animeunity/__init__.py @@ -17,7 +17,7 @@ def main_anime(): if len_database != 0: # Select title from list - select_title = get_select_title(True) + select_title = get_select_title() if select_title.type == 'TV': donwload_series( diff --git a/Src/Api/Animeunity/anime.py b/Src/Api/Animeunity/anime.py index d430004..936b2fa 100644 --- a/Src/Api/Animeunity/anime.py +++ b/Src/Api/Animeunity/anime.py @@ -49,16 +49,17 @@ def download_episode(index_select: int): video_source.parse_script(embed_url) # Create output path - out_path = None + mp4_path = None + mp4_name = f"{index_select}.mp4" if video_source.is_series: - out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, SERIES_FOLDER, video_source.series_name, f"{index_select+1}.mp4") + mp4_path = os.path.join(ROOT_PATH, ANIME_FOLDER, SERIES_FOLDER, video_source.series_name) else: - out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, MOVIE_FOLDER, video_source.series_name, f"{index_select}.mp4") - + mp4_path = os.path.join(ROOT_PATH, ANIME_FOLDER, MOVIE_FOLDER, video_source.series_name) + # Crete downloader obj_download = Downloader( m3u8_playlist = video_source.get_playlist(), - output_filename = out_path + output_filename = os.path.join(mp4_path, mp4_name) ) # Start downloading diff --git a/Src/Api/Animeunity/site.py b/Src/Api/Animeunity/site.py index e374456..cefbae1 100644 --- a/Src/Api/Animeunity/site.py +++ b/Src/Api/Animeunity/site.py @@ -181,12 +181,11 @@ def title_search(title: str) -> int: -def get_select_title(switch: bool = False, type_filter: list = None) -> MediaItem: +def get_select_title(type_filter: list = None) -> MediaItem: """ Display a selection of titles and prompt the user to choose one. Args: - - switch (bool): switch from film to anime - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] Returns: @@ -223,7 +222,7 @@ def get_select_title(switch: bool = False, type_filter: list = None) -> MediaIte }) # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list), switch=switch) + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) table_show_manager.clear() # Handle user's quit command diff --git a/Src/Api/Streamingcommunity/Core/Util/manage_ep.py b/Src/Api/Streamingcommunity/Core/Util/manage_ep.py index 3ab51d7..331771c 100644 --- a/Src/Api/Streamingcommunity/Core/Util/manage_ep.py +++ b/Src/Api/Streamingcommunity/Core/Util/manage_ep.py @@ -7,7 +7,6 @@ from typing import List # Internal utilities from Src.Util._jsonConfig import config_manager -from Src.Lib.Unidecode import transliterate # Logic class @@ -70,8 +69,6 @@ def map_episode_title(tv_name: str, episode: Episode, number_season: int): # Additional fix map_episode_temp = map_episode_temp.replace(".", "_") - #map_episode_temp = map_episode_temp.replace(" ", "_") - map_episode_temp = transliterate(map_episode_temp) logging.info(f"Map episode string return: {map_episode_temp}") return map_episode_temp diff --git a/Src/Api/Streamingcommunity/Core/Vix_player/player.py b/Src/Api/Streamingcommunity/Core/Vix_player/player.py index bab894a..396828e 100644 --- a/Src/Api/Streamingcommunity/Core/Vix_player/player.py +++ b/Src/Api/Streamingcommunity/Core/Vix_player/player.py @@ -61,14 +61,11 @@ class VideoSource: try: - # Make a request to collect information about preview of the title response = requests.post(f"https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}", headers = self.headers) response.raise_for_status() - if response.ok: - - # Collect all info about preview - self.obj_preview = PreviewManager(response.json()) + # Collect all info about preview + self.obj_preview = PreviewManager(response.json()) except Exception as e: logging.error(f"Error collecting preview info: {e}") @@ -78,6 +75,7 @@ class VideoSource: """ Collect information about seasons. """ + self.headers = { 'user-agent': get_headers(), 'x-inertia': 'true', @@ -86,18 +84,15 @@ class VideoSource: try: - # Make a request to collect information about seasons response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers = self.headers) response.raise_for_status() - if response.ok: - - # Extract JSON response if available - json_response = response.json().get('props', {}).get('title', {}).get('seasons', []) + # Extract JSON response if available + json_response = response.json().get('props', {}).get('title', {}).get('seasons', []) - # Iterate over JSON data and add titles to the manager - for dict_season in json_response: - self.obj_title_manager.add_title(dict_season) + # Iterate over JSON data and add titles to the manager + for dict_season in json_response: + self.obj_title_manager.add_title(dict_season) except Exception as e: logging.error(f"Error collecting season info: {e}") @@ -116,14 +111,12 @@ class VideoSource: response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers = self.headers) response.raise_for_status() - if response.ok: - - # Extract JSON response if available - json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', []) + # Extract JSON response if available + json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', []) - # Iterate over JSON data and add episodes to the manager - for dict_episode in json_response: - self.obj_episode_manager.add_episode(dict_episode) + # Iterate over JSON data and add episodes to the manager + for dict_episode in json_response: + self.obj_episode_manager.add_episode(dict_episode) except Exception as e: logging.error(f"Error collecting title season info: {e}") @@ -150,11 +143,9 @@ class VideoSource: response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params = params) response.raise_for_status() - if response.ok: - - # Parse response with BeautifulSoup to get iframe source - soup = BeautifulSoup(response.text, "html.parser") - self.iframe_src: str = soup.find("iframe").get("src") + # Parse response with BeautifulSoup to get iframe source + soup = BeautifulSoup(response.text, "html.parser") + self.iframe_src: str = soup.find("iframe").get("src") except Exception as e: logging.error(f"Error getting iframe source: {e}") diff --git a/Src/Api/Streamingcommunity/film.py b/Src/Api/Streamingcommunity/film.py index 44b653a..f9b971d 100644 --- a/Src/Api/Streamingcommunity/film.py +++ b/Src/Api/Streamingcommunity/film.py @@ -9,10 +9,7 @@ import logging from Src.Util.console import console from Src.Util._jsonConfig import config_manager from Src.Lib.Hls.downloader import Downloader -from Src.Util.file_validator import can_create_file from Src.Util.message import start_message -from Src.Util.os import remove_special_characters -from Src.Lib.Unidecode import transliterate # Logic class @@ -21,7 +18,7 @@ from .Core.Vix_player.player import VideoSource # Config ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -STREAMING_FOLDER = config_manager.get('SITE', 'streamingcommunity') +STREAMING_FOLDER = "streamingcommunity" MOVIE_FOLDER = "Movie" @@ -54,17 +51,13 @@ def download_film(id_film: str, title_name: str, domain: str): video_source.get_content() master_playlist = video_source.get_playlist() - # Define the filename and path for the downloaded film mp4_name = title_name.replace("-", "_") - mp4_format = remove_special_characters(transliterate(mp4_name) + ".mp4") - - if not can_create_file(mp4_format): - logging.error("Invalid mp4 name.") - sys.exit(0) + mp4_format = (mp4_name) + ".mp4" + mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name) # Download the film using the m3u8 playlist, key, and output filename Downloader( m3u8_playlist = master_playlist, - output_filename = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name, mp4_format) + output_filename = os.path.join(mp4_path, mp4_format) ).start() \ No newline at end of file diff --git a/Src/Api/Streamingcommunity/series.py b/Src/Api/Streamingcommunity/series.py index 2a6c542..5040798 100644 --- a/Src/Api/Streamingcommunity/series.py +++ b/Src/Api/Streamingcommunity/series.py @@ -10,8 +10,6 @@ from Src.Util.console import console, msg from Src.Util._jsonConfig import config_manager from Src.Util.table import TVShowManager from Src.Util.message import start_message -from Src.Util.os import remove_special_characters -from Src.Util.file_validator import can_create_file from Src.Lib.Hls.downloader import Downloader @@ -87,13 +85,8 @@ def donwload_video(tv_name: str, index_season_selected: int, index_episode_selec print() # Define filename and path for the downloaded video - mp4_name = remove_special_characters(f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4") + mp4_name = f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4" mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, SERIES_FOLDER, tv_name, f"S{index_season_selected}") - os.makedirs(mp4_path, exist_ok=True) - - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) # Retrieve scws and if available master playlist video_source.get_iframe(obj_episode.id) diff --git a/Src/Api/Streamingcommunity/site.py b/Src/Api/Streamingcommunity/site.py index 3537d49..048f708 100644 --- a/Src/Api/Streamingcommunity/site.py +++ b/Src/Api/Streamingcommunity/site.py @@ -58,8 +58,6 @@ def get_version(text: str) -> tuple[str, list]: version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version'] sliders = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['props']['sliders'] - title_trending = sliders[0] - title_lates = sliders[1] title_top_10 = sliders[2] # Collect info about only top 10 title @@ -149,12 +147,11 @@ def title_search(title_search: str, domain: str) -> int: return media_search_manager.get_length() -def get_select_title(switch: bool = False, type_filter: list = None) -> MediaItem: +def get_select_title(type_filter: list = None) -> MediaItem: """ Display a selection of titles and prompt the user to choose one. Args: - - switch (bool): switch from film to anime - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] Returns: @@ -191,7 +188,7 @@ def get_select_title(switch: bool = False, type_filter: list = None) -> MediaIte }) # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list), switch=switch) + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) table_show_manager.clear() # Handle user's quit command @@ -220,4 +217,3 @@ def manager_clear(): # Clear list of data media_search_manager.clear() table_show_manager.clear() - diff --git a/Src/Lib/E_Table/sql_table.py b/Src/Lib/E_Table/sql_table.py index 93dd8e0..45c5d03 100644 --- a/Src/Lib/E_Table/sql_table.py +++ b/Src/Lib/E_Table/sql_table.py @@ -11,7 +11,7 @@ from Src.Util._jsonConfig import config_manager # Variable -CREATE_REPORT = config_manager.get_bool('M3U8', 'create_report') +CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report') CREATE_JOB_DB = config_manager.get_bool('DEFAULT', 'create_job_database') diff --git a/Src/Lib/FFmpeg/command.py b/Src/Lib/FFmpeg/command.py index 4cbce3a..5841996 100644 --- a/Src/Lib/FFmpeg/command.py +++ b/Src/Lib/FFmpeg/command.py @@ -16,6 +16,7 @@ except: pass # Internal utilities +from Src.Util.os import check_file_existence from Src.Util._jsonConfig import config_manager from .util import has_audio_stream from .capture import capture_ffmpeg_real_time @@ -24,10 +25,9 @@ from .capture import capture_ffmpeg_real_time # Variable DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug") DEBUG_FFMPEG = "debug" if DEBUG_MODE else "error" -terminate_flag = threading.Event() -USE_CODECS = config_manager.get_bool("M3U8_OPTIONS", "use_codec") -USE_GPU = config_manager.get_bool("M3U8_OPTIONS", "use_gpu") -FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_OPTIONS", "default_preset") +USE_CODECS = config_manager.get_bool("M3U8_FILTER", "use_codec") +USE_GPU = config_manager.get_bool("M3U8_FILTER", "use_gpu") +FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_FILTER", "default_preset") @@ -271,6 +271,9 @@ def join_video(video_path: str, out_path: str, vcodec: str = None, acodec: str = - force_ts (bool): Force video path to be mpegts as input. """ + if not check_file_existence(video_path): + sys.exit(0) + # Start command ffmpeg_cmd = ['ffmpeg'] @@ -324,6 +327,9 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s - preset (str): The preset for encoding. Defaults to 'ultrafast'. """ + if not check_file_existence(video_path): + sys.exit(0) + # Start command ffmpeg_cmd = ['ffmpeg', '-i', video_path] @@ -331,6 +337,9 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s for i, audio_track in enumerate(audio_tracks): ffmpeg_cmd.extend(['-i', audio_track.get('path')]) + if not check_file_existence(audio_track.get('path')): + sys.exit(0) + # Add output args if USE_CODECS: ffmpeg_cmd.extend(['-c:v', vcodec, '-c:a', acodec, '-b:a', str(bitrate), '-preset', FFMPEG_DEFAULT_PRESET]) @@ -348,7 +357,7 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s capture_ffmpeg_real_time(ffmpeg_cmd, "[cyan]Join audio") print() -def join_subtitle(video: str, subtitles_list: List[Dict[str, str]], output_file: str): +def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], output_file: str): """ Joins subtitles with a video file using FFmpeg. @@ -359,9 +368,13 @@ def join_subtitle(video: str, subtitles_list: List[Dict[str, str]], output_file: - output_file (str): The path to save the output file. """ + if not check_file_existence(video_path): + sys.exit(0) + + # Start command added_subtitle_names = set() # Remove subtitle with same name - ffmpeg_cmd = ["ffmpeg", "-i", video] + ffmpeg_cmd = ["ffmpeg", "-i", video_path] # Add subtitle with language for idx, subtitle in enumerate(subtitles_list): @@ -375,6 +388,9 @@ def join_subtitle(video: str, subtitles_list: List[Dict[str, str]], output_file: ffmpeg_cmd += ["-map", "0:v", "-map", "0:a", "-map", f"{idx + 1}:s"] ffmpeg_cmd += ["-metadata:s:s:{}".format(idx), "title={}".format(subtitle['name'])] + if not check_file_existence(subtitle['path']): + sys.exit(0) + # Add output args if USE_CODECS: ffmpeg_cmd.extend(['-c:v', 'copy', '-c:a', 'copy', '-c:s', 'mov_text']) diff --git a/Src/Lib/Hls/M3U8/__init__.py b/Src/Lib/Hls/M3U8/__init__.py deleted file mode 100644 index a0bacd5..0000000 --- a/Src/Lib/Hls/M3U8/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# 02.04.24 - -from .decryption import M3U8_Decryption -from .math_calc import M3U8_Ts_Files -from .parser import M3U8_Parser, M3U8_Codec -from .url_fix import m3u8_url_fix \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/decryption.py b/Src/Lib/Hls/M3U8/decryption.py deleted file mode 100644 index b02946c..0000000 --- a/Src/Lib/Hls/M3U8/decryption.py +++ /dev/null @@ -1,127 +0,0 @@ -# 03.04.24 - -import sys -import logging -import subprocess -import importlib.util - - -# Internal utilities -from Src.Util.console import console - - -# Check if Crypto module is installed -crypto_spec = importlib.util.find_spec("Crypto") -crypto_installed = crypto_spec is not None - - -if crypto_installed: - logging.info("Decrypy use: Crypto") - from Crypto.Cipher import AES # type: ignore - from Crypto.Util.Padding import unpad # type: ignore - - class M3U8_Decryption: - """ - Class for decrypting M3U8 playlist content using AES encryption when the Crypto module is available. - """ - def __init__(self, key: bytes, iv: bytes, method: str) -> None: - """ - Initialize the M3U8_Decryption object. - - Args: - - key (bytes): The encryption key. - - iv (bytes): The initialization vector (IV). - - method (str): The encryption method. - """ - self.key = key - if "0x" in str(iv): - self.iv = bytes.fromhex(iv.replace("0x", "")) - else: - self.iv = iv - self.method = method - logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})") - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt the ciphertext using the specified encryption method. - - Args: - - ciphertext (bytes): The encrypted content to decrypt. - - Returns: - bytes: The decrypted content. - """ - if self.method == "AES": - cipher = AES.new(self.key, AES.MODE_ECB) - decrypted_data = cipher.decrypt(ciphertext) - return unpad(decrypted_data, AES.block_size) - - elif self.method == "AES-128": - cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv) - decrypted_data = cipher.decrypt(ciphertext) - return unpad(decrypted_data, AES.block_size) - - elif self.method == "AES-128-CTR": - cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv) - return cipher.decrypt(ciphertext) - - else: - raise ValueError("Invalid or unsupported method") - -else: - - # Check if openssl command is available - openssl_available = subprocess.run(["openssl", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0 - logging.info("Decrypy use: OPENSSL") - - if not openssl_available: - console.log("[red]Neither Crypto nor openssl is installed. Please install either one of them.") - sys.exit(0) - - class M3U8_Decryption: - """ - Class for decrypting M3U8 playlist content using OpenSSL when the Crypto module is not available. - """ - def __init__(self, key: bytes, iv: bytes, method: str) -> None: - """ - Initialize the M3U8_Decryption object. - - Args: - - key (bytes): The encryption key. - - iv (bytes): The initialization vector (IV). - - method (str): The encryption method. - """ - self.key = key - if "0x" in str(iv): - self.iv = bytes.fromhex(iv.replace("0x", "")) - else: - self.iv = iv - self.method = method - logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})") - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt the ciphertext using the specified encryption method. - - Args: - - ciphertext (bytes): The encrypted content to decrypt. - - Returns: - bytes: The decrypted content. - """ - if self.method == "AES": - openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt' - decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) - - elif self.method == "AES-128": - openssl_cmd = f'openssl enc -d -aes-128-cbc -K {self.key[:16].hex()} -iv {self.iv.hex()}' - decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) - - elif self.method == "AES-128-CTR": - openssl_cmd = f'openssl enc -d -aes-128-ctr -K {self.key[:16].hex()} -iv {self.iv.hex()}' - decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) - - else: - raise ValueError("Invalid or unsupported method") - - return decrypted_data diff --git a/Src/Lib/Hls/M3U8/lib_parser/__init__.py b/Src/Lib/Hls/M3U8/lib_parser/__init__.py deleted file mode 100644 index 1d99d3d..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -# 15.04.24 - -import os - - -# Internal utilities -from .model import M3U8 - - -def load(raw_content, uri): - """ - Parses the content of an M3U8 playlist and returns an M3U8 object. - - Args: - raw_content (str): The content of the M3U8 playlist as a string. - uri (str): The URI of the M3U8 playlist file or stream. - - Returns: - M3U8: An object representing the parsed M3U8 playlist. - - Raises: - IOError: If the raw_content is empty or if the URI cannot be accessed. - ValueError: If the raw_content is not a valid M3U8 playlist format. - - Example: - >>> m3u8_content = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:10\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:10.0,\nhttp://example.com/segment0.ts\n#EXTINF:10.0,\nhttp://example.com/segment1.ts\n" - >>> uri = "http://example.com/playlist.m3u8" - >>> playlist = load(m3u8_content, uri) - """ - - if not raw_content: - raise IOError("Empty content provided.") - - if not uri: - raise IOError("Empty URI provided.") - - base_uri = os.path.dirname(uri) - return M3U8(raw_content, base_uri=base_uri) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/lib_parser/_util.py b/Src/Lib/Hls/M3U8/lib_parser/_util.py deleted file mode 100644 index 48b5b0a..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/_util.py +++ /dev/null @@ -1,28 +0,0 @@ -# 19.04.24 - -import itertools - - -def remove_quotes_parser(*attrs): - """ - Returns a dictionary mapping attribute names to a function that removes quotes from their values. - """ - return dict(zip(attrs, itertools.repeat(remove_quotes))) - - -def remove_quotes(string): - """ - Removes quotes from a string. - """ - quotes = ('"', "'") - if string and string[0] in quotes and string[-1] in quotes: - return string[1:-1] - return string - - -def normalize_attribute(attribute): - """ - Normalizes an attribute name by converting hyphens to underscores and converting to lowercase. - """ - return attribute.replace('-', '_').lower().strip() - diff --git a/Src/Lib/Hls/M3U8/lib_parser/model.py b/Src/Lib/Hls/M3U8/lib_parser/model.py deleted file mode 100644 index c1a89ef..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/model.py +++ /dev/null @@ -1,359 +0,0 @@ -# 15.04.24 - -import os -from collections import namedtuple - - -# Internal utilities -from ..lib_parser import parser - - -# Variable -StreamInfo = namedtuple('StreamInfo', ['bandwidth', 'program_id', 'resolution', 'codecs']) -Media = namedtuple('Media', ['uri', 'type', 'group_id', 'language', 'name','default', 'autoselect', 'forced', 'characteristics']) - - - -class M3U8: - """ - Represents a single M3U8 playlist. Should be instantiated with the content as string. - - Args: - - content: the m3u8 content as string - - base_path: all urls (key and segments url) will be updated with this base_path, - ex: base_path = "http://videoserver.com/hls" - - base_uri: uri the playlist comes from. it is propagated to SegmentList and Key - ex: http://example.com/path/to - - Attribute: - - key: it's a `Key` object, the EXT-X-KEY from m3u8. Or None - - segments: a `SegmentList` object, represents the list of `Segment`s from this playlist - - is_variant: Returns true if this M3U8 is a variant playlist, with links to other M3U8s with different bitrates. - If true, `playlists` is a list of the playlists available, and `iframe_playlists` is a list of the i-frame playlists available. - - is_endlist: Returns true if EXT-X-ENDLIST tag present in M3U8. - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8 - - playlists: If this is a variant playlist (`is_variant` is True), returns a list of Playlist objects - - iframe_playlists: If this is a variant playlist (`is_variant` is True), returns a list of IFramePlaylist objects - - playlist_type: A lower-case string representing the type of the playlist, which can be one of VOD (video on demand) or EVENT. - - media: If this is a variant playlist (`is_variant` is True), returns a list of Media objects - - target_duration: Returns the EXT-X-TARGETDURATION as an integer - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2 - - media_sequence: Returns the EXT-X-MEDIA-SEQUENCE as an integer - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3 - - program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a string - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5 - - version: Return the EXT-X-VERSION as is - - allow_cache: Return the EXT-X-ALLOW-CACHE as is - - files: Returns an iterable with all files from playlist, in order. This includes segments and key uri, if present. - - base_uri: It is a property (getter and setter) used by SegmentList and Key to have absolute URIs. - - is_i_frames_only: Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8. - Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12 - - """ - - # Mapping of simple attributes (obj attribute, parser attribute) - SIMPLE_ATTRIBUTES = ( - ('is_variant', 'is_variant'), - ('is_endlist', 'is_endlist'), - ('is_i_frames_only', 'is_i_frames_only'), - ('target_duration', 'targetduration'), - ('media_sequence', 'media_sequence'), - ('program_date_time', 'program_date_time'), - ('version', 'version'), - ('allow_cache', 'allow_cache'), - ('playlist_type', 'playlist_type') - ) - - def __init__(self, content=None, base_path=None, base_uri=None): - """ - Initialize the M3U8 object. - - Parameters: - - content: M3U8 content (string). - - base_path: Base path for relative URIs (string). - - base_uri: Base URI for absolute URIs (string). - """ - if content is not None: - self.data = parser.parse(content) - else: - self.data = {} - self._base_uri = base_uri - self.base_path = base_path - self._initialize_attributes() - - def _initialize_attributes(self): - """ - Initialize attributes based on parsed data. - """ - # Initialize key and segments - self.key = Key(base_uri=self.base_uri, **self.data.get('key', {})) if 'key' in self.data else None - self.segments = SegmentList([Segment(base_uri=self.base_uri, **params) for params in self.data.get('segments', [])]) - - # Initialize simple attributes - for attr, param in self.SIMPLE_ATTRIBUTES: - setattr(self, attr, self.data.get(param)) - - # Initialize files, media, playlists, and iframe_playlists - self.files = [] - if self.key: - self.files.append(self.key.uri) - self.files.extend(self.segments.uri) - - self.media = [Media( - uri = media.get('uri'), - type = media.get('type'), - group_id = media.get('group_id'), - language = media.get('language'), - name = media.get('name'), - default = media.get('default'), - autoselect = media.get('autoselect'), - forced = media.get('forced'), - characteristics = media.get('characteristics')) - for media in self.data.get('media', []) - ] - self.playlists = PlaylistList([Playlist( - base_uri = self.base_uri, - media = self.media, - **playlist - )for playlist in self.data.get('playlists', []) - ]) - self.iframe_playlists = PlaylistList() - for ifr_pl in self.data.get('iframe_playlists', []): - self.iframe_playlists.append( - IFramePlaylist( - base_uri = self.base_uri, - uri = ifr_pl['uri'], - iframe_stream_info=ifr_pl['iframe_stream_info']) - ) - - @property - def base_uri(self): - """ - Get the base URI. - """ - return self._base_uri - - @base_uri.setter - def base_uri(self, new_base_uri): - """ - Set the base URI. - """ - self._base_uri = new_base_uri - self.segments.base_uri = new_base_uri - - -class BasePathMixin: - """ - Mixin class for managing base paths. - """ - @property - def base_path(self): - """ - Get the base path. - """ - return os.path.dirname(self.uri) - - @base_path.setter - def base_path(self, newbase_path): - """ - Set the base path. - """ - if not self.base_path: - self.uri = "%s/%s" % (newbase_path, self.uri) - self.uri = self.uri.replace(self.base_path, newbase_path) - - -class GroupedBasePathMixin: - """ - Mixin class for managing base paths across a group of items. - """ - - def _set_base_uri(self, new_base_uri): - """ - Set the base URI for each item in the group. - """ - for item in self: - item.base_uri = new_base_uri - - base_uri = property(None, _set_base_uri) - - def _set_base_path(self, new_base_path): - """ - Set the base path for each item in the group. - """ - for item in self: - item.base_path = new_base_path - - base_path = property(None, _set_base_path) - - -class Segment(BasePathMixin): - """ - Class representing a segment in an M3U8 playlist. - Inherits from BasePathMixin for managing base paths. - """ - - def __init__(self, uri, base_uri, program_date_time=None, duration=None, - title=None, byterange=None, discontinuity=False, key=None): - """ - Initialize a Segment object. - - Args: - - uri: URI of the segment. - - base_uri: Base URI for the segment. - - program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a datetime - Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5 - - duration: Duration of the segment (optional). - - title: Title attribute from EXTINF parameter - - byterange: Byterange information of the segment (optional). - - discontinuity: Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists - Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11 - - key: Key for encryption (optional). - """ - self.uri = uri - self.duration = duration - self.title = title - self.base_uri = base_uri - self.byterange = byterange - self.program_date_time = program_date_time - self.discontinuity = discontinuity - #self.key = key - - -class SegmentList(list, GroupedBasePathMixin): - """ - Class representing a list of segments in an M3U8 playlist. - Inherits from list and GroupedBasePathMixin for managing base paths across a group of items. - """ - - @property - def uri(self): - """ - Get the URI of each segment in the SegmentList. - - Returns: - - List of URIs of segments in the SegmentList. - """ - return [seg.uri for seg in self] - - -class Key(BasePathMixin): - """ - Class representing a key used for encryption in an M3U8 playlist. - Inherits from BasePathMixin for managing base paths. - """ - - def __init__(self, method, uri, base_uri, iv=None): - """ - Initialize a Key object. - - Args: - - method: Encryption method. - ex: "AES-128" - - uri: URI of the key. - ex: "https://priv.example.com/key.php?r=52" - - base_uri: Base URI for the key. - ex: http://example.com/path/to - - iv: Initialization vector (optional). - ex: 0X12A - """ - self.method = method - self.uri = uri - self.iv = iv - self.base_uri = base_uri - - -class Playlist(BasePathMixin): - """ - Playlist object representing a link to a variant M3U8 with a specific bitrate. - - More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10 - """ - - def __init__(self, uri, stream_info, media, base_uri): - """ - Initialize a Playlist object. - - Args: - - uri: URI of the playlist. - - stream_info: is a named tuple containing the attributes: `program_id`, - - media: List of Media objects associated with the playlist. - - base_uri: Base URI for the playlist. - """ - self.uri = uri - self.base_uri = base_uri - - # Extract resolution information from stream_info - resolution = stream_info.get('resolution') - if resolution is not None: - values = resolution.split('x') - resolution_pair = (int(values[0]), int(values[1])) - else: - resolution_pair = None - - # Create StreamInfo object - self.stream_info = StreamInfo( - bandwidth = stream_info['bandwidth'], - program_id = stream_info.get('program_id'), - resolution = resolution_pair, - codecs = stream_info.get('codecs') - ) - - # Filter media based on group ID and media type - self.media = [] - for media_type in ('audio', 'video', 'subtitles'): - group_id = stream_info.get(media_type) - if group_id: - self.media += filter(lambda m: m.group_id == group_id, media) - - -class IFramePlaylist(BasePathMixin): - """ - Class representing an I-Frame playlist in an M3U8 playlist. - Inherits from BasePathMixin for managing base paths. - """ - - def __init__(self, base_uri, uri, iframe_stream_info): - """ - Initialize an IFramePlaylist object. - - Args: - - base_uri: Base URI for the I-Frame playlist. - - uri: URI of the I-Frame playlist. - - iframe_stream_info, is a named tuple containing the attributes: - `program_id`, `bandwidth`, `codecs` and `resolution` which is a tuple (w, h) of integers - """ - self.uri = uri - self.base_uri = base_uri - - # Extract resolution information from iframe_stream_info - resolution = iframe_stream_info.get('resolution') - if resolution is not None: - values = resolution.split('x') - resolution_pair = (int(values[0]), int(values[1])) - else: - resolution_pair = None - - # Create StreamInfo object for I-Frame playlist - self.iframe_stream_info = StreamInfo( - bandwidth = iframe_stream_info.get('bandwidth'), - program_id = iframe_stream_info.get('program_id'), - resolution = resolution_pair, - codecs = iframe_stream_info.get('codecs') - ) - -class PlaylistList(list, GroupedBasePathMixin): - """ - Class representing a list of playlists in an M3U8 playlist. - Inherits from list and GroupedBasePathMixin for managing base paths across a group of items. - """ - - def __str__(self): - """ - Return a string representation of the PlaylistList. - - Returns: - - String representation of the PlaylistList. - """ - output = [str(playlist) for playlist in self] - return '\n'.join(output) diff --git a/Src/Lib/Hls/M3U8/lib_parser/parser.py b/Src/Lib/Hls/M3U8/lib_parser/parser.py deleted file mode 100644 index dd49f96..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/parser.py +++ /dev/null @@ -1,338 +0,0 @@ -# 15.04.24 - -import re -import logging -import datetime - - -# Internal utilities -from ..lib_parser import protocol -from ._util import ( - remove_quotes, - remove_quotes_parser, - normalize_attribute -) - - -# External utilities -from Src.Util._jsonConfig import config_manager - - -# Variable -REMOVE_EMPTY_ROW = config_manager.get_bool('M3U8_PARSER', 'skip_empty_row_playlist') -ATTRIBUTELISTPATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''') - - -def parse(content): - """ - Given an M3U8 playlist content, parses the content and extracts metadata. - - Args: - content (str): The M3U8 playlist content. - - Returns: - dict: A dictionary containing the parsed metadata. - """ - - # Initialize data dictionary with default values - data = { - 'is_variant': False, - 'is_endlist': False, - 'is_i_frames_only': False, - 'playlist_type': None, - 'playlists': [], - 'iframe_playlists': [], - 'segments': [], - 'media': [], - } - - # Initialize state dictionary for tracking parsing state - state = { - 'expect_segment': False, - 'expect_playlist': False, - } - - # Iterate over lines in the content - content = content.split("\n") - content_length = len(content) - i = 0 - - while i < content_length: - line = content[i] - line_stripped = line.strip() - is_end = i + 1 == content_length - 2 - - if REMOVE_EMPTY_ROW: - if i < content_length - 2: - actual_row = extract_params(line_stripped) - next_row = extract_params(content[i + 2].strip()) - - if actual_row is not None and next_row is None and not is_end: - logging.info(f"Skip row: {line_stripped}") - i += 1 - continue - - i += 1 - - if line.startswith(protocol.ext_x_byterange): - _parse_byterange(line, state) - state['expect_segment'] = True - - elif state['expect_segment']: - _parse_ts_chunk(line, data, state) - state['expect_segment'] = False - - elif state['expect_playlist']: - _parse_variant_playlist(line, data, state) - state['expect_playlist'] = False - - elif line.startswith(protocol.ext_x_targetduration): - _parse_simple_parameter(line, data, float) - elif line.startswith(protocol.ext_x_media_sequence): - _parse_simple_parameter(line, data, int) - elif line.startswith(protocol.ext_x_discontinuity): - state['discontinuity'] = True - elif line.startswith(protocol.ext_x_version): - _parse_simple_parameter(line, data) - elif line.startswith(protocol.ext_x_allow_cache): - _parse_simple_parameter(line, data) - - elif line.startswith(protocol.ext_x_key): - state['current_key'] = _parse_key(line) - data['key'] = data.get('key', state['current_key']) - - elif line.startswith(protocol.extinf): - _parse_extinf(line, data, state) - state['expect_segment'] = True - - elif line.startswith(protocol.ext_x_stream_inf): - state['expect_playlist'] = True - _parse_stream_inf(line, data, state) - - elif line.startswith(protocol.ext_x_i_frame_stream_inf): - _parse_i_frame_stream_inf(line, data) - - elif line.startswith(protocol.ext_x_media): - _parse_media(line, data, state) - - elif line.startswith(protocol.ext_x_playlist_type): - _parse_simple_parameter(line, data) - - elif line.startswith(protocol.ext_i_frames_only): - data['is_i_frames_only'] = True - - elif line.startswith(protocol.ext_x_endlist): - data['is_endlist'] = True - - return data - - -def extract_params(line): - """ - Extracts parameters from a formatted input string. - - Args: - - line (str): The string containing the parameters to extract. - - Returns: - dict or None: A dictionary containing the extracted parameters with their respective values. - """ - params = {} - matches = re.findall(r'([A-Z\-]+)=("[^"]*"|[^",\s]*)', line) - if not matches: - return None - for match in matches: - param, value = match - params[param] = value.strip('"') - return params - -def _parse_key(line): - """ - Parses the #EXT-X-KEY line and extracts key attributes. - - Args: - - line (str): The #EXT-X-KEY line from the playlist. - - Returns: - dict: A dictionary containing the key attributes. - """ - params = ATTRIBUTELISTPATTERN.split(line.replace(protocol.ext_x_key + ':', ''))[1::2] - key = {} - for param in params: - name, value = param.split('=', 1) - key[normalize_attribute(name)] = remove_quotes(value) - return key - -def _parse_extinf(line, data, state): - """ - Parses the #EXTINF line and extracts segment duration and title. - - Args: - - line (str): The #EXTINF line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - duration, title = line.replace(protocol.extinf + ':', '').split(',') - state['segment'] = {'duration': float(duration), 'title': remove_quotes(title)} - -def _parse_ts_chunk(line, data, state): - """ - Parses a segment URI line and adds it to the segment list. - - Args: - line (str): The segment URI line from the playlist. - data (dict): The dictionary to store the parsed data. - state (dict): The parsing state. - """ - segment = state.pop('segment') - if state.get('current_program_date_time'): - segment['program_date_time'] = state['current_program_date_time'] - state['current_program_date_time'] += datetime.timedelta(seconds=segment['duration']) - segment['uri'] = line - segment['discontinuity'] = state.pop('discontinuity', False) - if state.get('current_key'): - segment['key'] = state['current_key'] - data['segments'].append(segment) - -def _parse_attribute_list(prefix, line, atribute_parser): - """ - Parses a line containing a list of attributes and their values. - - Args: - - prefix (str): The prefix to identify the line. - - line (str): The line containing the attributes. - - atribute_parser (dict): A dictionary mapping attribute names to parsing functions. - - Returns: - dict: A dictionary containing the parsed attributes. - """ - params = ATTRIBUTELISTPATTERN.split(line.replace(prefix + ':', ''))[1::2] - - attributes = {} - for param in params: - name, value = param.split('=', 1) - name = normalize_attribute(name) - - if name in atribute_parser: - value = atribute_parser[name](value) - - attributes[name] = value - - return attributes - -def _parse_stream_inf(line, data, state): - """ - Parses the #EXT-X-STREAM-INF line and extracts stream information. - - Args: - - line (str): The #EXT-X-STREAM-INF line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - data['is_variant'] = True - atribute_parser = remove_quotes_parser('codecs', 'audio', 'video', 'subtitles') - atribute_parser["program_id"] = int - atribute_parser["bandwidth"] = int - state['stream_info'] = _parse_attribute_list(protocol.ext_x_stream_inf, line, atribute_parser) - -def _parse_i_frame_stream_inf(line, data): - """ - Parses the #EXT-X-I-FRAME-STREAM-INF line and extracts I-frame stream information. - - Args: - - line (str): The #EXT-X-I-FRAME-STREAM-INF line from the playlist. - - data (dict): The dictionary to store the parsed data. - """ - atribute_parser = remove_quotes_parser('codecs', 'uri') - atribute_parser["program_id"] = int - atribute_parser["bandwidth"] = int - iframe_stream_info = _parse_attribute_list(protocol.ext_x_i_frame_stream_inf, line, atribute_parser) - iframe_playlist = {'uri': iframe_stream_info.pop('uri'), - 'iframe_stream_info': iframe_stream_info} - - data['iframe_playlists'].append(iframe_playlist) - -def _parse_media(line, data, state): - """ - Parses the #EXT-X-MEDIA line and extracts media attributes. - - Args: - - line (str): The #EXT-X-MEDIA line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - quoted = remove_quotes_parser('uri', 'group_id', 'language', 'name', 'characteristics') - media = _parse_attribute_list(protocol.ext_x_media, line, quoted) - data['media'].append(media) - -def _parse_variant_playlist(line, data, state): - """ - Parses a variant playlist line and extracts playlist information. - - Args: - - line (str): The variant playlist line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - playlist = {'uri': line, 'stream_info': state.pop('stream_info')} - - data['playlists'].append(playlist) - -def _parse_byterange(line, state): - """ - Parses the #EXT-X-BYTERANGE line and extracts byte range information. - - Args: - - line (str): The #EXT-X-BYTERANGE line from the playlist. - - state (dict): The parsing state. - """ - state['segment']['byterange'] = line.replace(protocol.ext_x_byterange + ':', '') - -def _parse_simple_parameter_raw_value(line, cast_to=str, normalize=False): - """ - Parses a line containing a simple parameter and its value. - - Args: - - line (str): The line containing the parameter and its value. - - cast_to (type): The type to which the value should be cast. - - normalize (bool): Whether to normalize the parameter name. - - Returns: - tuple: A tuple containing the parameter name and its value. - """ - param, value = line.split(':', 1) - param = normalize_attribute(param.replace('#EXT-X-', '')) - if normalize: - value = normalize_attribute(value) - return param, cast_to(value) - -def _parse_and_set_simple_parameter_raw_value(line, data, cast_to=str, normalize=False): - """ - Parses a line containing a simple parameter and its value, and sets it in the data dictionary. - - Args: - - line (str): The line containing the parameter and its value. - - data (dict): The dictionary to store the parsed data. - - cast_to (type): The type to which the value should be cast. - - normalize (bool): Whether to normalize the parameter name. - - Returns: - The parsed value. - """ - param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize) - data[param] = value - return data[param] - -def _parse_simple_parameter(line, data, cast_to=str): - """ - Parses a line containing a simple parameter and its value, and sets it in the data dictionary. - - Args: - line (str): The line containing the parameter and its value. - data (dict): The dictionary to store the parsed data. - cast_to (type): The type to which the value should be cast. - - Returns: - The parsed value. - """ - return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/lib_parser/protocol.py b/Src/Lib/Hls/M3U8/lib_parser/protocol.py deleted file mode 100644 index 7fcf5a5..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/protocol.py +++ /dev/null @@ -1,17 +0,0 @@ -# 15.04.24 - -ext_x_targetduration = '#EXT-X-TARGETDURATION' -ext_x_media_sequence = '#EXT-X-MEDIA-SEQUENCE' -ext_x_program_date_time = '#EXT-X-PROGRAM-DATE-TIME' -ext_x_media = '#EXT-X-MEDIA' -ext_x_playlist_type = '#EXT-X-PLAYLIST-TYPE' -ext_x_key = '#EXT-X-KEY' -ext_x_stream_inf = '#EXT-X-STREAM-INF' -ext_x_version = '#EXT-X-VERSION' -ext_x_allow_cache = '#EXT-X-ALLOW-CACHE' -ext_x_endlist = '#EXT-X-ENDLIST' -extinf = '#EXTINF' -ext_i_frames_only = '#EXT-X-I-FRAMES-ONLY' -ext_x_byterange = '#EXT-X-BYTERANGE' -ext_x_i_frame_stream_inf = '#EXT-X-I-FRAME-STREAM-INF' -ext_x_discontinuity = '#EXT-X-DISCONTINUITY' diff --git a/Src/Lib/Hls/M3U8/math_calc.py b/Src/Lib/Hls/M3U8/math_calc.py deleted file mode 100644 index 46aaccb..0000000 --- a/Src/Lib/Hls/M3U8/math_calc.py +++ /dev/null @@ -1,41 +0,0 @@ -# 20.02.24 - -# Internal utilities -from Src.Util.os import format_size - - -class M3U8_Ts_Files: - def __init__(self): - """ - Initialize the TSFileSizeCalculator object. - - Args: - - num_segments (int): The number of segments. - """ - self.ts_file_sizes = [] - - def add_ts_file_size(self, size: int): - """ - Add a file size to the list of file sizes. - - Args: - - size (float): The size of the ts file to be added. - """ - self.ts_file_sizes.append(size) - - def calculate_total_size(self): - """ - Calculate the total size of the files. - - Returns: - float: The mean size of the files in a human-readable format. - """ - - if len(self.ts_file_sizes) == 0: - return 0 - - total_size = sum(self.ts_file_sizes) - mean_size = total_size / len(self.ts_file_sizes) - - # Return format mean - return format_size(mean_size) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/parser.py b/Src/Lib/Hls/M3U8/parser.py deleted file mode 100644 index b387891..0000000 --- a/Src/Lib/Hls/M3U8/parser.py +++ /dev/null @@ -1,547 +0,0 @@ -# 20.04.25 - -import logging - - -# Internal utilities -from .lib_parser import load - - -# External libraries -from Src.Lib.Request.my_requests import requests - - -# Costant -CODEC_MAPPINGS = { - "video": { - "avc1": "libx264", - "avc2": "libx264", - "avc3": "libx264", - "avc4": "libx264", - "hev1": "libx265", - "hev2": "libx265", - "hvc1": "libx265", - "hvc2": "libx265", - "vp8": "libvpx", - "vp9": "libvpx-vp9", - "vp10": "libvpx-vp9" - }, - "audio": { - "mp4a": "aac", - "mp3": "libmp3lame", - "ac-3": "ac3", - "ec-3": "eac3", - "opus": "libopus", - "vorbis": "libvorbis" - } -} - -RESOLUTIONS = [ - (7680, 4320), - (3840, 2160), - (2560, 1440), - (1920, 1080), - (1280, 720), - (640, 480) -] - - - -class M3U8_Codec: - """ - Represents codec information for an M3U8 playlist. - """ - - def __init__(self, bandwidth, resolution, codecs): - """ - Initializes the M3U8Codec object with the provided parameters. - - Args: - - bandwidth (int): Bandwidth of the codec. - - resolution (str): Resolution of the codec. - - codecs (str): Codecs information in the format "avc1.xxxxxx,mp4a.xx". - """ - self.bandwidth = bandwidth - self.resolution = resolution - self.codecs = codecs - self.audio_codec = None - self.video_codec = None - self.extract_codecs() - self.parse_codecs() - - def extract_codecs(self): - """ - Parses the codecs information to extract audio and video codecs. - Extracted codecs are set as attributes: audio_codec and video_codec. - """ - - # Split the codecs string by comma - codecs_list = self.codecs.split(',') - - # Separate audio and video codecs - for codec in codecs_list: - if codec.startswith('avc'): - self.video_codec = codec - elif codec.startswith('mp4a'): - self.audio_codec = codec - - def convert_video_codec(self, video_codec_identifier) -> str: - - """ - Convert video codec identifier to codec name. - - Args: - - video_codec_identifier (str): Identifier of the video codec. - - Returns: - str: Codec name corresponding to the identifier. - """ - - # Extract codec type from the identifier - codec_type = video_codec_identifier.split('.')[0] - - # Retrieve codec mapping from the provided mappings or fallback to static mappings - video_codec_mapping = CODEC_MAPPINGS.get('video', {}) - codec_name = video_codec_mapping.get(codec_type) - - if codec_name: - return codec_name - - else: - logging.warning(f"No corresponding video codec found for {video_codec_identifier}. Using default codec libx264.") - return "libx264" # Default - - def convert_audio_codec(self, audio_codec_identifier) -> str: - - """ - Convert audio codec identifier to codec name. - - Args: - - audio_codec_identifier (str): Identifier of the audio codec. - - Returns: - str: Codec name corresponding to the identifier. - """ - - # Extract codec type from the identifier - codec_type = audio_codec_identifier.split('.')[0] - - # Retrieve codec mapping from the provided mappings or fallback to static mappings - audio_codec_mapping = CODEC_MAPPINGS.get('audio', {}) - codec_name = audio_codec_mapping.get(codec_type) - - if codec_name: - return codec_name - - else: - logging.warning(f"No corresponding audio codec found for {audio_codec_identifier}. Using default codec aac.") - return "aac" # Default - - def parse_codecs(self): - """ - Parse video and audio codecs. - This method updates `video_codec_name` and `audio_codec_name` attributes. - """ - - self.video_codec_name = self.convert_video_codec(self.video_codec) - self.audio_codec_name = self.convert_audio_codec(self.audio_codec) - - def __str__(self): - """ - Returns a string representation of the M3U8Codec object. - """ - return f"BANDWIDTH={self.bandwidth},RESOLUTION={self.resolution},CODECS=\"{self.codecs}\"" - - -class M3U8_Video: - def __init__(self, video_playlist) -> None: - """ - Initializes an M3U8_Video object with the provided video playlist. - - Args: - - video_playlist (M3U8): An M3U8 object representing the video playlist. - """ - self.video_playlist = video_playlist - - def get_best_uri(self): - """ - Returns the URI with the highest resolution from the video playlist. - - Returns: - tuple or None: A tuple containing the URI with the highest resolution and its resolution value, or None if the video list is empty. - """ - if not self.video_playlist: - return None - - best_uri = max(self.video_playlist, key=lambda x: x['resolution']) - return best_uri['uri'], best_uri['resolution'] - - def get_worst_uri(self): - """ - Returns the URI with the lowest resolution from the video playlist. - - Returns: - - tuple or None: A tuple containing the URI with the lowest resolution and its resolution value, or None if the video list is empty. - """ - if not self.video_playlist: - return None - - worst_uri = min(self.video_playlist, key=lambda x: x['resolution']) - return worst_uri['uri'], worst_uri['resolution'] - - def get_custom_uri(self, y_resolution): - """ - Returns the URI corresponding to a custom resolution from the video list. - - Args: - - video_list (list): A list of dictionaries containing video URIs and resolutions. - - custom_resolution (tuple): A tuple representing the custom resolution. - - Returns: - str or None: The URI corresponding to the custom resolution, or None if not found. - """ - for video in self.video_playlist: - logging.info(f"Check resolution from playlist: {int(video['resolution'][1])}, with input: {int(y_resolution)}") - - if int(video['resolution'][1]) == int(y_resolution): - return video['uri'], video['resolution'] - - return None, None - - def get_list_resolution(self): - """ - Retrieve a list of resolutions from the video playlist. - - Returns: - list: A list of resolutions extracted from the video playlist. - """ - return [video['resolution'] for video in self.video_playlist] - - -class M3U8_Audio: - def __init__(self, audio_playlist) -> None: - """ - Initializes an M3U8_Audio object with the provided audio playlist. - - Args: - - audio_playlist (M3U8): An M3U8 object representing the audio playlist. - """ - self.audio_playlist = audio_playlist - - def get_uri_by_language(self, language): - """ - Returns a dictionary with 'name' and 'uri' given a specific language. - - Args: - - audio_list (list): List of dictionaries containing audio information. - - language (str): The desired language. - - Returns: - dict or None: Dictionary with 'name', 'language', and 'uri' for the specified language, or None if not found. - """ - for audio in self.audio_playlist: - if audio['language'] == language: - return {'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} - return None - - def get_all_uris_and_names(self): - """ - Returns a list of dictionaries containing all URIs and names. - - Args: - - audio_list (list): List of dictionaries containing audio information. - - Returns: - list: List of dictionaries containing 'name', 'language', and 'uri' for all audio in the list. - """ - return [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist] - - def get_default_uri(self): - """ - Returns the dictionary with 'default' equal to 'YES'. - - Args: - - audio_list (list): List of dictionaries containing audio information. - - Returns: - dict or None: Dictionary with 'default' equal to 'YES', or None if not found. - """ - for audio in self.audio_playlist: - if audio['default'] == 'YES': - return audio.get('uri') - return None - - -class M3U8_Subtitle: - def __init__(self, subtitle_playlist) -> None: - """ - Initializes an M3U8_Subtitle object with the provided subtitle playlist. - - Args: - - subtitle_playlist (M3U8): An M3U8 object representing the subtitle playlist. - """ - self.subtitle_playlist = subtitle_playlist - - def get_uri_by_language(self, language): - """ - Returns a dictionary with 'name' and 'uri' given a specific language for subtitles. - - Args: - - subtitle_list (list): List of dictionaries containing subtitle information. - - language (str): The desired language. - - Returns: - dict or None: Dictionary with 'name' and 'uri' for the specified language for subtitles, or None if not found. - """ - for subtitle in self.subtitle_playlist: - if subtitle['language'] == language: - return {'name': subtitle['name'], 'uri': subtitle['uri']} - return None - - def get_all_uris_and_names(self): - """ - Returns a list of dictionaries containing all URIs and names of subtitles. - - Args: - - subtitle_list (list): List of dictionaries containing subtitle information. - - Returns: - list: List of dictionaries containing 'name' and 'uri' for all subtitles in the list. - """ - return [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist] - - def get_default_uri(self): - """ - Returns the dictionary with 'default' equal to 'YES' for subtitles. - - Args: - - subtitle_list (list): List of dictionaries containing subtitle information. - - Returns: - dict or None: Dictionary with 'default' equal to 'YES' for subtitles, or None if not found. - """ - for subtitle in self.subtitle_playlist: - if subtitle['default'] == 'YES': - return subtitle - return None - - def download_all(self, custom_subtitle): - """ - Download all subtitles listed in the object's attributes, filtering based on a provided list of custom subtitles. - - Args: - - custom_subtitle (list): A list of custom subtitles to download. - - Returns: - list: A list containing dictionaries with subtitle information including name, language, and URI. - """ - - output = [] # Initialize an empty list to store subtitle information - - # Iterate through all available subtitles - for obj_subtitle in self.subtitle_get_all_uris_and_names(): - - # Check if the subtitle name is not in the list of custom subtitles, and skip if not found - if obj_subtitle.get('name') not in custom_subtitle: - continue - - # Send a request to retrieve the subtitle content - logging.info(f"Download subtitle: {obj_subtitle.get('name')}") - response_subitle = requests.get(obj_subtitle.get('uri')) - - try: - # Try to extract the VTT URL from the subtitle content - sub_parse = M3U8_Parser() - sub_parse.parse_data(obj_subtitle.get('uri'), response_subitle.text) - url_subititle = sub_parse.subtitle[0] - - output.append({ - 'name': obj_subtitle.get('name'), - 'language': obj_subtitle.get('language'), - 'uri': url_subititle - }) - - except Exception as e: - logging.error(f"Cant donwload: {obj_subtitle.get('name')}, error: {e}") - - return output - - -class M3U8_Parser: - def __init__(self): - self.segments = [] - self.video_playlist = [] - self.keys = None - self.subtitle_playlist = [] - self.subtitle = [] - self.audio_playlist = [] - self.codec: M3U8_Codec = None - self._video: M3U8_Video = None - self._audio: M3U8_Audio = None - self._subtitle: M3U8_Subtitle = None - - self.__create_variable__() - - def parse_data(self, uri, raw_content) -> None: - """ - Extracts all information present in the provided M3U8 content. - - Args: - - m3u8_content (str): The content of the M3U8 file. - """ - - - # Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles - m3u8_obj = load(raw_content, uri) - - self.__parse_video_info__(m3u8_obj) - self.__parse_encryption_keys__(m3u8_obj) - self.__parse_subtitles_and_audio__(m3u8_obj) - self.__parse_segments__(m3u8_obj) - - @staticmethod - def extract_resolution(uri: str) -> int: - """ - Extracts the video resolution from the given URI. - - Args: - - uri (str): The URI containing video information. - - Returns: - int: The video resolution if found, otherwise 0. - """ - - # Log - logging.info(f"Try extract resolution from: {uri}") - - for resolution in RESOLUTIONS: - if "http" in str(uri): - if str(resolution[1]) in uri: - return resolution - - # Default resolution return (not best) - logging.error("No resolution found with custom parsing.") - logging.warning("Try set remove duplicate line to TRUE.") - return (0, 0) - - def __parse_video_info__(self, m3u8_obj) -> None: - """ - Extracts video information from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing video playlists. - """ - - try: - for playlist in m3u8_obj.playlists: - - # Direct access resolutions in m3u8 obj - if playlist.stream_info.resolution is not None: - - self.video_playlist.append({ - "uri": playlist.uri, - "resolution": playlist.stream_info.resolution - }) - - # Find resolutions in uri - else: - - self.video_playlist.append({ - "uri": playlist.uri, - "resolution": M3U8_Parser.extract_resolution(playlist.uri) - }) - - # Dont stop - continue - - # Check if all key is present to create codec - try: - self.codec = M3U8_Codec( - playlist.stream_info.bandwidth, - playlist.stream_info.resolution, - playlist.stream_info.codecs - ) - except: - logging.error(f"Error parsing codec: {e}") - - except Exception as e: - logging.error(f"Error parsing video info: {e}") - - def __parse_encryption_keys__(self, m3u8_obj) -> None: - """ - Extracts encryption keys from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing encryption keys. - """ - try: - - if m3u8_obj.key is not None: - if self.keys is None: - self.keys = { - 'method': m3u8_obj.key.method, - 'iv': m3u8_obj.key.iv, - 'uri': m3u8_obj.key.uri - } - - - except Exception as e: - logging.error(f"Error parsing encryption keys: {e}") - pass - - def __parse_subtitles_and_audio__(self, m3u8_obj) -> None: - """ - Extracts subtitles and audio information from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing subtitles and audio data. - """ - try: - for media in m3u8_obj.media: - if media.type == "SUBTITLES": - self.subtitle_playlist.append({ - "type": media.type, - "name": media.name, - "default": media.default, - "language": media.language, - "uri": media.uri - }) - - if media.type == "AUDIO": - self.audio_playlist.append({ - "type": media.type, - "name": media.name, - "default": media.default, - "language": media.language, - "uri": media.uri - }) - - except Exception as e: - logging.error(f"Error parsing subtitles and audio: {e}") - - def __parse_segments__(self, m3u8_obj) -> None: - """ - Extracts segment information from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing segment data. - """ - - try: - for segment in m3u8_obj.segments: - if "vtt" not in segment.uri: - self.segments.append(segment.uri) - else: - self.subtitle.append(segment.uri) - - except Exception as e: - logging.error(f"Error parsing segments: {e}") - - def __create_variable__(self): - """ - Initialize variables for video, audio, and subtitle playlists. - """ - - self._video = M3U8_Video(self.video_playlist) - self._audio = M3U8_Audio(self.audio_playlist) - self._subtitle = M3U8_Subtitle(self.subtitle_playlist) diff --git a/Src/Lib/Hls/downloader.py b/Src/Lib/Hls/downloader.py index c22278a..6392a5b 100644 --- a/Src/Lib/Hls/downloader.py +++ b/Src/Lib/Hls/downloader.py @@ -1,23 +1,29 @@ # 5.01.24 import os +import sys import logging -from concurrent.futures import ThreadPoolExecutor from datetime import datetime +from concurrent.futures import ThreadPoolExecutor # Internal utilities -from Src.Util.console import console, Panel from Src.Lib.Request.my_requests import requests from Src.Util.headers import get_headers -from Src.Util.color import Colors from Src.Util._jsonConfig import config_manager +from Src.Util.console import console, Panel +from Src.Util.color import Colors from Src.Util.os import ( remove_folder, delete_files_except_one, compute_sha1_hash, - format_size + format_size, + create_folder, + reduce_base_name, + remove_special_characters ) +from Src.Lib.Unidecode import transliterate +from Src.Util.file_validator import can_create_file # Logic class @@ -30,23 +36,23 @@ from ..FFmpeg import ( from ..M3U8 import ( M3U8_Parser, M3U8_Codec, - m3u8_url_fix + M3U8_UrlFix ) from .segments import M3U8_Segments from ..E_Table import report_table # Config -DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_OPTIONS', 'specific_list_audio') -DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_OPTIONS', 'specific_list_subtitles') -REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_OPTIONS', 'cleanup_tmp_folder') +DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_FILTER', 'specific_list_audio') +DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_FILTER', 'specific_list_subtitles') +REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_FILTER', 'cleanup_tmp_folder') +FORCE_TS = config_manager.get_dict('M3U8_FILTER', 'force_ts') FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution') -CREATE_REPORT = config_manager.get_bool('M3U8', 'create_report') +CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report') # Variable headers_index = config_manager.get_dict('M3U8_REQUESTS', 'index') -FORCE_TS = config_manager.get_dict('M3U8_OPTIONS', 'force_ts') class Downloader(): @@ -71,6 +77,17 @@ class Downloader(): self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_playlist)) else: self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_index)) + + else: + folder, base_name = os.path.split(self.output_filename) # Split file_folder output + base_name = reduce_base_name(remove_special_characters(transliterate(base_name))) # Remove special char + create_folder(folder) # Create folder and check if exist + if not can_create_file(base_name): # Check if folder file name can be create + logging.error("Invalid mp4 name.") + sys.exit(0) + + self.output_filename = os.path.join(folder, base_name) + logging.info(f"Output filename: {self.output_filename}") # Initialize temp base path @@ -93,6 +110,9 @@ class Downloader(): # Path converted ts files self.path_video_audio = None self.path_video_subtitle = None + + # Class + self.m3u8_url_fixer = M3U8_UrlFix() def __df_make_req__(self, url: str) -> str: """ @@ -112,18 +132,15 @@ class Downloader(): headers_index['user-agent'] = get_headers() response = requests.get(url, headers=headers_index) - # Check status response of request - response.raise_for_status() - if response.ok: return response.text else: - logging.error(f"Request to {url} failed with status code: {response.status_code}") + logging.error(f"Test request to {url} failed with status code: {response.status_code}") return None except Exception as e: - logging.error(f"An unexpected error occurred: {e}") + logging.error(f"An unexpected error occurred with test request: {e}") return None def __manage_playlist__(self, m3u8_playlist_text): @@ -188,7 +205,7 @@ class Downloader(): if "http" not in self.m3u8_index: # Generate full URL - self.m3u8_index = m3u8_url_fix.generate_full_url(self.m3u8_index) + self.m3u8_index = self.m3u8_url_fixer.generate_full_url(self.m3u8_index) logging.info(f"Generate index url: {self.m3u8_index}") # Check if a valid HTTPS URL is obtained @@ -457,11 +474,11 @@ class Downloader(): logging.info("Download from PLAYLIST") # Fetch the M3U8 playlist content - if not len(str(self.m3u8_playlist).split("\n")) > 2: + if not len(str(self.m3u8_playlist).split("\n")) > 2: # Is a single link m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist) # Add full URL of the M3U8 playlist to fix next .ts without https if necessary - m3u8_url_fix.set_playlist(self.m3u8_playlist) # !!!!!!!!!!!!!!!!!! to fix for playlist with text + self.m3u8_url_fixer.set_playlist(self.m3u8_playlist) # !!!!!!!!!!!!!!!!!! to fix for playlist with text else: logging.warning("M3U8 master url not set.") # TO DO @@ -506,7 +523,7 @@ class Downloader(): logging.info("Download from INDEX") # Add full URL of the M3U8 playlist to fix next .ts without https if necessary - m3u8_url_fix.set_playlist(self.m3u8_index) + self.m3u8_url_fixer.set_playlist(self.m3u8_index) # Start all download ... self.__donwload_video__() diff --git a/Src/Lib/Hls/segments.py b/Src/Lib/Hls/segments.py index 0c4647e..507041a 100644 --- a/Src/Lib/Hls/segments.py +++ b/Src/Lib/Hls/segments.py @@ -22,31 +22,24 @@ from Src.Util.headers import get_headers from Src.Util.color import Colors from Src.Lib.Request.my_requests import requests from Src.Util._jsonConfig import config_manager -from Src.Util.os import ( - format_size -) - # Logic class from ..M3U8 import ( M3U8_Decryption, - M3U8_Ts_Files, + M3U8_Ts_Estimator, M3U8_Parser, - m3u8_url_fix + M3U8_UrlFix ) # Config -TQDM_MAX_WORKER = config_manager.get_int('M3U8', 'tdqm_workers') -DELAY_START_WORKER = config_manager.get_float('M3U8', 'delay_start_workers') -TQDM_PROGRESS_TIMEOUT = config_manager.get_int('M3U8', 'tqdm_progress_timeout') -REQUESTS_TIMEOUT = config_manager.get_int('M3U8', 'requests_timeout') -ENABLE_TIME_TIMEOUT = config_manager.get_bool('M3U8', 'enable_time_quit') -TQDM_SHOW_PROGRESS = config_manager.get_bool('M3U8', 'tqdm_show_progress') -LIMIT_DONWLOAD_PERCENTAGE = config_manager.get_float('M3U8', 'download_percentage') -SAVE_M3U8_FILE = config_manager.get_float('M3U8', 'save_m3u8_content') -FAKE_PROXY = config_manager.get_float('M3U8', 'fake_proxy') -FAKE_PROXY_IP = config_manager.get_list('M3U8', 'fake_proxy_ip') +TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers') +TQDM_SHOW_PROGRESS = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_show_progress') +FAKE_PROXY = config_manager.get_float('M3U8_DOWNLOAD', 'fake_proxy') +FAKE_PROXY_IP = config_manager.get_list('M3U8_DOWNLOAD', 'fake_proxy_ip') +REQUEST_TIMEOUT = config_manager.get_int('M3U8_REQUESTS', 'timeout') +REQUEST_VERIFY_SSL = config_manager.get_bool('M3U8_REQUESTS', 'verify_ssl') +REQUEST_DISABLE_ERROR = config_manager.get_bool('M3U8_REQUESTS', 'disable_error') # Variable @@ -66,9 +59,7 @@ class M3U8_Segments: """ self.url = url self.tmp_folder = tmp_folder - self.downloaded_size = 0 self.decryption: M3U8_Decryption = None # Initialize decryption as None - self.class_ts_files_size = M3U8_Ts_Files() # Initialize the TS files size class self.segment_queue = queue.PriorityQueue() # Priority queue to maintain the order of segments self.current_index = 0 # Index of the current segment to be written self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts") # Path to the temporary file @@ -76,8 +67,8 @@ class M3U8_Segments: self.ctrl_c_detected = False # Global variable to track Ctrl+C detection os.makedirs(self.tmp_folder, exist_ok=True) # Create the temporary folder if it does not exist - self.list_speeds = [] - self.average_over = int(TQDM_MAX_WORKER / 3) + self.class_ts_estimator = M3U8_Ts_Estimator(TQDM_MAX_WORKER) + self.class_url_fixer = M3U8_UrlFix(url) def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes: """ @@ -143,6 +134,27 @@ class M3U8_Segments: # Store the segment information parsed from the playlist self.segments = m3u8_parser.segments + # Fix URL if it is incomplete (missing 'http') + for i in range(len(self.segments)): + segment_url = self.segments[i] + + if "http" not in segment_url: + self.segments[i] = self.class_url_fixer.generate_full_url(segment_url) + logging.info(f"Generated new URL: {self.segments[i]}, from: {segment_url}") + + # Change IP address of server + if FAKE_PROXY: + for i in range(len(self.segments)): + segment_url = self.segments[i] + + self.segments[i] = self.__gen_proxy__(segment_url, self.segments.index(segment_url)) + + # Save new playlist of segment + path_m3u8_file = os.path.join(self.tmp_folder, "playlist_fix.m3u8") + with open(path_m3u8_file, "w") as file: + for item in self.segments: + file.write(f"{item}\n") + def get_info(self) -> None: """ Makes a request to the index M3U8 file to get information about segments. @@ -154,9 +166,8 @@ class M3U8_Segments: response.raise_for_status() # Raise an exception for HTTP errors # Save the M3U8 file to the temporary folder - if SAVE_M3U8_FILE: - path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") - open(path_m3u8_file, "w+").write(response.text) + path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") + open(path_m3u8_file, "w+").write(response.text) # Parse the text from the M3U8 index file self.parse_data(response.text) @@ -176,9 +187,39 @@ class M3U8_Segments: # Parse the original URL and replace the hostname with the new IP address parsed_url = urlparse(url)._replace(netloc=new_ip_address) + return urlunparse(parsed_url) - def make_requests_stream(self, ts_url: str, index: int, stop_event: threading.Event, progress_counter: tqdm, add_desc: str) -> None: + def update_progress_bar(self, segment_content: bytes, duration: float, progress_counter: tqdm) -> None: + """ + Updates the progress bar with information about the TS segment download. + + Args: + segment_content (bytes): The content of the downloaded TS segment. + duration (float): The duration of the segment download in seconds. + progress_counter (tqdm): The tqdm object representing the progress bar. + """ + if TQDM_SHOW_PROGRESS: + total_downloaded = len(segment_content) + + # Add the size of the downloaded segment to the estimator + self.class_ts_estimator.add_ts_file(total_downloaded * len(self.segments), total_downloaded, duration) + + # Get downloaded size and total estimated size + downloaded_file_size_str = self.class_ts_estimator.get_downloaded_size().split(' ')[0] + file_total_size = self.class_ts_estimator.calculate_total_size() + number_file_total_size = file_total_size.split(' ')[0] + units_file_total_size = file_total_size.split(' ')[1] + + average_internet_speed = self.class_ts_estimator.get_average_speed() + + # Update the progress bar's postfix + progress_counter.set_postfix_str( + f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_file_size_str} {Colors.WHITE}< {Colors.GREEN}{number_file_total_size} {Colors.RED}{units_file_total_size} " + f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s" + ) + + def make_requests_stream(self, ts_url: str, index: int, stop_event: threading.Event, progress_bar: tqdm) -> None: """ Downloads a TS segment and adds it to the segment queue. @@ -186,72 +227,47 @@ class M3U8_Segments: - ts_url (str): The URL of the TS segment. - index (int): The index of the segment. - stop_event (threading.Event): Event to signal the stop of downloading. - - progress_counter (tqdm): Progress counter for tracking download progress. + - progress_bar (tqdm): Progress counter for tracking download progress. - add_desc (str): Additional description for the progress bar. """ if stop_event.is_set(): return # Exit if the stop event is set - - headers_segments['user-agent'] = get_headers() - # Fix URL if it is incomplete (missing 'http') - if "http" not in ts_url: - ts_url = m3u8_url_fix.generate_full_url(ts_url) - logging.info(f"Generated new URL: {ts_url}") + # Generate new user agent + headers_segments['user-agent'] = get_headers() try: - # Change IP address if FAKE_PROXY is enabled - if FAKE_PROXY: - ts_url = self.__gen_proxy__(ts_url, self.segments.index(ts_url)) - # Make request and calculate time duration start_time = time.time() - response = requests.get(ts_url, headers=headers_segments, timeout=REQUESTS_TIMEOUT, verify_ssl=False) # Send GET request for the segment + response = requests.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, verify_ssl=REQUEST_VERIFY_SSL) duration = time.time() - start_time if response.ok: # Get the content of the segment segment_content = response.content - total_downloaded = len(response.content) - - # Calculate mbps - speed_mbps = (total_downloaded * 8) / (duration * 1_000_000) * TQDM_MAX_WORKER - self.list_speeds.append(speed_mbps) - - # Get average speed after (average_over) - if len(self.list_speeds) > self.average_over: - self.list_speeds.pop(0) - average_speed = ( sum(self.list_speeds) / len(self.list_speeds) ) / 10 # MB/s - #print(f"{average_speed:.2f} MB/s") - #progress_counter.set_postfix_str(f"{average_speed:.2f} MB/s") - - - if TQDM_SHOW_PROGRESS: - self.downloaded_size += len(response.content) # Update the downloaded size - self.class_ts_files_size.add_ts_file_size(len(response.content) * len(self.segments)) # Update the TS file size class - downloaded_size_str = format_size(self.downloaded_size) # Format the downloaded size - estimate_total_size = self.class_ts_files_size.calculate_total_size() # Calculate the estimated total size - progress_counter.set_postfix_str(f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size_str.split(' ')[0]} {Colors.WHITE}< {Colors.GREEN}{estimate_total_size.split(' ')[0]} {Colors.RED}MB {Colors.WHITE}| {Colors.CYAN}{average_speed:.2f} {Colors.RED}MB/s") + self.update_progress_bar(segment_content, duration, progress_bar) # Decrypt the segment content if decryption is needed if self.decryption is not None: segment_content = self.decryption.decrypt(segment_content) with self.condition: - self.segment_queue.put((index, segment_content)) # Add the segment to the queue self.condition.notify() # Notify the writer thread that a new segment is available - progress_counter.update(1) # Update the progress counter - else: - logging.warning(f"Failed to download segment: {ts_url}") + if not REQUEST_DISABLE_ERROR: + logging.error(f"Failed to download segment: {ts_url}") except Exception as e: - logging.error(f"Exception while downloading segment: {e}") + if not REQUEST_DISABLE_ERROR: + logging.error(f"Exception while downloading segment: {e}") + + # Update bar + progress_bar.update(1) def write_segments_to_file(self, stop_event: threading.Event): """ @@ -265,7 +281,10 @@ class M3U8_Segments: while not stop_event.is_set() or not self.segment_queue.empty(): with self.condition: while self.segment_queue.empty() and not stop_event.is_set(): - self.condition.wait() # Wait until a new segment is available or stop_event is set + self.condition.wait(timeout=1) # Wait until a new segment is available or stop_event is set + + if stop_event.is_set(): + break if not self.segment_queue.empty(): @@ -280,7 +299,7 @@ class M3U8_Segments: else: self.segment_queue.put((index, segment_content)) # Requeue the segment if it is not the next to be written - self.condition.notify() # Notify that a segment has been requeued # Notify that a segment has been requeued + self.condition.notify() # Notify that a segment has been requeued def download_streams(self, add_desc): """ @@ -316,21 +335,8 @@ class M3U8_Segments: writer_thread = threading.Thread(target=self.write_segments_to_file, args=(stop_event,)) writer_thread.start() - # Start progress monitor thread - progress_thread = threading.Thread(target=self.timer, args=(progress_bar, stop_event)) - progress_thread.start() - # Delay the start of each worker for index, segment_url in enumerate(self.segments): - time.sleep(DELAY_START_WORKER) - - # da 0.0 a 100.00 - if int(LIMIT_DONWLOAD_PERCENTAGE) != 0: - score_percentage = (progress_bar.n / progress_bar.total) * 100 - if score_percentage>= LIMIT_DONWLOAD_PERCENTAGE: - #progress_bar.refresh() - break - # Check for Ctrl+C before starting each download task time.sleep(0.025) @@ -345,7 +351,7 @@ class M3U8_Segments: break # Submit the download task to the executor - executor.submit(self.make_requests_stream, segment_url, index, stop_event, progress_bar, add_desc) + executor.submit(self.make_requests_stream, segment_url, index, stop_event, progress_bar) # Wait for all segments to be downloaded executor.shutdown(wait=True) @@ -353,51 +359,3 @@ class M3U8_Segments: with self.condition: self.condition.notify_all() # Wake up the writer thread if it's waiting writer_thread.join() # Wait for the writer thread to finish - - def timer(self, progress_counter: tqdm, quit_event: threading.Event): - """ - Function to monitor progress and quit if no progress is made within a certain time - - Args: - - progress_counter (tqdm): The progress counter object. - - quit_event (threading.Event): The event to signal when to quit. - """ - - # If timer is disabled, return immediately without starting it, to reduce cpu use - if not ENABLE_TIME_TIMEOUT: - return - - start_time = time.time() - last_count = 0 - - # Loop until quit event is set - while not quit_event.is_set(): - current_count = progress_counter.n - - # Update start time when progress is made - if current_count != last_count: - start_time = time.time() - last_count = current_count - - # Calculate elapsed time - elapsed_time = time.time() - start_time - - # Check if elapsed time exceeds progress timeout - if elapsed_time > TQDM_PROGRESS_TIMEOUT: - console.log(f"[red]No progress for {TQDM_PROGRESS_TIMEOUT} seconds. Stopping.") - - # Set quit event to break the loop - quit_event.set() - break - - # Calculate remaining time until timeout - remaining_time = max(0, TQDM_PROGRESS_TIMEOUT - elapsed_time) - - # Determine sleep interval dynamically based on remaining time - sleep_interval = min(1, remaining_time) - - # Wait for the calculated sleep interval - time.sleep(sleep_interval) - - # Refresh progress bar - #progress_counter.refresh() diff --git a/Src/Lib/M3U8/__init__.py b/Src/Lib/M3U8/__init__.py index a0bacd5..08a541b 100644 --- a/Src/Lib/M3U8/__init__.py +++ b/Src/Lib/M3U8/__init__.py @@ -1,6 +1,6 @@ # 02.04.24 from .decryption import M3U8_Decryption -from .math_calc import M3U8_Ts_Files +from .estimator import M3U8_Ts_Estimator from .parser import M3U8_Parser, M3U8_Codec -from .url_fix import m3u8_url_fix \ No newline at end of file +from .url_fixer import M3U8_UrlFix \ No newline at end of file diff --git a/Src/Lib/M3U8/estimator.py b/Src/Lib/M3U8/estimator.py new file mode 100644 index 0000000..429bc85 --- /dev/null +++ b/Src/Lib/M3U8/estimator.py @@ -0,0 +1,81 @@ +# 20.02.24 + +from collections import deque + +# Internal utilities +from Src.Util.os import format_size + + +class M3U8_Ts_Estimator: + def __init__(self, workers: int): + """ + Initialize the TSFileSizeCalculator object. + + Args: + - workers (int): The number of workers using with ThreadPool. + """ + self.ts_file_sizes = [] + self.now_downloaded_size = 0 + self.average_over = 5 + self.list_speeds = deque(maxlen=self.average_over) + self.smoothed_speeds = [] + self.tqdm_workers = workers + + def add_ts_file(self, size: int, size_download: int, duration: float): + """ + Add a file size to the list of file sizes. + + Args: + - size (float): The size of the ts file to be added. + - size_download (int): Single size of the ts file. + - duration (float): Time to download segment file. + """ + self.ts_file_sizes.append(size) + self.now_downloaded_size += size_download + + # Calculate mbps + speed_mbps = (size_download * 8) / (duration * 1_000_000) * self.tqdm_workers + self.list_speeds.append(speed_mbps) + + # Calculate moving average + smoothed_speed = sum(self.list_speeds) / len(self.list_speeds) + self.smoothed_speeds.append(smoothed_speed) + + # Update smooth speeds + if len(self.smoothed_speeds) > self.average_over: + self.smoothed_speeds.pop(0) + + def calculate_total_size(self) -> str: + """ + Calculate the total size of the files. + + Returns: + float: The mean size of the files in a human-readable format. + """ + + if len(self.ts_file_sizes) == 0: + return 0 + + total_size = sum(self.ts_file_sizes) + mean_size = total_size / len(self.ts_file_sizes) + + # Return format mean + return format_size(mean_size) + + def get_average_speed(self) -> float: + """ + Calculate the average speed from a list of speeds and convert it to megabytes per second (MB/s). + + Returns: + float: The average speed in megabytes per second (MB/s). + """ + return (sum(self.smoothed_speeds) / len(self.smoothed_speeds)) / 10 # MB/s + + def get_downloaded_size(self) -> str: + """ + Get the total downloaded size formatted as a human-readable string. + + Returns: + str: The total downloaded size as a human-readable string. + """ + return format_size(self.now_downloaded_size) \ No newline at end of file diff --git a/Src/Lib/M3U8/math_calc.py b/Src/Lib/M3U8/math_calc.py deleted file mode 100644 index 46aaccb..0000000 --- a/Src/Lib/M3U8/math_calc.py +++ /dev/null @@ -1,41 +0,0 @@ -# 20.02.24 - -# Internal utilities -from Src.Util.os import format_size - - -class M3U8_Ts_Files: - def __init__(self): - """ - Initialize the TSFileSizeCalculator object. - - Args: - - num_segments (int): The number of segments. - """ - self.ts_file_sizes = [] - - def add_ts_file_size(self, size: int): - """ - Add a file size to the list of file sizes. - - Args: - - size (float): The size of the ts file to be added. - """ - self.ts_file_sizes.append(size) - - def calculate_total_size(self): - """ - Calculate the total size of the files. - - Returns: - float: The mean size of the files in a human-readable format. - """ - - if len(self.ts_file_sizes) == 0: - return 0 - - total_size = sum(self.ts_file_sizes) - mean_size = total_size / len(self.ts_file_sizes) - - # Return format mean - return format_size(mean_size) \ No newline at end of file diff --git a/Src/Lib/M3U8/url_fix.py b/Src/Lib/M3U8/url_fix.py deleted file mode 100644 index 17f6f7d..0000000 --- a/Src/Lib/M3U8/url_fix.py +++ /dev/null @@ -1,54 +0,0 @@ -# 20.03.24 - -import logging -from urllib.parse import urlparse, urljoin - - -class M3U8_UrlFix: - def __init__(self, url: str = None) -> None: - """ - Initializes an M3U8_UrlFix object with the provided playlist URL. - - Args: - - url (str, optional): The URL of the playlist. Defaults to None. - """ - self.url_playlist: str = url - - def set_playlist(self, url: str) -> None: - """ - Set the M3U8 playlist URL. - - Args: - - url (str): The M3U8 playlist URL. - """ - self.url_playlist = url - - def generate_full_url(self, url_resource: str) -> str: - """ - Generate a full URL for a given resource using the base URL from the playlist. - - Args: - - url_resource (str): The relative URL of the resource within the playlist. - - Returns: - str: The full URL for the specified resource. - """ - - # Check if m3u8 url playlist is present - if self.url_playlist == None: - logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present") - raise - - # Parse the playlist URL to extract the base URL components - parsed_playlist_url = urlparse(self.url_playlist) - - # Construct the base URL using the scheme, netloc, and path from the playlist URL - base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}" - - # Join the base URL with the relative resource URL to get the full URL - full_url = urljoin(base_url, url_resource) - - return full_url - -# Output -m3u8_url_fix = M3U8_UrlFix() \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/url_fix.py b/Src/Lib/M3U8/url_fixer.py similarity index 97% rename from Src/Lib/Hls/M3U8/url_fix.py rename to Src/Lib/M3U8/url_fixer.py index 17f6f7d..037d5b5 100644 --- a/Src/Lib/Hls/M3U8/url_fix.py +++ b/Src/Lib/M3U8/url_fixer.py @@ -49,6 +49,4 @@ class M3U8_UrlFix: full_url = urljoin(base_url, url_resource) return full_url - -# Output -m3u8_url_fix = M3U8_UrlFix() \ No newline at end of file + \ No newline at end of file diff --git a/Src/Lib/Request/my_requests.py b/Src/Lib/Request/my_requests.py index 18da38c..9b3e41b 100644 --- a/Src/Lib/Request/my_requests.py +++ b/Src/Lib/Request/my_requests.py @@ -31,10 +31,15 @@ except ImportError: from bs4 import BeautifulSoup +# Internal utilities +from Src.Util._jsonConfig import config_manager + + # Default settings HTTP_TIMEOUT = 5 HTTP_RETRIES = 1 HTTP_DELAY = 1 +HTTP_DISABLE_ERROR = config_manager.get_bool('M3U8_REQUESTS', 'disable_error') @@ -325,7 +330,7 @@ class ManageRequests: response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl_context) else: - response = urllib.request.urlopen(req, timeout=self.timeout) + response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl.create_default_context()) return response @@ -374,7 +379,8 @@ class ManageRequests: """ Handle request error. """ - logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}") + if not HTTP_DISABLE_ERROR: + logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}") if self.attempt < self.retries: logging.info(f"Retrying request for URL '{self.url}' (attempt {self.attempt}/{self.retries})") diff --git a/Src/Util/file_validator.py b/Src/Util/file_validator.py index 95255f4..f0bb515 100644 --- a/Src/Util/file_validator.py +++ b/Src/Util/file_validator.py @@ -85,8 +85,10 @@ def can_create_file(file_path): try: with open(file_path, 'w') as file: pass + os.remove(file_path) # Cleanup if the file was created return True + except OSError as e: if e.errno in (errno.EACCES, errno.ENOENT, errno.EEXIST, errno.ENOTDIR): return False diff --git a/Src/Util/message.py b/Src/Util/message.py index eafe180..8576feb 100644 --- a/Src/Util/message.py +++ b/Src/Util/message.py @@ -15,7 +15,7 @@ CLEAN = config_manager.get_bool('DEFAULT', 'clean_console') SHOW = config_manager.get_bool('DEFAULT', 'show_message') -def start_message(switch = False): +def start_message(): """ Display a start message. """ @@ -33,18 +33,6 @@ def start_message(switch = False): ''' - if switch: - msg = ''' - - _ _ _ _ - / \ _ __ (_)_ __ ___ ___ _ _ _ __ (_) |_ _ _ - / _ \ | '_ \| | '_ ` _ \ / _ \ | | | '_ \| | __| | | | - / ___ \| | | | | | | | | | __/ |_| | | | | | |_| |_| | - /_/ \_\_| |_|_|_| |_| |_|\___|\__,_|_| |_|_|\__|\__, | - |___/ - - ''' - if CLEAN: if platform.system() == 'Windows': os.system("cls") @@ -52,7 +40,6 @@ def start_message(switch = False): os.system("clear") if SHOW: - console.print(f"[bold yellow]{msg}") console.print(f"[magenta]Created by: Ghost6446\n") diff --git a/Src/Util/node_jjs.py b/Src/Util/node_jjs.py new file mode 100644 index 0000000..17098f1 --- /dev/null +++ b/Src/Util/node_jjs.py @@ -0,0 +1,56 @@ +# 26.05.24 + +import subprocess + +def is_node_installed() -> bool: + """ + Checks if Node.js is installed on the system. + + Returns: + bool: True if Node.js is installed, False otherwise. + """ + try: + # Run the command 'node -v' to get the Node.js version + result = subprocess.run(['node', '-v'], capture_output=True, text=True, check=True) + + # If the command runs successfully and returns a version number, Node.js is installed + if result.stdout.startswith('v'): + return True + + except (subprocess.CalledProcessError, FileNotFoundError): + # If there is an error running the command or the command is not found, Node.js is not installed + return False + + return False + +def run_node_script(script_content: str) -> str: + """ + Runs a Node.js script and returns its output. + + Args: + script_content (str): The content of the Node.js script to run. + + Returns: + str: The output of the Node.js script. + """ + + # Check if Node.js is installed + if not is_node_installed(): + raise EnvironmentError("Node.js is not installed on the system.") + + # Write the script content to a temporary file + with open('script.js', 'w') as file: + file.write(script_content) + + try: + # Run the Node.js script using subprocess and capture the output + result = subprocess.run(['node', 'script.js'], capture_output=True, text=True, check=True) + return result.stdout + + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Error running Node.js script: {e.stderr}") + + finally: + # Clean up the temporary script file + import os + os.remove('script.js') diff --git a/Src/Util/os.py b/Src/Util/os.py index 1fbd6fc..9e17c12 100644 --- a/Src/Util/os.py +++ b/Src/Util/os.py @@ -1,13 +1,15 @@ # 24.01.24 -import shutil +import re import os import time import json +import shutil import hashlib import logging -import re import zipfile +import platform + from typing import List @@ -42,6 +44,108 @@ special_chars_to_remove = [ ] +def get_max_length_by_os(system: str) -> int: + """ + Determines the maximum length for a base name based on the operating system. + + Args: + system (str): The operating system name. + + Returns: + int: The maximum length for the base name. + """ + if system == 'windows': + return 255 # NTFS and other common Windows filesystems support 255 characters for filenames + elif system == 'darwin': # macOS + return 255 # HFS+ and APFS support 255 characters for filenames + elif system == 'linux': + return 255 # Most Linux filesystems (e.g., ext4) support 255 characters for filenames + else: + raise ValueError(f"Unsupported operating system: {system}") + +def reduce_base_name(base_name: str) -> str: + """ + Splits the file path into folder and base name, and reduces the base name based on the operating system. + + Args: + base_name (str): The name of the file. + + Returns: + str: The reduced base name. + """ + + + # Determine the operating system + system = platform.system().lower() + + # Get the maximum length for the base name based on the operating system + max_length = get_max_length_by_os(system) + + # Reduce the base name if necessary + if len(base_name) > max_length: + if system == 'windows': + # For Windows, truncate and add a suffix if needed + base_name = base_name[:max_length - 3] + '___' + elif system == 'darwin': # macOS + # For macOS, truncate without adding suffix + base_name = base_name[:max_length] + elif system == 'linux': + # For Linux, truncate and add a numeric suffix if needed + base_name = base_name[:max_length - 2] + '___' + + return base_name + + +def create_folder(folder_name: str) -> None: + """ + Create a directory if it does not exist, and log the result. + + Args: + folder_name (str): The path of the directory to be created. + + """ + try: + + logging.info(f"Try create folder: {folder_name}") + os.makedirs(folder_name, exist_ok=True) + + if os.path.exists(folder_name) and os.path.isdir(folder_name): + logging.info(f"Directory successfully created or already exists: {folder_name}") + else: + logging.error(f"Failed to create directory: {folder_name}") + + except OSError as e: + logging.error(f"OS error occurred while creating the directory {folder_name}: {e}") + raise + + except Exception as e: + logging.error(f"An unexpected error occurred while creating the directory {folder_name}: {e}") + raise + +def check_file_existence(file_path): + """ + Check if a file exists at the given file path. + + Args: + file_path (str): The path to the file. + + Returns: + bool: True if the file exists, False otherwise. + """ + try: + logging.info(f"Check if file exists: {file_path}") + if os.path.exists(file_path): + logging.info(f"The file '{file_path}' exists.") + return True + + else: + logging.warning(f"The file '{file_path}' does not exist.") + return False + + except Exception as e: + logging.error(f"An error occurred while checking file existence: {e}") + return False + def remove_folder(folder_path: str) -> None: """ Remove a folder if it exists. diff --git a/Src/Util/table.py b/Src/Util/table.py index b4c2f0b..2ffd968 100644 --- a/Src/Util/table.py +++ b/Src/Util/table.py @@ -80,14 +80,13 @@ class TVShowManager: self.console.print(table) # Use self.console.print instead of print - def run(self, force_int_input: bool = False, max_int_input: int = 0, switch: bool = False) -> str: + def run(self, force_int_input: bool = False, max_int_input: int = 0) -> str: """ Run the TV show manager application. Args: - force_int_input(bool): If True, only accept integer inputs from 0 to max_int_input - max_int_input (int): range of row to show - - switch (bool): switch from film to anime Returns: str: Last command executed before breaking out of the loop. @@ -96,7 +95,7 @@ class TVShowManager: last_command = "" # Variable to store the last command executed while True: - start_message(switch) + start_message() # Display table self.display_data(self.tv_shows[self.slice_start:self.slice_end]) diff --git a/config.json b/config.json index f7a3086..daa1ff5 100644 --- a/config.json +++ b/config.json @@ -3,8 +3,8 @@ "debug": false, "log_file": "app.log", "log_to_file": true, - "show_message": true, - "clean_console": true, + "show_message": false, + "clean_console": false, "root_path": "Video", "map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)", "create_job_database": false, @@ -12,26 +12,17 @@ }, "SITE": { "streamingcommunity": "foo", - "animeunity": "epic" + "animeunity": "to", + "altadefinizione": "food" }, - "M3U8": { + "M3U8_DOWNLOAD": { "tdqm_workers": 30, - "delay_start_workers": 0, - "requests_timeout": 10, - "enable_time_quit": false, - "tqdm_progress_timeout": 10, - "download_percentage": 0, "tqdm_show_progress": true, - "save_m3u8_content": true, - "fake_proxy": true, - "fake_proxy_ip": ["57.129.7.85", "57.129.7.188", "57.129.7.174", "57.129.4.77", "57.129.16.196", "57.129.16.156", "57.129.16.139", "57.129.16.135", "57.129.13.175", "57.129.13.157", "51.38.112.237", "51.195.107.7", "51.195.107.230"], + "fake_proxy": false, + "fake_proxy_ip": ["57.129.7.85","57.129.7.188","57.129.7.174","57.129.4.77","57.129.16.196","57.129.16.156","57.129.16.139","57.129.16.135","57.129.13.175","57.129.13.157","51.38.112.237","51.195.107.7","51.195.107.230"], "create_report": false }, - "M3U8_PARSER": { - "skip_empty_row_playlist": false, - "force_resolution": -1 - }, - "M3U8_OPTIONS": { + "M3U8_FILTER": { "use_codec": false, "use_gpu": false, "force_ts": false, @@ -41,7 +32,14 @@ "specific_list_subtitles": ["eng"] }, "M3U8_REQUESTS": { + "disable_error": false, + "timeout": 10, + "verify_ssl": false, "index": {"user-agent": ""}, "segments": {"user-agent": ""} + }, + "M3U8_PARSER": { + "skip_empty_row_playlist": false, + "force_resolution": -1 } } \ No newline at end of file diff --git a/job_series.py b/job_series.py deleted file mode 100644 index 3b1bf71..0000000 --- a/job_series.py +++ /dev/null @@ -1,205 +0,0 @@ -# 08.05.24 - -import logging - - -# Internal utilities -from Src.Api.Streamingcommunity import ( - get_version_and_domain, - title_search, - manager_clear, - get_select_title -) - -from Src.Util.message import start_message -from Src.Util._jsonConfig import config_manager -from Src.Util.console import console, msg -from Src.Lib.E_Table import job_database -from Src.Api.Streamingcommunity.Core.Vix_player.player import VideoSource - - -# Config -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -SERIES_FOLDER = config_manager.get('DEFAULT', 'series_folder_name') -STREAM_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') -DOMAIN_SITE_NAME = config_manager.get('SITE', 'streaming_domain') -CREATE_JOB_DB = config_manager.get_bool('DEFAULT', 'create_job_database') - - -class SuppressedConsolePrint: - def __enter__(self): - self.original_print = console.print - console.print = lambda *args, **kwargs: None - - def __exit__(self, exc_type, exc_value, traceback): - console.print = self.original_print - - -class SeriesManager: - def __init__(self): - """ - Initialize SeriesManager object. - """ - with SuppressedConsolePrint(): - self.version, self.domain = get_version_and_domain() - self.video_source = VideoSource() - - def add_series(self): - """ - Add a new series to the database. - """ - try: - - # Ask the user to input the search term - input_title_search = msg.ask("\n[cyan]Insert word to search in all site: [/cyan]") - - if input_title_search: - - # Perform streaming search based on the search term - len_database = title_search(input_title_search, self.domain) - - if len_database != 0: - - # Get the selected title from the search results - select_title = get_select_title(type_filter=['tv']) - - if select_title.type == 'tv': - - # Set series name and media ID for the selected title - self.video_source.setup( - domain = DOMAIN_SITE_NAME, - series_name = select_title.slug, - media_id = select_title.id - ) - - # Collect info about the season - self.video_source.get_preview() - seasons_count = self.video_source.obj_preview.seasons_count - - # Add the series to the database - console.print("[green]Series '[/green][bold cyan]" + select_title.slug + "[/bold cyan][green]' added successfully.[/green]") - job_database.add_row_to_database(select_title.id, select_title.slug, seasons_count) - job_database.save_database() - - # Clear old data added - manager_clear() - - else: - console.print("[red]No series found for the given search term.[/red]") - else: - console.print("[red]Invalid choice. Please select a valid option.[/red]") - - except Exception as e: - logging.error(f"Error occurred while adding series: {str(e)}") - - def check_series(self): - """ - Check for new seasons in existing series. - """ - try: - - # Loop through each series in the database - for data_series in job_database.db[1:]: - self.video_source.setup( - domain = DOMAIN_SITE_NAME, - series_name = data_series[1], - media_id = data_series[0] - ) - - # Collect information about seasons for the series - self.video_source.get_preview() - seasons_count = self.video_source.obj_preview.seasons_count - - if int(data_series[2]) < seasons_count: - - # Notify if a new season is found for the series - console.print("[bold yellow]Series '[/bold yellow][bold cyan]" + data_series[1] + "[/bold cyan][bold yellow]' found new season.[/bold yellow]") - else: - - # Notify if no new seasons are found for the series - console.print("[bold red]Series '[/bold red][bold cyan]" + data_series[1] + "[/bold cyan][bold red]' has no new seasons.[/bold red]") - - except Exception as e: - logging.error(f"Error occurred while checking series: {str(e)}") - - def list_series(self): - """ - Print the list of series in the database. - """ - try: - - # Print the header for the series list - console.print("\n[bold cyan]Series List:[/bold cyan]\n") - job_database.print_database_as_sql() - - except Exception as e: - logging.error(f"Error occurred while listing series: {str(e)}") - - def remove_series(self): - """ - Remove a series from the database. - """ - - if len(job_database.db) > 1: - - # Ask the user to input the index of the series to remove - index_to_remove = msg.ask("\n[cyan]Insert [bold red]ID [cyan]to remove").strip() - - if index_to_remove != 0: - - # Remove the series from the database - data_row_remove = job_database.remove_row_from_database(0, index_to_remove) - job_database.save_database() - - if data_row_remove: - console.print("[bold green]Series '[/bold green][bold cyan]" + data_row_remove[1] + "[/bold cyan][bold green]' removed successfully.[/bold green]") - - else: - console.print("[bold red]Cannot remove columns from the database.[/bold red]") - - else: - console.print("[bold yellow]No data to remove[/bold yellow]") - - def run(self): - """ - Run the SeriesManager application. - """ - while True: - - # Reload all database all time - start_message() - job_database.load_database() - - # Prompt the user for action choice - action = msg.ask("\n[green]What would you like to do?", choices=["add", "check", "remove", "print", "quit"]) - - if action == "add": - self.add_series() - elif action == "check": - self.check_series() - elif action == "remove": - self.remove_series() - elif action == "print": - self.list_series() - elif action == "quit": - console.print("\n[bold magenta]Exiting Series Manager. Goodbye![/bold magenta]") - break - else: - console.print("[red]Invalid action. Please try again.[/red]") - - confirmation = msg.ask("\n[blue]Press 'y' to continue, or 'n' to quit[/blue]") - if confirmation.lower() == "n": - console.print("\n[bold magenta]Exiting Series Manager. Goodbye![/bold magenta]") - break - - -def main(): - - if CREATE_JOB_DB: - manager = SeriesManager() - manager.run() - else: - console.print("[red]Set to true 'create_job_database' on config.json file.") - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/run.py b/run.py index 0fa118a..cfb87aa 100644 --- a/run.py +++ b/run.py @@ -5,6 +5,7 @@ import os import platform import argparse import logging + from typing import Callable @@ -18,15 +19,16 @@ from Src.Lib.FFmpeg import check_ffmpeg from Src.Util.logger import Logger # Internal api -from Src.Api.Streamingcommunity import main_film_series -from Src.Api.Animeunity import main_anime +from Src.Api.Streamingcommunity import main_film_series as streamingcommunity_film_serie +from Src.Api.Animeunity import main_anime as streamingcommunity_anime +from Src.Api.Altadefinizione import main_film as altadefinizione_film # Config CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close') -def initialize(switch = False): +def initialize(): """ Initialize the application. Checks Python version, removes temporary folder, and displays start message. @@ -44,7 +46,7 @@ def initialize(switch = False): # Removing temporary folder - start_message(switch) + start_message() # Attempting GitHub update @@ -88,36 +90,56 @@ def run_function(func: Callable[..., None], close_console: bool = False) -> None def main(): - + log_not = Logger() # Parse command line arguments - parser = argparse.ArgumentParser(description='Script to download film and series from internet.') - parser.add_argument('-a', '--anime', action='store_true', help='Check into anime category') - parser.add_argument('-f', '--film', action='store_true', help='Check into film/tv series category') + parser = argparse.ArgumentParser(description='Script to download film and series from the internet.') + parser.add_argument('-sa', '--streaming_anime', action='store_true', help='Check into anime category') + parser.add_argument('-sf', '--streaming_film', action='store_true', help='Check into film/tv series category') + parser.add_argument('-af', '--altadefinizione_film', action='store_true', help='Check into film/tv series category') args = parser.parse_args() - if args.anime: - run_function(main_anime, CLOSE_CONSOLE) + # Mapping command-line arguments to functions + arg_to_function = { + 'streaming_anime': streamingcommunity_anime, + 'streaming_film': streamingcommunity_film_serie, + 'altadefinizione_film': altadefinizione_film, + } - elif args.film: - run_function(main_film_series, CLOSE_CONSOLE) + # Check which argument is provided and run the corresponding function + for arg, func in arg_to_function.items(): + if getattr(args, arg): + run_function(func, CLOSE_CONSOLE) + return + # Mapping user input to functions + input_to_function = { + '0': streamingcommunity_film_serie, + '1': streamingcommunity_anime, + '2': altadefinizione_film, + } + + # Create dynamic prompt message and choices + choices = list(input_to_function.keys()) + choice_labels = { + '0': "Film/Series", + '1': "Anime", + '2': "Altadefinizione" + } + prompt_message = "[cyan]Insert category [white](" + ", ".join( + f"[red]{key}[white]: [bold magenta]{label}[white]" for key, label in choice_labels.items() + ) + ")[white]:[/cyan]" + + # Ask the user for input + category = msg.ask(prompt_message, choices=choices, default="0") + + # Run the corresponding function based on user input + if category in input_to_function: + run_function(input_to_function[category], CLOSE_CONSOLE) else: - - # If no arguments are provided, ask the user to input the category, if nothing insert return 0 - category = msg.ask("[cyan]Insert category [white]([red]0[white]: [bold magenta]Film/Series[white], [red]1[white]: [bold magenta]Anime[white])[white]:[/cyan]", choices={"0": "", "1": ""}, default="0") - - if category == '0': - run_function(main_film_series, CLOSE_CONSOLE) - - elif category == '1': - run_function(main_anime, CLOSE_CONSOLE) - - else: - console.print("[red]Invalid category, you need to insert 0 or 1.") - sys.exit(0) - + console.print("[red]Invalid category, you need to insert 0, 1, or 2.") + sys.exit(0) if __name__ == '__main__': main() \ No newline at end of file diff --git a/update.py b/update.py deleted file mode 100644 index 142af56..0000000 --- a/update.py +++ /dev/null @@ -1,152 +0,0 @@ -# 10.12.24 - -import os -import shutil -from io import BytesIO -from zipfile import ZipFile - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -# External libraries -import requests -from rich.console import Console - - -# Variable -console = Console() -local_path = os.path.join(".") -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') - - -def move_content(source: str, destination: str) : - """ - Move all content from the source folder to the destination folder. - - Args: - source (str): The path to the source folder. - destination (str): The path to the destination folder. - """ - - os.makedirs(destination, exist_ok=True) - - # Iterate through all elements in the source folder - for element in os.listdir(source): - source_path = os.path.join(source, element) - destination_path = os.path.join(destination, element) - - # If it's a directory, recursively call the function - if os.path.isdir(source_path): - move_content(source_path, destination_path) - - # Otherwise, move the file, replacing if it already exists - else: - shutil.move(source_path, destination_path) - - -def keep_specific_items(directory: str, keep_folder: str, keep_file: str): - """ - Delete all items in the directory except for the specified folder and file. - - Args: - directory (str): The path to the directory. - keep_folder (str): The name of the folder to keep. - keep_file (str): The name of the file to keep. - """ - - try: - if not os.path.exists(directory) or not os.path.isdir(directory): - raise ValueError(f"Error: '{directory}' is not a valid directory.") - - # Iterate through items in the directory - for item in os.listdir(directory): - item_path = os.path.join(directory, item) - - # Check if the item is the specified folder or file - if os.path.isdir(item_path) and item != keep_folder: - shutil.rmtree(item_path) - elif os.path.isfile(item_path) and item != keep_file: - os.remove(item_path) - - except PermissionError as pe: - print(f"PermissionError: {pe}. Check permissions and try running the script with admin privileges.") - - except Exception as e: - print(f"Error: {e}") - - -def download_and_extract_latest_commit(author: str, repo_name: str): - """ - Download and extract the latest commit from a GitHub repository. - - Args: - author (str): The owner of the GitHub repository. - repo_name (str): The name of the GitHub repository. - """ - - # Get the latest commit information using GitHub API - api_url = f'https://api.github.com/repos/{author}/{repo_name}/commits?per_page=1' - response = requests.get(api_url) - console.log("[green]Making a request to GitHub repository...") - - if response.ok: - commit_info = response.json()[0] - commit_sha = commit_info['sha'] - zipball_url = f'https://github.com/{author}/{repo_name}/archive/{commit_sha}.zip' - console.log("[green]Getting zip file from repository...") - - # Download the zipball - response = requests.get(zipball_url) - - # Extract the content of the zipball into a temporary folder - temp_path = os.path.join(os.path.dirname(os.getcwd()), 'temp_extracted') - with ZipFile(BytesIO(response.content)) as zip_ref: - zip_ref.extractall(temp_path) - console.log("[green]Extracting file ...") - - # Move files from the temporary folder to the current folder - for item in os.listdir(temp_path): - item_path = os.path.join(temp_path, item) - destination_path = os.path.join(local_path, item) - shutil.move(item_path, destination_path) - - # Remove the temporary folder - shutil.rmtree(temp_path) - - # Move all folder to main folder - new_folder_name = f"{repo_name}-{commit_sha}" - move_content(new_folder_name, ".") - - # Remove old temp folder - shutil.rmtree(new_folder_name) - - console.log(f"[cyan]Latest commit downloaded and extracted successfully.") - else: - console.log(f"[red]Failed to fetch commit information. Status code: {response.status_code}") - - -def main_upload(): - """ - Main function to upload the latest commit of a GitHub repository. - """ - - repository_owner = 'Ghost6446' - repository_name = 'StreamingCommunity_api' - - cmd_insert = input("Are you sure you want to delete all files? (Only videos folder will remain) [yes/no]: ") - - if cmd_insert == "yes": - - # Remove all old file - keep_specific_items(".", ROOT_PATH, "upload.py") - - download_and_extract_latest_commit(repository_owner, repository_name) - - -main_upload() - -# win -# pyinstaller --upx-dir="C:\Program Files\upx" --onefile run.py -