[CORE] Remove unecessary things

This commit is contained in:
Lovi 2025-02-25 18:05:13 +01:00
parent 53790d8c5b
commit 881e36868e
40 changed files with 208 additions and 674 deletions

1
.gitignore vendored
View File

@ -44,7 +44,6 @@ venv.bak/
# Other
Video
note.txt
list_proxy.txt
cmd.txt
bot_config.json
scripts.json

View File

@ -85,4 +85,4 @@ class VideoSource:
logging.error("Failed to retrieve content from the URL.")
except Exception as e:
logging.error(f"An error occurred while parsing the playlist: {e}")
logging.error(f"An error occurred while parsing the playlist: {e}")

View File

@ -16,7 +16,7 @@ from StreamingCommunity.Util.headers import get_userAgent
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
@ -38,9 +38,7 @@ class VideoSource:
Sends a request to the initial URL and extracts the redirect URL.
"""
try:
# Send a GET request to the initial URL
response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=max_timeout)
response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract the redirect URL from the HTML
@ -63,9 +61,7 @@ class VideoSource:
Sends a request to the redirect URL and extracts the Maxstream URL.
"""
try:
# Send a GET request to the redirect URL
response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=max_timeout)
response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract the Maxstream URL from the HTML
@ -89,7 +85,7 @@ class VideoSource:
uprot_url = response.json()['data']['value']
# Retry getting maxtstream url
response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=max_timeout)
response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
maxstream_url = soup.find("a").get("href")
@ -115,9 +111,7 @@ class VideoSource:
Sends a request to the Maxstream URL and extracts the .m3u8 file URL.
"""
try:
# Send a GET request to the Maxstream URL
response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=max_timeout)
response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
@ -148,4 +142,4 @@ class VideoSource:
"""
self.get_redirect_url()
self.get_maxstream_url()
return self.get_m3u8_url()
return self.get_m3u8_url()

View File

@ -16,7 +16,7 @@ from StreamingCommunity.Util.headers import get_userAgent
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
@ -45,40 +45,15 @@ class VideoSource:
Returns:
- str: The response content if successful, None otherwise.
"""
try:
response = self.client.get(
url=url,
headers=self.headers,
follow_redirects=True,
timeout=max_timeout
)
response = self.client.get(url, headers=self.headers, timeout=MAX_TIMEOUT, follow_redirects=True)
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Request failed: {e}")
return None
def parse_html(self, html_content: str) -> BeautifulSoup:
"""
Parse the provided HTML content using BeautifulSoup.
Parameters:
- html_content (str): The HTML content to parse.
Returns:
- BeautifulSoup: Parsed HTML content if successful, None otherwise.
"""
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup
except Exception as e:
logging.error(f"Failed to parse HTML content: {e}")
return None
def get_iframe(self, soup):
"""
Extracts the source URL of the second iframe in the provided BeautifulSoup object.
@ -107,7 +82,7 @@ class VideoSource:
"""
content = self.make_request(url)
if content:
return self.parse_html(content)
return BeautifulSoup(content, "html.parser")
return None
@ -140,7 +115,7 @@ class VideoSource:
logging.error("Failed to fetch HTML content.")
return None
soup = self.parse_html(html_content)
soup = BeautifulSoup(html_content, "html.parser")
if not soup:
logging.error("Failed to parse HTML content.")
return None
@ -190,5 +165,4 @@ class VideoSource:
except Exception as e:
logging.error(f"An error occurred: {e}")
return None
return None

View File

@ -19,7 +19,7 @@ from .Helper.Vixcloud.js_parser import JavaScriptParser
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
@ -60,13 +60,7 @@ class VideoSource:
}
try:
# Make a request to get iframe source
response = httpx.get(
url=f"{self.url}/iframe/{self.media_id}",
params=params,
timeout=max_timeout
)
response = httpx.get(f"{self.url}/iframe/{self.media_id}", params=params, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Parse response with BeautifulSoup to get iframe source
@ -108,19 +102,8 @@ class VideoSource:
"""
try:
if self.iframe_src is not None:
# Make a request to get content
try:
response = httpx.get(
url=self.iframe_src,
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
except Exception as e:
logging.error(f"Failed to get vixcloud contente with error: {e}")
sys.exit(0)
response = httpx.get(self.iframe_src, headers=self.headers, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Parse response with BeautifulSoup to get content
soup = BeautifulSoup(response.text, "html.parser")
@ -140,7 +123,6 @@ class VideoSource:
Returns:
str: Fully constructed playlist URL with authentication parameters
"""
# Initialize parameters dictionary
params = {}
# Add 'h' parameter if video quality is 1080p
@ -167,56 +149,6 @@ class VideoSource:
# Construct the new URL with updated query parameters
return urlunparse(parsed_url._replace(query=query_string))
def get_mp4(self, url_to_download: str, scws_id: str) -> list:
"""
Generate download links for the specified resolutions from StreamingCommunity.
Args:
url_to_download (str): URL of the video page.
scws_id (str): SCWS ID of the title.
Returns:
list: A list of video download URLs.
"""
headers = {
'referer': url_to_download,
'user-agent': get_userAgent(),
}
# API request to get video details
video_api_url = f'{self.url}/api/video/{scws_id}'
response = httpx.get(video_api_url, headers=headers)
if response.status_code == 200:
response_json = response.json()
video_tracks = response_json.get('video_tracks', [])
track = video_tracks[-1]
console.print(f"[cyan]Available resolutions: [red]{[str(track['quality']) for track in video_tracks]}")
# Request download link generation for each track
download_response = httpx.post(
url=f'{self.url}/api/download/generate_link?scws_id={track["video_id"]}&rendition={track["quality"]}',
headers={
'referer': url_to_download,
'user-agent': get_userAgent(),
'x-xsrf-token': config_manager.get("SITE", self.base_name)['extra']['x-xsrf-token']
},
cookies={
'streamingcommunity_session': config_manager.get("SITE", self.base_name)['extra']['streamingcommunity_session']
}
)
if download_response.status_code == 200:
return {'url': download_response.text, 'quality': track["quality"]}
else:
logging.error(f"Failed to generate link for resolution {track['quality']} (HTTP {download_response.status_code}).")
else:
logging.error(f"Error fetching video API URL (HTTP {response.status_code}).")
return []
class VideoSourceAnime(VideoSource):
def __init__(self, url: str):
@ -243,12 +175,7 @@ class VideoSourceAnime(VideoSource):
str: Parsed script content
"""
try:
response = httpx.get(
url=f"{self.url}/embed-url/{episode_id}",
headers=self.headers,
timeout=max_timeout
)
response = httpx.get(f"{self.url}/embed-url/{episode_id}", headers=self.headers, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract and clean embed URL

View File

@ -55,13 +55,7 @@ def title_search(word_to_search: str) -> int:
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
url=search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
verify=site_constant.VERIFY,
follow_redirects=True
)
response = httpx.get(search_url, headers={'user-agent': get_userAgent()}, timeout=max_timeout, follow_redirects=True)
response.raise_for_status()
except Exception as e:

View File

@ -144,12 +144,11 @@ def title_search(title: str) -> int:
# Send a POST request to the API endpoint for live search
try:
response = httpx.post(
url=f'{site_constant.FULL_URL}/livesearch',
f'{site_constant.FULL_URL}/livesearch',
cookies=cookies,
headers=headers,
json=json_data,
timeout=max_timeout,
verify=site_constant.VERIFY
timeout=max_timeout
)
response.raise_for_status()
@ -176,6 +175,7 @@ def title_search(title: str) -> int:
})
if site_constant.TELEGRAM_BOT:
# Crea una stringa formattata per ogni scelta con numero
choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}"
choices.append(choice_text)

