mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-07-24 19:10:04 +00:00
Remove stop_event
This commit is contained in:
parent
4677deea14
commit
d2a9165f7d
@ -71,8 +71,8 @@ def get_video_duration(file_path: str) -> float:
|
||||
return float(probe_result['format']['duration'])
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error: {e}")
|
||||
return None
|
||||
logging.error(f"Error get video duration: {e}")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def format_duration(seconds: float) -> Tuple[int, int, int]:
|
||||
|
@ -5,7 +5,6 @@ import sys
|
||||
import time
|
||||
import queue
|
||||
import threading
|
||||
import signal
|
||||
import logging
|
||||
import binascii
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
@ -53,18 +52,20 @@ class M3U8_Segments:
|
||||
- tmp_folder (str): The temporary folder to store downloaded segments.
|
||||
"""
|
||||
self.url = url
|
||||
self.fake_proxy = False
|
||||
self.tmp_folder = tmp_folder
|
||||
self.decryption: M3U8_Decryption = None # Initialize decryption as None
|
||||
self.segment_queue = queue.PriorityQueue() # Priority queue to maintain the order of segments
|
||||
self.current_index = 0 # Index of the current segment to be written
|
||||
self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts") # Path to the temporary file
|
||||
self.condition = threading.Condition() # Condition variable for thread synchronization
|
||||
self.ctrl_c_detected = False # Global variable to track Ctrl+C detection
|
||||
self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts")
|
||||
os.makedirs(self.tmp_folder, exist_ok=True)
|
||||
|
||||
os.makedirs(self.tmp_folder, exist_ok=True) # Create the temporary folder if it does not exist
|
||||
# Util class
|
||||
self.decryption: M3U8_Decryption = None
|
||||
self.class_ts_estimator = M3U8_Ts_Estimator(0)
|
||||
self.class_url_fixer = M3U8_UrlFix(url)
|
||||
self.fake_proxy = False
|
||||
|
||||
# Sync
|
||||
self.current_index = 0 # Index of the current segment to be written
|
||||
self.segment_queue = queue.PriorityQueue() # Priority queue to maintain the order of segments
|
||||
self.condition = threading.Condition() # Condition variable for thread synchronization
|
||||
|
||||
def add_server_ip(self, list_ip):
|
||||
"""
|
||||
@ -201,21 +202,16 @@ class M3U8_Segments:
|
||||
|
||||
return urlunparse(parsed_url)
|
||||
|
||||
def make_requests_stream(self, ts_url: str, index: int, stop_event: threading.Event, progress_bar: tqdm) -> None:
|
||||
def make_requests_stream(self, ts_url: str, index: int, progress_bar: tqdm) -> None:
|
||||
"""
|
||||
Downloads a TS segment and adds it to the segment queue.
|
||||
|
||||
Args:
|
||||
- ts_url (str): The URL of the TS segment.
|
||||
- index (int): The index of the segment.
|
||||
- stop_event (threading.Event): Event to signal the stop of downloading.
|
||||
- progress_bar (tqdm): Progress counter for tracking download progress.
|
||||
- add_desc (str): Additional description for the progress bar.
|
||||
"""
|
||||
|
||||
if stop_event.is_set():
|
||||
return # Exit if the stop event is set
|
||||
|
||||
# Generate new user agent
|
||||
headers_segments['user-agent'] = get_headers()
|
||||
|
||||
@ -225,8 +221,8 @@ class M3U8_Segments:
|
||||
start_time = time.time()
|
||||
response = requests.get(ts_url, headers=headers_segments, verify=REQUEST_VERIFY_SSL, timeout=30)
|
||||
duration = time.time() - start_time
|
||||
logging.info(f"Make request to get segmenet: [{index} - {len(self.segments)}] in: {duration}, len data: {len(response.content)}")
|
||||
|
||||
logging.info(f"Make request to get segment: [{index} - {len(self.segments)}] in: {duration}, len data: {len(response.content)}")
|
||||
|
||||
if response.ok:
|
||||
|
||||
# Get the content of the segment
|
||||
@ -242,7 +238,6 @@ class M3U8_Segments:
|
||||
with self.condition:
|
||||
self.segment_queue.put((index, segment_content)) # Add the segment to the queue
|
||||
self.condition.notify() # Notify the writer thread that a new segment is available
|
||||
|
||||
else:
|
||||
logging.error(f"Failed to download segment: {ts_url}")
|
||||
|
||||
@ -252,22 +247,18 @@ class M3U8_Segments:
|
||||
# Update bar
|
||||
progress_bar.update(1)
|
||||
|
||||
def write_segments_to_file(self, stop_event: threading.Event):
|
||||
def write_segments_to_file(self):
|
||||
"""
|
||||
Writes downloaded segments to a file in the correct order.
|
||||
|
||||
Args:
|
||||
- stop_event (threading.Event): Event to signal the stop of writing.
|
||||
"""
|
||||
|
||||
with open(self.tmp_file_path, 'ab') as f:
|
||||
while not (stop_event.is_set() and self.segment_queue.empty()): # Wait until both stop_event is set and queue is empty
|
||||
while True:
|
||||
with self.condition:
|
||||
while self.segment_queue.empty() and not stop_event.is_set():
|
||||
self.condition.wait() # Wait until a new segment is available or stop_event is set
|
||||
while self.segment_queue.empty() and self.current_index < len(self.segments):
|
||||
self.condition.wait() # Wait until a new segment is available or all segments are downloaded
|
||||
|
||||
if stop_event.is_set() and self.segment_queue.empty(): # Exit loop if both conditions are met
|
||||
break
|
||||
if self.segment_queue.empty() and self.current_index >= len(self.segments):
|
||||
break # Exit loop if all segments have been processed
|
||||
|
||||
if not self.segment_queue.empty():
|
||||
# Get the segment from the queue
|
||||
@ -279,7 +270,7 @@ class M3U8_Segments:
|
||||
self.current_index += 1
|
||||
self.segment_queue.task_done()
|
||||
else:
|
||||
self.segment_queue.put((index, segment_content)) # Requeue the segment if it is not the next to be written
|
||||
self.segment_queue.put((index, segment_content)) # Requeue the segment if it is not the next to be written
|
||||
self.condition.notify()
|
||||
|
||||
def download_streams(self, add_desc):
|
||||
@ -289,8 +280,6 @@ class M3U8_Segments:
|
||||
Args:
|
||||
- add_desc (str): Additional description for the progress bar.
|
||||
"""
|
||||
stop_event = threading.Event() # Event to signal stopping
|
||||
|
||||
if TQDM_USE_LARGE_BAR:
|
||||
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}| {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}| {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
|
||||
else:
|
||||
@ -300,28 +289,24 @@ class M3U8_Segments:
|
||||
total=len(self.segments),
|
||||
unit='s',
|
||||
ascii='░▒█',
|
||||
bar_format=bar_format,
|
||||
dynamic_ncols=True,
|
||||
ncols=80,
|
||||
mininterval=0.01
|
||||
bar_format=bar_format
|
||||
)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor:
|
||||
|
||||
# Start a separate thread to write segments to the file
|
||||
writer_thread = threading.Thread(target=self.write_segments_to_file, args=(stop_event,))
|
||||
writer_thread = threading.Thread(target=self.write_segments_to_file)
|
||||
writer_thread.start()
|
||||
|
||||
# Delay the start of each worker
|
||||
# Start all workers
|
||||
for index, segment_url in enumerate(self.segments):
|
||||
|
||||
# Submit the download task to the executor
|
||||
executor.submit(self.make_requests_stream, segment_url, index, stop_event, progress_bar)
|
||||
executor.submit(self.make_requests_stream, segment_url, index, progress_bar)
|
||||
|
||||
# Wait for all segments to be downloaded
|
||||
executor.shutdown()
|
||||
stop_event.set() # Set the stop event to halt the writer thread
|
||||
|
||||
with self.condition:
|
||||
self.condition.notify_all() # Wake up the writer thread if it's waiting
|
||||
writer_thread.join() # Wait for the writer thread to finish
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user