2024-04-28 18:17:18 +02:00

442 lines
14 KiB
Python

# 31.01.24
import subprocess
import os
import json
import logging
import shutil
from typing import Tuple, List, Dict
# External libraries
import ffmpeg
# Internal utilities
from Src.Util.console import console
from Src.Util.config import config_manager
# Variable
DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug")
DEBUG_FFMPEG = "debug" if DEBUG_MODE else "error"
USE_CODECS = config_manager.get_bool("M3U8", "use_codecs")
def has_audio_stream(video_path: str) -> bool:
"""
Check if the input video has an audio stream.
Parameters:
- video_path (str): Path to the input video file.
Returns:
- has_audio (bool): True if the input video has an audio stream, False otherwise.
"""
try:
ffprobe_cmd = ['ffprobe', '-v', 'error', '-print_format', 'json', '-select_streams', 'a', '-show_streams', video_path]
result = subprocess.run(ffprobe_cmd, capture_output=True, text=True, check=True)
# Parse JSON output
probe_result = json.loads(result.stdout)
# Check if there are audio streams
return bool(probe_result.get('streams', []))
except subprocess.CalledProcessError as e:
logging.error(f"Error: {e.stderr}")
return None
def get_video_duration(file_path: str) -> (float):
"""
Get the duration of a video file.
Args:
file_path (str): The path to the video file.
Returns:
(float): The duration of the video in seconds if successful,
None if there's an error.
"""
try:
ffprobe_cmd = ['ffprobe', '-v', 'error', '-show_format', '-print_format', 'json', file_path]
result = subprocess.run(ffprobe_cmd, capture_output=True, text=True, check=True)
# Parse JSON output
probe_result = json.loads(result.stdout)
# Extract duration from the video information
return float(probe_result['format']['duration'])
except subprocess.CalledProcessError as e:
logging.error(f"Error: {e.stderr}")
return None
def format_duration(seconds: float) -> Tuple[int, int, int]:
"""
Format duration in seconds into hours, minutes, and seconds.
Args:
seconds (float): Duration in seconds.
Returns:
list[int, int, int]: List containing hours, minutes, and seconds.
"""
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return int(hours), int(minutes), int(seconds)
def print_duration_table(file_path: str) -> None:
"""
Print duration of a video file in hours, minutes, and seconds.
Args:
file_path (str): The path to the video file.
"""
video_duration = get_video_duration(file_path)
if video_duration is not None:
# Format duration into hours, minutes, and seconds
hours, minutes, seconds = format_duration(video_duration)
# Print the formatted duration
console.log(f"[cyan]Info [green]'{file_path}': [purple]{int(hours)}[red]h [purple]{int(minutes)}[red]m [purple]{int(seconds)}[red]s")
def get_ts_resolution(ts_file_path):
"""
Get the resolution of a TS (MPEG Transport Stream) file using ffprobe.
Args:
- ts_file_path (str): The file path to the TS file.
Returns:
- tuple: A tuple containing the width and height of the video stream in the TS file.
If resolution information is not available, returns (None, None).
Example:
If `ts_file_path` points to a TS file with video resolution 1920x1080,
the function will return (1920, 1080).
"""
# Run ffprobe command to get video stream information
ffprobe_cmd = ['ffprobe', '-v', 'error', '-show_entries', 'stream=width,height', '-of', 'json', ts_file_path]
try:
# Execute ffprobe command and capture output
output = subprocess.check_output(ffprobe_cmd, stderr=subprocess.STDOUT)
# Decode JSON output
info = json.loads(output)
# Check if there are streams
if 'streams' in info:
for stream in info['streams']:
# Check if stream is video
if stream.get('codec_type') == 'video':
# Extract width and height
width = stream.get('width')
height = stream.get('height')
return width, height
except subprocess.CalledProcessError as e:
logging.error("Error running ffprobe:", e)
# If no resolution information found, return None
return None, None
def add_subtitle(input_video_path: str, input_subtitle_path: str, output_video_path: str, subtitle_language: str = 'ita', prefix: str = "single_sub") -> str:
"""
Convert a video with a single subtitle.
Args:
input_video_path (str): Path to the input video file.
input_subtitle_path (str): Path to the input subtitle file.
output_video_path (str): Path to save the output video file.
subtitle_language (str, optional): Language of the subtitle. Defaults to 'ita'.
prefix (str, optional): Prefix to add at the beginning of the output filename. Defaults to "subtitled".
Returns:
output_video_path (str): Path to the saved output video file.
"""
# Check if input_video_path and input_subtitle_path exist
if not os.path.exists(input_video_path):
raise FileNotFoundError(f"Input video file '{input_video_path}' not found.")
if not os.path.exists(input_subtitle_path):
raise FileNotFoundError(f"Input subtitle file '{input_subtitle_path}' not found.")
# Set up the output file name by modifying the video file name
output_filename = os.path.splitext(os.path.basename(input_video_path))[0]
output_file_name = f"{prefix}_{output_filename}.mp4"
output_video_path = os.path.join(os.path.dirname(output_video_path), output_file_name)
# Input settings
input_video = ffmpeg.input(input_video_path)
input_subtitle = ffmpeg.input(input_subtitle_path)
# Output settings
output_args = {
'c:s': 'mov_text',
'c:v': 'copy',
'c:a': 'copy',
'metadata:s:s:0': 'language=' + subtitle_language,
}
# Combine inputs, map subtitle stream, and output
ffmpeg.output(
input_video,
input_subtitle,
output_video_path,
**output_args
).global_args(
'-map', '0:v',
'-map', '0:a',
'-map', '1:s'
).run()
# Return
return output_video_path
def concatenate_and_save(file_list_path: str, output_filename: str, v_codec: str = None, a_codec: str = None, bandwidth: int = None, prefix: str = "segments", output_directory: str = None):
"""
Concatenate input files and save the output with specified decoding parameters.
Parameters:
- file_list_path (str): Path to the file list containing the segments.
- output_filename (str): Output filename for the concatenated video.
- v_codec (str): Video decoding parameter (optional).
- a_codec (str): Audio decoding parameter (optional).
- bandwidth (int): Bitrate for the output video (optional).
- prefix (str): Prefix to add at the end of output file name (default is "segments").
- output_directory (str): Directory to save the output file. If not provided, defaults to the current directory.
- codecs (str): Codecs for video and audio (optional).
Returns:
- output_file_path (str): Path to the saved output file.
"""
try:
# Output arguments
output_args = {
'c': 'copy',
'loglevel': DEBUG_FFMPEG,
}
# Set up the output file name by modifying the video file name
output_file_name = output_filename
output_file_path = os.path.join(output_directory, output_file_name) if output_directory else output_file_name
# Concatenate input file list and output
output = (
ffmpeg.input(file_list_path, safe=0, f='concat')
.output(output_file_path, **output_args)
)
# Overwrite output file if exists
output = ffmpeg.overwrite_output(output)
# Execute the process
process = output.run()
except ffmpeg.Error as ffmpeg_error:
logging.error(f"Error saving MP4: {ffmpeg_error.stderr.decode('utf-8')}")
return ""
# Remove the temporary file list and folder and completely remove tmp folder
os.remove(file_list_path)
shutil.rmtree("tmp", ignore_errors=True)
return output_file_path
def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], prefix: str = "merged") -> str:
"""
Join video with multiple audio tracks and sync them if there are matching segments.
Parameters:
- video_path (str): Path to the video file.
- audio_tracks (List[Dict[str, str]]): A list of dictionaries, where each dictionary contains 'audio_path'.
- prefix (str, optional): Prefix to add at the beginning of the output filename. Defaults to "merged".
Returns:
- out_path (str): Path to the saved output video file.
"""
try:
# Check if video_path exists
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file '{video_path}' not found.")
# Create input streams for video and audio using ffmpeg's.
video_stream = ffmpeg.input(video_path)
# Create a list to store audio streams and map arguments
audio_streams = []
map_arguments = []
# Iterate through audio tracks
for i, audio_track in enumerate(audio_tracks):
audio_path = audio_track.get('path', '')
# Check if audio_path exists
if audio_path:
if not os.path.exists(audio_path):
logging.warning(f"Audio file '{audio_path}' not found.")
continue
audio_stream = ffmpeg.input(audio_path)
audio_streams.append(audio_stream)
map_arguments.extend(['-map', f'{i + 1}:a:0'])
# Set up a process to combine the video and audio streams and create an output file with .mp4 extension.
output_file_name = f"{prefix}_{os.path.splitext(os.path.basename(video_path))[0]}.mp4"
out_path = os.path.join(os.path.dirname(video_path), output_file_name)
# Output arguments
output_args = {
'vcodec': 'copy',
'acodec': 'copy',
'loglevel': DEBUG_FFMPEG
}
# Combine inputs, map audio streams, and set output
output = (
ffmpeg.output(
video_stream,
*audio_streams,
out_path,
**output_args
)
.global_args(
'-map', '0:v:0',
*map_arguments,
'-shortest',
'-strict', 'experimental',
)
)
# Overwrite output file if exists
output = ffmpeg.overwrite_output(output)
# Retrieve the command that will be executed
command = output.compile()
logging.info(f"Execute command: {command}")
# Execute the process
process = output.run()
logging.info("[M3U8_Downloader] Merge completed successfully.")
# Return
return out_path
except ffmpeg.Error as ffmpeg_error:
logging.error("[M3U8_Downloader] Ffmpeg error: %s", ffmpeg_error)
return ""
def transcode_with_subtitles(video: str, subtitles_list: List[Dict[str, str]], output_file: str, prefix: str = "transcoded") -> str:
"""
Transcode a video with subtitles.
Args:
- video (str): Path to the input video file.
- subtitles_list (list[dict[str, str]]): List of dictionaries containing subtitles information.
- output_file (str): Path to the output transcoded video file.
- prefix (str): Prefix to add to the output file name. Default is "transcoded".
Returns:
- str: Path to the transcoded video file.
"""
try:
# Check if the input video file exists
if not os.path.exists(video):
raise FileNotFoundError(f"Video file '{video}' not found.")
# Get input video from video path
input_ffmpeg = ffmpeg.input(video)
input_video = input_ffmpeg['v']
input_audio = input_ffmpeg['a']
# List with subtitles path and metadata
input_subtitles = []
metadata = {}
# Iterate through subtitle tracks
for idx, sub_dict in enumerate(subtitles_list):
# Get path and name of subtitles
sub_file = sub_dict.get('path')
title = sub_dict.get('name')
# Check if the subtitle file exists
if not os.path.exists(sub_file):
raise FileNotFoundError(f"Subtitle file '{sub_file}' not found.")
# Append ffmpeg input to list
input_ffmpeg_sub = ffmpeg.input(sub_file)
input_subtitles.append(input_ffmpeg_sub['s'])
# Add metadata for title
metadata[f'metadata:s:s:{idx}'] = f"title={title}"
# Check if the input video has an audio stream
logging.info(f"There is audio: {has_audio_stream(video)}")
# Set up the output file name by adding the prefix
output_filename = f"{prefix}_{os.path.splitext(os.path.basename(video))[0]}.mkv"
output_file = os.path.join(os.path.dirname(output_file), output_filename)
# Configure ffmpeg output
output = ffmpeg.output(
input_video,
*(input_audio,) if has_audio_stream(video) else (), # If there is no audio stream
*input_subtitles,
output_file,
vcodec='copy',
acodec='copy' if has_audio_stream(video) else (), # If there is no audio stream
**metadata,
loglevel=DEBUG_FFMPEG
)
# Overwrite output file if exists
output = ffmpeg.overwrite_output(output)
# Retrieve the command that will be executed
command = output.compile()
logging.info(f"Execute command: {command}")
# Run ffmpeg command
ffmpeg.run(output, overwrite_output=True)
# Rename video from mkv -> mp4
output_filename_mp4 = output_file.replace("mkv", "mp4")
os.rename(output_file, output_filename_mp4)
return output_filename_mp4
except ffmpeg.Error as ffmpeg_error:
print(f"Error: {ffmpeg_error}")
return ""