2024-04-12 22:50:50 +02:00

970 lines
36 KiB
Python

# 5.01.24 -> 7.01.24 -> 20.02.24 -> 29.03.24
import os
import time
import threading
import logging
import warnings
# Disable specific warnings
from tqdm import TqdmExperimentalWarning
warnings.filterwarnings("ignore", category=TqdmExperimentalWarning)
warnings.filterwarnings("ignore", category=UserWarning, module="cryptography")
# External libraries
import requests
from tqdm.rich import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
# Internal utilities
from Src.Util.console import console
from Src.Util.headers import get_headers
from Src.Util.config import config_manager
from Src.Util.os import (
remove_folder,
remove_file,
format_size,
compute_sha1_hash,
convert_to_hex
)
# Logic class
from .util import (
print_duration_table,
concatenate_and_save,
join_audios,
transcode_with_subtitles
)
from .util import (
M3U8_Decryption,
M3U8_Ts_Files,
M3U8_Parser,
M3U8_Codec,
M3U8_UrlFix
)
# Config
DOWNLOAD_AUDIO = config_manager.get_bool('M3U8_OPTIONS', 'download_audio')
DOWNLOAD_SUBTITLES = config_manager.get_bool('M3U8_OPTIONS', 'download_subtitles')
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_OPTIONS', 'specific_list_audio')
DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_OPTIONS', 'specific_list_subtitles')
MERGE_SUBTITLES = config_manager.get_bool('M3U8_OPTIONS', 'merge_subtitles')
TQDM_MAX_WORKER = config_manager.get_int('M3U8', 'tdqm_workers')
TQDM_PROGRESS_TIMEOUT = config_manager.get_int('M3U8', 'tqdm_progress_timeout')
COMPLETED_PERCENTAGE = config_manager.get_float('M3U8', 'download_percentage')
REQUESTS_TIMEOUT = config_manager.get_int('M3U8', 'requests_timeout')
ENABLE_TIME_TIMEOUT = config_manager.get_bool('M3U8', 'enable_time_quit')
USE_OPENSSL = config_manager.get_bool('M3U8', 'use_openssl')
TQDM_SHOW_PROGRESS = config_manager.get_bool('M3U8', 'tqdm_show_progress')
MIN_TS_FILES_IN_FOLDER = config_manager.get_int('M3U8', 'minimum_ts_files_in_folder')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8', 'cleanup_tmp_folder')
# Variable
config_headers = config_manager.get_dict('M3U8_OPTIONS', 'request')
failed_segments = []
class_urlFix = M3U8_UrlFix()
class M3U8_Segments:
def __init__(self, url, folder, key=None):
"""
Initializes M3U8_Segments with the provided URL and optional decryption key.
Args:
- url (str): The URL of the M3U8 file.
- key (str, optional): The decryption key. Defaults to None.
"""
self.url = url
self.key = key
# Init M3U8_Decryption class if key is present
if self.key is not None:
self.decryption = M3U8_Decryption(key)
# Generate temp base folder based on hash of url
self.downloaded_size = 0
self.temp_folder = folder
os.makedirs(self.temp_folder, exist_ok=True)
# Config
self.enable_timer = ENABLE_TIME_TIMEOUT
self.progress_timeout = TQDM_PROGRESS_TIMEOUT
self.class_ts_files_size = M3U8_Ts_Files()
def parse_data(self, m3u8_content: str) -> None:
"""
Parses the M3U8 content to extract segment information.
Args:
m3u8_content (str): The content of the M3U8 file.
"""
try:
# Parse index m3u8 content from request(m3u8).text
m3u8_parser = M3U8_Parser(DOWNLOAD_SPECIFIC_SUBTITLE)
m3u8_parser.parse_data(m3u8_content)
# Add decryption iv if key has the same byte string
if self.key is not None and m3u8_parser.keys.get('iv') is not None:
iv = m3u8_parser.keys.get('iv')
method = m3u8_parser.keys.get('method')
# Add iv for decryption to M3U8_Decryption
logging.info(f"[M3U8_Segments] Parameter iv => {iv}")
self.decryption.parse_key(iv)
# Add method for decryption to M3U8_Decryption
logging.info(f"[M3U8_Segments] Set method => {method}")
self.decryption.set_method(method)
# Store segments
self.segments = m3u8_parser.segments
logging.info("[M3U8_Segments] Segments extracted successfully.")
except Exception as e:
logging.error(f"[M3U8_Segments] Error parsing M3U8 content: {e}")
def get_info(self) -> None:
"""
Makes a request to the index m3u8 file to get information about segments.
"""
try:
# Add random user agent to config headers
config_headers['index']['user-agent'] = get_headers()
# Send a GET request to retrieve the index m3u8 file
response = requests.get(self.url, headers=config_headers['index'])
response.raise_for_status() # Raise HTTPError for non-2xx status codes
# Parse text from request to m3u8 index
self.parse_data(response.text)
logging.info(f"[M3U8_Segments] Ts segments found: {len(self.segments)}")
except requests.exceptions.RequestException as req_err:
logging.error(f"[M3U8_Segments] Error occurred during request: {req_err}")
raise
except Exception as e:
logging.error(f"[M3U8_Segments] Error occurred: {e}")
def is_valid_ts_url(self, ts_url: str) -> bool:
"""
Check if the given ts URL is valid.
Args:
ts_url (str): The URL of the ts file.
failed_segments (list): List of failed segment URLs.
Returns:
bool: True if the URL is valid, False otherwise.
"""
# Check if the URL exists in the list of segments and is not in the list of failed segments
for failed_seg in failed_segments:
if str(failed_seg) in ts_url:
return False
return True
def make_reqests_stream(self, ts_url: str) -> bytes:
"""
Make a single request to a ts file to get content.
Args:
ts_url (str): The URL of the ts file.
Returns:
bytes or None: The content of the requested ts file if successful, otherwise None.
"""
try:
# Fix URL if it is incomplete (missing 'http')
if "http" not in ts_url:
ts_url = class_urlFix.generate_full_url(ts_url)
logging.info(f"Generated new URL: {ts_url}")
# Check if the ts_url is valid
is_valid_url = self.is_valid_ts_url(ts_url)
if is_valid_url:
# Generate random user agent for segments request
headers = config_headers.get('segments')
headers['user-agent'] = get_headers()
# Make GET request to ts audio or video file with a random user agent
response = requests.get(ts_url, headers=headers, timeout=REQUESTS_TIMEOUT)
# If the response status code is not 200, mark the URL as failed
if response.status_code != 200:
logging.error(f"Failed to fetch content from {ts_url}. Status code: {response.status_code}")
return None
# Return the content if the request is successful
return response.content
else:
logging.warn(f"Skipping invalid URL: {ts_url}")
return None
except requests.exceptions.RequestException as req_err:
logging.warn(f"Error occurred during request to {ts_url}: {req_err}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return None
def save_stream(self, index: int, progress_counter: tqdm, stop_event: threading.Event) -> None:
"""
Save ts file and decrypt if there is an iv present in the decryption class.
Parameters:
- index (int): The index of the ts file in the segments list.
- progress_counter (tqdm): The progress counter object.
- stop_event (threading.Event): The event to signal when to quit.
"""
# Break if stop event is true
if stop_event.is_set():
return
try:
# Get ts url and create a filename based on index
ts_url = self.segments[index]
ts_filename = os.path.join(self.temp_folder, f"{index}.ts")
#logging.info(f"Requesting: {ts_url}, saving to: {ts_filename}")
# If file already exists, skip download
if os.path.exists(ts_filename):
logging.info(f"Skipping download. File already exists: {ts_filename}")
return
# Get bytes of ts data
ts_content = self.make_reqests_stream(ts_url)
# If data is retrieved
if ts_content is not None:
# Create a file to save data
with open(ts_filename, "wb") as ts_file:
# Decrypt if there is an IV in the main M3U8 index
if self.key and self.decryption.iv:
# pycryptodomex, faster using win11
if USE_OPENSSL:
self.decryption.decrypt_openssl(ts_content, ts_filename)
else:
decrypted_data = self.decryption.decrypt(ts_content)
ts_file.write(decrypted_data)
# For no iv and key
else:
ts_file.write(ts_content)
# Update downloaded size
if TQDM_SHOW_PROGRESS:
self.downloaded_size += len(ts_content)
self.class_ts_files_size.add_ts_file_size(len(ts_content) * len(self.segments))
except Exception as e:
logging.error(f"Error saving TS file: {e}")
finally:
# Update progress counter
progress_counter.update(1)
if TQDM_SHOW_PROGRESS:
downloaded_size_str = format_size(self.downloaded_size)
estimate_total_size = self.class_ts_files_size.calculate_total_size()
progress_counter.set_description(f"[yellow]Download [red][{index}] - [{downloaded_size_str} / {estimate_total_size}]")
else:
progress_counter.set_description(f"[yellow]Download")
# Refresh progress bar
progress_counter.refresh()
def donwload_streams(self):
"""
Downloads TS segments in parallel using ThreadPoolExecutor.
"""
try:
# Initialize progress bar
progress_counter = tqdm(total=len(self.segments), unit=" segment", desc="[yellow]Download")
# Event to signal stop condition for progress monitoring
stop_event = threading.Event()
# Start progress monitor thread
progress_thread = threading.Thread(target=self.timer, args=(progress_counter, stop_event))
progress_thread.start()
# Create ThreadPoolExecutor for parallel downloading
with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor:
futures = []
# Submit tasks for downloading segments
for index in range(len(self.segments)):
future = executor.submit(self.save_stream, index, progress_counter, stop_event)
futures.append(future)
try:
# Wait for tasks to complete
for future in as_completed(futures):
future.result()
# Check if progress reached 99%
if progress_counter.n >= len(self.segments) * COMPLETED_PERCENTAGE:
#console.log(f"[yellow]Progress reached {COMPLETED_PERCENTAGE*100}%. Stopping.")
progress_counter.refresh()
break
except KeyboardInterrupt:
console.log("[red]Ctrl+C detected. Exiting gracefully [white]...")
stop_event.set()
except KeyboardInterrupt:
logging.info("Ctrl+C detected. Exiting gracefully...")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
finally:
# Signal stop event to end progress monitor thread
stop_event.set()
# Wait for progress monitor thread to finish
progress_thread.join()
def timer(self, progress_counter: tqdm, quit_event: threading.Event):
"""
Function to monitor progress and quit if no progress is made within a certain time
Parameters:
- progress_counter (tqdm): The progress counter object.
- quit_event (threading.Event): The event to signal when to quit.
"""
# If timer is disabled, return immediately without starting it, to reduce cpu use
if not self.enable_timer:
return
start_time = time.time()
last_count = 0
# Loop until quit event is set
while not quit_event.is_set():
current_count = progress_counter.n
# Update start time when progress is made
if current_count != last_count:
start_time = time.time()
last_count = current_count
# Calculate elapsed time
elapsed_time = time.time() - start_time
# Check if elapsed time exceeds progress timeout
if elapsed_time > self.progress_timeout:
console.log(f"[red]No progress for {self.progress_timeout} seconds. Stopping.")
# Set quit event to break the loop
quit_event.set()
break
# Calculate remaining time until timeout
remaining_time = max(0, self.progress_timeout - elapsed_time)
# Determine sleep interval dynamically based on remaining time
sleep_interval = min(1, remaining_time)
# Wait for the calculated sleep interval
time.sleep(sleep_interval)
# Refresh progress bar
progress_counter.refresh()
def join(self, output_filename: str):
"""
Join all segments file to a mp4 file name
!! NOT USED IN THIS VERSION
Parameters:
- video_decoding(str): video decoding to use with ffmpeg for only video
- audio_decoding(str): audio decoding to use with ffmpeg for only audio
- output_filename (str): The name of the output mp4 file.
"""
# Print output of failed segments if present
if len(failed_segments) > 0:
logging.error(f"[M3U8_Segments] Failed segments = {failed_segments}")
logging.warning("[M3U8_Segments] Audio and video can be out of sync !!!")
console.log("[red]Audio and video can be out of sync !!!")
# Get current dir and create file_list with path of all ts file
current_dir = os.path.dirname(os.path.realpath(__file__))
file_list_path = os.path.join(current_dir, 'file_list.txt')
# Sort files (1.ts, 2.ts, ...)
ts_files = [f for f in os.listdir(self.temp_folder) if f.endswith(".ts")]
def extract_number(file_name):
return int(''.join(filter(str.isdigit, file_name)))
ts_files.sort(key=extract_number)
# Check if there is file to json
if len(ts_files) < MIN_TS_FILES_IN_FOLDER:
logging.error(f"No .ts file to join in folder: {self.temp_folder}")
raise
# Save files sorted in a txt file with absolute path to fix problem with ( C:\\path (win))
with open(file_list_path, 'w') as file_list:
for ts_file in ts_files:
absolute_path = os.path.abspath(os.path.join(self.temp_folder, ts_file))
file_list.write(f"file '{absolute_path}'\n")
console.log("[cyan]Start joining all files")
# ADD IF
concatenate_and_save(
file_list_path = file_list_path,
output_filename = output_filename
)
class Downloader():
def __init__(self, output_filename: str = None, m3u8_playlist:str = None, m3u8_index:str = None, key: str = None):
"""
Initialize the Downloader object.
Parameters:
- output_filename (str): Output filename for the downloaded content.
- m3u8_playlist (str, optional): URL to the main M3U8 playlist.
- key (str, optional): Hexadecimal representation of the encryption key.
"""
self.m3u8_playlist = m3u8_playlist
self.m3u8_index = m3u8_index
self.key = bytes.fromhex(key) if key is not None else key
self.output_filename = output_filename
# Auto generate out file name if not present
if output_filename == None:
if m3u8_playlist != None:
self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_playlist))
else:
self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_index))
if self.key != None:
hex_data = convert_to_hex(self.key)
console.log(f"[cyan]Key use [white]=> [red]{hex_data}")
logging.info(f"Key use: {self.key}")
# Initialize temp base path
self.base_path = os.path.join(str(self.output_filename).replace(".mp4", ""))
self.video_segments_path = os.path.join(self.base_path, "tmp", "video")
self.audio_segments_path = os.path.join(self.base_path, "tmp", "audio")
self.subtitle_segments_path = os.path.join(self.base_path, "tmp", "subtitle")
logging.info(f"Output base path: {self.base_path}")
# Create temp folder
os.makedirs(self.video_segments_path, exist_ok=True)
os.makedirs(self.audio_segments_path, exist_ok=True)
os.makedirs(self.subtitle_segments_path, exist_ok=True)
# Track subtitle, audio donwload
self.downloaded_audio = []
self.downloaded_subtitle = []
self.downloaded_video = []
def __df_make_req__(self, url: str) -> str:
"""
Make a request to get text from the provided URL to test if index or m3u8 work correcly.
Parameters:
- url (str): The URL to make the request to.
Returns:
- str: The text content of the response.
"""
try:
# Send a GET request to the provided URL
config_headers.get('index')['user-agent'] = get_headers()
response = requests.get(url, headers=config_headers.get('index'))
# Check status response of request
logging.info(f"Test url: {url}")
response.raise_for_status()
if response.ok:
return response.text
else:
logging.error(f"Request to {url} failed with status code: {response.status_code}")
return None
except requests.RequestException as req_err:
logging.error(f"Error occurred during request: {req_err}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return None
def manage_playlist(self, m3u8_playlist_text):
"""
Parses the M3U8 playlist to extract information about keys, playlists, subtitles, etc.
Args:
m3u8_playlist_text (str): The text content of the M3U8 playlist.
"""
global DOWNLOAD_AUDIO, DOWNLOAD_SUBTITLES
# Create an instance of the M3U8_Parser class
parse_class_m3u8 = M3U8_Parser(DOWNLOAD_SPECIFIC_SUBTITLE)
# Extract information about the M3U8 playlist
parse_class_m3u8.parse_data(m3u8_playlist_text)
# Collect available audio tracks and default audio track
self.list_available_audio = parse_class_m3u8.get_track_audios()
self.default_audio = parse_class_m3u8.get_default_track_audio()
# Check if there is some audios, else disable download
if self.list_available_audio != None:
console.log(f"[cyan]Find audios language: [red]{[obj_audio.get('language') for obj_audio in self.list_available_audio]}")
else:
console.log("[red]Cant find a list of audios")
DOWNLOAD_AUDIO = False
# Collect available subtitles and default subtitle
self.list_available_subtitles = parse_class_m3u8.get_subtitles()
self.default_subtitle = parse_class_m3u8.get_default_subtitle()
# Check if there is some subtitles, else disable download
if self.list_available_subtitles != None:
console.log(f"[cyan]Find subtitles language: [red]{[obj_sub.get('language') for obj_sub in self.list_available_subtitles]}")
else:
console.log("[red]Cant find a list of audios")
DOWNLOAD_SUBTITLES = False
# Collect best quality video
m3u8_index_obj = parse_class_m3u8.get_best_quality()
# Get URI of the best quality and codecs parameters
console.log(f"[cyan]Find resolution: [red]{m3u8_index_obj.get('width')}")
m3u8_index = m3u8_index_obj.get('uri')
# Fix URL if it is not complete with http:\\site_name.domain\...
if "http" not in m3u8_index:
# Generate full URL
m3u8_index = class_urlFix.generate_full_url(m3u8_index)
# Check if a valid HTTPS URL is obtained
if m3u8_index is not None and "https" in m3u8_index:
console.log(f"[cyan]Found m3u8 index [white]=> [red]{m3u8_index}")
else:
logging.warning("[download_m3u8] Can't find a valid m3u8 index")
raise
# Set m3u8_index
self.m3u8_index = m3u8_index
# Get obj codec
self.codec: M3U8_Codec = parse_class_m3u8.codec
# If codec exist print log
if self.codec is not None:
logging.info(f"Get codeds: {self.codec}")
console.log(f"[cyan]Find codecs: [red]({self.codec.video_codec_name};{self.codec.audio_codec_name})")
else:
logging.info(f"Codec obj dont exist")
def manage_subtitle(self):
"""
Downloads and manages subtitles.
This method iterates over available subtitles, downloads them if necessary, and updates
the list of downloaded subtitles.
"""
# Iterate over each available subtitle
for obj_subtitle in self.list_available_subtitles:
logging.info(f"(manage_subtitle) Find => {obj_subtitle}")
# Check if there is custom subtitles to download
if len(DOWNLOAD_SPECIFIC_SUBTITLE) > 0:
# Check if language in list
if obj_subtitle.get('language') not in DOWNLOAD_SPECIFIC_SUBTITLE:
continue
# Construct full path for the subtitle file
sub_full_path = os.path.join(self.subtitle_segments_path, obj_subtitle.get('language') + ".vtt")
# Check if the subtitle file already exists
if not os.path.exists(sub_full_path):
console.log(f"[cyan]Download subtitle [white]=> [red]{obj_subtitle.get('language')}.")
# Add the subtitle to the list of downloaded subtitles
self.downloaded_subtitle.append({
'name': obj_subtitle.get('name').split(" ")[0],
'language': obj_subtitle.get('language').upper(),
'path': os.path.abspath(sub_full_path)
})
# If the subtitle file doesn't exist, download it
logging.info(f"Download uri subtitles: {obj_subtitle.get('uri')} => {sub_full_path}")
response = requests.get(obj_subtitle.get('uri'))
open(sub_full_path, "wb").write(response.content)
def manage_audio(self):
"""
Downloads and manages audio segments.
This method iterates over available audio tracks, downloads them if necessary, and updates
the list of downloaded audio tracks.
"""
# Iterate over each available audio track
for obj_audio in self.list_available_audio:
logging.info(f"(manage_audio) Find => {obj_audio}")
# Check if there is custom subtitles to download
if len(DOWNLOAD_SPECIFIC_AUDIO) > 0:
# Check if language in list
if obj_audio.get('language') not in DOWNLOAD_SPECIFIC_AUDIO:
continue
# Construct full path for the audio segment directory
full_path_audio = os.path.join(self.audio_segments_path, obj_audio.get('language'))
self.downloaded_audio.append({
'language': obj_audio.get('language'),
'path': full_path_audio
})
# Check if the audio segment directory already exists
if not os.path.exists(full_path_audio):
# If the audio segment directory doesn't exist, download audio segments
logging.info(f"Download uri audio: {obj_audio.get('uri')} => {full_path_audio}")
audio_m3u8 = M3U8_Segments(obj_audio.get('uri'), full_path_audio, self.key)
console.log(f"[purple]Download audio segments [white]=> [red]{obj_audio.get('language')}.")
# Get information about the audio segments
audio_m3u8.get_info()
# Download the audio segments
audio_m3u8.donwload_streams()
def manage_video(self):
"""
Downloads and manages video segments.
This method downloads video segments if necessary and updates
the list of downloaded video segments.
"""
# Construct full path for the video segment directory
full_path_video = self.video_segments_path
# Create an instance of M3U8_Segments to handle video segments
video_m3u8 = M3U8_Segments(self.m3u8_index, full_path_video, self.key)
console.log("[purple]Download video segments.")
# Add the video segment directory to the list of downloaded video segments
self.downloaded_video.append({
'path': full_path_video
})
# Get information about the video segments
video_m3u8.get_info()
# Download the video segments
video_m3u8.donwload_streams()
@staticmethod
def extract_number(file_name):
return int(''.join(filter(str.isdigit, file_name)))
def join_ts_files(self, full_path: str, out_file_name: str):
"""
Joins the individual .ts files into a single video file.
Args:
full_path (str): The full path to the directory containing the .ts files.
out_file_name (str): The name of the output video file.
Returns:
str: The path to the output video file.
"""
# Get the current directory and create a file_list with the path of all .ts files
file_list_path = os.path.join('file_list.txt')
# Sort files (1.ts, 2.ts, ...) based on their numbers
ts_files = [f for f in os.listdir(full_path) if f.endswith(".ts")]
ts_files.sort(key=Downloader.extract_number)
logging.info(f"Find {len(ts_files)} stream files to join")
# Check if there are enough .ts files to join (at least 10)
if len(ts_files) < 10:
logging.error(f"No .ts file to join in folder: {full_path}")
else:
# Save files sorted in a txt file with absolute path to fix problem with ( C:\\path (win))
with open(file_list_path, 'w') as file_list:
for ts_file in ts_files:
#absolute_path = os.path.abspath(os.path.join(full_path, ts_file))
relative_path = os.path.relpath(os.path.join(full_path, ts_file))
file_list.write(f"file '{relative_path}'\n")
# Concatenate and save the files and return the path to the output filename
return concatenate_and_save(
file_list_path=file_list_path,
output_filename=out_file_name,
v_codec=self.codec.video_codec_name,
a_codec=self.codec.audio_codec_name,
bandwidth=self.codec.bandwidth
)
def download_audios(self):
"""
Downloads audio files and stores their paths.
"""
# Initialize an empty list to store audio tracks paths
self.audio_tracks_path = []
# Check if there are any downloaded audio objects
if len(self.downloaded_audio) > 0:
# Iterate over each downloaded audio object
for obj_downloaded_audio in self.downloaded_audio:
# Create the expected path for the audio file based on its language
obj_audio_path = os.path.join(self.base_path, obj_downloaded_audio.get('language') + ".mp4")
# Check if the audio file already exists
if not os.path.exists(obj_audio_path):
# If the audio file doesn't exist, join the .ts files and save as .mp4
new_audio_path = self.join_ts_files(
obj_downloaded_audio.get('path'),
obj_audio_path
)
console.log(f"[cyan]Join segments: [red]{obj_downloaded_audio.get('language')}")
# Add the joined audio file path to the list
self.audio_tracks_path.append({
'path': new_audio_path
})
def download_videos(self):
"""
Downloads video files and stores their path.
"""
# Construct the expected path for the video file
video_track_path = os.path.join(self.base_path, "video.mp4")
console.log(f"[cyan]Join segments: [red]video")
# Check if the video file already exists
if not os.path.exists(video_track_path):
# If the video file doesn't exist, join the .ts files and save as .mp4
video_track_path = self.join_ts_files(
self.downloaded_video[0].get('path'),
video_track_path
)
# Get info video
print_duration_table(video_track_path)
self.video_track_path = video_track_path
def add_subtitles_audios(self):
"""Add subtitles and audio tracks to the video.
This function checks if there are any audio tracks and adds them to the video if available.
It also adds subtitles to the video if there are any downloaded. If no audio tracks are
available, it uses the original video path. The resulting video with added subtitles is
saved as 'out.mkv' in the base path and rename to .mp4.
"""
# Initialize variables
path_video_and_audio = None
path_join_subtitles = None
# Check if there are any audio tracks
if len(self.audio_tracks_path) > 0:
# Log adding audio tracks
console.log(f"[cyan]Add audios.")
# Join audio tracks with the video
path_video_and_audio = join_audios(
video_path=self.video_track_path,
audio_tracks=self.audio_tracks_path
)
# Check if there are any downloaded subtitles
if len(self.downloaded_subtitle) > 0:
console.log(f"[cyan]Add subtitles.")
if MERGE_SUBTITLES:
# If no audio tracks were joined, use the original video path
if path_video_and_audio is None:
path_video_and_audio = self.video_track_path
# Transcode video with subtitles
path_join_subtitles = transcode_with_subtitles(
path_video_and_audio,
self.downloaded_subtitle,
os.path.join(self.base_path, "out.mkv")
)
else:
console.log("[cyan]Moving subtitle out of tmp folder.")
for obj_sub in self.downloaded_subtitle:
try:
language = obj_sub.get('language').lower()
# Modifica la stringa se contiene "forced-", altrimenti la converte in minuscolo
language = (language.replace("forced-", "") + ".forced") if 'forced-' in language else language
sub_path = self.output_filename.replace(".mp4", f".{language}.vtt")
os.rename(obj_sub.get('path'), sub_path)
except Exception as e:
logging.error(f"Error moving subtitle: {e}. Skipping...")
continue
self.path_video_and_audio = path_video_and_audio
self.path_join_subtitles = path_join_subtitles
def cleanup_tmp(self, is_index = False):
"""Cleanup temporary files.
This function removes temporary audio join files, the starting video file if necessary,
and the temporary folder. It also renames the output file to the desired output filename.
Args:
full_path (str): The full path to the directory containing the .ts files.
is_index (bool): To bypass audio tracks and subtitles tracks
"""
join_output_file = None
console.log("[cyan]Cleanup [white]...")
# Remove audio join files
if not is_index:
for clean_audio_path in self.audio_tracks_path:
remove_file(clean_audio_path.get('path'))
# Determine the output file
if not is_index:
# Determine the output file
if self.path_join_subtitles is not None:
join_output_file = self.path_join_subtitles
remove_file(self.path_video_and_audio)
else:
join_output_file = self.path_video_and_audio
# Remove the starting video if necessary
if self.path_join_subtitles is not None or self.path_video_and_audio is not None:
remove_file(self.video_track_path)
# If no join or video and audio files exist, the final output is the original video
if self.path_join_subtitles is None and self.path_video_and_audio is None:
join_output_file = self.video_track_path
# Rename output file
os.rename(join_output_file, self.output_filename)
# Remove the temporary folder
if not is_index:
remove_folder(self.base_path)
else:
remove_folder(os.path.join(self.base_path, "tmp"))
def download_m3u8(self):
"""
Download content from M3U8 sources including video, audio, and subtitles.
"""
# Check if the M3U8 playlist is valid
if self.m3u8_playlist is not None:
logging.info(f"Download m3u8 from playlist.")
# Fetch the M3U8 playlist content
m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist)
# Add full URL of the M3U8 playlist to fix next .ts without https if necessary
class_urlFix.set_playlist(self.m3u8_playlist)
# Collect information about the playlist
self.manage_playlist(m3u8_playlist_text)
# Download subtitles
if DOWNLOAD_SUBTITLES:
logging.info("Download subtitles ...")
self.manage_subtitle()
# Download segmenets of audio tracks
if DOWNLOAD_AUDIO:
logging.info("Download audios ...")
self.manage_audio()
# Download segements of video segments
logging.info("Download videos ...")
self.manage_video()
# Convert audios segments to mp4
self.download_audios()
# Convert video segments to mp4
self.download_videos()
# Add subtitles and audio to video mp4 if present
self.add_subtitles_audios()
# Clean up folder of all tmp folder and tmp with .ts segments folder
if REMOVE_SEGMENTS_FOLDER:
self.cleanup_tmp()
else:
logging.info(f"Download m3u8 from index.")
# Add full URL of the M3U8 playlist to fix next .ts without https if necessary
class_urlFix.set_playlist(self.m3u8_index)
logging.info("Download videos ...")
self.manage_video()
# Convert video segments to mp4
self.download_videos()
# Clean up folder of all tmp folder and tmp with .ts segments folder
if REMOVE_SEGMENTS_FOLDER:
self.cleanup_tmp(is_index = True)