From 3836148ff3e0bfb6cae78977ffb2516a29793e03 Mon Sep 17 00:00:00 2001 From: Lovi <62809003+Lovi-0@users.noreply.github.com> Date: Sat, 23 Nov 2024 15:36:30 +0100 Subject: [PATCH] Add path validator --- Src/Api/1337xx/title.py | 26 +- Src/Api/Template/Util/manage_ep.py | 6 +- Src/Api/Template/Util/recall_search.py | 6 +- Src/Api/altadefinizione/film.py | 13 +- Src/Api/animeunity/anime.py | 29 +- Src/Api/bitsearch/title.py | 20 +- Src/Api/cb01new/film.py | 14 +- Src/Api/ddlstreamitaly/series.py | 15 +- Src/Api/guardaserie/series.py | 5 +- Src/Api/mostraguarda/film.py | 11 +- Src/Api/piratebays/title.py | 20 +- Src/Api/streamingcommunity/film.py | 9 +- Src/Api/streamingcommunity/series.py | 5 +- Src/Lib/Downloader/HLS/downloader.py | 29 +- Src/Lib/Downloader/HLS/proxyes.py | 4 +- Src/Lib/Downloader/HLS/segments.py | 4 +- Src/Lib/Downloader/MP4/downloader.py | 4 +- Src/Lib/Downloader/TOR/downloader.py | 8 +- Src/Lib/FFmpeg/capture.py | 4 +- Src/Lib/FFmpeg/command.py | 18 +- Src/Lib/M3U8/estimator.py | 10 +- Src/Lib/M3U8/parser.py | 4 +- Src/Util/console.py | 1 + Src/Util/message.py | 1 - Src/Util/os.py | 713 ++++++++++++------------- config.json | 1 - requirements.txt | 1 + run.py | 4 +- 28 files changed, 488 insertions(+), 497 deletions(-) diff --git a/Src/Api/1337xx/title.py b/Src/Api/1337xx/title.py index 9516cf9..1e2147c 100644 --- a/Src/Api/1337xx/title.py +++ b/Src/Api/1337xx/title.py @@ -1,8 +1,6 @@ # 02.07.24 import os -import sys -import logging # External libraries @@ -12,9 +10,9 @@ from bs4 import BeautifulSoup # Internal utilities from Src.Util.console import console +from Src.Util.os import os_manager from Src.Util.message import start_message from Src.Util.headers import get_headers -from Src.Util.os import create_folder, can_create_file, remove_special_characters from Src.Lib.Downloader import TOR_downloader @@ -39,19 +37,23 @@ def download_title(select_title: MediaItem): print() # Define output path - title_name = remove_special_characters(select_title.name) - mp4_name = title_name.replace("-", "_") + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(title_name.replace(".mp4", ""))) + title_name = os_manager.get_sanitize_file(select_title.name) + mp4_path = os_manager.get_sanitize_path( + os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", "")) + ) - # Check if can create file output - create_folder(mp4_path) - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) + # Create output folder + os_manager.create_path(mp4_path) # Make request to page with magnet full_site_name = f"{SITE_NAME}.{DOMAIN_NOW}" - response = httpx.get("https://" + full_site_name + select_title.url, headers={'user-agent': get_headers()}, follow_redirects=True) + response = httpx.get( + url="https://" + full_site_name + select_title.url, + headers={ + 'user-agent': get_headers() + }, + follow_redirects=True + ) # Create soup and find table soup = BeautifulSoup(response.text, "html.parser") diff --git a/Src/Api/Template/Util/manage_ep.py b/Src/Api/Template/Util/manage_ep.py index 0356b12..b95c5fc 100644 --- a/Src/Api/Template/Util/manage_ep.py +++ b/Src/Api/Template/Util/manage_ep.py @@ -7,7 +7,7 @@ from typing import List # Internal utilities from Src.Util._jsonConfig import config_manager -from Src.Util.os import remove_special_characters +from Src.Util.os import os_manager # Config @@ -85,10 +85,10 @@ def map_episode_title(tv_name: str, number_season: int, episode_number: int, epi str: The mapped episode title. """ map_episode_temp = MAP_EPISODE - map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) + map_episode_temp = map_episode_temp.replace("%(tv_name)", os_manager.get_sanitize_file(tv_name)) map_episode_temp = map_episode_temp.replace("%(season)", dynamic_format_number(number_season)) map_episode_temp = map_episode_temp.replace("%(episode)", dynamic_format_number(episode_number)) - map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name)) + map_episode_temp = map_episode_temp.replace("%(episode_name)", os_manager.get_sanitize_file(episode_name)) logging.info(f"Map episode string return: {map_episode_temp}") return map_episode_temp diff --git a/Src/Api/Template/Util/recall_search.py b/Src/Api/Template/Util/recall_search.py index e93372d..09b5209 100644 --- a/Src/Api/Template/Util/recall_search.py +++ b/Src/Api/Template/Util/recall_search.py @@ -11,10 +11,10 @@ def execute_search(info): info (dict): A dictionary containing the function name, folder, and module information. """ - # Step 1: Define the project path using the folder from the info dictionary + # Define the project path using the folder from the info dictionary project_path = os.path.dirname(info['folder']) # Get the base path for the project - # Step 2: Add the project path to sys.path + # Add the project path to sys.path if project_path not in sys.path: sys.path.append(project_path) @@ -29,7 +29,9 @@ def execute_search(info): except ModuleNotFoundError as e: print(f"ModuleNotFoundError: {e}") + except ImportError as e: print(f"ImportError: {e}") + except Exception as e: print(f"An error occurred: {e}") diff --git a/Src/Api/altadefinizione/film.py b/Src/Api/altadefinizione/film.py index 0b60629..4f85323 100644 --- a/Src/Api/altadefinizione/film.py +++ b/Src/Api/altadefinizione/film.py @@ -6,7 +6,7 @@ import time # Internal utilities from Src.Util.console import console, msg -from Src.Util.os import remove_special_characters +from Src.Util.os import os_manager from Src.Util.message import start_message from Src.Util.call_stack import get_call_stack from Src.Lib.Downloader import HLS_Downloader @@ -39,14 +39,19 @@ def download_film(select_title: MediaItem): video_source = VideoSource(select_title.url) # Define output path - mp4_name = remove_special_characters(select_title.name) + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(select_title.name)) + title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4" + mp4_path = os_manager.get_sanitize_path( + os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", "")) + ) # Get m3u8 master playlist master_playlist = video_source.get_playlist() # Download the film using the m3u8 playlist, and output filename - r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() + r_proc = HLS_Downloader( + m3u8_playlist=master_playlist, + output_filename=os.path.join(mp4_path, title_name) + ).start() if r_proc == 404: time.sleep(2) diff --git a/Src/Api/animeunity/anime.py b/Src/Api/animeunity/anime.py index 749cc42..12cbdfb 100644 --- a/Src/Api/animeunity/anime.py +++ b/Src/Api/animeunity/anime.py @@ -7,8 +7,8 @@ import logging # Internal utilities from Src.Util.console import console, msg +from Src.Util.os import os_manager from Src.Util.message import start_message -from Src.Util.os import create_folder, can_create_file from Src.Lib.Downloader import MP4_downloader @@ -46,23 +46,24 @@ def download_episode(index_select: int): video_source.parse_script(js_script) # Create output path - mp4_path = None - mp4_name = f"{obj_episode.number}.mp4" - if video_source.is_series: - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, video_source.series_name) - else: - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, video_source.series_name) + title_name = f"{obj_episode.number}.mp4" - # Check if can create file output - create_folder(mp4_path) - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) + if video_source.is_series: + mp4_path = os_manager.get_sanitize_path( + os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, video_source.series_name) + ) + else: + mp4_path = os_manager.get_sanitize_path( + os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, video_source.series_name) + ) + + # Create output folder + os_manager.create_path(mp4_path) # Start downloading MP4_downloader( - str(video_source.src_mp4).strip(), - os.path.join(mp4_path, mp4_name) + url = str(video_source.src_mp4).strip(), + path = os.path.join(mp4_path, title_name) ) else: diff --git a/Src/Api/bitsearch/title.py b/Src/Api/bitsearch/title.py index bd8486c..3f42ab2 100644 --- a/Src/Api/bitsearch/title.py +++ b/Src/Api/bitsearch/title.py @@ -1,14 +1,12 @@ # 01.07.24 import os -import sys -import logging # Internal utilities from Src.Util.console import console from Src.Util.message import start_message -from Src.Util.os import create_folder, can_create_file, remove_special_characters +from Src.Util.os import os_manager from Src.Lib.Downloader import TOR_downloader @@ -34,15 +32,13 @@ def download_title(select_title: MediaItem): print() # Define output path - title_name = remove_special_characters(select_title.name) - mp4_name = title_name.replace("-", "_") + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(title_name.replace(".mp4", ""))) - - # Check if can create file output - create_folder(mp4_path) - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) + title_name = os_manager.get_sanitize_file(select_title.name.replace("-", "_") + ".mp4") + mp4_path = os_manager.get_sanitize_path( + os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", "")) + ) + + # Create output folder + os_manager.create_path(mp4_path) # Tor manager manager = TOR_downloader() diff --git a/Src/Api/cb01new/film.py b/Src/Api/cb01new/film.py index e6fad17..a87bcd7 100644 --- a/Src/Api/cb01new/film.py +++ b/Src/Api/cb01new/film.py @@ -8,7 +8,7 @@ import logging # Internal utilities from Src.Util.console import console, msg -from Src.Util.os import remove_special_characters +from Src.Util.os import os_manager from Src.Util.message import start_message from Src.Util.call_stack import get_call_stack from Src.Lib.Downloader import HLS_Downloader @@ -40,15 +40,19 @@ def download_film(select_title: MediaItem): video_source = VideoSource(select_title.url) # Define output path - title_name = remove_special_characters(select_title.name) - mp4_name = title_name +".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + title_name = os_manager.get_sanitize_file(select_title.name) +".mp4" + mp4_path = os_manager.get_sanitize_path( + os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", "")) + ) # Get m3u8 master playlist master_playlist = video_source.get_playlist() # Download the film using the m3u8 playlist, and output filename - r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() + r_proc = HLS_Downloader( + m3u8_playlist=master_playlist, + output_filename=os.path.join(mp4_path, title_name) + ).start() if r_proc == 404: time.sleep(2) diff --git a/Src/Api/ddlstreamitaly/series.py b/Src/Api/ddlstreamitaly/series.py index 88b0847..756d148 100644 --- a/Src/Api/ddlstreamitaly/series.py +++ b/Src/Api/ddlstreamitaly/series.py @@ -9,7 +9,7 @@ from urllib.parse import urlparse # Internal utilities from Src.Util.console import console from Src.Util.message import start_message -from Src.Util.os import create_folder, can_create_file +from Src.Util.os import os_manager from Src.Util.table import TVShowManager from Src.Lib.Downloader import MP4_downloader from ..Template import manage_selection, map_episode_title, validate_episode_selection @@ -45,14 +45,13 @@ def download_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) print() # Define filename and path for the downloaded video - mp4_name = f"{map_episode_title(scape_info_serie.tv_name, None, index_episode_selected, obj_episode.get('name'))}.mp4" + title_name = os_manager.get_sanitize_file( + f"{map_episode_title(scape_info_serie.tv_name, None, index_episode_selected, obj_episode.get('name'))}.mp4" + ) mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name) - # Check if can create file output - create_folder(mp4_path) - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) + # Create output folder + os_manager.create_path(mp4_path) # Setup video source video_source.setup(obj_episode.get('url')) @@ -65,7 +64,7 @@ def download_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) MP4_downloader( url = master_playlist, - path = os.path.join(mp4_path, mp4_name), + path = os.path.join(mp4_path, title_name), referer = f"{parsed_url.scheme}://{parsed_url.netloc}/", ) diff --git a/Src/Api/guardaserie/series.py b/Src/Api/guardaserie/series.py index 57cc28c..26a0fe1 100644 --- a/Src/Api/guardaserie/series.py +++ b/Src/Api/guardaserie/series.py @@ -55,7 +55,10 @@ def download_video(scape_info_serie: GetSerieInfo, index_season_selected: int, i master_playlist = video_source.get_playlist() # Download the film using the m3u8 playlist, and output filename - r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() + r_proc = HLS_Downloader( + m3u8_playlist=master_playlist, + output_filename=os.path.join(mp4_path, mp4_name) + ).start() if r_proc == 404: time.sleep(2) diff --git a/Src/Api/mostraguarda/film.py b/Src/Api/mostraguarda/film.py index 28eaa9c..cc5fa94 100644 --- a/Src/Api/mostraguarda/film.py +++ b/Src/Api/mostraguarda/film.py @@ -13,7 +13,7 @@ from bs4 import BeautifulSoup # Internal utilities from Src.Util.console import console, msg -from Src.Util.os import remove_special_characters +from Src.Util.os import os_manager from Src.Util.message import start_message from Src.Util.call_stack import get_call_stack from Src.Util.headers import get_headers @@ -64,14 +64,17 @@ def download_film(movie_details: Json_film): video_source.setup(supervideo_url) # Define output path - mp4_name = remove_special_characters(movie_details.title) + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(movie_details.title)) + title_name = os_manager.get_sanitize_file(movie_details.title) + ".mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", "")) # Get m3u8 master playlist master_playlist = video_source.get_playlist() # Download the film using the m3u8 playlist, and output filename - r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() + r_proc = HLS_Downloader( + m3u8_playlist=master_playlist, + output_filename=os.path.join(mp4_path, title_name) + ).start() if r_proc == 404: time.sleep(2) diff --git a/Src/Api/piratebays/title.py b/Src/Api/piratebays/title.py index eaab06d..87ec337 100644 --- a/Src/Api/piratebays/title.py +++ b/Src/Api/piratebays/title.py @@ -5,16 +5,10 @@ import sys import logging -# External libraries -import httpx -from bs4 import BeautifulSoup - - # Internal utilities from Src.Util.console import console from Src.Util.message import start_message -from Src.Util.headers import get_headers -from Src.Util.os import create_folder, can_create_file, remove_special_characters +from Src.Util.os import os_manager from Src.Lib.Downloader import TOR_downloader @@ -39,15 +33,11 @@ def download_title(select_title: MediaItem): print() # Define output path - title_name = remove_special_characters(select_title.name) - mp4_name = title_name.replace("-", "_") + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(title_name.replace(".mp4", ""))) + title_name = os_manager.get_sanitize_file(select_title.name.replace("-", "_") + ".mp4") + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", "")) - # Check if can create file output - create_folder(mp4_path) - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) + # Create output folder + os_manager.create_path(mp4_path) # Tor manager manager = TOR_downloader() diff --git a/Src/Api/streamingcommunity/film.py b/Src/Api/streamingcommunity/film.py index 21745fc..30c094a 100644 --- a/Src/Api/streamingcommunity/film.py +++ b/Src/Api/streamingcommunity/film.py @@ -6,7 +6,7 @@ import time # Internal utilities from Src.Util.console import console, msg -from Src.Util.os import remove_special_characters +from Src.Util.os import os_manager from Src.Util.message import start_message from Src.Util.call_stack import get_call_stack from Src.Lib.Downloader import HLS_Downloader @@ -45,11 +45,14 @@ def download_film(select_title: MediaItem, domain: str, version: str): master_playlist = video_source.get_playlist() # Define the filename and path for the downloaded film - mp4_format = remove_special_characters(select_title.slug) + ".mp4" + title_name = os_manager.get_sanitize_file(select_title.slug) + ".mp4" mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, select_title.slug) # Download the film using the m3u8 playlist, and output filename - r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_format)).start() + r_proc = HLS_Downloader( + m3u8_playlist=master_playlist, + output_filename=os.path.join(mp4_path, title_name) + ).start() if r_proc == 404: time.sleep(2) diff --git a/Src/Api/streamingcommunity/series.py b/Src/Api/streamingcommunity/series.py index 4e0c0a9..14aa973 100644 --- a/Src/Api/streamingcommunity/series.py +++ b/Src/Api/streamingcommunity/series.py @@ -53,7 +53,10 @@ def download_video(tv_name: str, index_season_selected: int, index_episode_selec master_playlist = video_source.get_playlist() # Download the episode - r_proc = HLS_Downloader(os.path.join(mp4_path, mp4_name), master_playlist).start() + r_proc = HLS_Downloader( + master_playlist=master_playlist, + output_filename=os.path.join(mp4_path, mp4_name) + ).start() if r_proc == 404: time.sleep(2) diff --git a/Src/Lib/Downloader/HLS/downloader.py b/Src/Lib/Downloader/HLS/downloader.py index b3fc0fa..e53badb 100644 --- a/Src/Lib/Downloader/HLS/downloader.py +++ b/Src/Lib/Downloader/HLS/downloader.py @@ -7,7 +7,6 @@ import logging # External libraries import httpx -from unidecode import unidecode # Internal utilities @@ -15,17 +14,11 @@ from Src.Util._jsonConfig import config_manager from Src.Util.console import console, Panel, Table from Src.Util.color import Colors from Src.Util.os import ( - remove_folder, - delete_files_except_one, compute_sha1_hash, - format_file_size, - create_folder, - reduce_base_name, - remove_special_characters, - can_create_file + os_manager, + internet_manager ) - # Logic class from ...FFmpeg import ( print_duration_table, @@ -744,17 +737,13 @@ class HLS_Downloader: if not folder: folder = new_folder - # Sanitize base name - base_name = reduce_base_name(remove_special_characters(base_name)) - create_folder(folder) - - if not can_create_file(base_name): - logging.error("Invalid mp4 name.") - sys.exit(0) + # Sanitize base name and folder + folder = os_manager.get_sanitize_path(folder) + base_name = os_manager.get_sanitize_file(base_name) + os_manager.create_path(folder) # Parse to only ASCII for compatibility across platforms new_filename = os.path.join(folder, base_name) - new_filename = unidecode(new_filename) logging.info(f"class 'HLS_Downloader'; call _generate_output_filename(); return path: {new_filename}") return new_filename @@ -857,7 +846,7 @@ class HLS_Downloader: end_output_time = print_duration_table(self.output_filename, description=False, return_string=False) # Calculate file size and duration for reporting - formatted_size = format_file_size(os.path.getsize(self.output_filename)) + formatted_size = internet_manager.format_file_size(os.path.getsize(self.output_filename)) formatted_duration = print_duration_table(self.output_filename, description=False, return_string=True) expected_real_seconds = dict_to_seconds(self.content_downloader.expected_real_time) @@ -895,11 +884,11 @@ class HLS_Downloader: os.rename(self.output_filename, self.output_filename.replace(".mp4", "_failed.mp4")) # Delete all temporary files except for the output file - delete_files_except_one(self.path_manager.base_path, os.path.basename(self.output_filename.replace(".mp4", "_failed.mp4"))) + os_manager.remove_files_except_one(self.path_manager.base_path, os.path.basename(self.output_filename.replace(".mp4", "_failed.mp4"))) # Remove the base folder if specified if REMOVE_SEGMENTS_FOLDER: - remove_folder(self.path_manager.base_path) + os_manager.remove_folder(self.path_manager.base_path) else: logging.info("Video file converted already exists.") diff --git a/Src/Lib/Downloader/HLS/proxyes.py b/Src/Lib/Downloader/HLS/proxyes.py index 783d553..ff785cd 100644 --- a/Src/Lib/Downloader/HLS/proxyes.py +++ b/Src/Lib/Downloader/HLS/proxyes.py @@ -13,7 +13,7 @@ import httpx # Internal utilities from Src.Util._jsonConfig import config_manager from Src.Util.headers import get_headers -from Src.Util.os import check_file_existence +from Src.Util.os import os_manager class ProxyManager: @@ -84,7 +84,7 @@ def main_test_proxy(url_test): path_file_proxt_list = "list_proxy.txt" - if check_file_existence(path_file_proxt_list): + if os_manager.check_file(path_file_proxt_list): # Read file with open(path_file_proxt_list, 'r') as file: diff --git a/Src/Lib/Downloader/HLS/segments.py b/Src/Lib/Downloader/HLS/segments.py index ca3a4e4..1069c2f 100644 --- a/Src/Lib/Downloader/HLS/segments.py +++ b/Src/Lib/Downloader/HLS/segments.py @@ -22,7 +22,7 @@ from Src.Util.console import console from Src.Util.headers import get_headers, random_headers from Src.Util.color import Colors from Src.Util._jsonConfig import config_manager -from Src.Util.os import check_file_existence +from Src.Util.os import os_manager from Src.Util.call_stack import get_call_stack @@ -40,7 +40,7 @@ TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay') TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar') REQUEST_MAX_RETRY = config_manager.get_int('REQUESTS', 'max_retry') REQUEST_VERIFY = config_manager.get_bool('REQUESTS', 'verify_ssl') -THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt") +THERE_IS_PROXY_LIST = os_manager.check_file("list_proxy.txt") PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min') PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max') DEFAULT_VIDEO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_video_workser') diff --git a/Src/Lib/Downloader/MP4/downloader.py b/Src/Lib/Downloader/MP4/downloader.py index f596a13..88170ff 100644 --- a/Src/Lib/Downloader/MP4/downloader.py +++ b/Src/Lib/Downloader/MP4/downloader.py @@ -15,7 +15,7 @@ from Src.Util.headers import get_headers from Src.Util.color import Colors from Src.Util.console import console, Panel from Src.Util._jsonConfig import config_manager -from Src.Util.os import format_file_size +from Src.Util.os import internet_manager # Logic class @@ -95,7 +95,7 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: str = Non if total != 0: console.print(Panel( f"[bold green]Download completed![/bold green]\n" - f"[cyan]File size: [bold red]{format_file_size(os.path.getsize(path))}[/bold red]\n" + f"[cyan]File size: [bold red]{internet_manager.format_file_size(os.path.getsize(path))}[/bold red]\n" f"[cyan]Duration: [bold]{print_duration_table(path, description=False, return_string=True)}[/bold]", title=f"{os.path.basename(path.replace('.mp4', ''))}", border_style="green" diff --git a/Src/Lib/Downloader/TOR/downloader.py b/Src/Lib/Downloader/TOR/downloader.py index 6ce3259..f563c06 100644 --- a/Src/Lib/Downloader/TOR/downloader.py +++ b/Src/Lib/Downloader/TOR/downloader.py @@ -9,7 +9,7 @@ import logging # Internal utilities from Src.Util.color import Colors -from Src.Util.os import format_file_size, format_transfer_speed +from Src.Util.os import internet_manager from Src.Util._jsonConfig import config_manager @@ -146,14 +146,14 @@ class TOR_downloader: downloaded_size = torrent_info['total_downloaded'] # Format variable - downloaded_size_str = format_file_size(downloaded_size) + downloaded_size_str = internet_manager.format_file_size(downloaded_size) downloaded_size = downloaded_size_str.split(' ')[0] - total_size_str = format_file_size(total_size) + total_size_str = internet_manager.format_file_size(total_size) total_size = total_size_str.split(' ')[0] total_size_unit = total_size_str.split(' ')[1] - average_internet_str = format_transfer_speed(download_speed) + average_internet_str = internet_manager.format_transfer_speed(download_speed) average_internet = average_internet_str.split(' ')[0] average_internet_unit = average_internet_str.split(' ')[1] diff --git a/Src/Lib/FFmpeg/capture.py b/Src/Lib/FFmpeg/capture.py index ddd5889..fef163a 100644 --- a/Src/Lib/FFmpeg/capture.py +++ b/Src/Lib/FFmpeg/capture.py @@ -8,7 +8,7 @@ import subprocess # Internal utilities from Src.Util.console import console -from Src.Util.os import format_file_size +from Src.Util.os import internet_manager # Variable @@ -61,7 +61,7 @@ def capture_output(process: subprocess.Popen, description: str) -> None: # Construct the progress string with formatted output information progress_string = (f"[yellow][FFmpeg] [white][{description}[white]]: " f"([green]'speed': [yellow]{data.get('speed', 'N/A')}[white], " - f"[green]'size': [yellow]{format_file_size(byte_size)}[white])") + f"[green]'size': [yellow]{internet_manager.format_file_size(byte_size)}[white])") max_length = max(max_length, len(progress_string)) # Print the progress string to the console, overwriting the previous line diff --git a/Src/Lib/FFmpeg/command.py b/Src/Lib/FFmpeg/command.py index aca810a..ad3e188 100644 --- a/Src/Lib/FFmpeg/command.py +++ b/Src/Lib/FFmpeg/command.py @@ -10,7 +10,7 @@ from typing import List, Dict # Internal utilities from Src.Util._jsonConfig import config_manager -from Src.Util.os import check_file_existence, suppress_output +from Src.Util.os import os_manager, suppress_output from Src.Util.console import console from .util import need_to_force_to_ts, check_duration_v_a from .capture import capture_ffmpeg_real_time @@ -46,7 +46,7 @@ def join_video(video_path: str, out_path: str, codec: M3U8_Codec = None): - force_ts (bool): Force video path to be mpegts as input. """ - if not check_file_existence(video_path): + if not os_manager.check_file(video_path): logging.error("Missing input video for ffmpeg conversion.") sys.exit(0) @@ -119,7 +119,7 @@ def join_video(video_path: str, out_path: str, codec: M3U8_Codec = None): print() time.sleep(0.5) - if not check_file_existence(out_path): + if not os_manager.check_file(out_path): logging.error("Missing output video for ffmpeg conversion video.") sys.exit(0) @@ -135,7 +135,7 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s - out_path (str): The path to save the output file. """ - if not check_file_existence(video_path): + if not os_manager.check_file(video_path): logging.error("Missing input video for ffmpeg conversion.") sys.exit(0) @@ -153,7 +153,7 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s # Add audio tracks as input for i, audio_track in enumerate(audio_tracks): - if check_file_existence(audio_track.get('path')): + if os_manager.check_file(audio_track.get('path')): ffmpeg_cmd.extend(['-i', audio_track.get('path')]) else: logging.error(f"Skip audio join: {audio_track.get('path')} dont exist") @@ -223,7 +223,7 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s print() time.sleep(0.5) - if not check_file_existence(out_path): + if not os_manager.check_file(out_path): logging.error("Missing output video for ffmpeg conversion audio.") sys.exit(0) @@ -239,7 +239,7 @@ def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_pat - out_path (str): The path to save the output file. """ - if not check_file_existence(video_path): + if not os_manager.check_file(video_path): logging.error("Missing input video for ffmpeg conversion.") sys.exit(0) @@ -248,7 +248,7 @@ def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_pat # Add subtitle input files first for subtitle in subtitles_list: - if check_file_existence(subtitle.get('path')): + if os_manager.check_file(subtitle.get('path')): ffmpeg_cmd += ["-i", subtitle['path']] else: logging.error(f"Skip subtitle join: {subtitle.get('path')} doesn't exist") @@ -288,6 +288,6 @@ def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_pat print() time.sleep(0.5) - if not check_file_existence(out_path): + if not os_manager.check_file(out_path): logging.error("Missing output video for ffmpeg conversion subtitle.") sys.exit(0) diff --git a/Src/Lib/M3U8/estimator.py b/Src/Lib/M3U8/estimator.py index 2d254dc..7bdfe09 100644 --- a/Src/Lib/M3U8/estimator.py +++ b/Src/Lib/M3U8/estimator.py @@ -14,7 +14,7 @@ from tqdm import tqdm # Internal utilities from Src.Util.color import Colors -from Src.Util.os import format_file_size, format_transfer_speed +from Src.Util.os import internet_manager from Src.Util._jsonConfig import config_manager @@ -86,8 +86,8 @@ class M3U8_Ts_Estimator: download_speed = (new_value.bytes_recv - old_value.bytes_recv) / interval self.speed = { - "upload": format_transfer_speed(upload_speed), - "download": format_transfer_speed(download_speed) + "upload": internet_manager.format_transfer_speed(upload_speed), + "download": internet_manager.format_transfer_speed(download_speed) } def get_average_speed(self) -> float: @@ -115,7 +115,7 @@ class M3U8_Ts_Estimator: mean_size = total_size / len(self.ts_file_sizes) # Return formatted mean size - return format_file_size(mean_size) + return internet_manager.format_file_size(mean_size) except ZeroDivisionError as e: logging.error("Division by zero error occurred: %s", e) @@ -132,7 +132,7 @@ class M3U8_Ts_Estimator: Returns: str: The total downloaded size as a human-readable string. """ - return format_file_size(self.now_downloaded_size) + return internet_manager.format_file_size(self.now_downloaded_size) def update_progress_bar(self, total_downloaded: int, duration: float, progress_counter: tqdm) -> None: """ diff --git a/Src/Lib/M3U8/parser.py b/Src/Lib/M3U8/parser.py index df7d89d..eb3fc50 100644 --- a/Src/Lib/M3U8/parser.py +++ b/Src/Lib/M3U8/parser.py @@ -6,7 +6,7 @@ import logging # Internal utilities from m3u8 import loads -from Src.Util.os import format_file_size +from Src.Util.os import internet_manager # External libraries @@ -250,7 +250,7 @@ class M3U8_Video: result = [] for video in self.video_playlist: - video_size = format_file_size((video['bandwidth'] * duration) / 8) + video_size = internet_manager.format_file_size((video['bandwidth'] * duration) / 8) result.append((video_size)) return result diff --git a/Src/Util/console.py b/Src/Util/console.py index e0d6f6f..ee34a74 100644 --- a/Src/Util/console.py +++ b/Src/Util/console.py @@ -4,6 +4,7 @@ from rich.console import Console from rich.prompt import Prompt, Confirm from rich.panel import Panel from rich.table import Table +from rich.text import Text # Variable diff --git a/Src/Util/message.py b/Src/Util/message.py index 8b36140..8e59fe7 100644 --- a/Src/Util/message.py +++ b/Src/Util/message.py @@ -1,6 +1,5 @@ # 3.12.23 -import warnings import os import platform diff --git a/Src/Util/os.py b/Src/Util/os.py index bbbccb0..7d114ea 100644 --- a/Src/Util/os.py +++ b/Src/Util/os.py @@ -1,268 +1,407 @@ # 24.01.24 -import re import io import os import sys import ssl import time -import errno import shutil import hashlib import logging import platform +import unidecode import importlib import subprocess import contextlib +import pathvalidate import urllib.request import importlib.metadata # External library import httpx -import unicodedata # Internal utilities -from .console import console -from ._jsonConfig import config_manager +from Src.Util.console import console -# --> OS FILE ASCII -special_chars_to_remove = config_manager.get("DEFAULT", "special_chars_to_remove") +# Variable +OS_CONFIGURATIONS = { + 'windows': { + 'max_length': 255, + 'invalid_chars': '<>:"/\\|?*', + 'reserved_names': [ + "CON", "PRN", "AUX", "NUL", + "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", + "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9" + ], + 'max_path': 255 + }, + 'darwin': { + 'max_length': 4096, + 'invalid_chars': '/:', + 'reserved_names': [], + 'hidden_file_restriction': True + }, + 'linux': { + 'max_length': 4096, + 'invalid_chars': '/\0', + 'reserved_names': [] + } + } -def get_max_length_by_os(system: str) -> int: - """ - Determines the maximum length for a base name based on the operating system. - Parameters: - system (str): The operating system name. +class OsManager: + def __init__(self): + self.system = self._detect_system() + self.config = OS_CONFIGURATIONS.get(self.system, {}) - Returns: - int: The maximum length for the base name. - """ - if system == 'windows': - return 255 # NTFS and other common Windows filesystems support 255 characters for filenames - elif system == 'darwin': # macOS - return 255 # HFS+ and APFS support 255 characters for filenames - elif system == 'linux': - return 255 # Most Linux filesystems (e.g., ext4) support 255 characters for filenames - else: + def _detect_system(self) -> str: + """Detect and normalize operating system name.""" + system = platform.system().lower() + + if system in OS_CONFIGURATIONS: + return system + raise ValueError(f"Unsupported operating system: {system}") -def reduce_base_name(base_name: str) -> str: - """ - Splits the file path into folder and base name, and reduces the base name based on the operating system. + def _process_filename(self, filename: str) -> str: + """ + Comprehensively process filename with cross-platform considerations. + + Args: + filename (str): Original filename. + + Returns: + str: Processed filename. + """ + # Preserve file extension + logging.info("_process_filename: ", filename) + name, ext = os.path.splitext(filename) + + # Handle length restrictions + if len(name) > self.config['max_length']: + name = self._truncate_filename(name) + + # Reconstruct filename + processed_filename = name + ext + + return processed_filename - Parameters: - base_name (str): The name of the file. + def _truncate_filename(self, name: str) -> str: + """ + Truncate filename based on OS-specific rules. + + Args: + name (str): Original filename. + + Returns: + str: Truncated filename. + """ + logging.info("_truncate_filename: ", name) - Returns: - str: The reduced base name. - """ + if self.system == 'windows': + return name[:self.config['max_length'] - 3] + '___' + elif self.system == 'darwin': + return name[:self.config['max_length']] + elif self.system == 'linux': + return name[:self.config['max_length'] - 2] + '___' - # Determine the operating system - system = platform.system().lower() + def get_sanitize_file(self, filename: str) -> str: + """ + Sanitize filename using pathvalidate with unidecode. + + Args: + filename (str): Original filename. + + Returns: + str: Sanitized filename. + """ + logging.info("get_sanitize_file: ", filename) + + # Decode unicode characters and sanitize + decoded_filename = unidecode.unidecode(filename) + sanitized_filename = pathvalidate.sanitize_filename(decoded_filename) + + # Truncate if necessary based on OS configuration + name, ext = os.path.splitext(sanitized_filename) + if len(name) > self.config['max_length']: + name = self._truncate_filename(name) + + logging.info("return :", name + ext) + return name + ext + + def get_sanitize_path(self, path: str) -> str: + """ + Sanitize folder path using pathvalidate with unidecode. + + Args: + path (str): Original folder path. + + Returns: + str: Sanitized folder path. + """ + logging.info("get_sanitize_file: ", path) + + # Decode unicode characters and sanitize + decoded_path = unidecode.unidecode(path) + sanitized_path = pathvalidate.sanitize_filepath(decoded_path) + + # Split path and process each component + path_components = os.path.normpath(sanitized_path).split(os.sep) + processed_components = [] + + for component in path_components: + # Truncate component if necessary + if len(component) > self.config['max_length']: + component = self._truncate_filename(component) + processed_components.append(component) + + logging.info("return :", os.path.join(*processed_components)) + return os.path.join(*processed_components) + + def create_path(self, path: str, mode: int = 0o755) -> bool: + """ + Create directory path with specified permissions. + + Args: + path (str): Path to create. + mode (int, optional): Directory permissions. Defaults to 0o755. + + Returns: + bool: True if path created successfully, False otherwise. + """ + try: + # Sanitize path first + sanitized_path = self.get_sanitize_path(path) + + # Create directory with recursive option + os.makedirs(sanitized_path, mode=mode, exist_ok=True) + return True + + except Exception as e: + logging.error(f"Path creation error: {e}") + return False + + def remove_folder(self, folder_path: str) -> bool: + """ + Safely remove a folder. + + Args: + folder_path (str): Path of directory to remove. + + Returns: + bool: Removal status. + """ + try: + shutil.rmtree(folder_path) + return True + except OSError as e: + logging.error(f"Folder removal error: {e}") + return False - # Get the maximum length for the base name based on the operating system - max_length = get_max_length_by_os(system) + def remove_files_except_one(self, folder_path: str, keep_file: str) -> None: + """ + Delete all files in a folder except for one specified file. + + Parameters: + - folder_path (str): The path to the folder containing the files. + - keep_file (str): The filename to keep in the folder. + """ + + try: + # List all files in the folder + files_in_folder = os.listdir(folder_path) + + # Iterate over each file in the folder + for file_name in files_in_folder: + file_path = os.path.join(folder_path, file_name) + + # Check if the file is not the one to keep and is a regular file + if file_name != keep_file and os.path.isfile(file_path): + os.remove(file_path) # Delete the file + + except Exception as e: + logging.error(f"An error occurred: {e}") + raise - # Reduce the base name if necessary - if len(base_name) > max_length: - if system == 'windows': - # For Windows, truncate and add a suffix if needed - base_name = base_name[:max_length - 3] + '___' - elif system == 'darwin': # macOS - # For macOS, truncate without adding suffix - base_name = base_name[:max_length] - elif system == 'linux': - # For Linux, truncate and add a numeric suffix if needed - base_name = base_name[:max_length - 2] + '___' - - return base_name + def check_file(self, file_path: str) -> bool: + """ + Check if a file exists at the given file path. -def remove_special_characters(input_string): - """ - Remove specified special characters from a string. + Parameters: + file_path (str): The path to the file. - Parameters: - - input_string (str): The input string containing special characters. + Returns: + bool: True if the file exists, False otherwise. + """ + try: + logging.info(f"Check if file exists: {file_path}") + if os.path.exists(file_path): + logging.info(f"The file '{file_path}' exists.") + return True + + else: + return False + + except Exception as e: + logging.error(f"An error occurred while checking file existence: {e}") + return False - Returns: - str: A new string with specified special characters removed. - """ - if input_string is None: - return "None" - # Check if the string ends with '.mp4' - # If it does, we temporarily remove the '.mp4' extension for processing - ends_with_mp4 = input_string.endswith('.mp4') - if ends_with_mp4: - input_string = input_string[:-4] # Remove the last 4 characters ('.mp4') +class InternManager(): - # Compile regular expression pattern to match special characters - pattern = re.compile('[' + re.escape(special_chars_to_remove) + ']') + def format_file_size(self, size_bytes: float) -> str: + """ + Formats a file size from bytes into a human-readable string representation. - # Use compiled pattern to replace special characters with an empty string - cleaned_string = pattern.sub('', input_string) + Parameters: + size_bytes (float): Size in bytes to be formatted. - # If the original string had the '.mp4' extension, re-add it to the cleaned string - if ends_with_mp4: - cleaned_string += '.mp4' + Returns: + str: Formatted string representing the file size with appropriate unit (B, KB, MB, GB, TB). + """ + if size_bytes <= 0: + return "0B" - return cleaned_string.strip() + units = ['B', 'KB', 'MB', 'GB', 'TB'] + unit_index = 0 + + while size_bytes >= 1024 and unit_index < len(units) - 1: + size_bytes /= 1024 + unit_index += 1 + + return f"{size_bytes:.2f} {units[unit_index]}" + + def format_transfer_speed(self, bytes: float) -> str: + """ + Formats a transfer speed from bytes per second into a human-readable string representation. + + Parameters: + bytes (float): Speed in bytes per second to be formatted. + + Returns: + str: Formatted string representing the transfer speed with appropriate unit (Bytes/s, KB/s, MB/s). + """ + if bytes < 1024: + return f"{bytes:.2f} Bytes/s" + elif bytes < 1024 * 1024: + return f"{bytes / 1024:.2f} KB/s" + else: + return f"{bytes / (1024 * 1024):.2f} MB/s" + + @staticmethod + def check_internet(): + while True: + try: + httpx.get("https://www.google.com") + console.log("[bold green]Internet is available![/bold green]") + break + + except urllib.error.URLError: + console.log("[bold red]Internet is not available. Waiting...[/bold red]") + time.sleep(5) + + print() + + +class OsSummary(): + + def get_executable_version(self, command: list): + """ + Get the version of a given command-line executable. + + Args: + command (list): The command to run, e.g., `['ffmpeg', '-version']`. + + Returns: + str: The version string of the executable. + + Raises: + SystemExit: If the command is not found or fails to execute. + """ + + try: + version_output = subprocess.check_output(command, stderr=subprocess.STDOUT).decode().split('\n')[0] + return version_output.split(" ")[2] + + except (FileNotFoundError, subprocess.CalledProcessError): + print(f"{command[0]} not found") + sys.exit(0) + + def get_library_version(self, lib_name: str): + """ + Retrieve the version of a Python library. + + Args: + lib_name (str): The name of the Python library. + + Returns: + str: The library name followed by its version, or `-not installed` if not found. + """ + + try: + version = importlib.metadata.version(lib_name) + return f"{lib_name}-{version}" + + except importlib.metadata.PackageNotFoundError: + return f"{lib_name}-not installed" + + def get_system_summary(self): + """ + Generate a summary of the system environment. + + Includes: + - Python version and implementation details. + - Operating system and architecture. + - OpenSSL and glibc versions. + - Versions of `ffmpeg` and `ffprobe` executables. + - Installed Python libraries as listed in `requirements.txt`. + """ + + # Check internet connectivity + InternManager().check_internet() + console.print("[bold blue]System Summary[/bold blue][white]:") + + # Python version and platform + python_version = sys.version.split()[0] + python_implementation = platform.python_implementation() + arch = platform.machine() + os_info = platform.platform() + openssl_version = ssl.OPENSSL_VERSION + glibc_version = 'glibc ' + '.'.join(map(str, platform.libc_ver()[1])) + + console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})[/bold red]") + logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})") + + # ffmpeg and ffprobe versions + ffmpeg_version = self.get_executable_version(['ffmpeg', '-version']) + ffprobe_version = self.get_executable_version(['ffprobe', '-version']) + + console.print(f"[cyan]Exe versions[white]: [bold red]ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}[/bold red]") + logging.info(f"Dependencies: ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}") + + # Optional libraries versions + optional_libraries = [line.strip() for line in open('requirements.txt', 'r', encoding='utf-8-sig')] + optional_libs_versions = [self.get_library_version(lib) for lib in optional_libraries] + + console.print(f"[cyan]Libraries[white]: [bold red]{', '.join(optional_libs_versions)}[/bold red]\n") + logging.info(f"Libraries: {', '.join(optional_libs_versions)}") -# --> OS MANAGE OUTPUT +# OTHER +os_manager = OsManager() +internet_manager = InternManager() +os_summary = OsSummary() + @contextlib.contextmanager def suppress_output(): with contextlib.redirect_stdout(io.StringIO()): yield - - -# --> OS MANAGE FOLDER -def create_folder(folder_name: str) -> None: - """ - Create a directory if it does not exist, and log the result. - - Parameters: - folder_name (str): The path of the directory to be created. - """ - - if platform.system() == 'Windows': - max_path_length = 260 - else: - max_path_length = 4096 - - try: - logging.info(f"Try create folder: {folder_name}") - - # Check if path length exceeds the maximum allowed - if len(folder_name) > max_path_length: - logging.error(f"Path length exceeds the maximum allowed limit: {len(folder_name)} characters (Max: {max_path_length})") - raise OSError(f"Path length exceeds the maximum allowed limit: {len(folder_name)} characters (Max: {max_path_length})") - - os.makedirs(folder_name, exist_ok=True) - - if os.path.exists(folder_name) and os.path.isdir(folder_name): - logging.info(f"Directory successfully created or already exists: {folder_name}") - else: - logging.error(f"Failed to create directory: {folder_name}") - - except Exception as e: - logging.error(f"An unexpected error occurred while creating the directory {folder_name}: {e}") - raise - -def check_file_existence(file_path): - """ - Check if a file exists at the given file path. - - Parameters: - file_path (str): The path to the file. - - Returns: - bool: True if the file exists, False otherwise. - """ - try: - logging.info(f"Check if file exists: {file_path}") - if os.path.exists(file_path): - logging.info(f"The file '{file_path}' exists.") - return True - - else: - return False - - except Exception as e: - logging.error(f"An error occurred while checking file existence: {e}") - return False - -def remove_folder(folder_path: str) -> None: - """ - Remove a folder if it exists. - - Parameters: - - folder_path (str): The path to the folder to be removed. - """ - - if os.path.exists(folder_path): - try: - shutil.rmtree(folder_path) - except OSError as e: - print(f"Error removing folder '{folder_path}': {e}") - -def delete_files_except_one(folder_path: str, keep_file: str) -> None: - """ - Delete all files in a folder except for one specified file. - - Parameters: - - folder_path (str): The path to the folder containing the files. - - keep_file (str): The filename to keep in the folder. - """ - - try: - - # List all files in the folder - files_in_folder = os.listdir(folder_path) - - # Iterate over each file in the folder - for file_name in files_in_folder: - file_path = os.path.join(folder_path, file_name) - - # Check if the file is not the one to keep and is a regular file - if file_name != keep_file and os.path.isfile(file_path): - os.remove(file_path) # Delete the file - - except Exception as e: - logging.error(f"An error occurred: {e}") - - - -# --> OS MANAGE SIZE FILE AND INTERNET SPEED -def format_file_size(size_bytes: float) -> str: - """ - Formats a file size from bytes into a human-readable string representation. - - Parameters: - size_bytes (float): Size in bytes to be formatted. - - Returns: - str: Formatted string representing the file size with appropriate unit (B, KB, MB, GB, TB). - """ - if size_bytes <= 0: - return "0B" - - units = ['B', 'KB', 'MB', 'GB', 'TB'] - unit_index = 0 - - while size_bytes >= 1024 and unit_index < len(units) - 1: - size_bytes /= 1024 - unit_index += 1 - - return f"{size_bytes:.2f} {units[unit_index]}" - -def format_transfer_speed(bytes: float) -> str: - """ - Formats a transfer speed from bytes per second into a human-readable string representation. - - Parameters: - bytes (float): Speed in bytes per second to be formatted. - - Returns: - str: Formatted string representing the transfer speed with appropriate unit (Bytes/s, KB/s, MB/s). - """ - if bytes < 1024: - return f"{bytes:.2f} Bytes/s" - elif bytes < 1024 * 1024: - return f"{bytes / 1024:.2f} KB/s" - else: - return f"{bytes / (1024 * 1024):.2f} MB/s" - - - -# --> OS MANAGE KEY AND IV HEX def compute_sha1_hash(input_string: str) -> str: """ Computes the SHA-1 hash of the input string. @@ -278,151 +417,3 @@ def compute_sha1_hash(input_string: str) -> str: # Return the hashed string return hashed_string - - - -# --> OS GET SUMMARY -def check_internet(): - while True: - try: - httpx.get("https://www.google.com") - console.log("[bold green]Internet is available![/bold green]") - break - - except urllib.error.URLError: - console.log("[bold red]Internet is not available. Waiting...[/bold red]") - time.sleep(5) - - print() - -def get_executable_version(command): - try: - version_output = subprocess.check_output(command, stderr=subprocess.STDOUT).decode().split('\n')[0] - return version_output.split(" ")[2] - except (FileNotFoundError, subprocess.CalledProcessError): - print(f"{command[0]} not found") - sys.exit(0) - -def get_library_version(lib_name): - try: - version = importlib.metadata.version(lib_name) - return f"{lib_name}-{version}" - except importlib.metadata.PackageNotFoundError: - return f"{lib_name}-not installed" - -def get_system_summary(): - - check_internet() - console.print("[bold blue]System Summary[/bold blue][white]:") - - # Python version and platform - python_version = sys.version.split()[0] - python_implementation = platform.python_implementation() - arch = platform.machine() - os_info = platform.platform() - openssl_version = ssl.OPENSSL_VERSION - glibc_version = 'glibc ' + '.'.join(map(str, platform.libc_ver()[1])) - - console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})[/bold red]") - logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})") - - # ffmpeg and ffprobe versions - ffmpeg_version = get_executable_version(['ffmpeg', '-version']) - ffprobe_version = get_executable_version(['ffprobe', '-version']) - - console.print(f"[cyan]Exe versions[white]: [bold red]ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}[/bold red]") - logging.info(f"Dependencies: ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}") - - # Optional libraries versions - optional_libraries = [line.strip() for line in open('requirements.txt', 'r', encoding='utf-8-sig')] - optional_libs_versions = [get_library_version(lib) for lib in optional_libraries] - - console.print(f"[cyan]Libraries[white]: [bold red]{', '.join(optional_libs_versions)}[/bold red]\n") - logging.info(f"Libraries: {', '.join(optional_libs_versions)}") - - - -# --> OS FILE VALIDATOR - -# List of invalid characters for Windows filenames -WINDOWS_INVALID_CHARS = '<>:"/\\|?*' -WINDOWS_RESERVED_NAMES = [ - "CON", "PRN", "AUX", "NUL", - "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", - "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9" -] - -# Invalid characters for macOS filenames -MACOS_INVALID_CHARS = '/:' - -# Invalid characters for Linux/Android filenames -LINUX_INVALID_CHARS = '/\0' - -# Maximum path length for Windows -WINDOWS_MAX_PATH = 260 - -def is_valid_filename(filename, system): - """ - Validates if the given filename is valid for the specified system. - - Parameters: - - filename (str): The filename to validate. - - system (str): The operating system, e.g., 'Windows', 'Darwin' (macOS), or others for Linux/Android. - - Returns: - bool: True if the filename is valid, False otherwise. - """ - # Normalize Unicode - filename = unicodedata.normalize('NFC', filename) - - # Common checks across all systems - if filename.endswith(' ') or filename.endswith('.') or filename.endswith('/'): - return False - - if filename.startswith('.') and system == "Darwin": - return False - - # System-specific checks - if system == "Windows": - if len(filename) > WINDOWS_MAX_PATH: - return False - if any(char in filename for char in WINDOWS_INVALID_CHARS): - return False - name, ext = os.path.splitext(filename) - if name.upper() in WINDOWS_RESERVED_NAMES: - return False - elif system == "Darwin": # macOS - if any(char in filename for char in MACOS_INVALID_CHARS): - return False - else: # Linux and Android - if any(char in filename for char in LINUX_INVALID_CHARS): - return False - - return True - -def can_create_file(file_path): - """ - Checks if a file can be created at the given file path. - - Parameters: - - file_path (str): The path where the file is to be created. - - Returns: - bool: True if the file can be created, False otherwise. - """ - current_system = platform.system() - - if not is_valid_filename(os.path.basename(file_path), current_system): - return False - - try: - with open(file_path, 'w') as file: - pass - - os.remove(file_path) # Cleanup if the file was created - return True - - except OSError as e: - if e.errno in (errno.EACCES, errno.ENOENT, errno.EEXIST, errno.ENOTDIR): - return False - raise diff --git a/config.json b/config.json index 4abfceb..e2d7fca 100644 --- a/config.json +++ b/config.json @@ -7,7 +7,6 @@ "clean_console": true, "root_path": "Video", "map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)", - "special_chars_to_remove": ".-!@#$%^&*()[]{}<>|`~'\";:,?=+\u00e2\u20ac\u00a6", "config_qbit_tor": { "host": "192.168.1.59", "port": "8080", diff --git a/requirements.txt b/requirements.txt index e365501..0be135e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ m3u8 psutil unidecode jsbeautifier +pathvalidate fake-useragent qbittorrent-api python-qbittorrent diff --git a/run.py b/run.py index 6e37621..5dbd5b1 100644 --- a/run.py +++ b/run.py @@ -16,7 +16,7 @@ from Src.Util.message import start_message from Src.Util.console import console, msg from Src.Util._jsonConfig import config_manager from Src.Upload.update import update as git_update -from Src.Util.os import get_system_summary +from Src.Util.os import os_summary from Src.Lib.TMBD import tmdb from Src.Util.logger import Logger @@ -108,7 +108,7 @@ def initialize(): log_not = Logger() # Get system info - get_system_summary() + os_summary.get_system_summary() # Set terminal size for win 7 if platform.system() == "Windows" and "7" in platform.version():