View File

@ -55,13 +55,7 @@ def title_search(word_to_search: str) -> int:
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
url=search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
verify=site_constant.VERIFY,
follow_redirects=True
)
response = httpx.get(url=search_url, headers={'user-agent': get_userAgent()}, timeout=max_timeout, follow_redirects=True)
response.raise_for_status()
except Exception as e:

View File

@ -57,13 +57,7 @@ def title_search(word_to_search: str) -> int:
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
url=search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
verify=site_constant.VERIFY,
follow_redirects=True
)
response = httpx.get(search_url, headers={'user-agent': get_userAgent()}, timeout=max_timeout, follow_redirects=True)
response.raise_for_status()
except Exception as e:

View File

@ -55,13 +55,7 @@ def title_search(word_to_search: str) -> int:
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
url=search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
verify=site_constant.VERIFY,
follow_redirects=True
)
response = httpx.get(search_url, headers={'user-agent': get_userAgent()}, timeout=max_timeout, follow_redirects=True)
response.raise_for_status()
except Exception as e:

View File

@ -59,13 +59,7 @@ def title_search(title_search: str) -> int:
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
url=search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
verify=site_constant.VERIFY,
follow_redirects=True
)
response = httpx.get(search_url, headers={'user-agent': get_userAgent()}, timeout=max_timeout, follow_redirects=True)
response.raise_for_status()
except Exception as e:

View File

