Fix struck at 100%

This commit is contained in:
Ghost 2024-06-08 20:26:54 +02:00
parent 93a594beef
commit 48f39042ae
2 changed files with 91 additions and 107 deletions

View File

@ -7,8 +7,9 @@ import queue
import threading
import logging
import binascii
from queue import PriorityQueue
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse, urlunparse
# External libraries
@ -72,9 +73,8 @@ class M3U8_Segments:
self.class_url_fixer = M3U8_UrlFix(url)
# Sync
self.current_index = 0 # Index of the current segment to be written
self.segment_queue = queue.PriorityQueue() # Priority queue to maintain the order of segments
self.condition = threading.Condition() # Condition variable for thread synchronization
self.queue = PriorityQueue()
self.stop_event = threading.Event()
def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes:
"""
@ -210,83 +210,71 @@ class M3U8_Segments:
- index (int): The index of the segment.
- progress_bar (tqdm): Progress counter for tracking download progress.
"""
# Generate new user agent
headers_segments['user-agent'] = get_headers()
try:
# Generate headers
start_time = time.time()
headers_segments['user-agent'] = get_headers()
# Generate proxy
# Make request to get content
if len(PROXY_LIST) > 0:
# Make request
proxy = self.get_proxy(index)
response = session.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, proxies=proxy)
response.raise_for_status()
else:
# Make request
response = session.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
# Calculate duration
# Get response content
response.raise_for_status()
segment_content = response.content
# Update bar
duration = time.time() - start_time
logging.info(f"Make request to get segment: [{index} - {len(self.segments)}] in: {duration}, len data: {len(response.content)}")
response_size = int(response.headers.get('Content-Length', 0))
self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar)
if response.ok:
# Decrypt the segment content if decryption is needed
if self.decryption is not None:
segment_content = self.decryption.decrypt(segment_content)
# Get the content of the segment
segment_content = response.content
# Update bar
self.class_ts_estimator.update_progress_bar(int(response.headers.get('Content-Length', 0)), duration, progress_bar)
# Decrypt the segment content if decryption is needed
if self.decryption is not None:
segment_content = self.decryption.decrypt(segment_content)
with self.condition:
self.segment_queue.put((index, segment_content)) # Add the segment to the queue
self.condition.notify() # Notify the writer thread that a new segment is available
else:
logging.error(f"Failed to download segment: {ts_url}")
# Add the segment to the queue
self.queue.put((index, segment_content))
progress_bar.update(1)
except (HTTPError, ConnectionError, Timeout, RequestException) as e:
progress_bar.update(1)
logging.error(f"Request-related exception while downloading segment: {e}")
except Exception as e:
logging.error(f"An unexpected exception occurred while download segment: {e}")
# Update bar
progress_bar.update(1)
except Exception as e:
progress_bar.update(1)
logging.error(f"An unexpected exception occurred while download segment: {e}")
def write_segments_to_file(self):
"""
Writes downloaded segments to a file in the correct order.
"""
with open(self.tmp_file_path, 'ab') as f:
while True:
with self.condition:
while self.segment_queue.empty() and self.current_index < len(self.segments):
self.condition.wait() # Wait until a new segment is available or all segments are downloaded
with open(self.tmp_file_path, 'wb') as f:
expected_index = 0
buffer = {}
if self.segment_queue.empty() and self.current_index >= len(self.segments):
break # Exit loop if all segments have been processed
while not self.stop_event.is_set() or not self.queue.empty():
try:
index, segment_content = self.queue.get(timeout=1)
if not self.segment_queue.empty():
# Get the segment from the queue
index, segment_content = self.segment_queue.get()
if index == expected_index:
f.write(segment_content)
f.flush()
expected_index += 1
# Write the segment to the file
if index == self.current_index:
f.write(segment_content)
self.current_index += 1
self.segment_queue.task_done()
else:
self.segment_queue.put((index, segment_content)) # Requeue the segment if it is not the next to be written
self.condition.notify()
# Write any buffered segments in order
while expected_index in buffer:
f.write(buffer.pop(expected_index))
f.flush()
expected_index += 1
else:
buffer[index] = segment_content
except queue.Empty:
continue
def download_streams(self, add_desc):
"""
@ -296,10 +284,11 @@ class M3U8_Segments:
- add_desc (str): Additional description for the progress bar.
"""
if TQDM_USE_LARGE_BAR:
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}| {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}| {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
else:
bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
# Create progress bar
progress_bar = tqdm(
total=len(self.segments),
unit='s',
@ -307,21 +296,17 @@ class M3U8_Segments:
bar_format=bar_format
)
# Start a separate thread to write segments to the file
writer_thread = threading.Thread(target=self.write_segments_to_file)
writer_thread.start()
# Start all workers
with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor:
# Start a separate thread to write segments to the file
writer_thread = threading.Thread(target=self.write_segments_to_file)
writer_thread.start()
# Start all workers
for index, segment_url in enumerate(self.segments):
# Submit the download task to the executor
executor.submit(self.make_requests_stream, segment_url, index, progress_bar)
# Wait for all segments to be downloaded
executor.shutdown()
with self.condition:
self.condition.notify_all() # Wake up the writer thread if it's waiting
writer_thread.join() # Wait for the writer thread to finish
# Wait for all tasks to complete
executor.shutdown(wait=True)
self.stop_event.set()
writer_thread.join()
progress_bar.close()

View File

@ -22,13 +22,20 @@ TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar
class M3U8_Ts_Estimator:
def __init__(self, total_segments: int):
"""
Initialize the TSFileSizeCalculator object.
Args:
- workers (int): The number of workers using with ThreadPool.
- total_segments (int): Len of total segments to download
"""
self.ts_file_sizes = []
self.now_downloaded_size = 0
self.average_over = 5
self.average_over = 3
self.list_speeds = deque(maxlen=self.average_over)
self.smoothed_speeds = []
self.total_segments = total_segments
self.last_segment_duration = 1
self.last_segment_size = 0
self.lock = threading.Lock()
def add_ts_file(self, size: int, size_download: int, duration: float):
"""
@ -43,24 +50,31 @@ class M3U8_Ts_Estimator:
logging.error("Invalid input values: size=%d, size_download=%d, duration=%f", size, size_download, duration)
return
# Calibrazione dinamica del tempo
self.last_segment_duration = duration
# Considerazione della variazione di dimensione del segmento
self.last_segment_size = size_download
# Calcolo velocità
try:
speed_mbps = (size_download * 8) / (duration * 1024 * 1024)
except ZeroDivisionError as e:
logging.error("Division by zero error while calculating speed: %s", e)
return
# Calculate speed outside of the lock
speed_mbps = (size_download * 4) / (duration * (1024 * 1024))
# Add total size bytes
self.ts_file_sizes.append(size)
self.now_downloaded_size += size_download
self.list_speeds.append(speed_mbps)
# Calculate moving average
smoothed_speed = sum(self.list_speeds) / len(self.list_speeds)
self.smoothed_speeds.append(smoothed_speed)
# Update smooth speeds
if len(self.smoothed_speeds) > self.average_over:
self.smoothed_speeds.pop(0)
def get_average_speed(self) -> float:
"""
Calculate the average speed from a list of speeds and convert it to megabytes per second (MB/s).
Returns:
float: The average speed in megabytes per second (MB/s).
"""
return (sum(self.smoothed_speeds) / len(self.smoothed_speeds))
def calculate_total_size(self) -> str:
"""
Calculate the total size of the files.
@ -86,21 +100,6 @@ class M3U8_Ts_Estimator:
logging.error("An unexpected error occurred: %s", e)
return "Error"
def get_average_speed(self) -> float:
"""
Calculate the average speed from a list of speeds and convert it to megabytes per second (MB/s).
Returns:
float: The average speed in megabytes per second (MB/s).
"""
# Smooth the speeds for better accuracy using the window defined by average_over
smoothed_speed = sum(self.list_speeds) / min(len(self.list_speeds), self.average_over)
predicted_speed = smoothed_speed * (self.last_segment_size / (1024 * 1024)) / self.last_segment_duration
# Convert to mb/s
return predicted_speed / 8
def get_downloaded_size(self) -> str:
"""
Get the total downloaded size formatted as a human-readable string.
@ -144,5 +143,5 @@ class M3U8_Ts_Estimator:
else:
progress_counter.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{number_file_downloaded}{Colors.RED} {units_file_downloaded} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s"
f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}Mbps"
)