mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-05 02:55:25 +00:00
Decrypt log
This commit is contained in:
parent
73569b2965
commit
4593aebc9a
@ -1,6 +1,7 @@
|
||||
# 03.04.24
|
||||
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import subprocess
|
||||
import importlib.util
|
||||
@ -17,8 +18,8 @@ crypto_installed = crypto_spec is not None
|
||||
|
||||
if crypto_installed:
|
||||
logging.info("Decrypy use: Crypto")
|
||||
from Crypto.Cipher import AES # type: ignore
|
||||
from Crypto.Util.Padding import unpad # type: ignore
|
||||
from Crypto.Cipher import AES # type: ignore
|
||||
from Crypto.Util.Padding import unpad # type: ignore
|
||||
|
||||
class M3U8_Decryption:
|
||||
"""
|
||||
@ -34,12 +35,20 @@ if crypto_installed:
|
||||
- method (str): The encryption method.
|
||||
"""
|
||||
self.key = key
|
||||
if "0x" in str(iv):
|
||||
self.iv = iv
|
||||
if "0x" in str(iv):
|
||||
self.iv = bytes.fromhex(iv.replace("0x", ""))
|
||||
else:
|
||||
self.iv = iv
|
||||
self.method = method
|
||||
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
|
||||
|
||||
# Precreate cipher based on encryption method
|
||||
if self.method == "AES":
|
||||
self.cipher = AES.new(self.key, AES.MODE_ECB)
|
||||
elif self.method == "AES-128":
|
||||
self.cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv)
|
||||
elif self.method == "AES-128-CTR":
|
||||
self.cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv)
|
||||
else:
|
||||
raise ValueError("Invalid or unsupported method")
|
||||
|
||||
def decrypt(self, ciphertext: bytes) -> bytes:
|
||||
"""
|
||||
@ -51,23 +60,34 @@ if crypto_installed:
|
||||
Returns:
|
||||
bytes: The decrypted content.
|
||||
"""
|
||||
if self.method == "AES":
|
||||
cipher = AES.new(self.key, AES.MODE_ECB)
|
||||
decrypted_data = cipher.decrypt(ciphertext)
|
||||
return unpad(decrypted_data, AES.block_size)
|
||||
|
||||
elif self.method == "AES-128":
|
||||
cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv)
|
||||
decrypted_data = cipher.decrypt(ciphertext)
|
||||
return unpad(decrypted_data, AES.block_size)
|
||||
start = time.perf_counter_ns()
|
||||
|
||||
# Decrypt based on encryption method
|
||||
if self.method in {"AES", "AES-128"}:
|
||||
decrypted_data = self.cipher.decrypt(ciphertext)
|
||||
decrypted_content = unpad(decrypted_data, AES.block_size)
|
||||
|
||||
elif self.method == "AES-128-CTR":
|
||||
cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv)
|
||||
return cipher.decrypt(ciphertext)
|
||||
|
||||
decrypted_content = self.cipher.decrypt(ciphertext)
|
||||
else:
|
||||
raise ValueError("Invalid or unsupported method")
|
||||
|
||||
end = time.perf_counter_ns()
|
||||
|
||||
# Calculate elapsed time with high precision
|
||||
elapsed_nanoseconds = end - start
|
||||
elapsed_milliseconds = elapsed_nanoseconds / 1_000_000
|
||||
elapsed_seconds = elapsed_nanoseconds / 1_000_000_000
|
||||
|
||||
# Print performance metrics
|
||||
logging.info(f"[Crypto Decryption Performance]")
|
||||
logging.info(f"Method: {self.method}")
|
||||
logging.info(f"Decryption Time: {elapsed_milliseconds:.4f} ms ({elapsed_seconds:.6f} s)")
|
||||
logging.info(f"Decrypted Content Length: {len(decrypted_content)} bytes")
|
||||
|
||||
return decrypted_content
|
||||
|
||||
|
||||
else:
|
||||
|
||||
# Check if openssl command is available
|
||||
@ -95,10 +115,9 @@ else:
|
||||
- method (str): The encryption method.
|
||||
"""
|
||||
self.key = key
|
||||
if "0x" in str(iv):
|
||||
self.iv = iv
|
||||
if "0x" in str(iv):
|
||||
self.iv = bytes.fromhex(iv.replace("0x", ""))
|
||||
else:
|
||||
self.iv = iv
|
||||
self.method = method
|
||||
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
|
||||
|
||||
@ -112,6 +131,9 @@ else:
|
||||
Returns:
|
||||
bytes: The decrypted content.
|
||||
"""
|
||||
start = time.perf_counter_ns()
|
||||
|
||||
# Construct OpenSSL command based on encryption method
|
||||
if self.method == "AES":
|
||||
openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt'
|
||||
elif self.method == "AES-128":
|
||||
@ -122,8 +144,21 @@ else:
|
||||
raise ValueError("Invalid or unsupported method")
|
||||
|
||||
try:
|
||||
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext, stderr=subprocess.STDOUT)
|
||||
decrypted_content = subprocess.check_output(openssl_cmd.split(), input=ciphertext, stderr=subprocess.STDOUT)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ValueError(f"Decryption failed: {e.output.decode()}")
|
||||
|
||||
return decrypted_data
|
||||
end = time.perf_counter_ns()
|
||||
|
||||
# Calculate elapsed time with high precision
|
||||
elapsed_nanoseconds = end - start
|
||||
elapsed_milliseconds = elapsed_nanoseconds / 1_000_000
|
||||
elapsed_seconds = elapsed_nanoseconds / 1_000_000_000
|
||||
|
||||
# Print performance metrics
|
||||
logging.info(f"[OpenSSL Decryption Performance]")
|
||||
logging.info(f"Method: {self.method}")
|
||||
logging.info(f"Decryption Time: {elapsed_milliseconds:.4f} ms ({elapsed_seconds:.6f} s)")
|
||||
logging.info(f"Decrypted Content Length: {len(decrypted_content)} bytes")
|
||||
|
||||
return decrypted_content
|
@ -142,7 +142,7 @@ def main():
|
||||
|
||||
# Create logger
|
||||
log_not = Logger()
|
||||
initialize()
|
||||
#initialize()
|
||||
|
||||
# Load search functions
|
||||
search_functions = load_search_functions()
|
||||
|
@ -7,6 +7,7 @@ psutil
|
||||
unidecode
|
||||
jsbeautifier
|
||||
pathvalidate
|
||||
pycryptodome
|
||||
fake-useragent=1.1.3
|
||||
qbittorrent-api
|
||||
python-qbittorrent
|
||||
|
Loading…
x
Reference in New Issue
Block a user