@ -1,6 +1,5 @@
# 23.11.24
from .recall_search import execute_search
from .get_domain import search_domain
from .manage_ep import (
manage_selection,

View File

@ -1,37 +0,0 @@
# 19.10.24
import os
import sys
def execute_search(info):
"""
Dynamically imports and executes a specified function from a module defined in the info dictionary.
Parameters:
info (dict): A dictionary containing the function name, folder, and module information.
"""
# Define the project path using the folder from the info dictionary
project_path = os.path.dirname(info['folder']) # Get the base path for the project
# Add the project path to sys.path
if project_path not in sys.path:
sys.path.append(project_path)
# Attempt to import the specified function from the module
try:
# Construct the import statement dynamically
module_path = f"StreamingCommunity.Api.Site{info['folder_base']}"
exec(f"from {module_path} import {info['function']}")
# Call the specified function
eval(info['function'])() # Calls the search function
except ModuleNotFoundError as e:
print(f"ModuleNotFoundError: {e}")
except ImportError as e:
print(f"ImportError: {e}")
except Exception as e:
print(f"An error occurred: {e}")

View File

@ -31,10 +31,6 @@ class SiteConstant:
def ROOT_PATH(self):
return config_manager.get('DEFAULT', 'root_path')
@property
def VERIFY(self):
return config_manager.get('REQUESTS', 'verify')
@property
def DOMAIN_NOW(self):
return config_manager.get_site(self.SITE_NAME, 'domain')

View File

@ -47,7 +47,6 @@ FILTER_CUSTOM_REOLUTION = str(config_manager.get('M3U8_PARSER', 'force_resolutio
GET_ONLY_LINK = config_manager.get_bool('M3U8_PARSER', 'get_only_link')
RETRY_LIMIT = config_manager.get_int('REQUESTS', 'max_retry')
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -1,110 +0,0 @@
# 09.06.24
import os
import sys
import logging
from concurrent.futures import ThreadPoolExecutor
# External libraries
import httpx
# Internal utilities
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.headers import get_userAgent
from StreamingCommunity.Util.os import os_manager
class ProxyManager:
def __init__(self, proxy_list=None, url=None):
"""
Initialize ProxyManager with a list of proxies and timeout.
Parameters:
- proxy_list: List of proxy strings
- timeout: Timeout for proxy requests
"""
self.proxy_list = proxy_list or []
self.verified_proxies = []
self.timeout = config_manager.get_float('REQUESTS', 'timeout')
self.url = url
def _check_proxy(self, proxy):
"""
Check if a single proxy is working by making a request to Google.
Parameters:
- proxy: Proxy string to be checked
Returns:
- Proxy string if working, None otherwise
"""
protocol = proxy.split(":")[0].lower()
protocol = f'{protocol}://'
proxy = {protocol: proxy, "https://": proxy}
try:
with httpx.Client(proxies=proxy, verify=False) as client:
response = client.get(self.url, timeout=self.timeout, headers={'user-agent': get_userAgent()})
if response.status_code == 200:
logging.info(f"Proxy {proxy} is working.")
return proxy
except Exception as e:
logging.error(f"Test proxy {proxy} failed: {e}")
return None
def verify_proxies(self):
"""
Verify all proxies in the list and store the working ones.
"""
logging.info("Starting proxy verification...")
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
self.verified_proxies = list(executor.map(self._check_proxy, self.proxy_list))
self.verified_proxies = [proxy for proxy in self.verified_proxies if proxy]
logging.info(f"Verification complete. {len(self.verified_proxies)} proxies are working.")
def get_verified_proxies(self):
"""
Get validate proxies.
"""
if len(self.verified_proxies) > 0:
return self.verified_proxies
else:
logging.error("Cant find valid proxy.")
sys.exit(0)
def main_test_proxy(url_test):
path_file_proxt_list = "list_proxy.txt"
if os_manager.check_file(path_file_proxt_list):
# Read file
with open(path_file_proxt_list, 'r') as file:
ip_addresses = file.readlines()
# Formatt ip
ip_addresses = [ip.strip() for ip in ip_addresses]
formatted_ips = [f"http://{ip}" for ip in ip_addresses]
# Get list of proxy from config.json
proxy_list = formatted_ips
# Verify proxy
manager = ProxyManager(proxy_list, url_test)
manager.verify_proxies()
# Write valid ip in txt file
with open(path_file_proxt_list, 'w') as file:
for ip in ip_addresses:
file.write(f"{ip}\n")
# Return valid proxy
return manager.get_verified_proxies()

View File

@ -22,7 +22,7 @@ from tqdm import tqdm
# Internal utilities
from StreamingCommunity.Util.color import Colors
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.headers import get_userAgent, random_headers
from StreamingCommunity.Util.headers import get_userAgent
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.os import os_manager
@ -34,16 +34,12 @@ from ...M3U8 import (
M3U8_Parser,
M3U8_UrlFix
)
from .proxyes import main_test_proxy
# Config
TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay')
USE_LARGE_BAR = not ("android" in sys.platform or "ios" in sys.platform)
REQUEST_MAX_RETRY = config_manager.get_int('REQUESTS', 'max_retry')
REQUEST_VERIFY = False
THERE_IS_PROXY_LIST = os_manager.check_file("list_proxy.txt")
PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min')
PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max')
REQUEST_VERIFY = config_manager.get_int('REQUESTS', 'verify')
DEFAULT_VIDEO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_video_workser')
DEFAULT_AUDIO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_audio_workser')
MAX_TIMEOOUT = config_manager.get_int("REQUESTS", "timeout")
@ -133,15 +129,6 @@ class M3U8_Segments:
]
self.class_ts_estimator.total_segments = len(self.segments)
# Proxy
if THERE_IS_PROXY_LIST:
console.log("[red]Start validation proxy.")
self.valid_proxy = main_test_proxy(self.segments[0])
console.log(f"[cyan]N. Valid ip: [red]{len(self.valid_proxy)}")
if len(self.valid_proxy) == 0:
sys.exit(0)
def get_info(self) -> None:
if self.is_index_url:
try:
@ -184,18 +171,13 @@ class M3U8_Segments:
else:
print("Signal handler must be set in the main thread")
def _get_http_client(self, index: int = None):
def _get_http_client(self):
client_params = {
#'headers': random_headers(self.key_base_url) if hasattr(self, 'key_base_url') else {'User-Agent': get_userAgent()},
'headers': {'User-Agent': get_userAgent()},
'timeout': SEGMENT_MAX_TIMEOUT,
'follow_redirects': True,
'http2': False
}
if THERE_IS_PROXY_LIST and index is not None and hasattr(self, 'valid_proxy'):
client_params['proxies'] = self.valid_proxy[index % len(self.valid_proxy)]
return httpx.Client(**client_params)
def download_segment(self, ts_url: str, index: int, progress_bar: tqdm, backoff_factor: float = 1.1) -> None:
@ -213,7 +195,7 @@ class M3U8_Segments:
return
try:
with self._get_http_client(index) as client:
with self._get_http_client() as client:
start_time = time.time()
response = client.get(ts_url)
@ -350,7 +332,6 @@ class M3U8_Segments:
# Configure workers and delay
max_workers = self._get_worker_count(type)
delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (len(self.valid_proxy) + 1))) if THERE_IS_PROXY_LIST else TQDM_DELAY_WORKER
# Download segments with completion verification
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@ -361,7 +342,7 @@ class M3U8_Segments:
if self.interrupt_flag.is_set():
break
time.sleep(delay)
time.sleep(TQDM_DELAY_WORKER)
futures.append(executor.submit(self.download_segment, segment_url, index, progress_bar))
# Wait for futures with interrupt handling
@ -429,8 +410,6 @@ class M3U8_Segments:
'audio': DEFAULT_AUDIO_WORKERS
}.get(stream_type.lower(), 1)
if THERE_IS_PROXY_LIST:
return min(len(self.valid_proxy), base_workers * 2)
return base_workers
def _generate_results(self, stream_type: str) -> Dict:

