# 5.01.24 -> 7.01.24 # Class import from Src.Util.Helper.console import console, config_logger from Src.Util.Helper.headers import get_headers from Src.Util.FFmpeg.util import print_duration_table # Import import requests, re, os, ffmpeg, time, sys, warnings, logging, shutil, subprocess from tqdm.rich import tqdm from concurrent.futures import ThreadPoolExecutor from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend # Disable warning from tqdm import TqdmExperimentalWarning warnings.filterwarnings("ignore", category=TqdmExperimentalWarning) warnings.filterwarnings("ignore", category=UserWarning, module="cryptography") # Variable os.makedirs("videos", exist_ok=True) DOWNLOAD_WORKERS = 30 # [ main class ] class Decryption(): def __init__(self, key): self.iv = None self.key = key def decode_ext_x_key(self, key_str): logging.debug(f"String to decode: {key_str}") key_str = key_str.replace('"', '').lstrip("#EXT-X-KEY:") v_list = re.findall(r"[^,=]+", key_str) key_map = {v_list[i]: v_list[i+1] for i in range(0, len(v_list), 2)} logging.debug(f"Output string: {key_map}") return key_map def parse_key(self, raw_iv): self.iv = bytes.fromhex(raw_iv.replace("0x", "")) def decrypt_ts(self, encrypted_data): cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) decryptor = cipher.decryptor() decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize() return decrypted_data class M3U8(): def __init__(self, url, key=None): self.url = url self.key = bytes.fromhex(key) if key is not None else key self.temp_folder = "tmp" os.makedirs(self.temp_folder, exist_ok=True) def parse_data(self, m3u8_content): self.decription = Decryption(self.key) self.segments = [] base_url = self.url.rstrip(self.url.split("/")[-1]) lines = m3u8_content.split('\n') for i in range(len(lines)): line = str(lines[i]) if line.startswith("#EXT-X-KEY:"): x_key_dict = self.decription.decode_ext_x_key(line) self.decription.parse_key(x_key_dict['IV']) if line.startswith("#EXTINF"): ts_url = lines[i+1] if not ts_url.startswith("http"): ts_url = base_url + ts_url logging.debug(f"Add to segment: {ts_url}") self.segments.append(ts_url) def get_info(self): self.max_retry = 3 response = requests.get(self.url, headers={'user-agent': get_headers()}) if response.ok: self.parse_data(response.text) console.log(f"[red]Ts segments find [white]=> [yellow]{len(self.segments)}") else: console.log("[red]Wrong m3u8 url") sys.exit(0) def get_req_ts(self, ts_url): try: response = requests.get(ts_url, headers={'user-agent': get_headers()}) if response.status_code == 200: return response.content else: print(f"Failed: {ts_url}, with error: {response.status_code}") self.segments.remove(ts_url) logging.error(f"Failed download: {ts_url}") return None except Exception as e: print(f"Failed: {ts_url}, with error: {e}") self.segments.remove(ts_url) logging.error(f"Failed download: {ts_url}") return None def save_ts(self, index): ts_url = self.segments[index] ts_filename = os.path.join(self.temp_folder, f"{index}.ts") if not os.path.exists(ts_filename): ts_content = self.get_req_ts(ts_url) if ts_content is not None: with open(ts_filename, "wb") as ts_file: if self.key and self.decription.iv: decrypted_data = self.decription.decrypt_ts(ts_content) ts_file.write(decrypted_data) else: ts_file.write(ts_content) def download_ts(self): with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor: list(tqdm(executor.map(self.save_ts, range(len(self.segments)) ), total=len(self.segments), unit="bytes", unit_scale=True, unit_divisor=1024, desc="[yellow]Download")) def join(self, output_filename): current_dir = os.path.dirname(os.path.realpath(__file__)) file_list_path = os.path.join(current_dir, 'file_list.txt') ts_files = [f for f in os.listdir(self.temp_folder) if f.endswith(".ts")] def extract_number(file_name): return int(''.join(filter(str.isdigit, file_name))) ts_files.sort(key=extract_number) with open(file_list_path, 'w') as f: for ts_file in ts_files: relative_path = os.path.relpath(os.path.join(self.temp_folder, ts_file), current_dir) f.write(f"file '{relative_path}'\n") console.log("[cyan]Start join all file") try: ffmpeg.input(file_list_path, format='concat', safe=0).output(output_filename, c='copy', loglevel='quiet').run() except ffmpeg.Error as e: console.log(f"[red]Error saving MP4: {e.stdout}") sys.exit(0) console.log(f"[cyan]Clean ...") os.remove(file_list_path) shutil.rmtree("tmp", ignore_errors=True) class M3U8Downloader: def __init__(self, m3u8_url, m3u8_audio = None, key=None, output_filename="output.mp4"): self.m3u8_url = m3u8_url self.m3u8_audio = m3u8_audio self.key = key self.video_path = output_filename self.audio_path = os.path.join("videos", "audio.mp4") def start(self): video_m3u8 = M3U8(self.m3u8_url, self.key) console.log("[green]Download video ts") video_m3u8.get_info() video_m3u8.download_ts() video_m3u8.join(self.video_path) print_duration_table(self.video_path) print("\n") if self.m3u8_audio != None: audio_m3u8 = M3U8(self.m3u8_audio, self.key) console.log("[green]Download audio ts") audio_m3u8.get_info() audio_m3u8.download_ts() audio_m3u8.join(self.audio_path) print_duration_table(self.audio_path) self.join_audio() def join_audio(self): output_path = self.video_path + ".mp4" try: video_stream = ffmpeg.input(self.video_path) audio_stream = ffmpeg.input(self.audio_path) ffmpeg.output(video_stream, audio_stream, output_path, vcodec="copy", acodec="copy").global_args('-map', '0:v:0', '-map', '1:a:0', '-shortest', '-strict', 'experimental').run(overwrite_output=True) except ffmpeg.Error as e: print("ffmpeg error:", e.stderr) os.remove(self.video_path) os.remove(self.audio_path) # [ main function ] def dw_m3u8(url, audio_url=None, key=None, output_filename="output.mp4"): print("\n") M3U8Downloader(url, audio_url, key, output_filename).start()