diff --git a/Src/Lib/FFmpeg/my_m3u8.py b/Src/Lib/FFmpeg/my_m3u8.py index 4d6fa8c..2333b4e 100644 --- a/Src/Lib/FFmpeg/my_m3u8.py +++ b/Src/Lib/FFmpeg/my_m3u8.py @@ -395,52 +395,6 @@ class M3U8_Segments: # Refresh progress bar progress_counter.refresh() - def join(self, output_filename: str): - """ - Join all segments file to a mp4 file name - !! NOT USED IN THIS VERSION - - Parameters: - - video_decoding(str): video decoding to use with ffmpeg for only video - - audio_decoding(str): audio decoding to use with ffmpeg for only audio - - output_filename (str): The name of the output mp4 file. - """ - - # Print output of failed segments if present - if len(failed_segments) > 0: - logging.error(f"[M3U8_Segments] Failed segments = {failed_segments}") - logging.warning("[M3U8_Segments] Audio and video can be out of sync !!!") - console.log("[red]Audio and video can be out of sync !!!") - - # Get current dir and create file_list with path of all ts file - current_dir = os.path.dirname(os.path.realpath(__file__)) - file_list_path = os.path.join(current_dir, 'file_list.txt') - - # Sort files (1.ts, 2.ts, ...) - ts_files = [f for f in os.listdir(self.temp_folder) if f.endswith(".ts")] - def extract_number(file_name): - return int(''.join(filter(str.isdigit, file_name))) - ts_files.sort(key=extract_number) - - # Check if there is file to json - if len(ts_files) < MIN_TS_FILES_IN_FOLDER: - logging.error(f"No .ts file to join in folder: {self.temp_folder}") - raise - - # Save files sorted in a txt file with absolute path to fix problem with ( C:\\path (win)) - with open(file_list_path, 'w') as file_list: - for ts_file in ts_files: - absolute_path = os.path.abspath(os.path.join(self.temp_folder, ts_file)) - file_list.write(f"file '{absolute_path}'\n") - - console.log("[cyan]Start joining all files") - - # ADD IF - concatenate_and_save( - file_list_path = file_list_path, - output_filename = output_filename - ) - class Downloader(): def __init__(self, output_filename: str = None, m3u8_playlist:str = None, m3u8_index:str = None, key: str = None): @@ -951,19 +905,4 @@ class Downloader(): # Clean up folder of all tmp folder and tmp with .ts segments folder if REMOVE_SEGMENTS_FOLDER: self.cleanup_tmp() - - else: - logging.info(f"Download m3u8 from index.") - - # Add full URL of the M3U8 playlist to fix next .ts without https if necessary - class_urlFix.set_playlist(self.m3u8_index) - - logging.info("Download videos ...") - self.manage_video() - - # Convert video segments to mp4 - self.download_videos() - - # Clean up folder of all tmp folder and tmp with .ts segments folder - if REMOVE_SEGMENTS_FOLDER: - self.cleanup_tmp(is_index = True) + \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/decryption.py b/Src/Lib/FFmpeg/util/decryption.py index d30ae09..485b2d0 100644 --- a/Src/Lib/FFmpeg/util/decryption.py +++ b/Src/Lib/FFmpeg/util/decryption.py @@ -10,131 +10,6 @@ from Crypto.Cipher import AES from Crypto.Util.Padding import unpad -class AES_ECB: - def __init__(self, key: bytes) -> None: - """ - Initialize AES ECB mode encryption/decryption object. - - Args: - key (bytes): The encryption key. - - Returns: - None - """ - self.key = key - - def encrypt(self, plaintext: bytes) -> bytes: - """ - Encrypt plaintext using AES ECB mode. - - Args: - plaintext (bytes): The plaintext to encrypt. - - Returns: - bytes: The encrypted ciphertext. - """ - cipher = AES.new(self.key, AES.MODE_ECB) - return cipher.encrypt(plaintext) - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt ciphertext using AES ECB mode. - - Args: - ciphertext (bytes): The ciphertext to decrypt. - - Returns: - bytes: The decrypted plaintext. - """ - cipher = AES.new(self.key, AES.MODE_ECB) - decrypted_data = cipher.decrypt(ciphertext) - return unpad(decrypted_data, AES.block_size) - - -class AES_CBC: - def __init__(self, key: bytes, iv: bytes) -> None: - """ - Initialize AES CBC mode encryption/decryption object. - - Args: - key (bytes): The encryption key. - iv (bytes): The initialization vector. - - Returns: - None - """ - self.key = key - self.iv = iv - - def encrypt(self, plaintext: bytes) -> bytes: - """ - Encrypt plaintext using AES CBC mode. - - Args: - plaintext (bytes): The plaintext to encrypt. - - Returns: - bytes: The encrypted ciphertext. - """ - cipher = AES.new(self.key, AES.MODE_CBC, iv=self.iv) - return cipher.encrypt(plaintext) - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt ciphertext using AES CBC mode. - - Args: - ciphertext (bytes): The ciphertext to decrypt. - - Returns: - bytes: The decrypted plaintext. - """ - cipher = AES.new(self.key, AES.MODE_CBC, iv=self.iv) - decrypted_data = cipher.decrypt(ciphertext) - return unpad(decrypted_data, AES.block_size) - - -class AES_CTR: - def __init__(self, key: bytes, nonce: bytes) -> None: - """ - Initialize AES CTR mode encryption/decryption object. - - Args: - key (bytes): The encryption key. - nonce (bytes): The nonce value. - - Returns: - None - """ - self.key = key - self.nonce = nonce - - def encrypt(self, plaintext: bytes) -> bytes: - """ - Encrypt plaintext using AES CTR mode. - - Args: - plaintext (bytes): The plaintext to encrypt. - - Returns: - bytes: The encrypted ciphertext. - """ - cipher = AES.new(self.key, AES.MODE_CTR, nonce=self.nonce) - return cipher.encrypt(plaintext) - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt ciphertext using AES CTR mode. - - Args: - ciphertext (bytes): The ciphertext to decrypt. - - Returns: - bytes: The decrypted plaintext. - """ - cipher = AES.new(self.key, AES.MODE_CTR, nonce=self.nonce) - return cipher.decrypt(ciphertext) - class M3U8_Decryption: def __init__(self, key: bytes, iv: bytes = None) -> None: @@ -175,67 +50,27 @@ class M3U8_Decryption: def decrypt(self, ciphertext: bytes) -> bytes: """ - Decrypt ciphertext using the specified method. + Decrypt the ciphertext using the specified encryption method. Args: - ciphertext (bytes): The ciphertext to decrypt. + ciphertext (bytes): The encrypted content to decrypt. Returns: - bytes: The decrypted plaintext. + bytes: The decrypted content. """ - if self.method == "AES": - aes_ecb = AES_ECB(self.key) - decrypted_data = aes_ecb.decrypt(ciphertext) + cipher = AES.new(self.key, AES.MODE_ECB) + decrypted_data = cipher.decrypt(ciphertext) + return unpad(decrypted_data, AES.block_size) elif self.method == "AES-128": - aes_cbc = AES_CBC(self.key[:16], self.iv) - decrypted_data = aes_cbc.decrypt(ciphertext) + cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv) + decrypted_data = cipher.decrypt(ciphertext) + return unpad(decrypted_data, AES.block_size) elif self.method == "AES-128-CTR": - aes_ctr = AES_CTR(self.key[:16], self.nonce) - decrypted_data = aes_ctr.decrypt(ciphertext) + cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv) + return cipher.decrypt(ciphertext) else: - raise ValueError("Invalid or unsupported method") - - return decrypted_data - - def decrypt_openssl(self, encrypted_content: bytes, output_path: str) -> None: - """ - Decrypts encrypted content using OpenSSL and writes the decrypted content to a file. - - Args: - encrypted_content (bytes): The content to be decrypted. - output_path (str): The path to write the decrypted content to. - """ - - # Create a temporary file to store the encrypted content - temp_encrypted_file = str(output_path).replace(".ts", "_.ts") - - # Write the encrypted content to the temporary file - with open(temp_encrypted_file, 'wb') as f: - f.write(encrypted_content) - - # Convert key and IV to hexadecimal strings - key_hex = self.key.hex() - iv_hex = self.iv.hex() - - # OpenSSL command to decrypt the content - openssl_cmd = [ - 'openssl', 'aes-128-cbc', - '-d', - '-in', temp_encrypted_file, - '-out', output_path, - '-K', key_hex, - '-iv', iv_hex - ] - - # Execute the OpenSSL command - try: - subprocess.run(openssl_cmd, check=True) - except subprocess.CalledProcessError as e: - logging.error("Decryption failed:", e) - - # Remove the temporary encrypted file - os.remove(temp_encrypted_file) \ No newline at end of file + raise ValueError("Invalid or unsupported method") \ No newline at end of file diff --git a/Src/Lib/FFmpeg/util/helper.py b/Src/Lib/FFmpeg/util/helper.py index 4ce5971..80f2d8b 100644 --- a/Src/Lib/FFmpeg/util/helper.py +++ b/Src/Lib/FFmpeg/util/helper.py @@ -24,20 +24,6 @@ USE_CODECS = config_manager.get_bool("M3U8", "use_codecs") -""" -DOC: - -The `'c': 'copy'` option in the `output_args` dictionary indicates that ffmpeg should perform stream copying for both the audio and video streams. -Stream copying means that the audio and video streams are copied directly from the input file(s) to the output file without any re-encoding. -This process preserves the original quality of the streams and is much faster than re-encoding. - -When using `'c': 'copy'`, ffmpeg simply copies the bitstream from the input file(s) to the output file without decoding or altering it. -This is useful when you want to quickly concatenate or merge multimedia files without any loss in quality or additional processing time. -It's particularly efficient when the input and output formats are compatible, and you don't need to make any modifications to the streams. -""" - - - def has_audio_stream(video_path: str) -> bool: """ Check if the input video has an audio stream. diff --git a/requirements.txt b/requirements.txt index 9b91b07..de6588c 100644 Binary files a/requirements.txt and b/requirements.txt differ