View File

@ -27,15 +27,10 @@ from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
from ...FFmpeg import print_duration_table
# Suppress SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Config
REQUEST_VERIFY = config_manager.get_int('REQUESTS', 'verify')
GET_ONLY_LINK = config_manager.get_bool('M3U8_PARSER', 'get_only_link')
REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
@ -47,6 +42,7 @@ class InterruptHandler:
self.kill_download = False
self.force_quit = False
def signal_handler(signum, frame, interrupt_handler, original_handler):
"""Enhanced signal handler for multiple interrupt scenarios"""
current_time = time.time()
@ -67,6 +63,7 @@ def signal_handler(signum, frame, interrupt_handler, original_handler):
console.print("\n[bold red]Force quit activated. Saving partial download...[/bold red]")
signal.signal(signum, original_handler)
def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = None):
"""
Downloads an MP4 video with enhanced interrupt handling.
@ -111,7 +108,7 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = No
original_handler = signal.signal(signal.SIGINT, partial(signal_handler, interrupt_handler=interrupt_handler, original_handler=signal.getsignal(signal.SIGINT)))
try:
transport = httpx.HTTPTransport(verify=False, http2=True)
transport = httpx.HTTPTransport(verify=REQUEST_VERIFY, http2=True)
with httpx.Client(transport=transport, timeout=httpx.Timeout(60)) as client:
with client.stream("GET", url, headers=headers, timeout=REQUEST_TIMEOUT) as response:

View File

@ -22,10 +22,10 @@ import qbittorrentapi
# Tor config
HOST = str(config_manager.get_dict('DEFAULT', 'config_qbit_tor')['host'])
PORT = str(config_manager.get_dict('DEFAULT', 'config_qbit_tor')['port'])
USERNAME = str(config_manager.get_dict('DEFAULT', 'config_qbit_tor')['user'])
PASSWORD = str(config_manager.get_dict('DEFAULT', 'config_qbit_tor')['pass'])
HOST = config_manager.get_dict('QBIT_CONFIG', 'host')
PORT = config_manager.get_dict('QBIT_CONFIG', 'port')
USERNAME = config_manager.get_dict('QBIT_CONFIG', 'user')
PASSWORD = config_manager.get_dict('QBIT_CONFIG', 'pass')
# Config

View File

@ -1,4 +1,4 @@
# 18.04.24
from .command import join_video, join_audios, join_subtitle
from .util import print_duration_table, get_video_duration
from .util import print_duration_table, get_video_duration

View File

@ -24,8 +24,6 @@ def capture_output(process: subprocess.Popen, description: str) -> None:
- description (str): Description of the command being executed.
"""
try:
# Variable to store the length of the longest progress string
max_length = 0
for line in iter(process.stdout.readline, ''):
@ -94,10 +92,7 @@ def parse_output_line(line: str) -> dict:
dict: A dictionary containing parsed information.
"""
try:
data = {}
# Split the line by whitespace and extract key-value pairs
parts = line.replace(" ", "").replace("= ", "=").split()
for part in parts:
@ -123,7 +118,7 @@ def terminate_process(process):
- process (subprocess.Popen): The subprocess to terminate.
"""
try:
if process.poll() is None: # Check if the process is still running
if process.poll() is None:
process.kill()
except Exception as e:
logging.error(f"Failed to terminate process: {e}")
@ -137,7 +132,6 @@ def capture_ffmpeg_real_time(ffmpeg_command: list, description: str) -> None:
- ffmpeg_command (list): The command to execute ffmpeg.
- description (str): Description of the command being executed.
"""
global terminate_flag
# Clear the terminate_flag before starting a new capture
@ -163,8 +157,8 @@ def capture_ffmpeg_real_time(ffmpeg_command: list, description: str) -> None:
logging.error(f"Error in ffmpeg process: {e}")
finally:
terminate_flag.set() # Signal the output capture thread to terminate
output_thread.join() # Wait for the output capture thread to complete
terminate_flag.set()
output_thread.join()
except Exception as e:
logging.error(f"Failed to start ffmpeg process: {e}")
logging.error(f"Failed to start ffmpeg process: {e}")

View File

@ -83,37 +83,6 @@ def get_video_duration(file_path: str) -> float:
sys.exit(0)
def get_video_duration_s(filename):
"""
Get the duration of a video file using ffprobe.
Parameters:
- filename (str): Path to the video file (e.g., 'sim.mp4')
Returns:
- duration (float): Duration of the video in seconds, or None if an error occurs.
"""
ffprobe_cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', filename]
try:
# Run ffprobe command and capture output
result = subprocess.run(ffprobe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True)
# Extract duration from the output
duration_str = result.stdout.strip()
duration = float(duration_str) # Convert duration to float
return int(duration)
except subprocess.CalledProcessError as e:
print(f"Error running ffprobe: {e}")
return None
except ValueError as e:
print(f"Error converting duration to float: {e}")
return None
def format_duration(seconds: float) -> Tuple[int, int, int]:
"""
Format duration in seconds into hours, minutes, and seconds.

