mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-06 19:45:24 +00:00
429 lines
13 KiB
Python
429 lines
13 KiB
Python
# 24.01.24
|
|
|
|
import re
|
|
import io
|
|
import os
|
|
import sys
|
|
import ssl
|
|
import time
|
|
import errno
|
|
import shutil
|
|
import hashlib
|
|
import logging
|
|
import platform
|
|
import importlib
|
|
import subprocess
|
|
import contextlib
|
|
import urllib.request
|
|
import importlib.metadata
|
|
|
|
|
|
# External library
|
|
import httpx
|
|
import unicodedata
|
|
|
|
|
|
# Internal utilities
|
|
from .console import console
|
|
from ._jsonConfig import config_manager
|
|
|
|
|
|
# --> OS FILE ASCII
|
|
special_chars_to_remove = config_manager.get("DEFAULT", "special_chars_to_remove")
|
|
|
|
|
|
def get_max_length_by_os(system: str) -> int:
|
|
"""
|
|
Determines the maximum length for a base name based on the operating system.
|
|
|
|
Parameters:
|
|
system (str): The operating system name.
|
|
|
|
Returns:
|
|
int: The maximum length for the base name.
|
|
"""
|
|
if system == 'windows':
|
|
return 255 # NTFS and other common Windows filesystems support 255 characters for filenames
|
|
elif system == 'darwin': # macOS
|
|
return 255 # HFS+ and APFS support 255 characters for filenames
|
|
elif system == 'linux':
|
|
return 255 # Most Linux filesystems (e.g., ext4) support 255 characters for filenames
|
|
else:
|
|
raise ValueError(f"Unsupported operating system: {system}")
|
|
|
|
def reduce_base_name(base_name: str) -> str:
|
|
"""
|
|
Splits the file path into folder and base name, and reduces the base name based on the operating system.
|
|
|
|
Parameters:
|
|
base_name (str): The name of the file.
|
|
|
|
Returns:
|
|
str: The reduced base name.
|
|
"""
|
|
|
|
# Determine the operating system
|
|
system = platform.system().lower()
|
|
|
|
# Get the maximum length for the base name based on the operating system
|
|
max_length = get_max_length_by_os(system)
|
|
|
|
# Reduce the base name if necessary
|
|
if len(base_name) > max_length:
|
|
if system == 'windows':
|
|
# For Windows, truncate and add a suffix if needed
|
|
base_name = base_name[:max_length - 3] + '___'
|
|
elif system == 'darwin': # macOS
|
|
# For macOS, truncate without adding suffix
|
|
base_name = base_name[:max_length]
|
|
elif system == 'linux':
|
|
# For Linux, truncate and add a numeric suffix if needed
|
|
base_name = base_name[:max_length - 2] + '___'
|
|
|
|
return base_name
|
|
|
|
def remove_special_characters(input_string):
|
|
"""
|
|
Remove specified special characters from a string.
|
|
|
|
Parameters:
|
|
- input_string (str): The input string containing special characters.
|
|
|
|
Returns:
|
|
str: A new string with specified special characters removed.
|
|
"""
|
|
if input_string is None:
|
|
return "None"
|
|
|
|
# Check if the string ends with '.mp4'
|
|
# If it does, we temporarily remove the '.mp4' extension for processing
|
|
ends_with_mp4 = input_string.endswith('.mp4')
|
|
if ends_with_mp4:
|
|
input_string = input_string[:-4] # Remove the last 4 characters ('.mp4')
|
|
|
|
# Compile regular expression pattern to match special characters
|
|
pattern = re.compile('[' + re.escape(special_chars_to_remove) + ']')
|
|
|
|
# Use compiled pattern to replace special characters with an empty string
|
|
cleaned_string = pattern.sub('', input_string)
|
|
|
|
# If the original string had the '.mp4' extension, re-add it to the cleaned string
|
|
if ends_with_mp4:
|
|
cleaned_string += '.mp4'
|
|
|
|
return cleaned_string.strip()
|
|
|
|
|
|
|
|
# --> OS MANAGE OUTPUT
|
|
@contextlib.contextmanager
|
|
def suppress_output():
|
|
with contextlib.redirect_stdout(io.StringIO()):
|
|
yield
|
|
|
|
|
|
|
|
# --> OS MANAGE FOLDER
|
|
def create_folder(folder_name: str) -> None:
|
|
"""
|
|
Create a directory if it does not exist, and log the result.
|
|
|
|
Parameters:
|
|
folder_name (str): The path of the directory to be created.
|
|
"""
|
|
|
|
if platform.system() == 'Windows':
|
|
max_path_length = 260
|
|
else:
|
|
max_path_length = 4096
|
|
|
|
try:
|
|
logging.info(f"Try create folder: {folder_name}")
|
|
|
|
# Check if path length exceeds the maximum allowed
|
|
if len(folder_name) > max_path_length:
|
|
logging.error(f"Path length exceeds the maximum allowed limit: {len(folder_name)} characters (Max: {max_path_length})")
|
|
raise OSError(f"Path length exceeds the maximum allowed limit: {len(folder_name)} characters (Max: {max_path_length})")
|
|
|
|
os.makedirs(folder_name, exist_ok=True)
|
|
|
|
if os.path.exists(folder_name) and os.path.isdir(folder_name):
|
|
logging.info(f"Directory successfully created or already exists: {folder_name}")
|
|
else:
|
|
logging.error(f"Failed to create directory: {folder_name}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"An unexpected error occurred while creating the directory {folder_name}: {e}")
|
|
raise
|
|
|
|
def check_file_existence(file_path):
|
|
"""
|
|
Check if a file exists at the given file path.
|
|
|
|
Parameters:
|
|
file_path (str): The path to the file.
|
|
|
|
Returns:
|
|
bool: True if the file exists, False otherwise.
|
|
"""
|
|
try:
|
|
logging.info(f"Check if file exists: {file_path}")
|
|
if os.path.exists(file_path):
|
|
logging.info(f"The file '{file_path}' exists.")
|
|
return True
|
|
|
|
else:
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while checking file existence: {e}")
|
|
return False
|
|
|
|
def remove_folder(folder_path: str) -> None:
|
|
"""
|
|
Remove a folder if it exists.
|
|
|
|
Parameters:
|
|
- folder_path (str): The path to the folder to be removed.
|
|
"""
|
|
|
|
if os.path.exists(folder_path):
|
|
try:
|
|
shutil.rmtree(folder_path)
|
|
except OSError as e:
|
|
print(f"Error removing folder '{folder_path}': {e}")
|
|
|
|
def delete_files_except_one(folder_path: str, keep_file: str) -> None:
|
|
"""
|
|
Delete all files in a folder except for one specified file.
|
|
|
|
Parameters:
|
|
- folder_path (str): The path to the folder containing the files.
|
|
- keep_file (str): The filename to keep in the folder.
|
|
"""
|
|
|
|
try:
|
|
|
|
# List all files in the folder
|
|
files_in_folder = os.listdir(folder_path)
|
|
|
|
# Iterate over each file in the folder
|
|
for file_name in files_in_folder:
|
|
file_path = os.path.join(folder_path, file_name)
|
|
|
|
# Check if the file is not the one to keep and is a regular file
|
|
if file_name != keep_file and os.path.isfile(file_path):
|
|
os.remove(file_path) # Delete the file
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {e}")
|
|
|
|
|
|
|
|
# --> OS MANAGE SIZE FILE AND INTERNET SPEED
|
|
def format_file_size(size_bytes: float) -> str:
|
|
"""
|
|
Formats a file size from bytes into a human-readable string representation.
|
|
|
|
Parameters:
|
|
size_bytes (float): Size in bytes to be formatted.
|
|
|
|
Returns:
|
|
str: Formatted string representing the file size with appropriate unit (B, KB, MB, GB, TB).
|
|
"""
|
|
if size_bytes <= 0:
|
|
return "0B"
|
|
|
|
units = ['B', 'KB', 'MB', 'GB', 'TB']
|
|
unit_index = 0
|
|
|
|
while size_bytes >= 1024 and unit_index < len(units) - 1:
|
|
size_bytes /= 1024
|
|
unit_index += 1
|
|
|
|
return f"{size_bytes:.2f} {units[unit_index]}"
|
|
|
|
def format_transfer_speed(bytes: float) -> str:
|
|
"""
|
|
Formats a transfer speed from bytes per second into a human-readable string representation.
|
|
|
|
Parameters:
|
|
bytes (float): Speed in bytes per second to be formatted.
|
|
|
|
Returns:
|
|
str: Formatted string representing the transfer speed with appropriate unit (Bytes/s, KB/s, MB/s).
|
|
"""
|
|
if bytes < 1024:
|
|
return f"{bytes:.2f} Bytes/s"
|
|
elif bytes < 1024 * 1024:
|
|
return f"{bytes / 1024:.2f} KB/s"
|
|
else:
|
|
return f"{bytes / (1024 * 1024):.2f} MB/s"
|
|
|
|
|
|
|
|
# --> OS MANAGE KEY AND IV HEX
|
|
def compute_sha1_hash(input_string: str) -> str:
|
|
"""
|
|
Computes the SHA-1 hash of the input string.
|
|
|
|
Parameters:
|
|
- input_string (str): The string to be hashed.
|
|
|
|
Returns:
|
|
str: The SHA-1 hash of the input string.
|
|
"""
|
|
# Compute the SHA-1 hash
|
|
hashed_string = hashlib.sha1(input_string.encode()).hexdigest()
|
|
|
|
# Return the hashed string
|
|
return hashed_string
|
|
|
|
|
|
|
|
# --> OS GET SUMMARY
|
|
def check_internet():
|
|
while True:
|
|
try:
|
|
httpx.get("https://www.google.com")
|
|
console.log("[bold green]Internet is available![/bold green]")
|
|
break
|
|
|
|
except urllib.error.URLError:
|
|
console.log("[bold red]Internet is not available. Waiting...[/bold red]")
|
|
time.sleep(5)
|
|
|
|
print()
|
|
|
|
def get_executable_version(command):
|
|
try:
|
|
version_output = subprocess.check_output(command, stderr=subprocess.STDOUT).decode().split('\n')[0]
|
|
return version_output.split(" ")[2]
|
|
except (FileNotFoundError, subprocess.CalledProcessError):
|
|
print(f"{command[0]} not found")
|
|
sys.exit(0)
|
|
|
|
def get_library_version(lib_name):
|
|
try:
|
|
version = importlib.metadata.version(lib_name)
|
|
return f"{lib_name}-{version}"
|
|
except importlib.metadata.PackageNotFoundError:
|
|
return f"{lib_name}-not installed"
|
|
|
|
def get_system_summary():
|
|
|
|
check_internet()
|
|
console.print("[bold blue]System Summary[/bold blue][white]:")
|
|
|
|
# Python version and platform
|
|
python_version = sys.version.split()[0]
|
|
python_implementation = platform.python_implementation()
|
|
arch = platform.machine()
|
|
os_info = platform.platform()
|
|
openssl_version = ssl.OPENSSL_VERSION
|
|
glibc_version = 'glibc ' + '.'.join(map(str, platform.libc_ver()[1]))
|
|
|
|
console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})[/bold red]")
|
|
logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})")
|
|
|
|
# ffmpeg and ffprobe versions
|
|
ffmpeg_version = get_executable_version(['ffmpeg', '-version'])
|
|
ffprobe_version = get_executable_version(['ffprobe', '-version'])
|
|
|
|
console.print(f"[cyan]Exe versions[white]: [bold red]ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}[/bold red]")
|
|
logging.info(f"Dependencies: ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}")
|
|
|
|
# Optional libraries versions
|
|
optional_libraries = [line.strip() for line in open('requirements.txt', 'r', encoding='utf-8-sig')]
|
|
optional_libs_versions = [get_library_version(lib) for lib in optional_libraries]
|
|
|
|
console.print(f"[cyan]Libraries[white]: [bold red]{', '.join(optional_libs_versions)}[/bold red]\n")
|
|
logging.info(f"Libraries: {', '.join(optional_libs_versions)}")
|
|
|
|
|
|
|
|
# --> OS FILE VALIDATOR
|
|
|
|
# List of invalid characters for Windows filenames
|
|
WINDOWS_INVALID_CHARS = '<>:"/\\|?*'
|
|
WINDOWS_RESERVED_NAMES = [
|
|
"CON", "PRN", "AUX", "NUL",
|
|
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
|
|
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"
|
|
]
|
|
|
|
# Invalid characters for macOS filenames
|
|
MACOS_INVALID_CHARS = '/:'
|
|
|
|
# Invalid characters for Linux/Android filenames
|
|
LINUX_INVALID_CHARS = '/\0'
|
|
|
|
# Maximum path length for Windows
|
|
WINDOWS_MAX_PATH = 260
|
|
|
|
def is_valid_filename(filename, system):
|
|
"""
|
|
Validates if the given filename is valid for the specified system.
|
|
|
|
Parameters:
|
|
- filename (str): The filename to validate.
|
|
- system (str): The operating system, e.g., 'Windows', 'Darwin' (macOS), or others for Linux/Android.
|
|
|
|
Returns:
|
|
bool: True if the filename is valid, False otherwise.
|
|
"""
|
|
# Normalize Unicode
|
|
filename = unicodedata.normalize('NFC', filename)
|
|
|
|
# Common checks across all systems
|
|
if filename.endswith(' ') or filename.endswith('.') or filename.endswith('/'):
|
|
return False
|
|
|
|
if filename.startswith('.') and system == "Darwin":
|
|
return False
|
|
|
|
# System-specific checks
|
|
if system == "Windows":
|
|
if len(filename) > WINDOWS_MAX_PATH:
|
|
return False
|
|
if any(char in filename for char in WINDOWS_INVALID_CHARS):
|
|
return False
|
|
name, ext = os.path.splitext(filename)
|
|
if name.upper() in WINDOWS_RESERVED_NAMES:
|
|
return False
|
|
elif system == "Darwin": # macOS
|
|
if any(char in filename for char in MACOS_INVALID_CHARS):
|
|
return False
|
|
else: # Linux and Android
|
|
if any(char in filename for char in LINUX_INVALID_CHARS):
|
|
return False
|
|
|
|
return True
|
|
|
|
def can_create_file(file_path):
|
|
"""
|
|
Checks if a file can be created at the given file path.
|
|
|
|
Parameters:
|
|
- file_path (str): The path where the file is to be created.
|
|
|
|
Returns:
|
|
bool: True if the file can be created, False otherwise.
|
|
"""
|
|
current_system = platform.system()
|
|
|
|
if not is_valid_filename(os.path.basename(file_path), current_system):
|
|
return False
|
|
|
|
try:
|
|
with open(file_path, 'w') as file:
|
|
pass
|
|
|
|
os.remove(file_path) # Cleanup if the file was created
|
|
return True
|
|
|
|
except OSError as e:
|
|
if e.errno in (errno.EACCES, errno.ENOENT, errno.EEXIST, errno.ENOTDIR):
|
|
return False
|
|
raise
|