Add small bar, summary system, fix ffmpeg multiple audios and subtitles.

This commit is contained in:
Ghost 2024-05-28 17:15:20 +02:00
parent 80393c6147
commit 5c209e9bed
30 changed files with 428 additions and 941 deletions

View File

@ -9,14 +9,13 @@ import subprocess
# External libraries
from Src.Lib.Request import requests
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.console import console
from Src.Lib.Request import requests
from Src.Util.headers import get_headers
from Src.Util.node_jjs import run_node_script
from Src.Util.os import run_node_script
class VideoSource:

View File

@ -20,7 +20,7 @@ def main_film():
"""
# Make request to site to get content that corrsisponde to that string
film_search = msg.ask("\n[purple]Insert word to search in all site: ").strip()
film_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(film_search)
if len_database != 0:

View File

@ -6,13 +6,13 @@ import logging
# External libraries
from Src.Lib.Request import requests
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Lib.Request import requests
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
@ -45,6 +45,7 @@ def title_search(title_search: str) -> int:
# Send request to search for titles
response = requests.get(f"https://{AD_SITE_NAME}.{AD_DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3")
response.raise_for_status()
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")

View File

@ -5,8 +5,11 @@ import threading
import logging
# Internal utilities
# Internal libraries
from Src.Lib.Request import requests
# Internal utilities
from Src.Lib.Google import search as google_search

View File

@ -5,12 +5,12 @@ from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
# External libraries
from Src.Lib.Request import requests
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Lib.Request.my_requests import requests
from Src.Util._jsonConfig import config_manager

View File

@ -11,7 +11,7 @@ from .anime import donwload_film, donwload_series
def main_anime():
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site: ").strip()
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(string_to_search)
if len_database != 0:

View File

@ -5,13 +5,13 @@ import logging
# External libraries
from Src.Lib.Request import requests
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Lib.Request import requests
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
@ -46,6 +46,7 @@ def get_token(site_name: str, domain: str) -> dict:
# Send a GET request to the specified URL composed of the site name and domain
response = requests.get(f"https://www.{site_name}.{domain}")
response.raise_for_status()
# Initialize variables to store CSRF token
find_csrf_token = None
@ -166,6 +167,7 @@ def title_search(title: str) -> int:
# Send a POST request to the API endpoint for live search
response = requests.post(f'https://www.{AU_SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json_data=json_data)
response.raise_for_status()
# Process each record returned in the response
for record in response.json()['records']:

View File

@ -5,7 +5,6 @@ import threading
import logging
# Internal utilities
from Src.Lib.Request import requests
from Src.Lib.Google import search as google_search

View File

@ -6,13 +6,12 @@ from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
# External libraries
from Src.Lib.Request import requests
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Lib.Request.my_requests import requests
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console, Panel
@ -61,7 +60,7 @@ class VideoSource:
try:
response = requests.post(f"https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}", headers = self.headers)
response = requests.post(f"https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}", headers=self.headers)
response.raise_for_status()
# Collect all info about preview
@ -84,7 +83,7 @@ class VideoSource:
try:
response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers = self.headers)
response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers)
response.raise_for_status()
# Extract JSON response if available
@ -108,7 +107,7 @@ class VideoSource:
try:
# Make a request to collect information about a specific season
response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers = self.headers)
response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers)
response.raise_for_status()
# Extract JSON response if available
@ -140,12 +139,12 @@ class VideoSource:
try:
# Make a request to get iframe source
response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params = params)
response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params)
response.raise_for_status()
# Parse response with BeautifulSoup to get iframe source
soup = BeautifulSoup(response.text, "html.parser")
self.iframe_src: str = soup.find("iframe").get("src")
self.iframe_src = soup.find("iframe").get("src")
except Exception as e:
logging.error(f"Error getting iframe source: {e}")
@ -182,7 +181,7 @@ class VideoSource:
# Make a request to get content
try:
response = requests.get(self.iframe_src, headers = self.headers)
response = requests.get(self.iframe_src, headers=self.headers)
response.raise_for_status()
except:

View File

@ -25,7 +25,7 @@ def main_film_series():
site_version, domain = get_version_and_domain()
# Make request to site to get content that corrsisponde to that string
film_search = msg.ask("\n[purple]Insert word to search in all site: ").strip()
film_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(film_search, domain)
if len_database != 0:

View File

@ -4,4 +4,4 @@ STREAMING_FOLDER = "streamingcommunity"
MOVIE_FOLDER = "Movie"
SERIES_FOLDER = "Serie"
SERVER_IP = ["57.129.7.85","57.129.7.188","57.129.7.174","57.129.4.77","57.129.16.196","57.129.16.156","57.129.16.139","57.129.16.135","57.129.13.175","51.38.112.237","51.195.107.7","51.195.107.230"]
SERVER_IP = ['162.19.231.20', '162.19.255.224', '162.19.254.232', '162.19.254.230', '51.195.107.230', '162.19.255.36', '162.19.228.128', '51.195.107.7', '162.19.253.242', '141.95.0.248', '57.129.4.77', '57.129.7.85']

View File

@ -13,11 +13,11 @@ from unidecode import unidecode
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Lib.Request import requests
from Src.Util.headers import get_headers
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console
from Src.Util.table import TVShowManager
# Logic class
@ -138,6 +138,7 @@ def title_search(title_search: str, domain: str) -> int:
# Send request to search for titles ( replace à to a and space to "+" )
response = requests.get(f"https://{SC_SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()})
response.raise_for_status()
# Add found titles to media search manager
for dict_title in response.json()['data']:

View File

@ -6,4 +6,3 @@ from .command import (
join_subtitle,
)
from .util import print_duration_table
from .installer import check_ffmpeg

View File

@ -27,10 +27,10 @@ from .capture import capture_ffmpeg_real_time
# Variable
DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug")
DEBUG_FFMPEG = "debug" if DEBUG_MODE else "error"
USE_CODECS = config_manager.get_bool("M3U8_FILTER", "use_codec")
USE_GPU = config_manager.get_bool("M3U8_FILTER", "use_gpu")
FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_FILTER", "default_preset")
CHECK_OUTPUT_CONVERSION = config_manager.get_bool("M3U8_FILTER", "check_output_conversion")
USE_CODECS = config_manager.get_bool("M3U8_CONVERSION", "use_codec")
USE_GPU = config_manager.get_bool("M3U8_CONVERSION", "use_gpu")
FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_CONVERSION", "default_preset")
CHECK_OUTPUT_CONVERSION = config_manager.get_bool("M3U8_CONVERSION", "check_output_after_ffmpeg")
@ -278,6 +278,7 @@ def join_video(video_path: str, out_path: str, vcodec: str = None, acodec: str =
logging.error("Missing input video for ffmpeg conversion.")
sys.exit(0)
# Start command
ffmpeg_cmd = ['ffmpeg']
@ -290,6 +291,7 @@ def join_video(video_path: str, out_path: str, vcodec: str = None, acodec: str =
ffmpeg_cmd.extend(['-f', 'mpegts'])
vcodec = "libx264"
# Insert input video path
ffmpeg_cmd.extend(['-i', video_path])
@ -307,6 +309,7 @@ def join_video(video_path: str, out_path: str, vcodec: str = None, acodec: str =
else:
ffmpeg_cmd.extend(['-preset', 'fast'])
# Overwrite
ffmpeg_cmd += [out_path, "-y"]
logging.info(f"FFmpeg command: {ffmpeg_cmd}")
@ -318,14 +321,20 @@ def join_video(video_path: str, out_path: str, vcodec: str = None, acodec: str =
capture_ffmpeg_real_time(ffmpeg_cmd, "[cyan]Join video")
print()
# Check file
if CHECK_OUTPUT_CONVERSION:
console.log("[red]Check output ffmpeg")
time.sleep(0.5)
check_ffmpeg_input(out_path)
time.sleep(0.5)
if not check_file_existence(out_path):
logging.error("Missing output video for ffmpeg conversion video.")
sys.exit(0)
def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: str, vcodec: str = 'copy', acodec: str = 'aac', bitrate: str = '192k'):
def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: str):
"""
Joins audio tracks with a video file using FFmpeg.
@ -334,29 +343,36 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s
- audio_tracks (list[dict[str, str]]): A list of dictionaries containing information about audio tracks.
Each dictionary should contain the 'path' key with the path to the audio file.
- out_path (str): The path to save the output file.
- vcodec (str): The video codec to use. Defaults to 'copy'.
- acodec (str): The audio codec to use. Defaults to 'aac'.
- bitrate (str): The bitrate for the audio stream. Defaults to '192k'.
- preset (str): The preset for encoding. Defaults to 'ultrafast'.
"""
if not check_file_existence(video_path):
logging.error("Missing input video for ffmpeg conversion.")
sys.exit(0)
# Start command
ffmpeg_cmd = ['ffmpeg', '-i', video_path]
# Add audio track
# Add audio tracks as input
for i, audio_track in enumerate(audio_tracks):
ffmpeg_cmd.extend(['-i', audio_track.get('path')])
if check_file_existence(audio_track.get('path')):
ffmpeg_cmd.extend(['-i', audio_track.get('path')])
else:
logging.error(f"Skip audio join: {audio_track.get('path')} dont exist")
# Map the video and audio streams
ffmpeg_cmd.append('-map')
ffmpeg_cmd.append('0:v') # Map video stream from the first input (video_path)
for i in range(1, len(audio_tracks) + 1):
ffmpeg_cmd.append('-map')
ffmpeg_cmd.append(f'{i}:a') # Map audio streams from subsequent inputs
if not check_file_existence(audio_track.get('path')):
sys.exit(0)
# Add output args
if USE_CODECS:
ffmpeg_cmd.extend(['-c:v', vcodec, '-c:a', acodec, '-b:a', str(bitrate), '-preset', FFMPEG_DEFAULT_PRESET])
ffmpeg_cmd.extend(['-c:v', 'copy', '-c:a', 'copy'])
else:
ffmpeg_cmd.extend(['-c', 'copy'])
@ -378,6 +394,11 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s
time.sleep(0.5)
check_ffmpeg_input(out_path)
time.sleep(0.5)
if not check_file_existence(out_path):
logging.error("Missing output video for ffmpeg conversion audio.")
sys.exit(0)
def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_path: str):
"""
@ -394,26 +415,24 @@ def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_pat
logging.error("Missing input video for ffmpeg conversion.")
sys.exit(0)
# Start command
added_subtitle_names = set() # Remove subtitle with same name
ffmpeg_cmd = ["ffmpeg", "-i", video_path]
# Add subtitle with language
# Add subtitle input files first
for subtitle in subtitles_list:
if check_file_existence(subtitle.get('path')):
ffmpeg_cmd += ["-i", subtitle['path']]
else:
logging.error(f"Skip subtitle join: {subtitle.get('path')} doesn't exist")
# Add maps for video and audio streams
ffmpeg_cmd += ["-map", "0:v", "-map", "0:a"]
# Add subtitle maps and metadata
for idx, subtitle in enumerate(subtitles_list):
if subtitle['name'] in added_subtitle_names:
continue
added_subtitle_names.add(subtitle['name'])
ffmpeg_cmd += ["-i", subtitle['path']]
ffmpeg_cmd += ["-map", "0:v", "-map", "0:a", "-map", f"{idx + 1}:s"]
ffmpeg_cmd += ["-map", f"{idx + 1}:s"]
ffmpeg_cmd += ["-metadata:s:s:{}".format(idx), "title={}".format(subtitle['name'])]
if not check_file_existence(subtitle['path']):
sys.exit(0)
# Add output args
if USE_CODECS:
ffmpeg_cmd.extend(['-c:v', 'copy', '-c:a', 'copy', '-c:s', 'mov_text'])
@ -430,9 +449,15 @@ def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_pat
else:
capture_ffmpeg_real_time(ffmpeg_cmd, "[cyan]Join subtitle")
print()
# Check file
if CHECK_OUTPUT_CONVERSION:
console.log("[red]Check output ffmpeg")
time.sleep(0.5)
check_ffmpeg_input(out_path)
time.sleep(0.5)
if not check_file_existence(out_path):
logging.error("Missing output video for ffmpeg conversion subtitle.")
sys.exit(0)