View File

@ -3,7 +3,6 @@
import sys
import time
import logging
import subprocess
import importlib.util
@ -11,155 +10,81 @@ import importlib.util
from StreamingCommunity.Util.console import console
# Check if Crypto module is installed
# Check if Cryptodome module is installed
crypto_spec = importlib.util.find_spec("Cryptodome")
crypto_installed = crypto_spec is not None
if not crypto_installed:
console.log("[red]pycryptodomex non è installato. Per favore installalo. Leggi readme.md [Requirement].")
sys.exit(0)
if crypto_installed:
logging.info("[cyan]Decrypy use: Cryptodomex")
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
logging.info("[cyan]Decrypy use: Cryptodomex")
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
class M3U8_Decryption:
class M3U8_Decryption:
"""
Class for decrypting M3U8 playlist content using AES with pycryptodomex.
"""
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
"""
Class for decrypting M3U8 playlist content using AES encryption when the Crypto module is available.
Initialize the M3U8_Decryption object.
Parameters:
key (bytes): The encryption key.
iv (bytes): The initialization vector (IV).
method (str): The encryption method.
"""
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
"""
Initialize the M3U8_Decryption object.
self.key = key
self.iv = iv
if "0x" in str(iv):
self.iv = bytes.fromhex(iv.replace("0x", ""))
self.method = method
Parameters:
- key (bytes): The encryption key.
- iv (bytes): The initialization vector (IV).
- method (str): The encryption method.
"""
self.key = key
self.iv = iv
if "0x" in str(iv):
self.iv = bytes.fromhex(iv.replace("0x", ""))
self.method = method
# Pre-create the cipher based on the encryption method
if self.method == "AES":
self.cipher = AES.new(self.key, AES.MODE_ECB)
elif self.method == "AES-128":
self.cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv)
elif self.method == "AES-128-CTR":
self.cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv)
else:
raise ValueError("Invalid or unsupported method")
# Precreate cipher based on encryption method
if self.method == "AES":
self.cipher = AES.new(self.key, AES.MODE_ECB)
elif self.method == "AES-128":
self.cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv)
elif self.method == "AES-128-CTR":
self.cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv)
else:
raise ValueError("Invalid or unsupported method")
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt the ciphertext using the specified encryption method.
Parameters:
- ciphertext (bytes): The encrypted content to decrypt.
Returns:
bytes: The decrypted content.
"""
start = time.perf_counter_ns()
#logging.info(f"Ciphertext: {ciphertext}")
# Decrypt based on encryption method
if self.method in {"AES", "AES-128"}:
decrypted_data = self.cipher.decrypt(ciphertext)
decrypted_content = unpad(decrypted_data, AES.block_size)
elif self.method == "AES-128-CTR":
decrypted_content = self.cipher.decrypt(ciphertext)
else:
raise ValueError("Invalid or unsupported method")
end = time.perf_counter_ns()
# Calculate elapsed time with high precision
elapsed_nanoseconds = end - start
elapsed_milliseconds = elapsed_nanoseconds / 1_000_000
elapsed_seconds = elapsed_nanoseconds / 1_000_000_000
# Print performance metrics
logging.info(f"[Crypto Decryption Performance]")
logging.info(f"Method: {self.method}")
logging.info(f"Decryption Time: {elapsed_milliseconds:.4f} ms ({elapsed_seconds:.6f} s)")
logging.info(f"Decrypted Content Length: {len(decrypted_content)} bytes")
return decrypted_content
else:
# Check if openssl command is available
try:
openssl_available = subprocess.run(["openssl", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
logging.info("[cyan]Decrypy use: OPENSSL")
except:
openssl_available = False
if not openssl_available:
console.log("[red]Neither python library: pycryptodomex nor openssl software is installed. Please install either one of them. Read readme.md [Requirement].")
sys.exit(0)
class M3U8_Decryption:
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Class for decrypting M3U8 playlist content using OpenSSL when the Crypto module is not available.
Decrypt the ciphertext using the specified encryption method.
Parameters:
ciphertext (bytes): The encrypted content to decrypt.
Returns:
bytes: The decrypted content.
"""
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
"""
Initialize the M3U8_Decryption object.
#start = time.perf_counter_ns()
Parameters:
- key (bytes): The encryption key.
- iv (bytes): The initialization vector (IV).
- method (str): The encryption method.
"""
self.key = key
self.iv = iv
if "0x" in str(iv):
self.iv = bytes.fromhex(iv.replace("0x", ""))
self.method = method
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
if self.method in {"AES", "AES-128"}:
decrypted_data = self.cipher.decrypt(ciphertext)
decrypted_content = unpad(decrypted_data, AES.block_size)
elif self.method == "AES-128-CTR":
decrypted_content = self.cipher.decrypt(ciphertext)
else:
raise ValueError("Invalid or unsupported method")
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt the ciphertext using the specified encryption method.
"""
end = time.perf_counter_ns()
Parameters:
- ciphertext (bytes): The encrypted content to decrypt.
# Calculate the elapsed time with high precision
elapsed_nanoseconds = end - start
elapsed_milliseconds = elapsed_nanoseconds / 1_000_000
elapsed_seconds = elapsed_nanoseconds / 1_000_000_000
Returns:
bytes: The decrypted content.
"""
start = time.perf_counter_ns()
# Construct OpenSSL command based on encryption method
if self.method == "AES":
openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt'
elif self.method == "AES-128":
openssl_cmd = f'openssl enc -d -aes-128-cbc -K {self.key[:16].hex()} -iv {self.iv.hex()}'
elif self.method == "AES-128-CTR":
openssl_cmd = f'openssl enc -d -aes-128-ctr -K {self.key[:16].hex()} -iv {self.iv.hex()}'
else:
raise ValueError("Invalid or unsupported method")
try:
decrypted_content = subprocess.check_output(openssl_cmd.split(), input=ciphertext, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise ValueError(f"Decryption failed: {e.output.decode()}")
end = time.perf_counter_ns()
# Calculate elapsed time with high precision
elapsed_nanoseconds = end - start
elapsed_milliseconds = elapsed_nanoseconds / 1_000_000
elapsed_seconds = elapsed_nanoseconds / 1_000_000_000
# Print performance metrics
logging.info(f"[OpenSSL Decryption Performance]")
logging.info(f"Method: {self.method}")
logging.info(f"Decryption Time: {elapsed_milliseconds:.4f} ms ({elapsed_seconds:.6f} s)")
logging.info(f"Decrypted Content Length: {len(decrypted_content)} bytes")
return decrypted_content
# Log performance metrics
logging.info("[Crypto Decryption Performance]")
logging.info(f"Method: {self.method}")
logging.info(f"Decryption Time: {elapsed_milliseconds:.4f} ms ({elapsed_seconds:.6f} s)")
logging.info(f"Decrypted Content Length: {len(decrypted_content)} bytes")
"""
return decrypted_content

View File

@ -106,12 +106,8 @@ class M3U8_Ts_Estimator:
try:
self.add_ts_file(total_downloaded * self.total_segments, total_downloaded, duration)
#downloaded_file_size_str = internet_manager.format_file_size(self.now_downloaded_size)
file_total_size = self.calculate_total_size()
#number_file_downloaded = downloaded_file_size_str.split(' ')[0]
number_file_total_size = file_total_size.split(' ')[0]
#units_file_downloaded = downloaded_file_size_str.split(' ')[1]
units_file_total_size = file_total_size.split(' ')[1]
if USE_LARGE_BAR:
@ -126,15 +122,14 @@ class M3U8_Ts_Estimator:
retry_count = self.segments_instance.active_retries if self.segments_instance else 0
progress_str = (
#f"{Colors.WHITE}[ {Colors.GREEN}{number_file_downloaded} {Colors.WHITE}< "
f"{Colors.GREEN}{number_file_total_size} {Colors.RED}{units_file_total_size}"
f"{Colors.WHITE} {Colors.CYAN}{average_internet_speed} {Colors.RED}{average_internet_unit}"
f"{Colors.WHITE} {Colors.GREEN}CRR {Colors.RED}{retry_count} "
)
else:
retry_count = self.segments_instance.active_retries if self.segments_instance else 0
progress_str = (
#f"{Colors.WHITE}[ {Colors.GREEN}{number_file_downloaded} {Colors.WHITE}< "
f"{Colors.GREEN}{number_file_total_size} {Colors.RED}{units_file_total_size}"
f"{Colors.WHITE} {Colors.GREEN}CRR {Colors.RED}{retry_count} "
)

View File

@ -9,10 +9,6 @@ from m3u8 import loads
from StreamingCommunity.Util.os import internet_manager
# External libraries
import httpx
# Costant
CODEC_MAPPINGS = {
"video": {
@ -79,7 +75,6 @@ class M3U8_Codec:
Extracted codecs are set as attributes: audio_codec and video_codec.
"""
try:
# Split the codecs string by comma
codecs_list = self.codecs.split(',')
except Exception as e:
logging.error(f"Can't split codec list: {self.codecs} with error {e}")
@ -407,10 +402,8 @@ class M3U8_Parser:
Parameters:
- m3u8_content (str): The content of the M3U8 file.
"""
# Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles
m3u8_obj = loads(raw_content, uri)
self.__parse_video_info__(m3u8_obj)
self.__parse_subtitles_and_audio__(m3u8_obj)
self.__parse_segments__(m3u8_obj)
@ -469,7 +462,6 @@ class M3U8_Parser:
Parameters:
- m3u8_obj: The M3U8 object containing video playlists.
"""
try:
for playlist in m3u8_obj.playlists:
@ -569,7 +561,6 @@ class M3U8_Parser:
Parameters:
- m3u8_obj: The M3U8 object containing segment data.
"""
try:
for segment in m3u8_obj.segments:
@ -606,13 +597,6 @@ class M3U8_Parser:
Returns:
- formatted_duration (str): Formatted duration string with hours, minutes, and seconds if return_string is True.
- duration_dict (dict): Dictionary with keys 'h', 'm', 's' representing hours, minutes, and seconds respectively if return_string is False.
Example usage:
>>> obj = YourClass(duration=3661)
>>> obj.get_duration()
'[yellow]1[red]h [yellow]1[red]m [yellow]1[red]s'
>>> obj.get_duration(return_string=False)
{'h': 1, 'm': 1, 's': 1}
"""
# Calculate hours, minutes, and remaining seconds

View File

@ -33,8 +33,6 @@ class M3U8_UrlFix:
Returns:
str: The full URL for the specified resource.
"""
# Check if m3u8 url playlist is present
if self.url_playlist == None:
logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present")
raise
@ -54,5 +52,4 @@ class M3U8_UrlFix:
"""
Reset the M3U8 playlist URL to its default state (None).
"""
self.url_playlist = None
self.url_playlist = None

View File

@ -1,2 +1,4 @@
# 17.09.24
from .tmdb import tmdb
from .obj_tmbd import Json_film

View File

@ -5,35 +5,21 @@ from typing import Dict
class Json_film:
def __init__(self, data: Dict):
self.adult = data.get('adult', False)
self.backdrop_path = data.get('backdrop_path')
self.budget = data.get('budget', 0)
self.homepage = data.get('homepage')
self.id = data.get('id', 0)
self.imdb_id = data.get('imdb_id')
self.origin_country = data.get('origin_country', [])
self.original_language = data.get('original_language')
self.original_title = data.get('original_title')
self.overview = data.get('overview')
self.popularity = data.get('popularity', 0.0)
self.poster_path = data.get('poster_path')
self.release_date = data.get('release_date')
self.revenue = data.get('revenue', 0)
self.runtime = data.get('runtime', 0)
self.status = data.get('status')
self.tagline = data.get('tagline')
self.title = data.get('title')
self.video = data.get('video', False)
self.vote_average = data.get('vote_average', 0.0)
self.vote_count = data.get('vote_count', 0)
def __repr__(self):
return (f"Film(adult={self.adult}, backdrop_path='{self.backdrop_path}', "
f"budget={self.budget}, "
f"homepage='{self.homepage}', id={self.id}, "
f"imdb_id='{self.imdb_id}', origin_country={self.origin_country}, "
return (f"Json_film(id={self.id}, imdb_id='{self.imdb_id}', origin_country={self.origin_country}, "
f"original_language='{self.original_language}', original_title='{self.original_title}', "
f"overview='{self.overview}', popularity={self.popularity}, poster_path='{self.poster_path}', "
f"release_date='{self.release_date}', revenue={self.revenue}, runtime={self.runtime}, "
f"status='{self.status}', tagline='{self.tagline}', "
f"title='{self.title}', video={self.video}, vote_average={self.vote_average}, vote_count={self.vote_count})")
f"popularity={self.popularity}, poster_path='{self.poster_path}', release_date='{self.release_date}', "
f"status='{self.status}', title='{self.title}', vote_average={self.vote_average}, vote_count={self.vote_count})")

View File

@ -336,6 +336,5 @@ class ConfigManager:
print(f"Error writing configuration file: {e}")
# Initialize
config_manager = ConfigManager()
config_manager.read_config()
config_manager.read_config()

View File

@ -39,4 +39,4 @@ def get_call_stack():
"line": lineno
})
return call_stack
return call_stack

View File

@ -17,4 +17,4 @@ class Colors:
LIGHT_MAGENTA = "\033[95m"
LIGHT_CYAN = "\033[96m"
WHITE = "\033[97m"
RESET = "\033[0m"
RESET = "\033[0m"

View File

@ -9,4 +9,4 @@ from rich.text import Text
# Variable
msg = Prompt()
console = Console()
console = Console()

View File

@ -10,47 +10,11 @@ import ua_generator
# Variable
ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'))
def get_userAgent() -> str:
"""
Generate a random user agent to use in HTTP requests.
Returns:
- str: A random user agent string.
"""
# Get a random user agent string from the user agent rotator
def get_userAgent() -> str:
user_agent = ua_generator.generate().text
return user_agent
def get_headers() -> dict:
return ua.headers.get()
def random_headers(referer: str = None):
"""
Generate random HTTP headers to simulate human-like behavior.
Returns:
dict: Generated HTTP headers.
"""
ua = ua_generator.generate()
headers = {
'User-Agent': ua.text,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': random.choice(['en-US', 'en-GB', 'fr-FR', 'es-ES', 'de-DE']),
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
}
if referer:
headers['Origin'] = referer
headers['Referer'] = referer
return headers
return ua.headers.get()

View File

@ -10,53 +10,83 @@ from StreamingCommunity.Util._jsonConfig import config_manager
class Logger:
_instance = None
def __new__(cls):
# Singleton pattern to avoid multiple logger instances
if cls._instance is None:
cls._instance = super(Logger, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
# Fetching configuration values
self.DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug")
self.log_to_file = config_manager.get_bool("DEFAULT", "log_to_file")
self.log_file = config_manager.get("DEFAULT", "log_file") if self.log_to_file else None
# Initialize only once
if getattr(self, '_initialized', False):
return
# Fetch only the debug setting from config
self.debug_mode = config_manager.get_bool("DEFAULT", "debug")
# Configure root logger
self.logger = logging.getLogger('')
# Remove any existing handlers to avoid duplication
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Reduce logging level for external libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
# Setting logging level based on DEBUG_MODE
if self.DEBUG_MODE:
logging.getLogger('root').setLevel(logging.DEBUG)
# Set logging level based on debug_mode
if self.debug_mode:
self.logger.setLevel(logging.DEBUG)
self._configure_console_log_file()
# Configure file logging if debug mode and logging to file are both enabled
if self.log_to_file:
self.remove_existing_log_file()
self.configure_file_logging()
else:
# If DEBUG_MODE is False, set logging level to ERROR
logging.getLogger('root').setLevel(logging.ERROR)
# Configure console logging
self.configure_logging()
def configure_logging(self):
"""
Configure console logging.
"""
logging.basicConfig(level=logging.DEBUG, format='[%(filename)s:%(lineno)s - %(funcName)20s() ] %(asctime)s - %(levelname)s - %(message)s')
def configure_file_logging(self):
"""
Configure file logging if enabled.
"""
file_handler = RotatingFileHandler(self.log_file, maxBytes=10*1024*1024, backupCount=5)
file_handler.setLevel(logging.DEBUG)
self.logger.setLevel(logging.ERROR)
# Configure console logging (terminal output) regardless of debug mode
self._configure_console_logging()
self._initialized = True
def _configure_console_logging(self):
"""Configure console logging output to terminal."""
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG if self.debug_mode else logging.ERROR)
formatter = logging.Formatter('[%(filename)s:%(lineno)s - %(funcName)20s() ] %(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logging.getLogger('').addHandler(file_handler)
def remove_existing_log_file(self):
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
def _configure_console_log_file(self):
"""Create a console.log file only when debug mode is enabled."""
console_log_path = "console.log"
try:
# Remove existing file if present
if os.path.exists(console_log_path):
os.remove(console_log_path)
# Create handler for console.log
console_file_handler = RotatingFileHandler(
console_log_path,
maxBytes=5*1024*1024, # 5 MB
backupCount=3
)
console_file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(filename)s:%(lineno)s - %(funcName)20s() ] %(asctime)s - %(levelname)s - %(message)s')
console_file_handler.setFormatter(formatter)
self.logger.addHandler(console_file_handler)
except Exception as e:
print(f"Error creating console.log: {e}")
@staticmethod
def get_logger(name=None):
"""
Remove the log file if it already exists.
Get a specific logger for a module/component.
If name is None, returns the root logger.
"""
if os.path.exists(self.log_file):
os.remove(self.log_file)
# Ensure Logger instance is initialized
Logger()
return logging.getLogger(name)

View File

@ -33,4 +33,4 @@ def start_message():
# Print a decorative separator line using asterisks
separator = "_" * (console.width - 2) # Ridotto di 2 per il padding
console.print(f"[cyan]{separator}[/cyan]\n")
console.print(f"[cyan]{separator}[/cyan]\n")

View File

@ -11,7 +11,6 @@ import logging
import platform
import subprocess
import contextlib
import urllib.request
import importlib.metadata
from pathlib import Path
@ -243,7 +242,6 @@ class OsManager:
class InternManager():
def format_file_size(self, size_bytes: float) -> str:
"""
Formats a file size from bytes into a human-readable string representation.
@ -296,7 +294,6 @@ class InternManager():
class OsSummary:
def __init__(self):
self.ffmpeg_path = None
self.ffprobe_path = None
@ -506,4 +503,4 @@ def compute_sha1_hash(input_string: str) -> str:
hashed_string = hashlib.sha1(input_string.encode()).hexdigest()
# Return the hashed string
return hashed_string
return hashed_string

View File

@ -19,6 +19,7 @@ from rich.style import Style
from .message import start_message
from .call_stack import get_call_stack
# Telegram bot instance
from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager

View File

@ -1,8 +1,6 @@
{
"DEFAULT": {
"debug": false,
"log_file": "app.log",
"log_to_file": true,
"show_message": true,
"clean_console": true,
"show_trending": true,
@ -11,12 +9,6 @@
"serie_folder_name": "Serie",
"anime_folder_name": "Anime",
"map_episode_name": "E%(episode)_%(episode_name)",
"config_qbit_tor": {
"host": "192.168.1.51",
"port": "6666",
"user": "admin",
"pass": "adminadmin"
},
"use_api": true,
"add_siteName": false,
"disable_searchDomain": false,
@ -66,5 +58,11 @@
"ips4_member_id": "",
"ips4_login_key": ""
}
},
"QBIT_CONFIG": {
"host": "192.168.1.51",
"port": "6666",
"user": "admin",
"pass": "adminadmin"
}
}

View File

@ -1,13 +1,18 @@
# 26.11.24
import sys
# Internal utilities
from StreamingCommunity.run import main
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.TelegramHelp.telegram_bot import TelegramRequestManager, TelegramSession
# Svuoto il file
# Variable
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
if TELEGRAM_BOT:
request_manager = TelegramRequestManager()
request_manager.clear_file()

View File

@ -116,6 +116,7 @@ def print_commit_info(commit_info: dict):
# Print the table in a panel
console.print(Panel.fit(table))
def download_and_extract_latest_commit():
"""
Download and extract the latest commit from a GitHub repository.