2024-10-17 18:21:59 +02:00

871 lines
33 KiB
Python

# 17.10.24
import os
import sys
import logging
# External libraries
import httpx
from unidecode import unidecode
# Internal utilities
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console, Panel, Table
from Src.Util.color import Colors
from Src.Util.os import (
remove_folder,
delete_files_except_one,
compute_sha1_hash,
format_file_size,
create_folder,
reduce_base_name,
remove_special_characters,
can_create_file
)
# Logic class
from ...FFmpeg import (
print_duration_table,
get_video_duration_s,
join_video,
join_audios,
join_subtitle
)
from ...M3U8 import (
M3U8_Parser,
M3U8_Codec,
M3U8_UrlFix
)
from .segments import M3U8_Segments
# Config
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_DOWNLOAD', 'specific_list_audio')
DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_DOWNLOAD', 'specific_list_subtitles')
DOWNLOAD_VIDEO = config_manager.get_bool('M3U8_DOWNLOAD', 'download_video')
DOWNLOAD_AUDIO = config_manager.get_bool('M3U8_DOWNLOAD', 'download_audio')
MERGE_AUDIO = config_manager.get_bool('M3U8_DOWNLOAD', 'merge_audio')
DOWNLOAD_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'download_sub')
MERGE_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'merge_subs')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_DOWNLOAD', 'cleanup_tmp_folder')
FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution')
# Variable
headers_index = config_manager.get_dict('REQUESTS', 'user-agent')
m3u8_url_fixer = M3U8_UrlFix()
class PathManager:
def __init__(self, output_filename):
"""
Initializes the PathManager with the output filename.
Args:
output_filename (str): The name of the output file (should end with .mp4).
"""
self.output_filename = output_filename
# Create the base path by removing the '.mp4' extension from the output filename
self.base_path = str(output_filename).replace(".mp4", "")
# Define the path for a temporary directory where segments will be stored
self.base_temp = os.path.join(self.base_path, "tmp")
self.video_segments_path = os.path.join(self.base_temp, "video")
self.audio_segments_path = os.path.join(self.base_temp, "audio")
self.subtitle_segments_path = os.path.join(self.base_temp, "subtitle")
def create_directories(self):
"""
Creates the necessary directories for storing video, audio, and subtitle segments.
"""
os.makedirs(self.base_temp, exist_ok=True)
os.makedirs(self.video_segments_path, exist_ok=True)
os.makedirs(self.audio_segments_path, exist_ok=True)
os.makedirs(self.subtitle_segments_path, exist_ok=True)
class HttpClient:
def __init__(self, headers: str):
"""
Initializes the HttpClient with specified headers.
"""
self.headers = headers
def get(self, url: str, timeout: int=20):
"""
Sends a GET request to the specified URL and returns the response as text.
Returns:
str: The response body as text if the request is successful, None otherwise.
"""
try:
response = httpx.get(url, headers=self.headers, timeout=timeout)
response.raise_for_status()
return response.text # Return the response text
except Exception as e:
logging.error(f"Request to {url} failed: {e}")
return None
def get_content(self, url, timeout=20):
"""
Sends a GET request to the specified URL and returns the raw response content.
Returns:
bytes: The response content as bytes if the request is successful, None otherwise.
"""
try:
response = httpx.get(url, headers=self.headers, timeout=timeout)
response.raise_for_status()
return response.content # Return the raw response content
except Exception as e:
logging.error(f"Request to {url} failed: {e}")
return None
class ContentExtractor:
def __init__(self):
"""
This class is responsible for extracting audio, subtitle, and video information from an M3U8 playlist.
"""
pass
def start(self, url, m3u8_playlist_text: str):
"""
Starts the extraction process by parsing the M3U8 playlist and collecting audio, subtitle, and video data.
Args:
url (str): The URL of the M3U8 playlist.
m3u8_playlist_text (str): The raw text content of the M3U8 playlist.
"""
# Create an instance of the M3U8_Parser class
self.obj_parse = M3U8_Parser()
# Extract information about the M3U8 playlist
self.obj_parse.parse_data(
uri=url,
raw_content=m3u8_playlist_text
)
# Collect audio, subtitle, and video information
self._collect_audio()
self._collect_subtitle()
self._collect_video()
def _collect_audio(self):
"""
It checks for available audio languages and the specific audio tracks to download.
"""
# Collect available audio tracks and their corresponding URIs and names
self.list_available_audio = self.obj_parse._audio.get_all_uris_and_names()
# Check if there are any audio tracks available; if not, disable download
if self.list_available_audio is not None:
# Extract available languages from the audio tracks
available_languages = [obj_audio.get('language') for obj_audio in self.list_available_audio]
set_language = DOWNLOAD_SPECIFIC_AUDIO
result = list(set(available_languages) & set(set_language))
# Create a formatted table to display audio info
table = Table(show_header=False, box=None)
table.add_row(f"[cyan]Available languages:", f"[purple]{', '.join(available_languages)}")
table.add_row(f"[red]Set audios:", f"[purple]{', '.join(set_language)}")
table.add_row(f"[green]Downloadable:", f"[purple]{', '.join(result)}")
console.rule("[bold green] AUDIO ", style="bold red")
console.print(table)
else:
console.log("[red]Can't find a list of audios")
def _collect_subtitle(self):
"""
It checks for available subtitle languages and the specific subtitles to download.
"""
# Collect available subtitles and their corresponding URIs and names
self.list_available_subtitles = self.obj_parse._subtitle.get_all_uris_and_names()
# Check if there are any subtitles available; if not, disable download
if self.list_available_subtitles is not None:
# Extract available languages from the subtitles
available_languages = [obj_subtitle.get('language') for obj_subtitle in self.list_available_subtitles]
set_language = DOWNLOAD_SPECIFIC_SUBTITLE
result = list(set(available_languages) & set(set_language))
# Create a formatted table to display subtitle info
table = Table(show_header=False, box=None)
table.add_row(f"[cyan]Available languages:", f"[purple]{', '.join(available_languages)}")
table.add_row(f"[red]Set subtitles:", f"[purple]{', '.join(set_language)}")
table.add_row(f"[green]Downloadable:", f"[purple]{', '.join(result)}")
console.rule("[bold green] SUBTITLE ", style="bold red")
console.print(table)
else:
console.log("[red]Can't find a list of subtitles")
def _collect_video(self):
"""
It identifies the best video quality and displays relevant information to the user.
"""
# Collect custom quality video if a specific resolution is set
if FILTER_CUSTOM_REOLUTION != -1:
self.m3u8_index, video_res = self.obj_parse._video.get_custom_uri(y_resolution=FILTER_CUSTOM_REOLUTION)
# Otherwise, get the best available video quality
self.m3u8_index, video_res = self.obj_parse._video.get_best_uri()
self.codec: M3U8_Codec = self.obj_parse.codec
# List all available resolutions
tuple_available_resolution = self.obj_parse._video.get_list_resolution()
list_available_resolution = [str(resolution[0]) + "x" + str(resolution[1]) for resolution in tuple_available_resolution]
logging.info(f"M3U8 index selected: {self.m3u8_index}, with resolution: {video_res}")
# Create a formatted table to display video info
table = Table(show_header=False, box=None)
table.add_row(f"[cyan]Available resolutions:", f"[purple]{', '.join(list_available_resolution)}")
table.add_row(f"[green]Downloadable:", f"[purple]{video_res[0]}x{video_res[1]}")
if self.codec is not None:
table.add_row(f"[green]Codec:", f"([green]'v'[white]: [yellow]{self.codec.video_codec_name}[white] ([green]b[white]: [yellow]{self.codec.video_bitrate // 1000}k[white]), [green]'a'[white]: [yellow]{self.codec.audio_codec_name}[white] ([green]b[white]: [yellow]{self.codec.audio_bitrate // 1000}k[white]))")
console.rule("[bold green] VIDEO ", style="bold red")
console.print(table)
print("")
# Fix the URL if it does not include the full protocol
if "http" not in self.m3u8_index:
# Generate the full URL
self.m3u8_index = m3u8_url_fixer.generate_full_url(self.m3u8_index)
logging.info(f"Generated index URL: {self.m3u8_index}")
# Check if a valid HTTPS URL is obtained
if self.m3u8_index is not None and "https" in self.m3u8_index:
console.print(f"[cyan]Found m3u8 index [white]=> [red]{self.m3u8_index}")
else:
logging.error("[download_m3u8] Can't find a valid m3u8 index")
raise ValueError("Invalid m3u8 index URL")
class DownloadTracker:
def __init__(self, path_manager: PathManager):
"""
Initializes the DownloadTracker with paths for audio, subtitle, and video segments.
Args:
path_manager (PathManager): An instance of the PathManager class to manage file paths.
"""
# Initialize lists to track downloaded audio, subtitles, and video
self.downloaded_audio = []
self.downloaded_subtitle = []
self.downloaded_video = []
self.video_segment_path = path_manager.video_segments_path
self.audio_segments_path = path_manager.audio_segments_path
self.subtitle_segments_path = path_manager.subtitle_segments_path
def add_video(self, available_video):
"""
Adds a single video to the list of downloaded videos.
Args:
available_video (str): The URL of the video to be downloaded.
"""
self.downloaded_video.append({
'type': 'video',
'url': available_video,
'path': os.path.join(self.video_segment_path, "0.ts")
})
def add_audio(self, list_available_audio):
"""
Adds available audio tracks to the list of downloaded audio.
Args:
list_available_audio (list): A list of available audio track objects.
"""
for obj_audio in list_available_audio:
# Check if specific audio languages are set for download
if len(DOWNLOAD_SPECIFIC_AUDIO) > 0:
# Skip this audio track if its language is not in the specified list
if obj_audio.get('language') not in DOWNLOAD_SPECIFIC_AUDIO:
continue
# Construct the full path for the audio segment directory
full_path_audio = os.path.join(self.audio_segments_path, obj_audio.get('language'))
# Append the audio information to the downloaded audio list
self.downloaded_audio.append({
'type': 'audio',
'url': obj_audio.get('uri'),
'language': obj_audio.get('language'),
'path': os.path.join(full_path_audio, "0.ts")
})
def add_subtitle(self, list_available_subtitles):
"""
Adds available subtitles to the list of downloaded subtitles.
Args:
list_available_subtitles (list): A list of available subtitle objects.
"""
for obj_subtitle in list_available_subtitles:
# Check if specific subtitle languages are set for download
if len(DOWNLOAD_SPECIFIC_SUBTITLE) > 0:
# Skip this subtitle if its language is not in the specified list
if obj_subtitle.get('language') not in DOWNLOAD_SPECIFIC_SUBTITLE:
continue
sub_language = obj_subtitle.get('language')
# Construct the full path for the subtitle file
sub_full_path = os.path.join(self.subtitle_segments_path, sub_language + ".vtt")
self.downloaded_subtitle.append({
'type': 'sub',
'url': obj_subtitle.get('uri'),
'language': obj_subtitle.get('language'),
'path': sub_full_path
})
class ContentDownloader:
def __init__(self):
"""
Initializes the ContentDownloader class.
Attributes:
expected_real_time (float): Expected real-time duration of the video download.
"""
self.expected_real_time = None
def download_video(self, downloaded_video):
"""
Downloads the video if it doesn't already exist.
Args:
downloaded_video (list): A list containing information about the video to download.
"""
# Check if the video file already exists
if not os.path.exists(downloaded_video[0].get('path')):
folder_name = os.path.dirname(downloaded_video[0].get('path'))
# Create an instance of M3U8_Segments to handle video segments download
video_m3u8 = M3U8_Segments(downloaded_video[0].get('url'), folder_name)
# Get information about the video segments (e.g., duration, ts files to download)
video_m3u8.get_info()
# Store the expected real-time duration of the video
self.expected_real_time = video_m3u8.expected_real_time
# Download the video streams and print status
video_m3u8.download_streams(f"{Colors.MAGENTA}video")
# Print duration information of the downloaded video
print_duration_table(downloaded_video[0].get('path'))
print("")
else:
console.log("[cyan]Video [red]already exists.")
def download_audio(self, downloaded_audio):
"""
Downloads audio tracks if they don't already exist.
Args:
downloaded_audio (list): A list containing information about audio tracks to download.
"""
for obj_audio in downloaded_audio:
folder_name = os.path.dirname(obj_audio.get('path'))
# Check if the audio file already exists
if not os.path.exists(obj_audio.get('path')):
# Create an instance of M3U8_Segments to handle audio segments download
audio_m3u8 = M3U8_Segments(obj_audio.get('url'), folder_name)
# Get information about the audio segments (e.g., duration, ts files to download)
audio_m3u8.get_info()
# Download the audio segments and print status
audio_m3u8.download_streams(f"{Colors.MAGENTA}audio {Colors.RED}{obj_audio.get('language')}")
# Print duration information of the downloaded audio
print_duration_table(obj_audio.get('path'))
else:
console.log(f"[cyan]Audio [white]([green]{obj_audio.get('language')}[white]) [red]already exists.")
def download_subtitle(self, downloaded_subtitle):
"""
Downloads subtitle files if they don't already exist.
Args:
downloaded_subtitle (list): A list containing information about subtitles to download.
"""
for obj_subtitle in downloaded_subtitle:
sub_language = obj_subtitle.get('language')
# Check if the subtitle file already exists
if os.path.exists(obj_subtitle.get("path")):
console.log(f"[cyan]Subtitle [white]([green]{sub_language}[white]) [red]already exists.")
continue # Skip to the next subtitle if it exists
# Parse the M3U8 file to get the subtitle URI
m3u8_sub_parser = M3U8_Parser()
m3u8_sub_parser.parse_data(
uri=obj_subtitle.get('uri'),
raw_content=httpx.get(obj_subtitle.get('url')).text # Fetch subtitle content
)
# Print the status of the subtitle download
console.print(f"[cyan]Downloading subtitle: [red]{sub_language.lower()}")
# Write the content to the specified file
with open(obj_subtitle.get("path"), "wb") as f:
f.write(HttpClient().get_content(m3u8_sub_parser.subtitle[-1]))
class ContentJoiner:
def __init__(self, path_manager):
"""
Initializes the ContentJoiner class.
Args:
path_manager (PathManager): An instance of PathManager to manage output paths.
"""
self.path_manager: PathManager = path_manager
def setup(self, downloaded_video, downloaded_audio, downloaded_subtitle):
"""
Sets up the content joiner with downloaded media files.
Args:
downloaded_video (list): List of downloaded video information.
downloaded_audio (list): List of downloaded audio information.
downloaded_subtitle (list): List of downloaded subtitle information.
"""
self.downloaded_video = downloaded_video
self.downloaded_audio = downloaded_audio
self.downloaded_subtitle = downloaded_subtitle
# Initialize flags to check if media is available
self.converted_out_path = None
self.there_is_video = (len(downloaded_video) > 0)
self.there_is_audio = (len(downloaded_audio) > 0)
self.there_is_subtitle = (len(downloaded_subtitle) > 0)
# Display the status of available media
table = Table(show_header=False, box=None)
table.add_row(f"[green]Video - audio:", f"[yellow]{self.there_is_audio}")
table.add_row(f"[green]Video - Subtitle:", f"[yellow]{self.there_is_subtitle}")
console.rule("[bold green] JOIN ", style="bold red")
console.print(table)
print("")
# Start the joining process
self.conversione()
def conversione(self):
"""
Handles the joining of video, audio, and subtitles based on availability.
"""
# Join audio and video if audio is available
if self.there_is_audio:
if MERGE_AUDIO:
# Join video with audio tracks
self.converted_out_path = self._join_video_audio()
else:
# Process each available audio track
for obj_audio in self.downloaded_audio:
language = obj_audio.get('language')
path = obj_audio.get('path')
# Set the new path for regular audio
new_path = self.path_manager.output_filename.replace(".mp4", f"_{language}.mp4")
try:
# Rename the audio file to the new path
os.rename(path, new_path)
logging.info(f"Audio moved to {new_path}")
except Exception as e:
logging.error(f"Failed to move audio {path} to {new_path}: {e}")
# Convert video if available
if self.there_is_video:
self.converted_out_path = self._join_video()
# If no audio but video is available, join video
else:
if self.there_is_video:
self.converted_out_path = self._join_video()
# Join subtitles if available
if self.there_is_subtitle:
if MERGE_SUBTITLE:
if self.converted_out_path is not None:
self.converted_out_path = self._join_video_subtitles(self.converted_out_path)
else:
# Process each available subtitle track
for obj_sub in self.downloaded_subtitle:
language = obj_sub.get('language')
path = obj_sub.get('path')
forced = 'forced' in language
# Adjust the language name and set the new path based on forced status
if forced:
language = language.replace("forced-", "")
new_path = self.path_manager.output_filename.replace(".mp4", f".{language}.forced.vtt")
else:
new_path = self.path_manager.output_filename.replace(".mp4", f".{language}.vtt")
try:
# Rename the subtitle file to the new path
os.rename(path, new_path)
logging.info(f"Subtitle moved to {new_path}")
except Exception as e:
logging.error(f"Failed to move subtitle {path} to {new_path}: {e}")
def _join_video(self):
"""
Joins video segments into a single video file.
Returns:
str: The path to the joined video file.
"""
path_join_video = os.path.join(self.path_manager.base_path, "v_v.mp4")
logging.info(f"JOIN video path: {path_join_video}")
# Check if the joined video file already exists
if not os.path.exists(path_join_video):
# Set codec to None if not defined in class
if not hasattr(self, 'codec'):
self.codec = None
# Join the video segments into a single video file
join_video(
video_path=self.downloaded_video[0].get('path'),
out_path=path_join_video,
codec=self.codec
)
else:
console.log("[red]Output join video already exists.")
return path_join_video
def _join_video_audio(self):
"""
Joins video segments with audio tracks into a single video with audio file.
Returns:
str: The path to the joined video with audio file.
"""
path_join_video_audio = os.path.join(self.path_manager.base_path, "v_a.mp4")
logging.info(f"JOIN audio path: {path_join_video_audio}")
# Check if the joined video with audio file already exists
if not os.path.exists(path_join_video_audio):
# Set codec to None if not defined in class
if not hasattr(self, 'codec'):
self.codec = None
# Join the video with audio segments
join_audios(
video_path=self.downloaded_video[0].get('path'),
audio_tracks=self.downloaded_audio,
out_path=path_join_video_audio,
codec=self.codec
)
else:
console.log("[red]Output join video and audio already exists.")
return path_join_video_audio
def _join_video_subtitles(self, input_path):
"""
Joins subtitles with the video.
Args:
input_path (str): The path to the video file to which subtitles will be added.
Returns:
str: The path to the video with subtitles file.
"""
path_join_video_subtitle = os.path.join(self.path_manager.base_path, "v_s.mp4")
logging.info(f"JOIN subtitle path: {path_join_video_subtitle}")
# Check if the video with subtitles file already exists
if not os.path.exists(path_join_video_subtitle):
# Join the video with subtitles
join_subtitle(
input_path,
self.downloaded_subtitle,
path_join_video_subtitle
)
return path_join_video_subtitle
class HLS_Downloader:
def __init__(self, output_filename: str=None, m3u8_playlist: str=None, m3u8_index: str=None, is_playlist_url: bool=True, is_index_url: bool=True):
"""
Initializes the HLS_Downloader class.
Args:
output_filename (str): The desired output filename for the downloaded content.
m3u8_playlist (str): The URL or content of the m3u8 playlist.
m3u8_index (str): The index URL for m3u8 streams.
is_playlist_url (bool): Flag indicating if the m3u8_playlist is a URL.
is_index_url (bool): Flag indicating if the m3u8_index is a URL.
"""
self.output_filename = self._generate_output_filename(output_filename, m3u8_playlist, m3u8_index)
self.path_manager = PathManager(self.output_filename)
self.download_tracker = DownloadTracker(self.path_manager)
self.http_client = HttpClient(headers_index)
self.content_extractor = ContentExtractor()
self.content_downloader = ContentDownloader()
self.content_joiner = ContentJoiner(self.path_manager)
self.m3u8_playlist = m3u8_playlist
self.m3u8_index = m3u8_index
self.is_playlist_url = is_playlist_url
self.is_index_url = is_index_url
self.expected_real_time = None
def _generate_output_filename(self, output_filename, m3u8_playlist, m3u8_index):
"""
Generates a valid output filename based on provided parameters.
Args:
output_filename (str): The desired output filename.
m3u8_playlist (str): The m3u8 playlist URL or content.
m3u8_index (str): The m3u8 index URL.
Returns:
str: The generated output filename.
"""
new_filename = None
# Auto-generate output file name if not present
if output_filename is None:
if m3u8_playlist is not None:
new_filename = os.path.join("missing", compute_sha1_hash(m3u8_playlist))
else:
new_filename = os.path.join("missing", compute_sha1_hash(m3u8_index))
else:
# Check if output_filename contains a folder path
folder, base_name = os.path.split(output_filename)
# If no folder is specified, default to 'undefined'
if not folder:
folder = "undefined"
# Sanitize base name
base_name = reduce_base_name(remove_special_characters(base_name))
create_folder(folder)
if not can_create_file(base_name):
logging.error("Invalid mp4 name.")
sys.exit(0)
# Parse to only ASCII for compatibility across platforms
new_filename = os.path.join(folder, base_name)
new_filename = unidecode(new_filename)
return new_filename
def start(self):
"""
Initiates the downloading process. Checks if the output file already exists and proceeds with processing the playlist or index.
"""
if os.path.exists(self.output_filename):
console.log("[red]Output file already exists.")
return
self.path_manager.create_directories()
# Determine whether to process a playlist or index
if self.m3u8_playlist:
self._process_playlist()
elif self.m3u8_index:
self._process_index()
def _clean(self, out_path: str) -> None:
"""
Cleans up temporary files and folders after downloading and processing.
Args:
out_path (str): The path of the output file to be cleaned up.
"""
def dict_to_seconds(d):
"""Converts a dictionary of time components to total seconds."""
if d is not None:
return d['h'] * 3600 + d['m'] * 60 + d['s']
return 0
# Check if the final output file exists
logging.info(f"Check if end file converted exists: {out_path}")
if out_path is None or not os.path.isfile(out_path):
logging.error("Video file converted does not exist.")
sys.exit(0)
# Rename the output file to the desired output filename if it does not already exist
if not os.path.exists(self.output_filename):
# Rename the converted file to the specified output filename
os.rename(out_path, self.output_filename)
# Get duration information for the output file
end_output_time = print_duration_table(self.output_filename, description=False, return_string=False)
# Calculate file size and duration for reporting
formatted_size = format_file_size(os.path.getsize(self.output_filename))
formatted_duration = print_duration_table(self.output_filename, description=False, return_string=True)
expected_real_seconds = dict_to_seconds(self.content_downloader.expected_real_time)
end_output_seconds = dict_to_seconds(end_output_time)
# Check if the downloaded content is complete based on expected duration
if expected_real_seconds is not None:
missing_ts = not (expected_real_seconds - 3 <= end_output_seconds <= expected_real_seconds + 3)
else:
missing_ts = "Undefined"
# Second check for missing segments
if not missing_ts:
if get_video_duration_s(self.output_filename) < int(expected_real_seconds) - 5:
missing_ts = True
# Prepare the report panel content
panel_content = (
f"[bold green]Download completed![/bold green]\n"
f"[cyan]File size: [bold red]{formatted_size}[/bold red]\n"
f"[cyan]Duration: [bold]{formatted_duration}[/bold]\n"
f"[cyan]Missing TS: [bold red]{missing_ts}[/bold red]"
)
# Display the download completion message
console.print(Panel(
panel_content,
title=f"{os.path.basename(self.output_filename.replace('.mp4', ''))}",
border_style="green"
))
# Handle missing segments
if missing_ts:
os.rename(self.output_filename, self.output_filename.replace(".mp4", "_failed.mp4"))
# Delete all temporary files except for the output file
delete_files_except_one(self.path_manager.base_path, os.path.basename(self.output_filename.replace(".mp4", "_failed.mp4")))
# Remove the base folder if specified
if REMOVE_SEGMENTS_FOLDER:
remove_folder(self.path_manager.base_path)
else:
logging.info("Video file converted already exists.")
def _process_playlist(self):
"""
Processes the m3u8 playlist to download video, audio, and subtitles.
"""
# Retrieve the m3u8 playlist content
if self.is_playlist_url:
m3u8_playlist_text = HttpClient(headers=headers_index).get(self.m3u8_playlist)
m3u8_url_fixer.set_playlist(self.m3u8_playlist)
else:
m3u8_playlist_text = self.m3u8_playlist
# Check if the m3u8 content is valid
if m3u8_playlist_text is None:
console.log("[red]Playlist m3u8 to download is empty.")
sys.exit(0)
# Save the m3u8 playlist text to a temporary file
open(os.path.join(self.path_manager.base_temp, "playlist.m3u8"), "w+", encoding="utf-8").write(m3u8_playlist_text)
# Collect information about the playlist
if self.is_playlist_url:
self.content_extractor.start(self.m3u8_playlist, m3u8_playlist_text)
else:
self.content_extractor.start("https://fake.com", m3u8_playlist_text)
# Add downloaded elements to the tracker
self.download_tracker.add_video(self.content_extractor.m3u8_index)
self.download_tracker.add_audio(self.content_extractor.list_available_audio)
self.download_tracker.add_subtitle(self.content_extractor.list_available_subtitles)
# Download each type of content
if DOWNLOAD_VIDEO and len(self.download_tracker.downloaded_video) > 0:
self.content_downloader.download_video(self.download_tracker.downloaded_video)
if DOWNLOAD_AUDIO and len(self.download_tracker.downloaded_audio) > 0:
self.content_downloader.download_audio(self.download_tracker.downloaded_audio)
if DOWNLOAD_SUBTITLE and len(self.download_tracker.downloaded_subtitle) > 0:
self.content_downloader.download_subtitle(self.download_tracker.downloaded_subtitle)
# Join downloaded content
self.content_joiner.setup(self.download_tracker.downloaded_video, self.download_tracker.downloaded_audio, self.download_tracker.downloaded_subtitle)
# Clean up temporary files and directories
self._clean(self.content_joiner.converted_out_path)
def _process_index(self):
"""
Processes the m3u8 index to download only video.
"""
m3u8_url_fixer.set_playlist(self.m3u8_index)
# Download video
self.download_tracker.add_video(self.m3u8_index)
# Join video
self.content_joiner.setup(self.download_tracker.downloaded_video, [], [])
# Clean up temporary files and directories
self._clean(self.content_joiner.converted_out_path)