Un errore...

This commit is contained in:
Ghost 2024-04-01 10:18:51 +02:00
parent cd3001f6b2
commit 5988f7d53b
10 changed files with 1025 additions and 70 deletions

70
.gitignore vendored
View File

@ -14,8 +14,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
@ -34,21 +32,6 @@ MANIFEST
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
@ -59,38 +42,9 @@ local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
@ -100,29 +54,5 @@ ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Other
Video

BIN
Src/Assets/Un_giorno....png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

View File

@ -0,0 +1,4 @@
# 20.02.24
from .util.installer import check_ffmpeg
from .my_m3u8 import Downloader

View File

@ -0,0 +1 @@
# TO DO

View File

@ -0,0 +1,113 @@
# 29.04.24
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import cmac
from cryptography.hazmat.primitives.asymmetric import rsa as RSA
class M3U8_Decryption:
def __init__(self, key: bytes, iv: bytes = None) -> None:
"""
Initialize the M3U8_Decryption class.
Args:
- key (bytes): Encryption key.
- method (str): Encryption method (e.g., "AES", "Blowfish").
- iv (bytes): Initialization Vector (bytes), default is None.
"""
self.key = key
self.iv = iv
def set_method(self, method: str):
"""
Set the encryption method.
Args:
- method (str): Encryption method (e.g., "AES", "Blowfish").
"""
self.method = method
def parse_key(self, raw_iv: str) -> None:
"""
Parse the raw IV string and set the IV.
Args:
- raw_iv (str): Raw IV string in hexadecimal format (e.g., "43A6D967D5C17290D98322F5C8F6660B").
"""
if "0x" in str(raw_iv):
self.iv = bytes.fromhex(raw_iv.replace("0x", ""))
else:
self.iv = raw_iv
def _check_iv_size(self, expected_size: int) -> None:
"""
Check the size of the initialization vector (IV).
Args:
- expected_size (int): The expected size of the IV.
"""
if self.iv is not None and len(self.iv) != expected_size:
raise ValueError(f"Invalid IV size ({len(self.iv)}) for {self.method}. Expected size: {expected_size}")
def generate_cmac(self, data: bytes) -> bytes:
"""
Generate CMAC (Cipher-based Message Authentication Code).
Args:
- data (bytes): The data to generate CMAC for.
Returns:
- bytes: The CMAC digest.
"""
if self.method == "AES-CMAC":
cipher = Cipher(algorithms.AES(self.key), modes.ECB(), backend=default_backend())
encryptor = cipher.encryptor()
cmac_obj = cmac.CMAC(encryptor)
cmac_obj.update(data)
return cmac_obj.finalize()
else:
raise ValueError("Invalid method")
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt the ciphertext using the specified encryption method.
Args:
- ciphertext (bytes): The ciphertext to decrypt.
Returns:
- bytes: The decrypted data.
"""
if self.method == "AES":
self._check_iv_size(16)
cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend())
elif self.method == "AES-128":
self._check_iv_size(16)
cipher = Cipher(algorithms.AES(self.key[:16]), modes.CBC(self.iv), backend=default_backend())
elif self.method == "AES-128-CTR":
self._check_iv_size(16)
cipher = Cipher(algorithms.AES(self.key[:16]), modes.CTR(self.iv), backend=default_backend())
elif self.method == "Blowfish":
self._check_iv_size(8)
cipher = Cipher(algorithms.Blowfish(self.key), modes.CBC(self.iv), backend=default_backend())
elif self.method == "RSA":
private_key = RSA.import_key(self.key)
cipher = Cipher(algorithms.RSA(private_key), backend=default_backend())
else:
raise ValueError("Invalid or unsupported method")
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
return decrypted_data

View File

@ -0,0 +1,389 @@
# 31.01.24
# Class
from Src.Util.console import console
# Import
import ffmpeg
import hashlib
import os
import logging
import shutil
def has_audio_stream(video_path: str) -> bool:
"""
Check if the input video has an audio stream.
Parameters:
- video_path (str): Path to the input video file.
Returns:
- has_audio (bool): True if the input video has an audio stream, False otherwise.
"""
try:
probe_result = ffmpeg.probe(video_path, select_streams='a')
return bool(probe_result['streams'])
except ffmpeg.Error:
return None
def get_video_duration(file_path: str) -> (float):
"""
Get the duration of a video file.
Args:
file_path (str): The path to the video file.
Returns:
(float): The duration of the video in seconds if successful,
None if there's an error.
"""
try:
# Use FFmpeg probe to get video information
probe = ffmpeg.probe(file_path)
# Extract duration from the video information
return float(probe['format']['duration'])
except ffmpeg.Error as e:
# Handle FFmpeg errors
print(f"Error: {e.stderr}")
return None
def format_duration(seconds: float) -> list[int, int, int]:
"""
Format duration in seconds into hours, minutes, and seconds.
Args:
seconds (float): Duration in seconds.
Returns:
list[int, int, int]: List containing hours, minutes, and seconds.
"""
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return int(hours), int(minutes), int(seconds)
def print_duration_table(file_path: str) -> None:
"""
Print duration of a video file in hours, minutes, and seconds.
Args:
file_path (str): The path to the video file.
"""
video_duration = get_video_duration(file_path)
if video_duration is not None:
# Format duration into hours, minutes, and seconds
hours, minutes, seconds = format_duration(video_duration)
# Print the formatted duration
console.log(f"[cyan]Info [green]'{file_path}': [purple]{int(hours)}[red]h [purple]{int(minutes)}[red]m [purple]{int(seconds)}[red]s")
def compute_sha1_hash(input_string: str) -> (str):
"""
Computes the SHA-1 hash of the input string.
Args:
input_string (str): The string to be hashed.
Returns:
str: The SHA-1 hash of the input string.
"""
# Compute the SHA-1 hash
hashed_string = hashlib.sha1(input_string.encode()).hexdigest()
# Return the hashed string
return hashed_string
# SINGLE SUBTITLE
def add_subtitle(input_video_path: str, input_subtitle_path: str, output_video_path: str, subtitle_language: str = 'ita', prefix: str = "single_sub") -> str:
"""
Convert a video with a single subtitle.
Args:
input_video_path (str): Path to the input video file.
input_subtitle_path (str): Path to the input subtitle file.
output_video_path (str): Path to save the output video file.
subtitle_language (str, optional): Language of the subtitle. Defaults to 'ita'.
prefix (str, optional): Prefix to add at the beginning of the output filename. Defaults to "subtitled".
Returns:
output_video_path (str): Path to the saved output video file.
"""
# Check if input_video_path and input_subtitle_path exist
if not os.path.exists(input_video_path):
raise FileNotFoundError(f"Input video file '{input_video_path}' not found.")
if not os.path.exists(input_subtitle_path):
raise FileNotFoundError(f"Input subtitle file '{input_subtitle_path}' not found.")
# Set up the output file name by modifying the video file name
output_filename = os.path.splitext(os.path.basename(input_video_path))[0]
output_file_name = f"{prefix}_{output_filename}.mp4"
output_video_path = os.path.join(os.path.dirname(output_video_path), output_file_name)
# Input settings
input_video = ffmpeg.input(input_video_path)
input_subtitle = ffmpeg.input(input_subtitle_path)
# Output settings
output_args = {
'c:s': 'mov_text',
'c:v': 'copy',
'c:a': 'copy',
'metadata:s:s:0': 'language=' + subtitle_language,
}
# Combine inputs, map subtitle stream, and output
ffmpeg.output(
input_video,
input_subtitle,
output_video_path,
**output_args
).global_args(
'-map', '0:v',
'-map', '0:a',
'-map', '1:s'
).run()
# Return
return output_video_path
# SEGMENTS
def concatenate_and_save(file_list_path: str, output_filename: str, video_decoding: str = None, audio_decoding: str = None, prefix: str = "segments", output_directory: str = None) -> str:
"""
Concatenate input files and save the output with specified decoding parameters.
Parameters:
- file_list_path (str): Path to the file list containing the segments.
- output_filename (str): Output filename for the concatenated video.
- video_decoding (str): Video decoding parameter (optional).
- audio_decoding (str): Audio decoding parameter (optional).
- prefix (str): Prefix to add at the end of output file name (default is "segments").
- output_directory (str): Directory to save the output file. If not provided, defaults to the current directory.
Returns:
- output_file_path (str): Path to the saved output file.
"""
try:
# Input and output arguments
input_args = {
'format': 'concat',
'safe': 0
}
output_args = {
'c': 'copy',
'loglevel': 'error',
'y': None
}
# Add encoding parameter for video and audio
global_args = []
if video_decoding:
global_args.extend(['-c:v', video_decoding])
if audio_decoding:
global_args.extend(['-c:a', audio_decoding])
# Set up the output file name by modifying the video file name
output_file_name = os.path.splitext(output_filename)[0] + f"_{prefix}.mp4"
# Determine output directory
if output_directory:
output_file_path = os.path.join(output_directory, output_file_name)
else:
output_file_path = output_file_name
# Concatenate input files and output
output = (
ffmpeg.input(file_list_path, **input_args)
.output(output_file_path, **output_args)
.global_args(*global_args)
)
# Execute the process
process = output.run()
except ffmpeg.Error as ffmpeg_error:
logging.error(f"Error saving MP4: {ffmpeg_error.stdout}")
return ""
# Remove the temporary file list and folder and completely remove tmp folder
logging.info("Cleanup...")
os.remove(file_list_path)
shutil.rmtree("tmp", ignore_errors=True)
# Return
return output_file_path
# AUDIOS
def join_audios(video_path: str, audio_tracks: list[dict[str, str]], prefix: str = "merged") -> str:
"""
Join video with multiple audio tracks and sync them if there are matching segments.
Parameters:
- video_path (str): Path to the video file.
- audio_tracks (List[Dict[str, str]]): A list of dictionaries, where each dictionary contains 'audio_path'.
- prefix (str, optional): Prefix to add at the beginning of the output filename. Defaults to "merged".
Returns:
- out_path (str): Path to the saved output video file.
"""
try:
# Check if video_path exists
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file '{video_path}' not found.")
# Create input streams for video and audio using ffmpeg's.
video_stream = ffmpeg.input(video_path)
# Create a list to store audio streams and map arguments
audio_streams = []
map_arguments = []
# Iterate through audio tracks
for i, audio_track in enumerate(audio_tracks):
audio_path = audio_track.get('path', '')
# Check if audio_path exists
if audio_path:
if not os.path.exists(audio_path):
logging.warning(f"Audio file '{audio_path}' not found.")
continue
audio_stream = ffmpeg.input(audio_path)
audio_streams.append(audio_stream)
map_arguments.extend(['-map', f'{i + 1}:a:0'])
# Set up a process to combine the video and audio streams and create an output file with .mp4 extension.
output_file_name = f"{prefix}_{os.path.splitext(os.path.basename(video_path))[0]}.mp4"
out_path = os.path.join(os.path.dirname(video_path), output_file_name)
# Output arguments
output_args = {
'vcodec': 'copy',
'acodec': 'copy',
'loglevel': 'error'
}
# Combine inputs, map audio streams, and set output
process = (
ffmpeg.output(
video_stream,
*audio_streams,
out_path,
**output_args
)
.global_args(
'-map', '0:v:0',
*map_arguments,
'-shortest',
'-strict', 'experimental',
)
.run(overwrite_output=True)
)
logging.info("[M3U8_Downloader] Merge completed successfully.")
# Return
return out_path
except ffmpeg.Error as ffmpeg_error:
logging.error("[M3U8_Downloader] Ffmpeg error: %s", ffmpeg_error)
return ""
# SUBTITLES
def transcode_with_subtitles(video: str, subtitles_list: list[dict[str, str]], output_file: str, prefix: str = "transcoded") -> str:
"""
Transcode a video with subtitles.
Args:
- video (str): Path to the input video file.
- subtitles_list (list[dict[str, str]]): List of dictionaries containing subtitles information.
- output_file (str): Path to the output transcoded video file.
- prefix (str): Prefix to add to the output file name. Default is "transcoded".
Returns:
- str: Path to the transcoded video file.
"""
try:
# Check if the input video file exists
if not os.path.exists(video):
raise FileNotFoundError(f"Video file '{video}' not found.")
# Get input video from video path
input_ffmpeg = ffmpeg.input(video)
input_video = input_ffmpeg['v']
input_audio = input_ffmpeg['a']
# List with subtitles path and metadata
input_subtitles = []
metadata = {}
# Iterate through subtitle tracks
for idx, sub_dict in enumerate(subtitles_list):
# Get path and name of subtitles
sub_file = sub_dict.get('path')
title = sub_dict.get('name')
# Check if the subtitle file exists
if not os.path.exists(sub_file):
raise FileNotFoundError(f"Subtitle file '{sub_file}' not found.")
# Append ffmpeg input to list
input_ffmpeg_sub = ffmpeg.input(sub_file)
input_subtitles.append(input_ffmpeg_sub['s'])
# Add metadata for title
metadata[f'metadata:s:s:{idx}'] = f"title={title}"
# Check if the input video has an audio stream
logging.info(f"There is audio: {has_audio_stream(video)}")
# Set up the output file name by adding the prefix
output_filename = f"{prefix}_{os.path.splitext(os.path.basename(video))[0]}.mkv"
output_file = os.path.join(os.path.dirname(output_file), output_filename)
# Configure ffmpeg output
output_ffmpeg = ffmpeg.output(
input_video,
*(input_audio,) if has_audio_stream(video) else (), # If there is no audio stream
*input_subtitles,
output_file,
vcodec='copy',
acodec='copy' if has_audio_stream(video) else (), # If there is no audio stream
**metadata,
loglevel='error'
)
# Overwrite output file if exists
output_ffmpeg = ffmpeg.overwrite_output(output_ffmpeg)
# Run ffmpeg command
ffmpeg.run(output_ffmpeg, overwrite_output=True)
# Rename video from mkv -> mp4
output_filename_mp4 = output_file.replace("mkv", "mp4")
os.rename(output_file, output_filename_mp4)
return output_filename_mp4
except ffmpeg.Error as ffmpeg_error:
print(f"Error: {ffmpeg_error}")
return ""

View File

@ -0,0 +1,143 @@
# 24.01.2023
# Class
from Src.Util.console import console
# Import
import subprocess
import os
import requests
import zipfile
import sys
import ctypes
# [ func ]
def isAdmin() -> (bool):
"""
Check if the current user has administrative privileges.
Returns:
bool: True if the user is an administrator, False otherwise.
"""
try:
is_admin = (os.getuid() == 0)
except AttributeError:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
return is_admin
def get_version():
"""
Get the version of FFmpeg installed on the system.
This function runs the 'ffmpeg -version' command to retrieve version information
about the installed FFmpeg binary.
"""
try:
# Run the FFmpeg command to get version information
output = subprocess.check_output(['ffmpeg', '-version'], stderr=subprocess.STDOUT, universal_newlines=True)
# Extract version information from the output
version_lines = [line for line in output.split('\n') if line.startswith('ffmpeg version')]
if version_lines:
# Extract version number from the version line
version = version_lines[0].split(' ')[2]
console.print(f"[blue]FFmpeg version: [red]{version}")
except subprocess.CalledProcessError as e:
# If there's an error executing the FFmpeg command
print("Error executing FFmpeg command:", e.output.strip())
raise e
def download_ffmpeg():
"""
Download FFmpeg binary for Windows and add it to the system PATH.
This function downloads the FFmpeg binary zip file from the specified URL,
extracts it to a directory named 'ffmpeg', and adds the 'bin' directory of
FFmpeg to the system PATH so that it can be accessed from the command line.
"""
# SInizializate start variable
ffmpeg_url = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-git-full.7z"
ffmpeg_dir = "ffmpeg"
print("[yellow]Downloading FFmpeg...[/yellow]")
try:
response = requests.get(ffmpeg_url)
# Create the directory to extract FFmpeg if it doesn't exist
os.makedirs(ffmpeg_dir, exist_ok=True)
# Save the zip file
zip_file_path = os.path.join(ffmpeg_dir, "ffmpeg.zip")
with open(zip_file_path, "wb") as zip_file:
zip_file.write(response.content)
# Extract the zip file
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(ffmpeg_dir)
# Add the FFmpeg directory to the system PATH
ffmpeg_bin_dir = os.path.join(os.getcwd(), ffmpeg_dir, "bin")
os.environ["PATH"] += os.pathsep + ffmpeg_bin_dir
# Remove the downloaded zip file
os.remove(zip_file_path)
except requests.RequestException as e:
# If there's an issue with downloading FFmpeg
print(f"Failed to download FFmpeg: {e}")
raise e
except zipfile.BadZipFile as e:
# If the downloaded file is not a valid zip file
print(f"Failed to extract FFmpeg zip file: {e}")
raise e
def check_ffmpeg():
"""
Check if FFmpeg is installed and available on the system PATH.
This function checks if FFmpeg is installed and available on the system PATH.
If FFmpeg is found, it prints its version. If not found, it attempts to download
FFmpeg and add it to the system PATH.
"""
console.print("[green]Checking FFmpeg...")
try:
# Try running the FFmpeg command to check if it exists
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
console.print("[blue]FFmpeg is installed. \n")
# Get and print FFmpeg version
#get_version()
except subprocess.CalledProcessError:
try:
# If FFmpeg is not found, attempt to download and add it to the PATH
console.print("[cyan]FFmpeg is not found in the PATH. Downloading and adding to the PATH...[/cyan]")
# Check if user has admin privileges
if not isAdmin():
console.log("[red]You need to be admin to proceed!")
sys.exit(0)
# Download FFmpeg and add it to the PATH
download_ffmpeg()
sys.exit(0)
except Exception as e:
# If unable to download or add FFmpeg to the PATH
console.print("[red]Unable to download or add FFmpeg to the PATH.[/red]")
console.print(f"Error: {e}")
sys.exit(0)

View File

@ -0,0 +1,39 @@
# 29.02.24
from Src.Util.os import format_size
class TSFileSizeCalculator:
def __init__(self):
"""
Initialize the TSFileSizeCalculator object.
Args:
num_segments (int): The number of segments.
"""
self.ts_file_sizes = []
def add_ts_file_size(self, size: int):
"""
Add a file size to the list of file sizes.
Args:
size (float): The size of the ts file to be added.
"""
self.ts_file_sizes.append(size)
def calculate_total_size(self):
"""
Calculate the total size of the files.
Returns:
float: The mean size of the files in a human-readable format.
"""
if len(self.ts_file_sizes) == 0:
return 0
total_size = sum(self.ts_file_sizes)
mean_size = total_size / len(self.ts_file_sizes)
# Return format mean
return format_size(mean_size)

View File

@ -0,0 +1,288 @@
# 29.04.25
# Class import
from Src.Util.headers import get_headers
# Import
from m3u8 import M3U8
import logging
import requests
class M3U8_Parser:
def __init__(self, DOWNLOAD_SPECIFIC_SUBTITLE = None):
"""
Initializes M3U8_Parser with empty lists for segments, playlists, keys, and subtitles.
"""
self.segments = []
self.video_playlist = []
self.keys = {}
self.subtitle_playlist = [] # No vvt ma url a vvt
self.subtitle = [] # Url a vvt
self.audio_ts = []
self.DOWNLOAD_SPECIFIC_SUBTITLE = DOWNLOAD_SPECIFIC_SUBTITLE
def parse_data(self, m3u8_content: str) -> (None):
"""
Extracts all information present in the provided M3U8 content.
Args:
- m3u8_content (str): The content of the M3U8 file.
"""
try:
# Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles
m3u8_obj = M3U8(m3u8_content)
# Collect video info with url, resolution and codecs
for playlist in m3u8_obj.playlists:
self.video_playlist.append({
"uri": playlist.uri,
"width": playlist.stream_info.resolution,
"codecs": playlist.stream_info.codecs
})
# Collect info of encryption if present, method, uri and iv
for key in m3u8_obj.keys:
if key is not None:
self.keys = ({
"method": key.method,
"uri": key.uri,
"iv": key.iv
})
# Collect info of subtitles, type, name, language and uri
# for audio and subtitles
for media in m3u8_obj.media:
if media.type == "SUBTITLES":
self.subtitle_playlist.append({
"type": media.type,
"name": media.name,
"default": media.default,
"language": media.language,
"uri": media.uri
})
if media.type == "AUDIO":
self.audio_ts.append({
"type": media.type,
"name": media.name,
"default": media.default,
"language": media.language,
"uri": media.uri
})
# Collect info about url of subtitles or segmenets
# m3u8 playlist
# m3u8 index
for segment in m3u8_obj.segments:
# Collect uri of request to vtt
if "vtt" not in segment.uri:
self.segments.append(segment.uri)
# Collect info of subtitle
else:
self.subtitle.append(segment.uri)
except Exception as e:
logging.error(f"[M3U8_Parser] Error parsing M3U8 content: {e}")
def get_resolution(self, uri: str) -> (int):
"""
Gets the resolution from the provided URI.
Args:
- uri (str): The URI to extract resolution from.
Returns:
- int: The resolution if found, otherwise 0.
"""
if '1080' in uri:
return 1080
elif '720' in uri:
return 720
elif '480' in uri:
return 480
else:
return 0
def get_best_quality(self) -> (dict):
"""
Returns the URI of the M3U8 playlist with the best quality.
Returns:
- str: The URI of the M3U8 playlist with the best quality and decoding if present, otherwise return None
"""
if self.video_playlist:
try:
# Sort the list of video playlist items based on the 'width' attribute in descending order.
# The 'width' attribute is extracted using the lambda function as the sorting key.
sorted_uris = sorted(self.video_playlist, key=lambda x: x['width'], reverse=True)
# And get the first with best resolution
return sorted_uris[0]
except:
logging.error("[M3U8_Parser] Error: Can't find M3U8 resolution by width...")
logging.info("[M3U8_Parser] Try searching in URI")
# Sort the list of video playlist items based on the 'width' attribute if present,
# otherwise, use the resolution obtained from the 'uri' attribute as a fallback.
# Sorting is done in descending order (reverse=True).
sorted_uris = sorted(self.video_playlist, key=lambda x: x.get('width') if x.get('width') is not None else self.get_resolution(x.get('uri')), reverse=True)
# And get the first with best resolution
return sorted_uris[0]
else:
logging.info("[M3U8_Parser] No video playlists found.")
return None
def get_subtitles(self):
"""
Download all subtitles if present.
Return:
- list: list of subtitle with [name_language, uri] or None if there is no subtitle
"""
# Create full path where store data of subtitle
logging.info("Download subtitle ...")
if self.subtitle_playlist:
output = []
# For all subtitle find
for sub_info in self.subtitle_playlist:
# Get language name
name_language = sub_info.get("language")
logging.info(f"[M3U8_Parser] Find subtitle: {name_language}")
# Check if there is custom subtitles to download
if len(self.DOWNLOAD_SPECIFIC_SUBTITLE) > 0:
# Check if language in list
if name_language not in self.DOWNLOAD_SPECIFIC_SUBTITLE:
continue
# Make request to m3u8 subtitle to extract vtt
logging.info(f"[M3U8_Parser] Download subtitle: {name_language}")
req_sub_content = requests.get(sub_info.get("uri"), headers={'user-agent': get_headers()})
try:
# Try extract vtt url
sub_parse = M3U8_Parser()
sub_parse.parse_data(req_sub_content.text)
url_subititle = sub_parse.subtitle[0]
# Add name and url to output list
output.append({
'name': sub_info.get('name'),
'language': name_language,
'uri': url_subititle
})
except Exception as e:
logging.error(f"[M3U8_Parser] Cant donwload: {name_language}, error: {e}")
# Return
return output
else:
logging.info("[M3U8_Parser] No subtitle find")
return None
def get_track_audios(self) -> list:
"""
Return a list of available audio files with dictionaries {'language': xx, 'uri: xx}
Returns:
list: A list of dictionaries containing language and URI information for audio tracks, or None if no audio tracks are found.
"""
logging.info(f"[M3U8_Parser] Finding {len(self.audio_ts)} playlist(s) with audio.")
if self.audio_ts:
logging.info("[M3U8_Parser] Getting list of available audio names")
list_output = []
# For all languages present in m3u8
for obj_audio in self.audio_ts:
# Add language and URI
list_output.append({
'language': obj_audio.get('language'),
'uri': obj_audio.get('uri')
})
# Return
return list_output
else:
logging.info("[M3U8_Parser] No audio tracks found")
return None
def get_default_subtitle(self):
"""
Retrieves the default subtitle information from the subtitle playlist.
Returns:
dict: A dictionary containing the name and URI of the default subtitle, or None if no default subtitle is found.
"""
dict_default_sub = None
# Check if there are subtitles in the playlist
if self.subtitle_playlist:
# Iterate through each subtitle in the playlist
for sub_info in self.subtitle_playlist:
# Check if the subtitle is marked as default
is_default = sub_info.get("default")
if is_default == "YES":
dict_default_sub = {
'name': sub_info.get('name'),
'uri': sub_info.get('uri'),
}
# Return the default subtitle dictionary
return dict_default_sub
def get_default_track_audio(self):
"""
Retrieves the default audio track information from the audio_ts list.
Returns:
dict: A dictionary containing the name and URI of the default audio track, or None if no default audio track is found.
"""
dict_default_audio = None
# Check if there are audio tracks in the list
if self.audio_ts:
# Iterate through each audio track object in the list
for obj_audio in self.audio_ts:
# Check if the audio track is marked as default
is_default = obj_audio.get("default")
if is_default == "YES":
dict_default_audio = {
'name': obj_audio.get('name'),
'uri': obj_audio.get('uri'),
}
# Return the default audio track dictionary
return dict_default_audio

View File

@ -0,0 +1,48 @@
# 29.03.24
# Import
import logging
import sys
from urllib.parse import urlparse, urljoin
class M3U8_UrlFix:
def __init__(self) -> None:
# Initialize the class with an empty playlist URL
self.url_playlist: str = ""
def set_playlist(self, url: str) -> None:
"""
Set the M3U8 playlist URL.
Parameters:
- url (str): The M3U8 playlist URL.
"""
self.url_playlist = url
def generate_full_url(self, url_resource: str) -> str:
"""
Generate a full URL for a given resource using the base URL from the playlist.
Parameters:
- url_resource (str): The relative URL of the resource within the playlist.
Returns:
- str: The full URL for the specified resource.
"""
# Check if m3u8 url playlist is present
if self.url_playlist == None:
logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present")
sys.exit(0)
# Parse the playlist URL to extract the base URL components
parsed_playlist_url = urlparse(self.url_playlist)
# Construct the base URL using the scheme, netloc, and path from the playlist URL
base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}"
# Join the base URL with the relative resource URL to get the full URL
full_url = urljoin(base_url, url_resource)
return full_url