View File

@ -1,242 +0,0 @@
# 24.01.2023
import logging
import os
import shutil
import subprocess
# External libraries
from tqdm.rich import tqdm
# Internal utilities
from Src.Util.os import decompress_file
from Src.Util._win32 import set_env_path
from Src.Util.console import console
from Src.Lib.Request.my_requests import requests
# Constants
FFMPEG_BUILDS = {
'release-full': {
'7z': ('release-full', 'full_build'),
'zip': (None, 'full_build')
}
}
INSTALL_DIR = os.path.expanduser("~")
show_version = True
def get_version():
"""
Get the version of FFmpeg installed on the system.
This function runs the 'ffmpeg -version' command to retrieve version information
about the installed FFmpeg binary.
"""
try:
# Run the FFmpeg command to get version information
output = subprocess.check_output(['ffmpeg', '-version'], stderr=subprocess.STDOUT, universal_newlines=True)
# Extract version information from the output
version_lines = [line for line in output.split('\n') if line.startswith('ffmpeg version')]
if version_lines:
# Extract version number from the version line
version = version_lines[0].split(' ')[2]
console.print(f"[cyan]FFmpeg version: [red]{version}")
except subprocess.CalledProcessError as e:
# If there's an error executing the FFmpeg command
logging.error("Error executing FFmpeg command:", e.output.strip())
raise e
def get_ffmpeg_download_url(build: str = 'release-full', format: str = 'zip') -> str:
"""
Construct the URL for downloading FFMPEG build.
Args:
- build (str): The type of FFMPEG build.
- format (str): The format of the build (e.g., zip, 7z).
Returns:
str: The URL for downloading the FFMPEG build.
"""
for ffbuild_name, formats in FFMPEG_BUILDS.items():
for ffbuild_format, names in formats.items():
if not (format is None or format == ffbuild_format):
continue
if names[0]:
return f'https://gyan.dev/ffmpeg/builds/ffmpeg-{names[0]}.{ffbuild_format}'
if names[1]:
github_version = requests.get('https://www.gyan.dev/ffmpeg/builds/release-version').text
assert github_version, 'failed to retreive latest version from github'
return (
'https://github.com/GyanD/codexffmpeg/releases/download/'
f'{github_version}/ffmpeg-{github_version}-{names[1]}.{ffbuild_format}'
)
raise ValueError(f'{build} as format {format} does not exist')
class FFMPEGDownloader:
def __init__(self, url: str, destination: str, hash_url: str = None) -> None:
"""
Initialize the FFMPEGDownloader object.
Args:
- url (str): The URL to download the file from.
- destination (str): The path where the downloaded file will be saved.
- hash_url (str): The URL containing the file's expected hash.
"""
self.url = url
self.destination = destination
self.expected_hash = requests.get(hash_url).text if hash_url else None
self.file_size = len(requests.get(self.url).content)
def download(self) -> None:
"""
Download the file from the provided URL.
"""
try:
with requests.get(self.url) as response, open(self.destination, 'wb') as out_file:
with tqdm(total=self.file_size, unit='B', unit_scale=True, unit_divisor=1024, desc='[yellow]Downloading') as pbar:
while True:
data = response.read(4096)
if not data:
break
out_file.write(data)
pbar.update(len(data))
except Exception as e:
logging.error(f"Error downloading file: {e}")
raise
def move_ffmpeg_exe_to_top_level(install_dir: str) -> None:
"""
Move the FFMPEG executable to the top-level directory.
Args:
- install_dir (str): The directory to search for the executable.
"""
try:
for root, _, files in os.walk(install_dir):
for file in files:
if file == 'ffmpeg.exe':
base_path = os.path.abspath(os.path.join(root, '..'))
to_remove = os.listdir(install_dir)
# Move ffmpeg.exe to the top level
for item in os.listdir(base_path):
shutil.move(os.path.join(base_path, item), install_dir)
# Remove other files from the top level
for item in to_remove:
item = os.path.join(install_dir, item)
if os.path.isdir(item):
shutil.rmtree(item)
else:
os.remove(item)
break
except Exception as e:
logging.error(f"Error moving ffmpeg executable: {e}")
raise
def add_install_dir_to_environment_path(install_dir: str) -> None:
"""
Add the install directory to the environment PATH variable.
Args:
- install_dir (str): The directory to be added to the environment PATH variable.
"""
install_dir = os.path.abspath(os.path.join(install_dir, 'bin'))
set_env_path(install_dir)
def download_ffmpeg():
"""
Main function to donwload ffmpeg and add to win path
"""
# Get FFMPEG download URL
ffmpeg_url = get_ffmpeg_download_url()
# Generate install directory path
install_dir = os.path.join(INSTALL_DIR, 'FFMPEG')
console.print(f"[cyan]Making install directory: [red]{install_dir!r}")
logging.info(f'Making install directory {install_dir!r}')
os.makedirs(install_dir, exist_ok=True)
# Download FFMPEG
console.print(f'[cyan]Downloading: [red]{ffmpeg_url!r} [cyan]to [red]{os.path.join(install_dir, os.path.basename(ffmpeg_url))!r}')
logging.info(f'Downloading {ffmpeg_url!r} to {os.path.join(install_dir, os.path.basename(ffmpeg_url))!r}')
downloader = FFMPEGDownloader(ffmpeg_url, os.path.join(install_dir, os.path.basename(ffmpeg_url)))
downloader.download()
# Decompress downloaded file
console.print(f'[cyan]Decompressing downloaded file to: [red]{install_dir!r}')
logging.info(f'Decompressing downloaded file to {install_dir!r}')
decompress_file(os.path.join(install_dir, os.path.basename(ffmpeg_url)), install_dir)
# Move ffmpeg executable to top level
console.print(f'[cyan]Moving ffmpeg executable to top level of [red]{install_dir!r}')
logging.info(f'Moving ffmpeg executable to top level of {install_dir!r}')
move_ffmpeg_exe_to_top_level(install_dir)
# Add install directory to environment PATH variable
console.print(f'[cyan]Adding [red]{install_dir} [cyan]to environment PATH variable')
logging.info(f'Adding {install_dir} to environment PATH variable')
add_install_dir_to_environment_path(install_dir)
def check_ffmpeg() -> bool:
"""
Check if FFmpeg is installed and available on the system PATH.
This function checks if FFmpeg is installed and available on the system PATH.
If FFmpeg is found, it prints its version. If not found, it attempts to download
FFmpeg and add it to the system PATH.
Returns:
bool: If ffmpeg is present or not
"""
console.print("[cyan]Checking FFmpeg[white]...")
try:
# Try running the FFmpeg command to check if it exists
subprocess.run(["ffmpeg"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Get and print FFmpeg version
if show_version:
get_version()
return True
except:
try:
# If FFmpeg is not found, attempt to download and add it to the PATH
console.print("[cyan]FFmpeg is not found in the PATH. Downloading and adding to the PATH...[/cyan]")
# Download FFmpeg and add it to the PATH
download_ffmpeg()
raise
except Exception as e:
# If unable to download or add FFmpeg to the PATH
console.print("[red]Unable to download or add FFmpeg to the PATH.[/red]")
console.print(f"Error: {e}")
print()
return False

View File

@ -7,12 +7,9 @@ from urllib.parse import quote_plus, urlparse, parse_qs
from typing import Generator, Optional
# External library
from bs4 import BeautifulSoup
# Internal utilities
# External libraries
from Src.Lib.Request import requests
from bs4 import BeautifulSoup
def filter_result(link: str) -> Optional[str]:

View File

@ -7,12 +7,12 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# External library
# External libraries
from Src.Lib.Request import requests
from unidecode import unidecode
# Internal utilities
from Src.Lib.Request.my_requests import requests
from Src.Util.headers import get_headers
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console, Panel
@ -24,9 +24,9 @@ from Src.Util.os import (
format_size,
create_folder,
reduce_base_name,
remove_special_characters
remove_special_characters,
can_create_file
)
from Src.Util.file_validator import can_create_file
# Logic class
@ -46,15 +46,19 @@ from ..E_Table import report_table
# Config
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_FILTER', 'specific_list_audio')
DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_FILTER', 'specific_list_subtitles')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_FILTER', 'cleanup_tmp_folder')
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_DOWNLOAD', 'specific_list_audio')
DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_DOWNLOAD', 'specific_list_subtitles')
DOWNLOAD_VIDEO = config_manager.get_bool('M3U8_DOWNLOAD', 'download_video')
DOWNLOAD_AUDIO = config_manager.get_bool('M3U8_DOWNLOAD', 'download_audio')
DOWNLOAD_SUB = config_manager.get_bool('M3U8_DOWNLOAD', 'download_sub')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_DOWNLOAD', 'cleanup_tmp_folder')
FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution')
CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report')
# Variable
headers_index = config_manager.get_dict('M3U8_REQUESTS', 'index')
headers_index = config_manager.get_dict('REQUESTS', 'index')
class Downloader():
@ -454,8 +458,8 @@ class Downloader():
# Check if file to rename exist
logging.info(f"Check if end file converted exist: {out_path}")
if not os.path.exists(out_path):
logging.info("Video file converted not exist.")
if out_path is None or not os.path.isfile(out_path):
logging.error("Video file converted not exist.")
sys.exit(0)
# Rename the output file to the desired output filename if not exist
@ -513,12 +517,16 @@ class Downloader():
self.__manage_playlist__(m3u8_playlist_text)
# Start all download ...
self.__donwload_video__(server_ip)
self.__donwload_audio__(server_ip)
self.__download_subtitle__()
if DOWNLOAD_VIDEO:
self.__donwload_video__(server_ip)
if DOWNLOAD_AUDIO:
self.__donwload_audio__(server_ip)
if DOWNLOAD_SUB:
self.__download_subtitle__()
# Check file to convert
converted_out_path = None
there_is_video: bool = (len(self.downloaded_video) > 0)
there_is_audio: bool = (len(self.downloaded_audio) > 0)
there_is_subtitle: bool = (len(self.downloaded_subtitle) > 0)
console.log(f"[cyan]Conversion [white]=> ([green]Audio: [yellow]{there_is_audio}[white], [green]Subtitle: [yellow]{there_is_subtitle}[white])")
@ -529,7 +537,8 @@ class Downloader():
# Join only video ( audio is present in the same ts files )
else:
converted_out_path = self.__join_video__()
if there_is_video:
converted_out_path = self.__join_video__()
# Join subtitle
if there_is_subtitle:

View File

@ -13,6 +13,7 @@ from urllib.parse import urljoin, urlparse, urlunparse
# External libraries
from Src.Lib.Request import requests
from tqdm import tqdm
@ -20,7 +21,6 @@ from tqdm import tqdm
from Src.Util.console import console
from Src.Util.headers import get_headers
from Src.Util.color import Colors
from Src.Lib.Request.my_requests import requests
from Src.Util._jsonConfig import config_manager
# Logic class
@ -34,15 +34,14 @@ from ..M3U8 import (
# Config
TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers')
TQDM_SHOW_PROGRESS = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_show_progress')
REQUEST_TIMEOUT = config_manager.get_int('M3U8_REQUESTS', 'timeout')
REQUEST_VERIFY_SSL = config_manager.get_bool('M3U8_REQUESTS', 'verify_ssl')
REQUEST_DISABLE_ERROR = config_manager.get_bool('M3U8_REQUESTS', 'disable_error')
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
REQUEST_VERIFY_SSL = config_manager.get_bool('REQUESTS', 'verify_ssl')
REQUEST_DISABLE_ERROR = config_manager.get_bool('REQUESTS', 'disable_error')
# Variable
headers_index = config_manager.get_dict('M3U8_REQUESTS', 'index')
headers_segments = config_manager.get_dict('M3U8_REQUESTS', 'segments')
headers_index = config_manager.get_dict('REQUESTS', 'index')
headers_segments = config_manager.get_dict('REQUESTS', 'segments')
@ -225,15 +224,16 @@ class M3U8_Segments:
# Make request and calculate time duration
start_time = time.time()
response = requests.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, verify_ssl=REQUEST_VERIFY_SSL)
response = requests.get(ts_url, headers=headers_segments, verify_ssl=REQUEST_VERIFY_SSL)
duration = time.time() - start_time
if response.ok:
# Get the content of the segment
segment_content = response.content
if TQDM_SHOW_PROGRESS:
self.class_ts_estimator.update_progress_bar(segment_content, duration, progress_bar)
# Update bar
self.class_ts_estimator.update_progress_bar(segment_content, duration, progress_bar)
# Decrypt the segment content if decryption is needed
if self.decryption is not None:
@ -295,12 +295,16 @@ class M3U8_Segments:
"""
stop_event = threading.Event() # Event to signal stopping
# bar_format="{desc}: {percentage:.0f}% | {bar} | {n_fmt}/{total_fmt} [ {elapsed}<{remaining}, {rate_fmt}{postfix} ]"
if TQDM_USE_LARGE_BAR:
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
else:
bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
progress_bar = tqdm(
total=len(self.segments),
unit='s',
ascii=' #',
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]",
bar_format=bar_format,
dynamic_ncols=True,
ncols=80,
mininterval=0.01
@ -326,7 +330,7 @@ class M3U8_Segments:
for index, segment_url in enumerate(self.segments):
# Check for Ctrl+C before starting each download task
time.sleep(0.025)
time.sleep(0.03)
if self.ctrl_c_detected:
console.log("[red]Ctrl+C detected. Stopping further downloads.")

View File

@ -12,6 +12,12 @@ from tqdm import tqdm
# Internal utilities
from Src.Util.color import Colors
from Src.Util.os import format_size
from Src.Util._jsonConfig import config_manager
# Variable
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
class M3U8_Ts_Estimator:
@ -127,16 +133,25 @@ class M3U8_Ts_Estimator:
self.add_ts_file(total_downloaded * self.total_segments, total_downloaded, duration)
# Get downloaded size and total estimated size
downloaded_file_size_str = self.get_downloaded_size().split(' ')[0]
downloaded_file_size_str = self.get_downloaded_size()
file_total_size = self.calculate_total_size()
# Fix parameter for prefix
number_file_downloaded = downloaded_file_size_str.split(' ')[0]
number_file_total_size = file_total_size.split(' ')[0]
units_file_downloaded = downloaded_file_size_str.split(' ')[1]
units_file_total_size = file_total_size.split(' ')[1]
average_internet_speed = self.get_average_speed()
# Update the progress bar's postfix
progress_counter.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_file_size_str} {Colors.WHITE}< {Colors.GREEN}{number_file_total_size} {Colors.RED}{units_file_total_size} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s"
)
if TQDM_USE_LARGE_BAR:
progress_counter.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{number_file_downloaded} {Colors.WHITE}< {Colors.GREEN}{number_file_total_size} {Colors.RED}{units_file_total_size} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s"
)
else:
progress_counter.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{number_file_downloaded}{Colors.RED} {units_file_downloaded} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s"
)

View File

@ -8,7 +8,7 @@ from .lib_parser import load
# External libraries
from Src.Lib.Request.my_requests import requests
from Src.Lib.Request import requests
# Costant
@ -254,7 +254,14 @@ class M3U8_Audio:
Returns:
list: List of dictionaries containing 'name', 'language', and 'uri' for all audio in the list.
"""
return [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist]
audios_list = [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist]
unique_audios_dict = {}
# Remove duplicate
for audio in audios_list:
unique_audios_dict[audio['language']] = audio
return list(unique_audios_dict.values())
def get_default_uri(self):
"""
@ -308,7 +315,14 @@ class M3U8_Subtitle:
Returns:
list: List of dictionaries containing 'name' and 'uri' for all subtitles in the list.
"""
return [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist]
subtitles_list = [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist]
unique_subtitles_dict = {}
# Remove duplicate
for subtitle in subtitles_list:
unique_subtitles_dict[subtitle['language']] = subtitle
return list(unique_subtitles_dict.values())
def get_default_uri(self):
"""

View File

@ -36,10 +36,10 @@ from Src.Util._jsonConfig import config_manager
# Default settings
HTTP_TIMEOUT = 5
HTTP_RETRIES = 1
HTTP_TIMEOUT = config_manager.get_int('REQUESTS', 'timeout')
HTTP_RETRIES = config_manager.get_int('REQUESTS', 'max_retry')
HTTP_DELAY = 1
HTTP_DISABLE_ERROR = config_manager.get_bool('M3U8_REQUESTS', 'disable_error')
HTTP_DISABLE_ERROR = config_manager.get_bool('REQUESTS', 'disable_error')
@ -383,7 +383,7 @@ class ManageRequests:
logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}")
if self.attempt < self.retries:
logging.info(f"Retrying request for URL '{self.url}' (attempt {self.attempt}/{self.retries})")
logging.error(f"Retry request for URL '{self.url}' (attempt {self.attempt}/{self.retries})")
time.sleep(HTTP_DELAY)
else:

View File

@ -11,8 +11,8 @@ import tempfile
from typing import Dict, List
# Internal utilities
from ..Request import requests
# Internal libraries
from Src.Lib.Request import requests

View File

@ -7,6 +7,9 @@ import time
# Internal utilities
from .version import __version__
from Src.Util.console import console
# External library
from Src.Lib.Request import requests
@ -53,7 +56,7 @@ def update():
if __version__ != last_version:
console.print(f"[red]New version available: [yellow]{last_version}")
else:
console.print(f"[green]Everything is up to date")
console.print(f"[red]Everything is up to date")
console.print("\n")
console.print(f"[red]{repo_name} has been downloaded [yellow]{total_download_count} [red]times, but only [yellow]{percentual_stars}% [red]of users have starred it.\n\

View File

@ -1,217 +0,0 @@
# 11.04.24
import os
import sys
import datetime
import tempfile
import configparser
import logging
import json
from typing import Union, List
# Variable
repo_name = "StreamingCommunity_api"
config_file_name = f"{repo_name}_config.ini"
class ConfigError(Exception):
"""
Exception raised for errors related to configuration management.
"""
def __init__(self, message: str):
"""
Initialize ConfigError with the given error message.
Args:
message (str): The error message.
"""
self.message = message
super().__init__(self.message)
logging.error(self.message)
class ConfigManager:
"""
Class to manage configuration settings using a config file.
"""
def __init__(self, defaults: dict = None):
"""
Initialize ConfigManager.
Args:
- defaults (dict, optional): A dictionary containing default values for variables. Default is None.
"""
self.config_file_path = os.path.join(tempfile.gettempdir(), config_file_name)
logging.info(f"Read file: {self.config_file_path}")
self.defaults = defaults
self.config = configparser.ConfigParser()
self._check_config_file()
def _check_config_file(self):
"""
Checks if the configuration file exists and contains all the default values.
"""
if os.path.exists(self.config_file_path):
# If the configuration file exists, check if default values are present
self.config.read(self.config_file_path)
if self.defaults:
for section, options in self.defaults.items():
if not self.config.has_section(section):
# If section is missing, rewrite default values
logging.info(f"Writing default values for section: {section}")
self._write_defaults()
return
for key, value in options.items():
if not self.config.has_option(section, key):
# If key is missing, rewrite default values
logging.info(f"Writing default value for key: {key} in section: {section}")
self._write_defaults()
return
else:
logging.info("Configuration file does not exist. Writing default values.")
self._write_defaults()
def _write_defaults(self):
"""
Writes the default values to the configuration file.
"""
with open(self.config_file_path, 'w') as config_file:
if self.defaults:
for section, options in self.defaults.items():
if not self.config.has_section(section):
self.config.add_section(section)
for key, value in options.items():
self.config.set(section, key, str(value))
self.config.write(config_file)
logging.info(f"Created config file: {self.config_file_path}")
def _check_section_and_key(self, section: str, key: str) -> None:
"""
Check if the given section and key exist in the configuration file.
Args:
- section (str): The section in the config file.
- key (str): The key of the variable.
Raises:
ConfigError: If the section or key does not exist.
"""
logging.info(f"Check section: {section}, key: {key}")
if not self.config.has_section(section):
raise ConfigError(f"Section '{section}' does not exist in the configuration file.")
if not self.config.has_option(section, key):
raise ConfigError(f"Key '{key}' does not exist in section '{section}'.")
def get_int(self, section: str, key: str, default: Union[int, None] = None) -> Union[int, None]:
"""
Get the value of a variable from the config file as an integer.
Args:
- section (str): The section in the config file.
- key (str): The key of the variable.
- default (int, optional): Default value if the variable doesn't exist. Default is None.
Returns:
int or None: Value of the variable as an integer or default value.
"""
try:
self._check_section_and_key(section, key)
return int(self.config.get(section, key))
except (ConfigError, ValueError):
return default
def get_string(self, section: str, key: str, default: Union[str, None] = None) -> Union[str, None]:
"""
Get the value of a variable from the config file as a string.
Args:
- section (str): The section in the config file.
- key (str): The key of the variable.
- default (str, optional): Default value if the variable doesn't exist. Default is None.
Returns:
str or None: Value of the variable as a string or default value.
"""
try:
self._check_section_and_key(section, key)
return self.config.get(section, key)
except ConfigError:
return default
def get_bool(self, section: str, key: str, default: Union[bool, None] = None) -> Union[bool, None]:
"""
Get the value of a variable from the config file as a boolean.
Args:
- section (str): The section in the config file.
- key (str): The key of the variable.
- default (bool, optional): Default value if the variable doesn't exist. Default is None.
Returns:
bool or None: Value of the variable as a boolean or default value.
"""
try:
self._check_section_and_key(section, key)
return self.config.getboolean(section, key)
except ConfigError:
return default
def get_list(self, section: str, key: str, default: Union[List, None] = None) -> Union[List, None]:
"""
Get the value of a variable from the config file as a list.
Args:
- section (str): The section in the config file.
- key (str): The key of the variable.
- default (List, optional): Default value if the variable doesn't exist. Default is None.
Returns:
List or None: Value of the variable as a list or default value.
"""
try:
self._check_section_and_key(section, key)
value = self.config.get(section, key)
return json.loads(value)
except (ConfigError, json.JSONDecodeError):
return default
def add_variable(self, section: str, key: str, value: Union[int, str, bool, List]) -> None:
"""
Add or update a variable in the config file.
Args:
- section (str): The section in the config file.
- key (str): The key of the variable.
- value (int, str, bool, List): The value of the variable.
"""
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, key, str(value))
with open(self.config_file_path, 'w') as config_file:
self.config.write(config_file)
logging.info(f"Added or updated variable '{key}' in section '{section}'")
# Output
defaults = {
'Setting': {
'ffmpeg': False, # Ffmpeg is present
'path': False, # Backup path for win
'date' : str(datetime.date.today()) # Date time now
}
}
temp_config_manager = ConfigManager(defaults=defaults)

View File

@ -1,140 +0,0 @@
# 07.04.24
import os
import platform
import logging
# Winreg only work for windows
if platform.system() == "Windows":
# Winreg only work for windows
import winreg
# Define Windows registry key for user environment variables
env_keys = winreg.HKEY_CURRENT_USER, "Environment"
else:
env_keys = None
def get_env(name: str) -> str:
"""
Retrieve the value of the specified environment variable from the Windows registry.
Args:
- name (str): The name of the environment variable to retrieve.
Returns:
str: The value of the specified environment variable.
"""
logging.info("Get enviroment key")
try:
with winreg.OpenKey(*env_keys, 0, winreg.KEY_READ) as key:
return winreg.QueryValueEx(key, name)[0]
except FileNotFoundError:
return ""
def set_env_path(dir: str) -> None:
"""
Add a directory to the user's PATH environment variable.
Args:
- dir (str): The directory to add to the PATH environment variable.
"""
user_path = get_env("Path")
if dir not in user_path:
new_path = user_path + os.pathsep + dir
try:
with winreg.OpenKey(*env_keys, 0, winreg.KEY_WRITE) as key:
winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ, new_path)
logging.info(f"Added {dir} to PATH.")
except Exception as e:
logging.error(f"Failed to set PATH: {e}")
else:
logging.info("Directory already exists in the Path for set new env path.")
def remove_from_path(dir) -> None:
"""
Remove a directory from the user's PATH environment variable.
Args:
- dir (str): The directory to remove from the PATH environment variable.
"""
user_path = get_env("Path")
if dir in user_path:
new_path = user_path.replace(dir + os.pathsep, "").replace(os.pathsep + dir, "")
try:
with winreg.OpenKey(*env_keys, 0, winreg.KEY_WRITE) as key:
winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ, new_path)
logging.info(f"Removed {dir} from PATH.")
except Exception as e:
logging.error(f"Failed to remove directory from PATH: {e}")
else:
logging.info("Directory does not exist in the Path.")
def backup_path():
"""
Backup the original state of the PATH environment variable.
"""
original_path = get_env("Path")
try:
# Create backup dir
script_dir = os.path.join(os.path.expanduser("~"), "Backup")
os.makedirs(script_dir, exist_ok=True)
backup_file = os.path.join(script_dir, "path_backup.txt")
logging.info(f"Crete file: {backup_file}")
# Check if backup file exist
if not os.path.exists(backup_file):
with open(backup_file, "w") as f:
for path in original_path.split("\n"):
if len(path) > 3:
f.write(f"{path}; \n")
logging.info("Backup of PATH variable created.")
print("Backup of PATH variable created.")
except Exception as e:
logging.error(f"Failed to create backup of PATH variable: {e}")
print(f"Failed to create backup of PATH variable: {e}")
def restore_path():
"""
Restore the original state of the PATH environment variable.
"""
try:
backup_file = "path_backup.txt"
logging.info(f"Read file: {backup_file}")
if os.path.isfile(backup_file):
with open(backup_file, "r") as f:
new_path = f.read()
with winreg.OpenKey(*env_keys, 0, winreg.KEY_WRITE) as key:
winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ, new_path)
logging.info("Restored original PATH variable.")
os.remove(backup_file)
else:
logging.error("No backup file found.")
except Exception as e:
logging.error(f"Failed to restore PATH variable: {e}")

View File

@ -1,95 +0,0 @@
# 16.05.24
import os
import errno
import platform
import unicodedata
# List of invalid characters for Windows filenames
WINDOWS_INVALID_CHARS = '<>:"/\\|?*'
WINDOWS_RESERVED_NAMES = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"
]
# Invalid characters for macOS filenames
MACOS_INVALID_CHARS = '/:'
# Invalid characters for Linux/Android filenames
LINUX_INVALID_CHARS = '/\0'
# Maximum path length for Windows
WINDOWS_MAX_PATH = 260
def is_valid_filename(filename, system):
"""
Validates if the given filename is valid for the specified system.
Args:
- filename (str): The filename to validate.
- system (str): The operating system, e.g., 'Windows', 'Darwin' (macOS), or others for Linux/Android.
Returns:
bool: True if the filename is valid, False otherwise.
"""
# Normalize Unicode
filename = unicodedata.normalize('NFC', filename)
# Common checks across all systems
if filename.endswith(' ') or filename.endswith('.') or filename.endswith('/'):
return False
if filename.startswith('.') and system == "Darwin":
return False
# System-specific checks
if system == "Windows":
if len(filename) > WINDOWS_MAX_PATH:
return False
if any(char in filename for char in WINDOWS_INVALID_CHARS):
return False
name, ext = os.path.splitext(filename)
if name.upper() in WINDOWS_RESERVED_NAMES:
return False
elif system == "Darwin": # macOS
if any(char in filename for char in MACOS_INVALID_CHARS):
return False
else: # Linux and Android
if any(char in filename for char in LINUX_INVALID_CHARS):
return False
return True
def can_create_file(file_path):
"""
Checks if a file can be created at the given file path.
Args:
- file_path (str): The path where the file is to be created.
Returns:
bool: True if the file can be created, False otherwise.
"""
current_system = platform.system()
if not is_valid_filename(os.path.basename(file_path), current_system):
return False
try:
with open(file_path, 'w') as file:
pass
os.remove(file_path) # Cleanup if the file was created
return True
except OSError as e:
if e.errno in (errno.EACCES, errno.ENOENT, errno.EEXIST, errno.ENOTDIR):
return False
raise

View File

@ -1,56 +0,0 @@
# 26.05.24
import subprocess
def is_node_installed() -> bool:
"""
Checks if Node.js is installed on the system.
Returns:
bool: True if Node.js is installed, False otherwise.
"""
try:
# Run the command 'node -v' to get the Node.js version
result = subprocess.run(['node', '-v'], capture_output=True, text=True, check=True)
# If the command runs successfully and returns a version number, Node.js is installed
if result.stdout.startswith('v'):
return True
except (subprocess.CalledProcessError, FileNotFoundError):
# If there is an error running the command or the command is not found, Node.js is not installed
return False
return False
def run_node_script(script_content: str) -> str:
"""
Runs a Node.js script and returns its output.
Args:
script_content (str): The content of the Node.js script to run.
Returns:
str: The output of the Node.js script.
"""
# Check if Node.js is installed
if not is_node_installed():
raise EnvironmentError("Node.js is not installed on the system.")
# Write the script content to a temporary file
with open('script.js', 'w') as file:
file.write(script_content)
try:
# Run the Node.js script using subprocess and capture the output
result = subprocess.run(['node', 'script.js'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error running Node.js script: {e.stderr}")
finally:
# Clean up the temporary script file
import os
os.remove('script.js')

View File

@ -2,48 +2,35 @@
import re
import os
import sys
import ssl
import time
import json
import errno
import shutil
import hashlib
import logging
import zipfile
import platform
import importlib
import subprocess
import importlib.metadata
from typing import List
# Variable
special_chars_to_remove = [
'!',
'@',
'#',
'$',
'%',
'^',
'&',
'*',
'(',
')',
'[',
']',
'{',
'}',
'<',
'|',
'`',
'~',
"'",
'"',
';',
':',
',',
'?',
"\\",
"/"
]
# External library
import unicodedata
# Internal utilities
from .console import console
# --> OS FILE ASCII
special_chars_to_remove = ['!','@','#','$','%','^','&','*','(',')','[',']','{','}','<','|','`','~',"'",'"',';',':',',','?',"\\","/"]
def get_max_length_by_os(system: str) -> int:
"""
Determines the maximum length for a base name based on the operating system.
@ -95,7 +82,28 @@ def reduce_base_name(base_name: str) -> str:
return base_name
def remove_special_characters(input_string):
"""
Remove specified special characters from a string.
Args:
- input_string (str): The input string containing special characters.
- special_chars (list): List of special characters to be removed.
Returns:
str: A new string with specified special characters removed.
"""
# Compile regular expression pattern to match special characters
pattern = re.compile('[' + re.escape(''.join(special_chars_to_remove)) + ']')
# Use compiled pattern to replace special characters with an empty string
cleaned_string = pattern.sub('', input_string)
return cleaned_string
# --> OS MANAGE FOLDER
def create_folder(folder_name: str) -> None:
"""
Create a directory if it does not exist, and log the result.
@ -160,7 +168,6 @@ def remove_folder(folder_path: str) -> None:
except OSError as e:
print(f"Error removing folder '{folder_path}': {e}")
def remove_file(file_path: str) -> None:
"""
Remove a file if it exists
@ -175,27 +182,6 @@ def remove_file(file_path: str) -> None:
except OSError as e:
print(f"Error removing file '{file_path}': {e}")
def remove_special_characters(input_string):
"""
Remove specified special characters from a string.
Args:
- input_string (str): The input string containing special characters.
- special_chars (list): List of special characters to be removed.
Returns:
str: A new string with specified special characters removed.
"""
# Compile regular expression pattern to match special characters
pattern = re.compile('[' + re.escape(''.join(special_chars_to_remove)) + ']')
# Use compiled pattern to replace special characters with an empty string
cleaned_string = pattern.sub('', input_string)
return cleaned_string
def move_file_one_folder_up(file_path) -> None:
"""
Move a file one folder up from its current location.
@ -219,7 +205,6 @@ def move_file_one_folder_up(file_path) -> None:
# Move the file
os.rename(file_path, new_path)
def delete_files_except_one(folder_path: str, keep_file: str) -> None:
"""
Delete all files in a folder except for one specified file.
@ -245,7 +230,6 @@ def delete_files_except_one(folder_path: str, keep_file: str) -> None:
except Exception as e:
logging.error(f"An error occurred: {e}")
def decompress_file(downloaded_file_path: str, destination: str) -> None:
"""
Decompress one file.
@ -262,6 +246,8 @@ def decompress_file(downloaded_file_path: str, destination: str) -> None:
raise
# --> OS MANAGE JSON
def read_json(path: str):
"""Reads JSON file and returns its content.
@ -277,7 +263,6 @@ def read_json(path: str):
return config
def save_json(json_obj, path: str) -> None:
"""Saves JSON object to the specified file path.
@ -289,7 +274,6 @@ def save_json(json_obj, path: str) -> None:
with open(path, 'w') as file:
json.dump(json_obj, file, indent=4) # Adjust the indentation as needed
def clean_json(path: str) -> None:
"""Reads JSON data from the file, cleans it, and saves it back.
@ -314,6 +298,8 @@ def clean_json(path: str) -> None:
save_json(modified_data, path)
# --> OS MANAGE SIZE FILE
def format_size(size_bytes: float) -> str:
"""
Format the size in bytes into a human-readable format.
@ -340,6 +326,9 @@ def format_size(size_bytes: float) -> str:
return f"{size_bytes:.2f} {units[unit_index]}"
# --> OS MANAGE KEY AND IV HEX
def compute_sha1_hash(input_string: str) -> str:
"""
Computes the SHA-1 hash of the input string.
@ -356,7 +345,6 @@ def compute_sha1_hash(input_string: str) -> str:
# Return the hashed string
return hashed_string
def decode_bytes(bytes_data: bytes, encodings_to_try: List[str] = None) -> str:
"""
Decode a byte sequence using a list of encodings and return the decoded string.
@ -387,7 +375,6 @@ def decode_bytes(bytes_data: bytes, encodings_to_try: List[str] = None) -> str:
logging.info("Raw byte data: %s", bytes_data)
return None
def convert_to_hex(bytes_data: bytes) -> str:
"""
Convert a byte sequence to its hexadecimal representation.
@ -400,4 +387,195 @@ def convert_to_hex(bytes_data: bytes) -> str:
"""
hex_data = ''.join(['{:02x}'.format(char) for char in bytes_data])
logging.info("Hexadecimal representation of the data: %s", hex_data)
return hex_data
return hex_data
# --> OS GET SUMMARY
def get_executable_version(command):
try:
version_output = subprocess.check_output(command, stderr=subprocess.STDOUT).decode().split('\n')[0]
return version_output.split(" ")[2]
except (FileNotFoundError, subprocess.CalledProcessError):
print(f"{command[0]} not found")
sys.exit(0)
def get_library_version(lib_name):
try:
version = importlib.metadata.version(lib_name)
return f"{lib_name}-{version}"
except importlib.metadata.PackageNotFoundError:
return f"{lib_name}-not installed"
def get_system_summary():
console.print("[bold blue]System Summary[/bold blue][white]:")
# Python version and platform
python_version = sys.version.split()[0]
python_implementation = platform.python_implementation()
arch = platform.machine()
os_info = platform.platform()
openssl_version = ssl.OPENSSL_VERSION
glibc_version = 'glibc ' + '.'.join(map(str, platform.libc_ver()[1]))
console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})[/bold red]")
logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})")
# ffmpeg and ffprobe versions
ffmpeg_version = get_executable_version(['ffmpeg', '-version'])
ffprobe_version = get_executable_version(['ffprobe', '-version'])
console.print(f"[cyan]Exe versions[white]: [bold red]ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}[/bold red]")
logging.info(f"Exe versions: ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}")
# Optional libraries versions
optional_libraries = ['bs4', 'certifi', 'tqdm', 'rich', 'unidecode']
optional_libs_versions = [get_library_version(lib) for lib in optional_libraries]
console.print(f"[cyan]Libraries[white]: [bold red]{', '.join(optional_libs_versions)}[/bold red]\n")
logging.info(f"Libraries: {', '.join(optional_libs_versions)}")
# --> OS MANAGE NODE JS
def is_node_installed() -> bool:
"""
Checks if Node.js is installed on the system.
Returns:
bool: True if Node.js is installed, False otherwise.
"""
try:
# Run the command 'node -v' to get the Node.js version
result = subprocess.run(['node', '-v'], capture_output=True, text=True, check=True)
# If the command runs successfully and returns a version number, Node.js is installed
if result.stdout.startswith('v'):
return True
except (subprocess.CalledProcessError, FileNotFoundError):
# If there is an error running the command or the command is not found, Node.js is not installed
return False
return False
def run_node_script(script_content: str) -> str:
"""
Runs a Node.js script and returns its output.
Args:
script_content (str): The content of the Node.js script to run.
Returns:
str: The output of the Node.js script.
"""
# Check if Node.js is installed
if not is_node_installed():
raise EnvironmentError("Node.js is not installed on the system.")
# Write the script content to a temporary file
with open('script.js', 'w') as file:
file.write(script_content)
try:
# Run the Node.js script using subprocess and capture the output
result = subprocess.run(['node', 'script.js'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error running Node.js script: {e.stderr}")
finally:
# Clean up the temporary script file
import os
os.remove('script.js')
# --> OS FILE VALIDATOR
# List of invalid characters for Windows filenames
WINDOWS_INVALID_CHARS = '<>:"/\\|?*'
WINDOWS_RESERVED_NAMES = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"
]
# Invalid characters for macOS filenames
MACOS_INVALID_CHARS = '/:'
# Invalid characters for Linux/Android filenames
LINUX_INVALID_CHARS = '/\0'
# Maximum path length for Windows
WINDOWS_MAX_PATH = 260
def is_valid_filename(filename, system):
"""
Validates if the given filename is valid for the specified system.
Args:
- filename (str): The filename to validate.
- system (str): The operating system, e.g., 'Windows', 'Darwin' (macOS), or others for Linux/Android.
Returns:
bool: True if the filename is valid, False otherwise.
"""
# Normalize Unicode
filename = unicodedata.normalize('NFC', filename)
# Common checks across all systems
if filename.endswith(' ') or filename.endswith('.') or filename.endswith('/'):
return False
if filename.startswith('.') and system == "Darwin":
return False
# System-specific checks
if system == "Windows":
if len(filename) > WINDOWS_MAX_PATH:
return False
if any(char in filename for char in WINDOWS_INVALID_CHARS):
return False
name, ext = os.path.splitext(filename)
if name.upper() in WINDOWS_RESERVED_NAMES:
return False
elif system == "Darwin": # macOS
if any(char in filename for char in MACOS_INVALID_CHARS):
return False
else: # Linux and Android
if any(char in filename for char in LINUX_INVALID_CHARS):
return False
return True
def can_create_file(file_path):
"""
Checks if a file can be created at the given file path.
Args:
- file_path (str): The path where the file is to be created.
Returns:
bool: True if the file can be created, False otherwise.
"""
current_system = platform.system()
if not is_valid_filename(os.path.basename(file_path), current_system):
return False
try:
with open(file_path, 'w') as file:
pass
os.remove(file_path) # Cleanup if the file was created
return True
except OSError as e:
if e.errno in (errno.EACCES, errno.ENOENT, errno.EEXIST, errno.ENOTDIR):
return False
raise

View File

@ -3,32 +3,36 @@
"debug": false,
"log_file": "app.log",
"log_to_file": true,
"show_message": true,
"clean_console": true,
"show_message": false,
"clean_console": false,
"root_path": "Video",
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
"not_close": false
},
"REQUESTS": {
"disable_error": false,
"timeout": 10,
"max_retry": 3,
"verify_ssl": false,
"index": {"user-agent": ""},
"segments": { "user-agent": ""}
},
"M3U8_DOWNLOAD": {
"tdqm_workers": 30,
"tqdm_show_progress": true,
"tqdm_use_large_bar": true,
"download_video": true,
"download_audio": true,
"download_sub": true,
"specific_list_audio": ["ita"],
"specific_list_subtitles": ["eng"],
"cleanup_tmp_folder": false,
"create_report": false
},
"M3U8_FILTER": {
"M3U8_CONVERSION": {
"use_codec": false,
"use_gpu": false,
"default_preset": "ultrafast",
"check_output_conversion": false,
"cleanup_tmp_folder": true,
"specific_list_audio": ["ita"],
"specific_list_subtitles": ["eng"]
},
"M3U8_REQUESTS": {
"disable_error": false,
"timeout": 10,
"verify_ssl": false,
"index": {"user-agent": ""},
"segments": {"user-agent": ""}
"check_output_after_ffmpeg": false
},
"M3U8_PARSER": {
"skip_empty_row_playlist": false,

25
run.py
View File

@ -4,7 +4,6 @@ import sys
import os
import platform
import argparse
import logging
from typing import Callable
@ -13,9 +12,8 @@ from typing import Callable
from Src.Util.message import start_message
from Src.Util.console import console, msg
from Src.Util._jsonConfig import config_manager
from Src.Util._tmpConfig import temp_config_manager
from Src.Upload.update import update as git_update
from Src.Lib.FFmpeg import check_ffmpeg
from Src.Util.os import get_system_summary
from Src.Util.logger import Logger
@ -51,25 +49,10 @@ def initialize():
# Attempting GitHub update
try:
"""try:
git_update()
except Exception as e:
console.print(f"[blue]Req github [white]=> [red]Failed: {e}")
# Check if tmp config ffmpeg is present
if not temp_config_manager.get_bool('Setting', 'ffmpeg'):
output_ffmpeg = check_ffmpeg()
# If ffmpeg is present is win systems change config
if output_ffmpeg:
temp_config_manager.add_variable('Setting', 'ffmpeg', True)
else:
logging.error("FFmpeg not exist")
else:
logging.info("FFmpeg exist")
console.print(f"[blue]Req github [white]=> [red]Failed: {e}")"""
def run_function(func: Callable[..., None], close_console: bool = False) -> None:
@ -93,6 +76,7 @@ def run_function(func: Callable[..., None], close_console: bool = False) -> None
def main():
log_not = Logger()
get_system_summary()
# Parse command line arguments
parser = argparse.ArgumentParser(description='Script to download film and series from the internet.')
@ -142,5 +126,6 @@ def main():
console.print("[red]Invalid category, you need to insert 0, 1, or 2.")
sys.exit(0)
if __name__ == '__main__':
main()