From 48f39042aed0e6e63e5a88285fb20e4a1531e93f Mon Sep 17 00:00:00 2001 From: Ghost <62809003+Ghost6446@users.noreply.github.com> Date: Sat, 8 Jun 2024 20:26:54 +0200 Subject: [PATCH] Fix struck at 100% --- Src/Lib/Hls/segments.py | 133 +++++++++++++++++--------------------- Src/Lib/M3U8/estimator.py | 65 +++++++++---------- 2 files changed, 91 insertions(+), 107 deletions(-) diff --git a/Src/Lib/Hls/segments.py b/Src/Lib/Hls/segments.py index 03b2d7d..2685a87 100644 --- a/Src/Lib/Hls/segments.py +++ b/Src/Lib/Hls/segments.py @@ -7,8 +7,9 @@ import queue import threading import logging import binascii +from queue import PriorityQueue +from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor -from urllib.parse import urljoin, urlparse, urlunparse # External libraries @@ -72,10 +73,9 @@ class M3U8_Segments: self.class_url_fixer = M3U8_UrlFix(url) # Sync - self.current_index = 0 # Index of the current segment to be written - self.segment_queue = queue.PriorityQueue() # Priority queue to maintain the order of segments - self.condition = threading.Condition() # Condition variable for thread synchronization - + self.queue = PriorityQueue() + self.stop_event = threading.Event() + def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes: """ Retrieves the encryption key from the M3U8 playlist. @@ -210,83 +210,71 @@ class M3U8_Segments: - index (int): The index of the segment. - progress_bar (tqdm): Progress counter for tracking download progress. """ - - # Generate new user agent - headers_segments['user-agent'] = get_headers() - try: + # Generate headers start_time = time.time() + headers_segments['user-agent'] = get_headers() - # Generate proxy + # Make request to get content if len(PROXY_LIST) > 0: - - # Make request proxy = self.get_proxy(index) response = session.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, proxies=proxy) - response.raise_for_status() - else: - - # Make request response = session.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT) - response.raise_for_status() - # Calculate duration + # Get response content + response.raise_for_status() + segment_content = response.content + + # Update bar duration = time.time() - start_time - logging.info(f"Make request to get segment: [{index} - {len(self.segments)}] in: {duration}, len data: {len(response.content)}") + response_size = int(response.headers.get('Content-Length', 0)) + self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar) + + # Decrypt the segment content if decryption is needed + if self.decryption is not None: + segment_content = self.decryption.decrypt(segment_content) - if response.ok: - - # Get the content of the segment - segment_content = response.content - - # Update bar - self.class_ts_estimator.update_progress_bar(int(response.headers.get('Content-Length', 0)), duration, progress_bar) - - # Decrypt the segment content if decryption is needed - if self.decryption is not None: - segment_content = self.decryption.decrypt(segment_content) - - with self.condition: - self.segment_queue.put((index, segment_content)) # Add the segment to the queue - self.condition.notify() # Notify the writer thread that a new segment is available - else: - logging.error(f"Failed to download segment: {ts_url}") + # Add the segment to the queue + self.queue.put((index, segment_content)) + progress_bar.update(1) except (HTTPError, ConnectionError, Timeout, RequestException) as e: + progress_bar.update(1) logging.error(f"Request-related exception while downloading segment: {e}") - except Exception as e: - logging.error(f"An unexpected exception occurred while download segment: {e}") - # Update bar - progress_bar.update(1) + except Exception as e: + progress_bar.update(1) + logging.error(f"An unexpected exception occurred while download segment: {e}") def write_segments_to_file(self): """ Writes downloaded segments to a file in the correct order. """ - with open(self.tmp_file_path, 'ab') as f: - while True: - with self.condition: - while self.segment_queue.empty() and self.current_index < len(self.segments): - self.condition.wait() # Wait until a new segment is available or all segments are downloaded + with open(self.tmp_file_path, 'wb') as f: + expected_index = 0 + buffer = {} - if self.segment_queue.empty() and self.current_index >= len(self.segments): - break # Exit loop if all segments have been processed + while not self.stop_event.is_set() or not self.queue.empty(): + try: + index, segment_content = self.queue.get(timeout=1) - if not self.segment_queue.empty(): - # Get the segment from the queue - index, segment_content = self.segment_queue.get() + if index == expected_index: + f.write(segment_content) + f.flush() + expected_index += 1 - # Write the segment to the file - if index == self.current_index: - f.write(segment_content) - self.current_index += 1 - self.segment_queue.task_done() - else: - self.segment_queue.put((index, segment_content)) # Requeue the segment if it is not the next to be written - self.condition.notify() + # Write any buffered segments in order + while expected_index in buffer: + f.write(buffer.pop(expected_index)) + f.flush() + expected_index += 1 + else: + buffer[index] = segment_content + + except queue.Empty: + continue def download_streams(self, add_desc): """ @@ -296,10 +284,11 @@ class M3U8_Segments: - add_desc (str): Additional description for the progress bar. """ if TQDM_USE_LARGE_BAR: - bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}| {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}| {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]" + bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]" else: bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]" - + + # Create progress bar progress_bar = tqdm( total=len(self.segments), unit='s', @@ -307,21 +296,17 @@ class M3U8_Segments: bar_format=bar_format ) + # Start a separate thread to write segments to the file + writer_thread = threading.Thread(target=self.write_segments_to_file) + writer_thread.start() + + # Start all workers with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor: - - # Start a separate thread to write segments to the file - writer_thread = threading.Thread(target=self.write_segments_to_file) - writer_thread.start() - - # Start all workers for index, segment_url in enumerate(self.segments): - - # Submit the download task to the executor executor.submit(self.make_requests_stream, segment_url, index, progress_bar) - # Wait for all segments to be downloaded - executor.shutdown() - - with self.condition: - self.condition.notify_all() # Wake up the writer thread if it's waiting - writer_thread.join() # Wait for the writer thread to finish + # Wait for all tasks to complete + executor.shutdown(wait=True) + self.stop_event.set() + writer_thread.join() + progress_bar.close() diff --git a/Src/Lib/M3U8/estimator.py b/Src/Lib/M3U8/estimator.py index 5fd41a7..8e5b8ed 100644 --- a/Src/Lib/M3U8/estimator.py +++ b/Src/Lib/M3U8/estimator.py @@ -22,13 +22,20 @@ TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar class M3U8_Ts_Estimator: def __init__(self, total_segments: int): + """ + Initialize the TSFileSizeCalculator object. + + Args: + - workers (int): The number of workers using with ThreadPool. + - total_segments (int): Len of total segments to download + """ self.ts_file_sizes = [] self.now_downloaded_size = 0 - self.average_over = 5 + self.average_over = 3 self.list_speeds = deque(maxlen=self.average_over) + self.smoothed_speeds = [] self.total_segments = total_segments - self.last_segment_duration = 1 - self.last_segment_size = 0 + self.lock = threading.Lock() def add_ts_file(self, size: int, size_download: int, duration: float): """ @@ -43,24 +50,31 @@ class M3U8_Ts_Estimator: logging.error("Invalid input values: size=%d, size_download=%d, duration=%f", size, size_download, duration) return - # Calibrazione dinamica del tempo - self.last_segment_duration = duration - - # Considerazione della variazione di dimensione del segmento - self.last_segment_size = size_download - - # Calcolo velocità - try: - speed_mbps = (size_download * 8) / (duration * 1024 * 1024) - - except ZeroDivisionError as e: - logging.error("Division by zero error while calculating speed: %s", e) - return + # Calculate speed outside of the lock + speed_mbps = (size_download * 4) / (duration * (1024 * 1024)) + # Add total size bytes self.ts_file_sizes.append(size) self.now_downloaded_size += size_download self.list_speeds.append(speed_mbps) + # Calculate moving average + smoothed_speed = sum(self.list_speeds) / len(self.list_speeds) + self.smoothed_speeds.append(smoothed_speed) + + # Update smooth speeds + if len(self.smoothed_speeds) > self.average_over: + self.smoothed_speeds.pop(0) + + def get_average_speed(self) -> float: + """ + Calculate the average speed from a list of speeds and convert it to megabytes per second (MB/s). + + Returns: + float: The average speed in megabytes per second (MB/s). + """ + return (sum(self.smoothed_speeds) / len(self.smoothed_speeds)) + def calculate_total_size(self) -> str: """ Calculate the total size of the files. @@ -86,21 +100,6 @@ class M3U8_Ts_Estimator: logging.error("An unexpected error occurred: %s", e) return "Error" - def get_average_speed(self) -> float: - """ - Calculate the average speed from a list of speeds and convert it to megabytes per second (MB/s). - - Returns: - float: The average speed in megabytes per second (MB/s). - """ - - # Smooth the speeds for better accuracy using the window defined by average_over - smoothed_speed = sum(self.list_speeds) / min(len(self.list_speeds), self.average_over) - predicted_speed = smoothed_speed * (self.last_segment_size / (1024 * 1024)) / self.last_segment_duration - - # Convert to mb/s - return predicted_speed / 8 - def get_downloaded_size(self) -> str: """ Get the total downloaded size formatted as a human-readable string. @@ -144,5 +143,5 @@ class M3U8_Ts_Estimator: else: progress_counter.set_postfix_str( f"{Colors.WHITE}[ {Colors.GREEN}{number_file_downloaded}{Colors.RED} {units_file_downloaded} " - f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s" - ) \ No newline at end of file + f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}Mbps" + )