Change Crypto to: pycryptodome

This commit is contained in:
Ghost 2024-04-03 11:00:30 +02:00
parent bdc252803e
commit 3a9378060b
6 changed files with 219 additions and 88 deletions

View File

@ -54,6 +54,7 @@ TQDM_PROGRESS_TIMEOUT = config_manager.get_int('M3U8', 'tqdm_progress_timeout')
COMPLETED_PERCENTAGE = config_manager.get_float('M3U8', 'download_percentage') COMPLETED_PERCENTAGE = config_manager.get_float('M3U8', 'download_percentage')
REQUESTS_TIMEOUT = config_manager.get_int('M3U8', 'requests_timeout') REQUESTS_TIMEOUT = config_manager.get_int('M3U8', 'requests_timeout')
ENABLE_TIME_TIMEOUT = config_manager.get_bool('M3U8', 'enable_time_quit') ENABLE_TIME_TIMEOUT = config_manager.get_bool('M3U8', 'enable_time_quit')
USE_OPENSSL = config_manager.get_bool('M3U8', 'use_openssl')
TQDM_SHOW_PROGRESS = config_manager.get_bool('M3U8', 'tqdm_show_progress') TQDM_SHOW_PROGRESS = config_manager.get_bool('M3U8', 'tqdm_show_progress')
MIN_TS_FILES_IN_FOLDER = config_manager.get_int('M3U8', 'minimum_ts_files_in_folder') MIN_TS_FILES_IN_FOLDER = config_manager.get_int('M3U8', 'minimum_ts_files_in_folder')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8', 'cleanup_tmp_folder') REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8', 'cleanup_tmp_folder')
@ -226,11 +227,13 @@ class M3U8_Segments:
- progress_counter (tqdm): The progress counter object. - progress_counter (tqdm): The progress counter object.
- stop_event (threading.Event): The event to signal when to quit. - stop_event (threading.Event): The event to signal when to quit.
""" """
# Break if stop event is true # Break if stop event is true
if stop_event.is_set(): if stop_event.is_set():
return return
try: try:
# Get ts url and create a filename based on index # Get ts url and create a filename based on index
ts_url = self.segments[index] ts_url = self.segments[index]
ts_filename = os.path.join(self.temp_folder, f"{index}.ts") ts_filename = os.path.join(self.temp_folder, f"{index}.ts")
@ -246,12 +249,21 @@ class M3U8_Segments:
# If data is retrieved # If data is retrieved
if ts_content is not None: if ts_content is not None:
# Create a file to save data # Create a file to save data
with open(ts_filename, "wb") as ts_file: with open(ts_filename, "wb") as ts_file:
# Decrypt if there is an IV in the main M3U8 index # Decrypt if there is an IV in the main M3U8 index
if self.key and self.decryption.iv: if self.key and self.decryption.iv:
# pycryptodomex, faster using win11
if USE_OPENSSL:
self.decryption.decrypt_openssl(ts_content, ts_filename)
else:
decrypted_data = self.decryption.decrypt(ts_content) decrypted_data = self.decryption.decrypt(ts_content)
ts_file.write(decrypted_data) ts_file.write(decrypted_data)
# For no iv and key
else: else:
ts_file.write(ts_content) ts_file.write(ts_content)

View File

