diff --git a/Src/Lib/Hls/_segments_to_fix.py b/Src/Lib/Hls/_segments_to_fix.py deleted file mode 100644 index 3782434..0000000 --- a/Src/Lib/Hls/_segments_to_fix.py +++ /dev/null @@ -1,340 +0,0 @@ -# 18.04.24 - -import os -import sys -import time -import queue -import threading -import logging -import binascii -from queue import PriorityQueue -from urllib.parse import urljoin, urlparse -from concurrent.futures import ThreadPoolExecutor - - -# External libraries -import httpx -from tqdm import tqdm - - -# Internal utilities -from Src.Util.console import console -from Src.Util.headers import get_headers, random_headers -from Src.Util.color import Colors -from Src.Util._jsonConfig import config_manager -from Src.Util.os import check_file_existence - - -# Logic class -from ..M3U8 import ( - M3U8_Decryption, - M3U8_Ts_Estimator, - M3U8_Parser, - M3U8_UrlFix -) -from .proxyes import main_test_proxy - - -# Warning -import urllib3 -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - -# Config -TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers') -TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay') -TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar') -REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout') -THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt") -PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min') -PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max') - - -# Variable -headers_index = config_manager.get_dict('REQUESTS', 'index') - - - -class RetryTransport(httpx.BaseTransport): - def __init__(self, transport, retries=3, backoff_factor=0.3): - self.transport = transport - self.retries = retries - self.backoff_factor = backoff_factor - - def handle_request(self, request: httpx.Request) -> httpx.Response: - url = request.url - - for attempt in range(1, self.retries + 1): - try: - response = self.transport.handle_request(request) - response.raise_for_status() - return response - - except (httpx.RequestError, httpx.HTTPStatusError) as e: - if attempt == self.retries: - raise - else: - wait = self.backoff_factor * (2 ** (attempt - 1)) - print(f"Attempt {attempt} for URL {url} failed: {e}. Retrying in {wait} seconds...") - time.sleep(wait) - - -transport = RetryTransport(httpx.HTTPTransport()) -client = httpx.Client(transport=transport, verify=False, timeout=REQUEST_TIMEOUT) - - -class M3U8_Segments: - def __init__(self, url: str, tmp_folder: str): - """ - Initializes the M3U8_Segments object. - - Args: - - url (str): The URL of the M3U8 playlist. - - tmp_folder (str): The temporary folder to store downloaded segments. - """ - self.url = url - self.tmp_folder = tmp_folder - self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts") - os.makedirs(self.tmp_folder, exist_ok=True) - - # Util class - self.decryption: M3U8_Decryption = None - self.class_ts_estimator = M3U8_Ts_Estimator(0) - self.class_url_fixer = M3U8_UrlFix(url) - - # Sync - self.queue = PriorityQueue() - self.stop_event = threading.Event() - - def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes: - """ - Retrieves the encryption key from the M3U8 playlist. - - Args: - - m3u8_parser (M3U8_Parser): The parser object containing M3U8 playlist information. - - Returns: - bytes: The encryption key in bytes. - """ - headers_index['user-agent'] = get_headers() - - # Construct the full URL of the key - key_uri = urljoin(self.url, m3u8_parser.keys.get('uri')) - parsed_url = urlparse(key_uri) - self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/" - logging.info(f"Uri key: {key_uri}") - - try: - response = httpx.get(key_uri, headers=headers_index) - response.raise_for_status() - - except Exception as e: - raise Exception(f"Failed to fetch key from {key_uri}: {e}") - - # Convert the content of the response to hexadecimal and then to bytes - hex_content = binascii.hexlify(response.content).decode('utf-8') - byte_content = bytes.fromhex(hex_content) - - logging.info(f"Key: ('hex': {hex_content}, 'byte': {byte_content})") - return byte_content - - def parse_data(self, m3u8_content: str) -> None: - """ - Parses the M3U8 content to extract segment information. - - Args: - - m3u8_content (str): The content of the M3U8 file. - """ - m3u8_parser = M3U8_Parser() - m3u8_parser.parse_data(uri=self.url, raw_content=m3u8_content) - - console.log(f"[red]Expected duration after download: {m3u8_parser.get_duration()}") - console.log(f"[red]There is key: [yellow]{m3u8_parser.keys is not None}") - - # Check if there is an encryption key in the playlis - if m3u8_parser.keys is not None: - try: - - # Extract byte from the key - key = self.__get_key__(m3u8_parser) - - except Exception as e: - raise Exception(f"Failed to retrieve encryption key {e}.") - - iv = m3u8_parser.keys.get('iv') - method = m3u8_parser.keys.get('method') - - # Create a decryption object with the key and set the method - self.decryption = M3U8_Decryption(key, iv, method) - - # Store the segment information parsed from the playlist - self.segments = m3u8_parser.segments - - # Fix URL if it is incomplete (missing 'http') - for i in range(len(self.segments)): - segment_url = self.segments[i] - - if "http" not in segment_url: - self.segments[i] = self.class_url_fixer.generate_full_url(segment_url) - logging.info(f"Generated new URL: {self.segments[i]}, from: {segment_url}") - - # Update segments for estimator - self.class_ts_estimator.total_segments = len(self.segments) - logging.info(f"Segmnets to donwload: [{len(self.segments)}]") - - # Proxy - if THERE_IS_PROXY_LIST: - console.log("[red]Validate proxy.") - self.valid_proxy = main_test_proxy(self.segments[0]) - console.log(f"[cyan]N. Valid ip: [red]{len(self.valid_proxy)}") - - if len(self.valid_proxy) == 0: - sys.exit(0) - - def get_info(self) -> None: - """ - Makes a request to the index M3U8 file to get information about segments. - """ - headers_index['user-agent'] = get_headers() - - # Send a GET request to retrieve the index M3U8 file - response = httpx.get(self.url, headers=headers_index) - response.raise_for_status() - - # Save the M3U8 file to the temporary folder - if response.status_code == 200: - path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") - open(path_m3u8_file, "w+").write(response.text) - - # Parse the text from the M3U8 index file - self.parse_data(response.text) - - def make_requests_stream(self, ts_url: str, index: int, progress_bar: tqdm) -> None: - """ - Downloads a TS segment and adds it to the segment queue. - - Args: - - ts_url (str): The URL of the TS segment. - - index (int): The index of the segment. - - progress_bar (tqdm): Progress counter for tracking download progress. - """ - - - # Generate headers - start_time = time.time() - - # Make request to get content - if THERE_IS_PROXY_LIST: - proxy = self.valid_proxy[index % len(self.valid_proxy)] - logging.info(f"Use proxy: {proxy}") - - if 'key_base_url' in self.__dict__: - response = client.get(ts_url, headers=random_headers(self.key_base_url), proxy=proxy) - else: - response = client.get(ts_url, headers={'user-agent': get_headers()}, proxy=proxy) - else: - if 'key_base_url' in self.__dict__: - response = client.get(ts_url, headers=random_headers(self.key_base_url)) - else: - response = client.get(ts_url, headers={'user-agent': get_headers()}) - - # Get response content - response.raise_for_status() - segment_content = response.content - - # Update bar - duration = time.time() - start_time - response_size = int(response.headers.get('Content-Length', 0)) - self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar) - - # Decrypt the segment content if decryption is needed - if self.decryption is not None: - segment_content = self.decryption.decrypt(segment_content) - - # Add the segment to the queue - self.queue.put((index, segment_content)) - progress_bar.update(1) - - - def write_segments_to_file(self): - """ - Writes downloaded segments to a file in the correct order. - """ - with open(self.tmp_file_path, 'wb') as f: - expected_index = 0 - buffer = {} - - while not self.stop_event.is_set() or not self.queue.empty(): - try: - index, segment_content = self.queue.get(timeout=1) - - if index == expected_index: - f.write(segment_content) - f.flush() - expected_index += 1 - - # Write any buffered segments in order - while expected_index in buffer: - f.write(buffer.pop(expected_index)) - f.flush() - expected_index += 1 - else: - buffer[index] = segment_content - - except queue.Empty: - continue - - def download_streams(self, add_desc): - """ - Downloads all TS segments in parallel and writes them to a file. - - Args: - - add_desc (str): Additional description for the progress bar. - """ - if TQDM_USE_LARGE_BAR: - bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]" - else: - bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]" - - # Create progress bar - progress_bar = tqdm( - total=len(self.segments), - unit='s', - ascii='░▒█', - bar_format=bar_format - ) - - # Start a separate thread to write segments to the file - writer_thread = threading.Thread(target=self.write_segments_to_file) - writer_thread.start() - - # Ff proxy avaiable set max_workers to number of proxy - # else set max_workers to TQDM_MAX_WORKER - max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER - - # if proxy avaiable set timeout to variable time - # else set timeout to TDQM_DELAY_WORKER - if THERE_IS_PROXY_LIST: - num_proxies = len(self.valid_proxy) - self.working_proxy_list = self.valid_proxy - - if num_proxies > 0: - # calculate delay based on number of proxies - # dalay should be between 0.5 and 1 - delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (num_proxies + 1))) - else: - delay = TQDM_DELAY_WORKER - else: - delay = TQDM_DELAY_WORKER - - # Start all workers - with ThreadPoolExecutor(max_workers=max_workers) as executor: - for index, segment_url in enumerate(self.segments): - time.sleep(delay) - executor.submit(self.make_requests_stream, segment_url, index, progress_bar) - - # Wait for all tasks to complete - executor.shutdown(wait=True) - self.stop_event.set() - writer_thread.join() - progress_bar.close() diff --git a/Src/Lib/Hls/segments.py b/Src/Lib/Hls/segments.py index b1f08a8..433f117 100644 --- a/Src/Lib/Hls/segments.py +++ b/Src/Lib/Hls/segments.py @@ -8,13 +8,12 @@ import threading import logging import binascii from queue import PriorityQueue -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse from concurrent.futures import ThreadPoolExecutor # External libraries -import requests -from requests.exceptions import HTTPError, ConnectionError, Timeout, RequestException +import httpx from tqdm import tqdm @@ -23,6 +22,7 @@ from Src.Util.console import console from Src.Util.headers import get_headers, random_headers from Src.Util.color import Colors from Src.Util._jsonConfig import config_manager +from Src.Util.os import check_file_existence # Logic class @@ -45,13 +45,44 @@ TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers') TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay') TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar') REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout') -THERE_IS_PROXY_LIST = False +THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt") +PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min') +PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max') # Variable headers_index = config_manager.get_dict('REQUESTS', 'index') + +class RetryTransport(httpx.BaseTransport): + def __init__(self, transport, retries=3, backoff_factor=0.3): + self.transport = transport + self.retries = retries + self.backoff_factor = backoff_factor + + def handle_request(self, request: httpx.Request) -> httpx.Response: + url = request.url + + for attempt in range(1, self.retries + 1): + try: + response = self.transport.handle_request(request) + response.raise_for_status() + return response + + except (httpx.RequestError, httpx.HTTPStatusError) as e: + if attempt == self.retries: + raise + else: + wait = self.backoff_factor * (2 ** (attempt - 1)) + print(f"Attempt {attempt} for URL {url} failed: {e}. Retrying in {wait} seconds...") + time.sleep(wait) + + +transport = RetryTransport(httpx.HTTPTransport()) +client = httpx.Client(transport=transport) + + class M3U8_Segments: def __init__(self, url: str, tmp_folder: str): """ @@ -87,13 +118,14 @@ class M3U8_Segments: """ headers_index['user-agent'] = get_headers() - # Construct the full URL of the key - key_uri = urljoin(self.url, m3u8_parser.keys.get('uri')) + key_uri = urljoin(self.url, m3u8_parser.keys.get('uri')) + parsed_url = urlparse(key_uri) + self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/" logging.info(f"Uri key: {key_uri}") try: - response = requests.get(key_uri, headers=headers_index) + response = httpx.get(key_uri, headers=headers_index) response.raise_for_status() except Exception as e: @@ -166,11 +198,11 @@ class M3U8_Segments: headers_index['user-agent'] = get_headers() # Send a GET request to retrieve the index M3U8 file - response = requests.get(self.url, headers=headers_index) + response = httpx.get(self.url, headers=headers_index) response.raise_for_status() # Save the M3U8 file to the temporary folder - if response.ok: + if response.status_code == 200: path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") open(path_m3u8_file, "w+").write(response.text) @@ -186,43 +218,42 @@ class M3U8_Segments: - index (int): The index of the segment. - progress_bar (tqdm): Progress counter for tracking download progress. """ - try: - # Generate headers - start_time = time.time() - # Make request to get content - if THERE_IS_PROXY_LIST: - proxy = self.valid_proxy[index % len(self.valid_proxy)] - logging.info(f"Use proxy: {proxy}") - response = requests.get(ts_url, headers=random_headers(), timeout=REQUEST_TIMEOUT, proxies=proxy) + # Generate headers + start_time = time.time() + + # Make request to get content + if THERE_IS_PROXY_LIST: + proxy = self.valid_proxy[index % len(self.valid_proxy)] + logging.info(f"Use proxy: {proxy}") + + if 'key_base_url' in self.__dict__: + response = httpx.get(ts_url, headers=random_headers(self.key_base_url), proxy=proxy, verify=False, timeout=REQUEST_TIMEOUT) else: - response = requests.get(ts_url, headers=random_headers(), timeout=REQUEST_TIMEOUT) + response = httpx.get(ts_url, headers={'user-agent': get_headers()}, proxy=proxy, verify=False, timeout=REQUEST_TIMEOUT) + else: + if 'key_base_url' in self.__dict__: + response = httpx.get(ts_url, headers=random_headers(self.key_base_url), verify=False, timeout=REQUEST_TIMEOUT) + else: + response = httpx.get(ts_url, headers={'user-agent': get_headers()}, verify=False, timeout=REQUEST_TIMEOUT) - # Get response content - response.raise_for_status() - segment_content = response.content + # Get response content + response.raise_for_status() + segment_content = response.content - # Update bar - duration = time.time() - start_time - response_size = int(response.headers.get('Content-Length', 0)) - self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar) + # Update bar + duration = time.time() - start_time + response_size = int(response.headers.get('Content-Length', 0)) + self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar) - # Decrypt the segment content if decryption is needed - if self.decryption is not None: - segment_content = self.decryption.decrypt(segment_content) + # Decrypt the segment content if decryption is needed + if self.decryption is not None: + segment_content = self.decryption.decrypt(segment_content) - # Add the segment to the queue - self.queue.put((index, segment_content)) - progress_bar.update(1) - - except (HTTPError, ConnectionError, Timeout, RequestException) as e: - progress_bar.update(1) - logging.error(f"Request-related exception while downloading segment: {e}") - - except Exception as e: - progress_bar.update(1) - logging.error(f"An unexpected exception occurred while download segment: {e}") + # Add the segment to the queue + self.queue.put((index, segment_content)) + progress_bar.update(1) def write_segments_to_file(self): """ @@ -276,10 +307,29 @@ class M3U8_Segments: writer_thread = threading.Thread(target=self.write_segments_to_file) writer_thread.start() + # Ff proxy avaiable set max_workers to number of proxy + # else set max_workers to TQDM_MAX_WORKER + max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER + + # if proxy avaiable set timeout to variable time + # else set timeout to TDQM_DELAY_WORKER + if THERE_IS_PROXY_LIST: + num_proxies = len(self.valid_proxy) + self.working_proxy_list = self.valid_proxy + + if num_proxies > 0: + # calculate delay based on number of proxies + # dalay should be between 0.5 and 1 + delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (num_proxies + 1))) + else: + delay = TQDM_DELAY_WORKER + else: + delay = TQDM_DELAY_WORKER + # Start all workers - with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor: + with ThreadPoolExecutor(max_workers=max_workers) as executor: for index, segment_url in enumerate(self.segments): - time.sleep(TQDM_DELAY_WORKER) + time.sleep(delay) executor.submit(self.make_requests_stream, segment_url, index, progress_bar) # Wait for all tasks to complete