@ -5,7 +5,6 @@ from .helper import (
get_video_duration, get_video_duration,
format_duration, format_duration,
print_duration_table, print_duration_table,
compute_sha1_hash,
add_subtitle, add_subtitle,
concatenate_and_save, concatenate_and_save,
join_audios, join_audios,

View File

@ -1,9 +1,136 @@
# 29.04.24 # 03.04.24
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes # Import
from cryptography.hazmat.backends import default_backend import subprocess
from cryptography.hazmat.primitives import cmac import logging
from cryptography.hazmat.primitives.asymmetric import rsa as RSA import os
# External library
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
class AES_ECB:
def __init__(self, key: bytes) -> None:
"""
Initialize AES ECB mode encryption/decryption object.
Args:
key (bytes): The encryption key.
Returns:
None
"""
self.key = key
def encrypt(self, plaintext: bytes) -> bytes:
"""
Encrypt plaintext using AES ECB mode.
Args:
plaintext (bytes): The plaintext to encrypt.
Returns:
bytes: The encrypted ciphertext.
"""
cipher = AES.new(self.key, AES.MODE_ECB)
return cipher.encrypt(plaintext)
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt ciphertext using AES ECB mode.
Args:
ciphertext (bytes): The ciphertext to decrypt.
Returns:
bytes: The decrypted plaintext.
"""
cipher = AES.new(self.key, AES.MODE_ECB)
decrypted_data = cipher.decrypt(ciphertext)
return unpad(decrypted_data, AES.block_size)
class AES_CBC:
def __init__(self, key: bytes, iv: bytes) -> None:
"""
Initialize AES CBC mode encryption/decryption object.
Args:
key (bytes): The encryption key.
iv (bytes): The initialization vector.
Returns:
None
"""
self.key = key
self.iv = iv
def encrypt(self, plaintext: bytes) -> bytes:
"""
Encrypt plaintext using AES CBC mode.
Args:
plaintext (bytes): The plaintext to encrypt.
Returns:
bytes: The encrypted ciphertext.
"""
cipher = AES.new(self.key, AES.MODE_CBC, iv=self.iv)
return cipher.encrypt(plaintext)
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt ciphertext using AES CBC mode.
Args:
ciphertext (bytes): The ciphertext to decrypt.
Returns:
bytes: The decrypted plaintext.
"""
cipher = AES.new(self.key, AES.MODE_CBC, iv=self.iv)
decrypted_data = cipher.decrypt(ciphertext)
return unpad(decrypted_data, AES.block_size)
class AES_CTR:
def __init__(self, key: bytes, nonce: bytes) -> None:
"""
Initialize AES CTR mode encryption/decryption object.
Args:
key (bytes): The encryption key.
nonce (bytes): The nonce value.
Returns:
None
"""
self.key = key
self.nonce = nonce
def encrypt(self, plaintext: bytes) -> bytes:
"""
Encrypt plaintext using AES CTR mode.
Args:
plaintext (bytes): The plaintext to encrypt.
Returns:
bytes: The encrypted ciphertext.
"""
cipher = AES.new(self.key, AES.MODE_CTR, nonce=self.nonce)
return cipher.encrypt(plaintext)
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt ciphertext using AES CTR mode.
Args:
ciphertext (bytes): The ciphertext to decrypt.
Returns:
bytes: The decrypted plaintext.
"""
cipher = AES.new(self.key, AES.MODE_CTR, nonce=self.nonce)
return cipher.decrypt(ciphertext)
class M3U8_Decryption: class M3U8_Decryption:
def __init__(self, key: bytes, iv: bytes = None) -> None: def __init__(self, key: bytes, iv: bytes = None) -> None:
@ -42,72 +169,69 @@ class M3U8_Decryption:
else: else:
self.iv = raw_iv self.iv = raw_iv
def _check_iv_size(self, expected_size: int) -> None:
"""
Check the size of the initialization vector (IV).
Args:
- expected_size (int): The expected size of the IV.
"""
if self.iv is not None and len(self.iv) != expected_size:
raise ValueError(f"Invalid IV size ({len(self.iv)}) for {self.method}. Expected size: {expected_size}")
def generate_cmac(self, data: bytes) -> bytes:
"""
Generate CMAC (Cipher-based Message Authentication Code).
Args:
- data (bytes): The data to generate CMAC for.
Returns:
- bytes: The CMAC digest.
"""
if self.method == "AES-CMAC":
cipher = Cipher(algorithms.AES(self.key), modes.ECB(), backend=default_backend())
encryptor = cipher.encryptor()
cmac_obj = cmac.CMAC(encryptor)
cmac_obj.update(data)
return cmac_obj.finalize()
else:
raise ValueError("Invalid method")
def decrypt(self, ciphertext: bytes) -> bytes: def decrypt(self, ciphertext: bytes) -> bytes:
""" """
Decrypt the ciphertext using the specified encryption method. Decrypt ciphertext using the specified method.
Args: Args:
- ciphertext (bytes): The ciphertext to decrypt. ciphertext (bytes): The ciphertext to decrypt.
Returns: Returns:
- bytes: The decrypted data. bytes: The decrypted plaintext.
""" """
if self.method == "AES": if self.method == "AES":
self._check_iv_size(16) aes_ecb = AES_ECB(self.key)
cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) decrypted_data = aes_ecb.decrypt(ciphertext)
elif self.method == "AES-128": elif self.method == "AES-128":
self._check_iv_size(16) aes_cbc = AES_CBC(self.key[:16], self.iv)
cipher = Cipher(algorithms.AES(self.key[:16]), modes.CBC(self.iv), backend=default_backend()) decrypted_data = aes_cbc.decrypt(ciphertext)
elif self.method == "AES-128-CTR": elif self.method == "AES-128-CTR":
self._check_iv_size(16) aes_ctr = AES_CTR(self.key[:16], self.nonce)
cipher = Cipher(algorithms.AES(self.key[:16]), modes.CTR(self.iv), backend=default_backend()) decrypted_data = aes_ctr.decrypt(ciphertext)
elif self.method == "Blowfish":
self._check_iv_size(8)
cipher = Cipher(algorithms.Blowfish(self.key), modes.CBC(self.iv), backend=default_backend())
elif self.method == "RSA":
private_key = RSA.import_key(self.key)
cipher = Cipher(algorithms.RSA(private_key), backend=default_backend())
else: else:
raise ValueError("Invalid or unsupported method") raise ValueError("Invalid or unsupported method")
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
return decrypted_data return decrypted_data
def decrypt_openssl(self, encrypted_content: bytes, output_path: str) -> None:
"""
Decrypts encrypted content using OpenSSL and writes the decrypted content to a file.
Args:
encrypted_content (bytes): The content to be decrypted.
output_path (str): The path to write the decrypted content to.
"""
# Create a temporary file to store the encrypted content
temp_encrypted_file = str(output_path).replace(".ts", "_.ts")
# Write the encrypted content to the temporary file
with open(temp_encrypted_file, 'wb') as f:
f.write(encrypted_content)
# Convert key and IV to hexadecimal strings
key_hex = self.key.hex()
iv_hex = self.iv.hex()
# OpenSSL command to decrypt the content
openssl_cmd = [
'openssl', 'aes-128-cbc',
'-d',
'-in', temp_encrypted_file,
'-out', output_path,
'-K', key_hex,
'-iv', iv_hex
]
# Execute the OpenSSL command
try:
subprocess.run(openssl_cmd, check=True)
except subprocess.CalledProcessError as e:
logging.error("Decryption failed:", e)
# Remove the temporary encrypted file
os.remove(temp_encrypted_file)

View File

@ -5,12 +5,12 @@ from Src.Util.console import console
# Import # Import
import ffmpeg import ffmpeg
import hashlib import subprocess
import os import os
import json
import logging import logging
import shutil import shutil
def has_audio_stream(video_path: str) -> bool: def has_audio_stream(video_path: str) -> bool:
""" """
Check if the input video has an audio stream. Check if the input video has an audio stream.
@ -21,10 +21,20 @@ def has_audio_stream(video_path: str) -> bool:
Returns: Returns:
- has_audio (bool): True if the input video has an audio stream, False otherwise. - has_audio (bool): True if the input video has an audio stream, False otherwise.
""" """
try: try:
probe_result = ffmpeg.probe(video_path, select_streams='a')
return bool(probe_result['streams']) ffprobe_cmd = ['ffprobe', '-v', 'error', '-print_format', 'json', '-select_streams', 'a', '-show_streams', video_path]
except ffmpeg.Error: result = subprocess.run(ffprobe_cmd, capture_output=True, text=True, check=True)
# Parse JSON output
probe_result = json.loads(result.stdout)
# Check if there are audio streams
return bool(probe_result.get('streams', []))
except subprocess.CalledProcessError as e:
logging.error(f"Error: {e.stderr}")
return None return None
def get_video_duration(file_path: str) -> (float): def get_video_duration(file_path: str) -> (float):
@ -41,16 +51,17 @@ def get_video_duration(file_path: str) -> (float):
try: try:
# Use FFmpeg probe to get video information ffprobe_cmd = ['ffprobe', '-v', 'error', '-show_format', '-print_format', 'json', file_path]
probe = ffmpeg.probe(file_path) result = subprocess.run(ffprobe_cmd, capture_output=True, text=True, check=True)
# Parse JSON output
probe_result = json.loads(result.stdout)
# Extract duration from the video information # Extract duration from the video information
return float(probe['format']['duration']) return float(probe_result['format']['duration'])
except ffmpeg.Error as e: except subprocess.CalledProcessError as e:
logging.error(f"Error: {e.stderr}")
# Handle FFmpeg errors
print(f"Error: {e.stderr}")
return None return None
def format_duration(seconds: float) -> list[int, int, int]: def format_duration(seconds: float) -> list[int, int, int]:
@ -87,22 +98,6 @@ def print_duration_table(file_path: str) -> None:
# Print the formatted duration # Print the formatted duration
console.log(f"[cyan]Info [green]'{file_path}': [purple]{int(hours)}[red]h [purple]{int(minutes)}[red]m [purple]{int(seconds)}[red]s") console.log(f"[cyan]Info [green]'{file_path}': [purple]{int(hours)}[red]h [purple]{int(minutes)}[red]m [purple]{int(seconds)}[red]s")
def compute_sha1_hash(input_string: str) -> (str):
"""
Computes the SHA-1 hash of the input string.
Args:
input_string (str): The string to be hashed.
Returns:
str: The SHA-1 hash of the input string.
"""
# Compute the SHA-1 hash
hashed_string = hashlib.sha1(input_string.encode()).hexdigest()
# Return the hashed string
return hashed_string
# SINGLE SUBTITLE # SINGLE SUBTITLE
def add_subtitle(input_video_path: str, input_subtitle_path: str, output_video_path: str, subtitle_language: str = 'ita', prefix: str = "single_sub") -> str: def add_subtitle(input_video_path: str, input_subtitle_path: str, output_video_path: str, subtitle_language: str = 'ita', prefix: str = "single_sub") -> str:
""" """

View File

@ -26,6 +26,7 @@
"minimum_ts_files_in_folder": 15, "minimum_ts_files_in_folder": 15,
"download_percentage": 1, "download_percentage": 1,
"requests_timeout": 5, "requests_timeout": 5,
"use_openssl": false,
"enable_time_quit": false, "enable_time_quit": false,
"tqdm_show_progress": false, "tqdm_show_progress": false,
"cleanup_tmp_folder": true "cleanup_tmp_folder": true

Binary file not shown.