fix bug m3u8 url

This commit is contained in:
Ghost 2024-01-07 13:24:28 +01:00
parent beb015685f
commit 11a4120124
13 changed files with 468 additions and 459 deletions

View File

@ -1,55 +1,55 @@
# 3.12.23 -> 10.12.23 # 3.12.23 -> 10.12.23
# Class import # Class import
from Stream.util.headers import get_headers from Src.Util.Helper.headers import get_headers
from Stream.util.m3u8 import dw_m3u8 from Src.Util.Helper.util import convert_utf8_name
from Stream.util.util import convert_utf8_name from Src.Util.m3u8 import dw_m3u8
# General import # General import
import requests, os, re, json import requests, os, re, json
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
# [func] # [func]
def get_iframe(id_title, domain): def get_iframe(id_title, domain):
req_iframe = requests.get(url = f"https://streamingcommunity.{domain}/iframe/{id_title}", headers = { req_iframe = requests.get(url = f"https://streamingcommunity.{domain}/iframe/{id_title}", headers = {
"User-agent": get_headers() "User-agent": get_headers()
}) })
url_embed = BeautifulSoup(req_iframe.text, "lxml").find("iframe").get("src") url_embed = BeautifulSoup(req_iframe.text, "lxml").find("iframe").get("src")
req_embed = requests.get(url_embed, headers = {"User-agent": get_headers()}).text req_embed = requests.get(url_embed, headers = {"User-agent": get_headers()}).text
return BeautifulSoup(req_embed, "lxml").find("body").find("script").text return BeautifulSoup(req_embed, "lxml").find("body").find("script").text
def parse_content(embed_content): def parse_content(embed_content):
# Parse parameter from req embed content # Parse parameter from req embed content
win_video = re.search(r"window.video = {.*}", str(embed_content)).group() win_video = re.search(r"window.video = {.*}", str(embed_content)).group()
win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group() win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group()
# Parse parameter to make read for json # Parse parameter to make read for json
json_win_video = "{"+win_video.split("{")[1].split("}")[0]+"}" json_win_video = "{"+win_video.split("{")[1].split("}")[0]+"}"
json_win_param = "{"+win_param.split("{")[1].split("}")[0].replace("\n", "").replace(" ", "") + "}" json_win_param = "{"+win_param.split("{")[1].split("}")[0].replace("\n", "").replace(" ", "") + "}"
json_win_param = json_win_param.replace(",}", "}").replace("'", '"') json_win_param = json_win_param.replace(",}", "}").replace("'", '"')
return json.loads(json_win_video), json.loads(json_win_param) return json.loads(json_win_video), json.loads(json_win_param)
def get_m3u8_url(json_win_video, json_win_param): def get_m3u8_url(json_win_video, json_win_param):
return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition=720p&token={json_win_param['token720p']}&expires={json_win_param['expires']}" return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition=720p&token={json_win_param['token720p']}&expires={json_win_param['expires']}"
def get_m3u8_key(json_win_video, json_win_param, title_name): def get_m3u8_key(json_win_video, json_win_param, title_name):
req_key = requests.get('https://vixcloud.co/storage/enc.key', headers={ req_key = requests.get('https://vixcloud.co/storage/enc.key', headers={
'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param["token720p"]}&title={title_name.replace(" ", "+")}&referer=1&expires={json_win_param["expires"]}', 'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param["token720p"]}&title={title_name.replace(" ", "+")}&referer=1&expires={json_win_param["expires"]}',
}).content }).content
return "".join(["{:02x}".format(c) for c in req_key]) return "".join(["{:02x}".format(c) for c in req_key])
def main_dw_film(id_film, title_name, domain): def main_dw_film(id_film, title_name, domain):
lower_title_name = str(title_name).lower() lower_title_name = str(title_name).lower()
title_name = convert_utf8_name(lower_title_name) # ERROR LATIN 1 IN REQ WITH ò à ù ... title_name = convert_utf8_name(lower_title_name) # ERROR LATIN 1 IN REQ WITH ò à ù ...
embed_content = get_iframe(id_film, domain) embed_content = get_iframe(id_film, domain)
json_win_video, json_win_param = parse_content(embed_content) json_win_video, json_win_param = parse_content(embed_content)
m3u8_url = get_m3u8_url(json_win_video, json_win_param) m3u8_url = get_m3u8_url(json_win_video, json_win_param)
m3u8_key = get_m3u8_key(json_win_video, json_win_param, title_name) m3u8_key = get_m3u8_key(json_win_video, json_win_param, title_name)
path_film = os.path.join("videos", lower_title_name.replace("+", " ").replace(",", "") + ".mp4") path_film = os.path.join("videos", lower_title_name.replace("+", " ").replace(",", "") + ".mp4")
dw_m3u8(m3u8_url, None, m3u8_key, path_film) dw_m3u8(m3u8_url, None, m3u8_key, path_film)

View File

@ -1,36 +1,36 @@
# 10.12.23 # 10.12.23
# Class import # Class import
from Stream.util.headers import get_headers from Src.Util.Helper.headers import get_headers
from Stream.util.console import console from Src.Util.Helper.console import console
# General import # General import
import requests, json, sys import requests, json, sys
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
def get_version(domain): def get_version(domain):
try: try:
r = requests.get(f'https://streamingcommunity.{domain}/', headers={ r = requests.get(f'https://streamingcommunity.{domain}/', headers={
'Authority': f'streamingcommunity.{domain}', 'Authority': f'streamingcommunity.{domain}',
'User-Agent': get_headers(), 'User-Agent': get_headers(),
}) })
soup = BeautifulSoup(r.text, "lxml") soup = BeautifulSoup(r.text, "lxml")
info_data_page = soup.find("div", {'id': 'app'}).attrs["data-page"] info_data_page = soup.find("div", {'id': 'app'}).attrs["data-page"]
return json.loads(info_data_page)['version'] return json.loads(info_data_page)['version']
except: except:
console.log("[red]UPDATE DOMANIN") console.log("[red]UPDATE DOMANIN")
sys.exit(0) sys.exit(0)
def search(title_search, domain): def search(title_search, domain):
title_search = str(title_search).replace(" ", "+") title_search = str(title_search).replace(" ", "+")
r = requests.get( r = requests.get(
url = f"https://streamingcommunity.{domain}/api/search?q={title_search}", url = f"https://streamingcommunity.{domain}/api/search?q={title_search}",
headers = {"User-agent": get_headers()} headers = {"User-agent": get_headers()}
) )
return [{'name': title['name'], 'type': title['type'], 'id': title['id']} for title in r.json()['data']] return [{'name': title['name'], 'type': title['type'], 'id': title['id']} for title in r.json()['data']]

View File

@ -1,108 +1,108 @@
# 3.12.23 -> 10.12.23 # 3.12.23 -> 10.12.23
# Class import # Class import
from Stream.util.headers import get_headers from Src.Util.Helper.headers import get_headers
from Stream.util.console import console, msg from Src.Util.Helper.util import convert_utf8_name
from Stream.util.m3u8 import dw_m3u8, join_audio_to_video from Src.Util.Helper.console import console, msg
from Stream.util.util import convert_utf8_name from Src.Util.m3u8 import dw_m3u8
# General import # General import
import requests, os, re, json import requests, os, re, json
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
# [func] # [func]
def get_token(id_tv, domain): def get_token(id_tv, domain):
session = requests.Session() session = requests.Session()
session.get(f"https://streamingcommunity.{domain}/watch/{id_tv}") session.get(f"https://streamingcommunity.{domain}/watch/{id_tv}")
return session.cookies['XSRF-TOKEN'] return session.cookies['XSRF-TOKEN']
def get_info_tv(id_film, title_name, site_version, domain): def get_info_tv(id_film, title_name, site_version, domain):
r = requests.get(f"https://streamingcommunity.{domain}/titles/{id_film}-{title_name}", headers={ r = requests.get(f"https://streamingcommunity.{domain}/titles/{id_film}-{title_name}", headers={
'X-Inertia': 'true', 'X-Inertia': 'true',
'X-Inertia-Version': site_version, 'X-Inertia-Version': site_version,
'User-Agent': get_headers() 'User-Agent': get_headers()
}) })
return r.json()['props']['title']['seasons_count'] return r.json()['props']['title']['seasons_count']
def get_info_season(tv_id, tv_name, domain, version, token, n_stagione): def get_info_season(tv_id, tv_name, domain, version, token, n_stagione):
r = requests.get(f'https://streamingcommunity.{domain}/titles/{tv_id}-{tv_name}/stagione-{n_stagione}', headers={ r = requests.get(f'https://streamingcommunity.{domain}/titles/{tv_id}-{tv_name}/stagione-{n_stagione}', headers={
'authority': f'streamingcommunity.{domain}', 'referer': f'https://streamingcommunity.broker/titles/{tv_id}-{tv_name}', 'authority': f'streamingcommunity.{domain}', 'referer': f'https://streamingcommunity.broker/titles/{tv_id}-{tv_name}',
'user-agent': get_headers(), 'x-inertia': 'true', 'x-inertia-version': version, 'x-xsrf-token': token, 'user-agent': get_headers(), 'x-inertia': 'true', 'x-inertia-version': version, 'x-xsrf-token': token,
}) })
return [{'id': ep['id'], 'n': ep['number'], 'name': ep['name']} for ep in r.json()['props']['loadedSeason']['episodes']] return [{'id': ep['id'], 'n': ep['number'], 'name': ep['name']} for ep in r.json()['props']['loadedSeason']['episodes']]
def get_iframe(tv_id, ep_id, domain, token): def get_iframe(tv_id, ep_id, domain, token):
r = requests.get(f'https://streamingcommunity.{domain}/iframe/{tv_id}', params={'episode_id': ep_id, 'next_episode': '1'}, cookies={'XSRF-TOKEN': token}, headers={ r = requests.get(f'https://streamingcommunity.{domain}/iframe/{tv_id}', params={'episode_id': ep_id, 'next_episode': '1'}, cookies={'XSRF-TOKEN': token}, headers={
'referer': f'https://streamingcommunity.{domain}/watch/{tv_id}?e={ep_id}', 'referer': f'https://streamingcommunity.{domain}/watch/{tv_id}?e={ep_id}',
'user-agent': get_headers() 'user-agent': get_headers()
}) })
url_embed = BeautifulSoup(r.text, "lxml").find("iframe").get("src") url_embed = BeautifulSoup(r.text, "lxml").find("iframe").get("src")
req_embed = requests.get(url_embed, headers = {"User-agent": get_headers()}).text req_embed = requests.get(url_embed, headers = {"User-agent": get_headers()}).text
return BeautifulSoup(req_embed, "lxml").find("body").find("script").text return BeautifulSoup(req_embed, "lxml").find("body").find("script").text
def parse_content(embed_content): def parse_content(embed_content):
# Parse parameter from req embed content # Parse parameter from req embed content
win_video = re.search(r"window.video = {.*}", str(embed_content)).group() win_video = re.search(r"window.video = {.*}", str(embed_content)).group()
win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group() win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group()
# Parse parameter to make read for json # Parse parameter to make read for json
json_win_video = "{"+win_video.split("{")[1].split("}")[0]+"}" json_win_video = "{"+win_video.split("{")[1].split("}")[0]+"}"
json_win_param = "{"+win_param.split("{")[1].split("}")[0].replace("\n", "").replace(" ", "") + "}" json_win_param = "{"+win_param.split("{")[1].split("}")[0].replace("\n", "").replace(" ", "") + "}"
json_win_param = json_win_param.replace(",}", "}").replace("'", '"') json_win_param = json_win_param.replace(",}", "}").replace("'", '"')
return json.loads(json_win_video), json.loads(json_win_param) return json.loads(json_win_video), json.loads(json_win_param)
def get_m3u8_url(json_win_video, json_win_param): def get_m3u8_url(json_win_video, json_win_param):
return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition=720p&token={json_win_param['token720p']}&expires={json_win_param['expires']}" return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition=720p&token={json_win_param['token720p']}&expires={json_win_param['expires']}"
def get_m3u8_key_ep(json_win_video, json_win_param, tv_name, n_stagione, n_ep, ep_title): def get_m3u8_key_ep(json_win_video, json_win_param, tv_name, n_stagione, n_ep, ep_title):
req_key = requests.get('https://vixcloud.co/storage/enc.key', headers={ req_key = requests.get('https://vixcloud.co/storage/enc.key', headers={
'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param["token720p"]}&title={tv_name.replace("-", "+")}&referer=1&expires={json_win_param["expires"]}&description=S{n_stagione}%3AE{n_ep}+{ep_title.replace(" ", "+")}&nextEpisode=1', 'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param["token720p"]}&title={tv_name.replace("-", "+")}&referer=1&expires={json_win_param["expires"]}&description=S{n_stagione}%3AE{n_ep}+{ep_title.replace(" ", "+")}&nextEpisode=1',
}).content }).content
return "".join(["{:02x}".format(c) for c in req_key]) return "".join(["{:02x}".format(c) for c in req_key])
def get_m3u8_audio(json_win_video, json_win_param, tv_name, n_stagione, n_ep, ep_title): def get_m3u8_audio(json_win_video, json_win_param, tv_name, n_stagione, n_ep, ep_title):
response = requests.get(f'https://vixcloud.co/playlist/{json_win_video["id"]}', params={'token': json_win_param['token'], 'expires': json_win_param["expires"] }, headers={ response = requests.get(f'https://vixcloud.co/playlist/{json_win_video["id"]}', params={'token': json_win_param['token'], 'expires': json_win_param["expires"] }, headers={
'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param["token720p"]}&title={tv_name.replace("-", "+")}&referer=1&expires={json_win_param["expires"]}&description=S{n_stagione}%3AE{n_ep}+{ep_title.replace(" ", "+")}&nextEpisode=1' 'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param["token720p"]}&title={tv_name.replace("-", "+")}&referer=1&expires={json_win_param["expires"]}&description=S{n_stagione}%3AE{n_ep}+{ep_title.replace(" ", "+")}&nextEpisode=1'
}) })
m3u8_cont = response.text.split() m3u8_cont = response.text.split()
for row in m3u8_cont: for row in m3u8_cont:
if "audio" in str(row) and "ita" in str(row): if "audio" in str(row) and "ita" in str(row):
return row.split(",")[-1].split('"')[-2] return row.split(",")[-1].split('"')[-2]
def main_dw_tv(tv_id, tv_name, version, domain): def main_dw_tv(tv_id, tv_name, version, domain):
token = get_token(tv_id, domain) token = get_token(tv_id, domain)
lower_tv_name = str(tv_name).lower() lower_tv_name = str(tv_name).lower()
tv_name = convert_utf8_name(lower_tv_name) # ERROR LATIN 1 IN REQ WITH ò à ù ... tv_name = convert_utf8_name(lower_tv_name) # ERROR LATIN 1 IN REQ WITH ò à ù ...
console.print(f"[blue]Season find: [red]{get_info_tv(tv_id, tv_name, version, domain)}") console.print(f"[blue]Season find: [red]{get_info_tv(tv_id, tv_name, version, domain)}")
season_select = msg.ask("\n[green]Insert season number: ") season_select = msg.ask("\n[green]Insert season number: ")
eps = get_info_season(tv_id, tv_name, domain, version, token, season_select) eps = get_info_season(tv_id, tv_name, domain, version, token, season_select)
for ep in eps: for ep in eps:
console.print(f"[green]Ep: [blue]{ep['n']} [green]=> [purple]{ep['name']}") console.print(f"[green]Ep: [blue]{ep['n']} [green]=> [purple]{ep['name']}")
index_ep_select = int(msg.ask("\n[green]Insert ep number: ")) - 1 index_ep_select = int(msg.ask("\n[green]Insert ep number: ")) - 1
embed_content = get_iframe(tv_id, eps[index_ep_select]['id'], domain, token) embed_content = get_iframe(tv_id, eps[index_ep_select]['id'], domain, token)
json_win_video, json_win_param = parse_content(embed_content) json_win_video, json_win_param = parse_content(embed_content)
m3u8_url = get_m3u8_url(json_win_video, json_win_param) m3u8_url = get_m3u8_url(json_win_video, json_win_param)
m3u8_key = get_m3u8_key_ep(json_win_video, json_win_param, tv_name, season_select, index_ep_select+1, eps[index_ep_select]['name']) m3u8_key = get_m3u8_key_ep(json_win_video, json_win_param, tv_name, season_select, index_ep_select+1, eps[index_ep_select]['name'])
mp4_name = f"{lower_tv_name.replace('+', '_')}_{str(season_select)}_{str(index_ep_select+1)}" mp4_name = f"{lower_tv_name.replace('+', '_')}_{str(season_select)}_{str(index_ep_select+1)}"
mp4_format = mp4_name + ".mp4" mp4_format = mp4_name + ".mp4"
mp4_path = os.path.join("videos", mp4_format) mp4_path = os.path.join("videos", mp4_format)
m3u8_url_audio = get_m3u8_audio(json_win_video, json_win_param, tv_name, season_select, index_ep_select+1, eps[index_ep_select]['name']) m3u8_url_audio = get_m3u8_audio(json_win_video, json_win_param, tv_name, season_select, index_ep_select+1, eps[index_ep_select]['name'])
if m3u8_url_audio != None: if m3u8_url_audio != None:
console.print("[red]=> Use m3u8 audio") console.print("[red]=> Use m3u8 audio")
dw_m3u8(m3u8_url, m3u8_url_audio, m3u8_key, mp4_path) dw_m3u8(m3u8_url, m3u8_url_audio, m3u8_key, mp4_path)

View File

Before

Width:  |  Height:  |  Size: 6.1 KiB

After

Width:  |  Height:  |  Size: 6.1 KiB

View File

Before

Width:  |  Height:  |  Size: 7.1 MiB

After

Width:  |  Height:  |  Size: 7.1 MiB

View File

@ -1,5 +1,5 @@
__title__ = 'Streaming_community' __title__ = 'Streaming_community'
__version__ = 'v0.7.1' __version__ = 'v0.7.0'
__author__ = 'Ghost6446' __author__ = 'Ghost6446'
__description__ = 'A command-line program to download film' __description__ = 'A command-line program to download film'
__license__ = 'MIT License' __license__ = 'MIT License'

View File

@ -1,7 +1,7 @@
# 13.09.2023 # 13.09.2023
# General import # General import
from Stream.util.console import console from Src.Util.Helper.console import console
import os, requests, time import os, requests, time
# Variable # Variable

View File

@ -1,10 +1,10 @@
# 17.09.2023 -> 3.12.23 # 17.09.2023 -> 3.12.23
# Import # Import
from rich.console import Console from rich.console import Console
from rich.prompt import Prompt from rich.prompt import Prompt
# Variable # Variable
msg = Prompt() msg = Prompt()
console = Console() console = Console()

View File

@ -1,13 +1,12 @@
# 3.12.23 -> 10.12.23 # 3.12.23 -> 10.12.23
# Import # Import
from random_user_agent.user_agent import UserAgent from random_user_agent.user_agent import UserAgent
from random_user_agent.params import SoftwareName, OperatingSystem from random_user_agent.params import SoftwareName, OperatingSystem
# [func] def get_headers():
def get_headers(): software_names = [SoftwareName.CHROME.value]
software_names = [SoftwareName.CHROME.value] operating_systems = [OperatingSystem.WINDOWS.value, OperatingSystem.LINUX.value]
operating_systems = [OperatingSystem.WINDOWS.value, OperatingSystem.LINUX.value] user_agent_rotator = UserAgent(software_names=software_names, operating_systems=operating_systems, limit=10)
user_agent_rotator = UserAgent(software_names=software_names, operating_systems=operating_systems, limit=10)
return user_agent_rotator.get_random_user_agent() return user_agent_rotator.get_random_user_agent()

View File

@ -1,20 +1,19 @@
# 3.12.23 # 3.12.23
# Import # Import
from Stream.util.console import console from Src.Util.Helper.console import console
# [Function] def msg_start():
def msg_start():
msg = """
msg = """ _____ _ _ _ _
_____ _ _ _ _ / ____| | (_) (_) |
/ ____| | (_) (_) | | (___ | |_ _ __ ___ __ _ _ __ ___ _ _ __ __ _ ___ ___ _ __ ___ _ _ _ __ _| |_ _ _
| (___ | |_ _ __ ___ __ _ _ __ ___ _ _ __ __ _ ___ ___ _ __ ___ _ _ _ __ _| |_ _ _ \___ \| __| '__/ _ \/ _` | '_ ` _ \| | '_ \ / _` | / __/ _ \| '_ ` _ \| | | | '_ \| | __| | | |
\___ \| __| '__/ _ \/ _` | '_ ` _ \| | '_ \ / _` | / __/ _ \| '_ ` _ \| | | | '_ \| | __| | | | ____) | |_| | | __/ (_| | | | | | | | | | | (_| | | (_| (_) | | | | | | |_| | | | | | |_| |_| |
____) | |_| | | __/ (_| | | | | | | | | | | (_| | | (_| (_) | | | | | | |_| | | | | | |_| |_| | |_____/ \__|_| \___|\__,_|_| |_| |_|_|_| |_|\__, | \___\___/|_| |_| |_|\__,_|_| |_|_|\__|\__, |
|_____/ \__|_| \___|\__,_|_| |_| |_|_|_| |_|\__, | \___\___/|_| |_| |_|\__,_|_| |_|_|\__|\__, | __/ | __/ |
__/ | __/ | |___/ |___/
|___/ |___/ """
"""
console.print(f"[purple]{msg}")
console.log(f"[purple]{msg}")

View File

@ -1,7 +1,7 @@
# 4.01.2023 # 4.01.2023
# Import # Import
import ffmpeg, subprocess import ffmpeg
def convert_utf8_name(name): def convert_utf8_name(name):
return str(name).encode('utf-8').decode('latin-1') return str(name).encode('utf-8').decode('latin-1')
@ -12,20 +12,6 @@ def there_is_audio(ts_file_path):
def merge_ts_files(video_path, audio_path, output_path): def merge_ts_files(video_path, audio_path, output_path):
"""command = [
'ffmpeg',
'-i', video_path,
'-i', audio_path,
'-c', 'copy',
'-map', '0',
'-map', '1',
'-y', output_path
]
subprocess.run(command)"""
input_video = ffmpeg.input(video_path) input_video = ffmpeg.input(video_path)
input_audio = ffmpeg.input(audio_path) input_audio = ffmpeg.input(audio_path)
ffmpeg.output(input_video, input_audio, output_path, format='mpegts', acodec='copy', vcodec='copy', loglevel='quiet').run() ffmpeg.output(input_video, input_audio, output_path, format='mpegts', acodec='copy', vcodec='copy', loglevel='quiet').run()

View File

@ -1,195 +1,220 @@
# 5.01.24 # 5.01.24 -> 7.01.24
# Import # Import
import requests, re, os, ffmpeg, shutil, time import requests, re, os, ffmpeg, shutil, time, sys
from tqdm.rich import tqdm from tqdm.rich import tqdm
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
import moviepy.editor as mp import moviepy.editor as mp
# Class import # Class import
from Stream.util.console import console from Src.Util.Helper.console import console
from Stream.util.headers import get_headers from Src.Util.Helper.headers import get_headers
from Stream.util.util import there_is_audio, merge_ts_files from Src.Util.Helper.util import there_is_audio, merge_ts_files
# Variable
os.makedirs("videos", exist_ok=True) # Variable
os.makedirs("videos", exist_ok=True)
# [ main class ]
class M3U8Downloader: # [ main class ]
class M3U8Downloader:
def __init__(self, m3u8_url, m3u8_audio = None, key=None, output_filename="output.mp4"):
self.m3u8_url = m3u8_url def __init__(self, m3u8_url, m3u8_audio = None, key=None, output_filename="output.mp4"):
self.m3u8_audio = m3u8_audio self.m3u8_url = m3u8_url
self.key = key self.m3u8_audio = m3u8_audio
self.output_filename = output_filename self.key = key
self.output_filename = output_filename
self.segments = []
self.segments_audio = [] self.segments = []
self.iv = None self.segments_audio = []
if key != None: self.key = bytes.fromhex(key) self.iv = None
if key != None: self.key = bytes.fromhex(key)
self.temp_folder = "tmp"
os.makedirs(self.temp_folder, exist_ok=True) self.temp_folder = "tmp"
self.download_audio = False os.makedirs(self.temp_folder, exist_ok=True)
self.download_audio = False
def decode_ext_x_key(self, key_str):
key_str = key_str.replace('"', '').lstrip("#EXT-X-KEY:") def decode_ext_x_key(self, key_str):
v_list = re.findall(r"[^,=]+", key_str) key_str = key_str.replace('"', '').lstrip("#EXT-X-KEY:")
key_map = {v_list[i]: v_list[i+1] for i in range(0, len(v_list), 2)} v_list = re.findall(r"[^,=]+", key_str)
key_map = {v_list[i]: v_list[i+1] for i in range(0, len(v_list), 2)}
return key_map # URI | METHOD | IV
return key_map # URI | METHOD | IV
def parse_key(self, raw_iv):
self.iv = bytes.fromhex(raw_iv.replace("0x", "")) def parse_key(self, raw_iv):
self.iv = bytes.fromhex(raw_iv.replace("0x", ""))
def parse_m3u8(self, m3u8_content):
if self.m3u8_audio != None: def parse_m3u8(self, m3u8_content):
m3u8_audio_line = str(requests.get(self.m3u8_audio).content).split("\\n") if self.m3u8_audio != None:
m3u8_audio_line = str(requests.get(self.m3u8_audio).content).split("\\n")
m3u8_base_url = self.m3u8_url.rstrip(self.m3u8_url.split("/")[-1])
lines = m3u8_content.split('\n') m3u8_base_url = self.m3u8_url.rstrip(self.m3u8_url.split("/")[-1])
lines = m3u8_content.split('\n')
for i in range(len(lines)):
line = str(lines[i]) for i in range(len(lines)):
line = str(lines[i])
if line.startswith("#EXT-X-KEY:"):
x_key_dict = self.decode_ext_x_key(line) if line.startswith("#EXT-X-KEY:"):
self.parse_key(x_key_dict['IV']) x_key_dict = self.decode_ext_x_key(line)
self.parse_key(x_key_dict['IV'])
if line.startswith("#EXTINF"):
ts_url = lines[i+1] if line.startswith("#EXTINF"):
ts_url = lines[i+1]
if not ts_url.startswith("http"):
ts_url = m3u8_base_url + ts_url if not ts_url.startswith("http"):
ts_url = m3u8_base_url + ts_url
self.segments.append(ts_url)
if self.m3u8_audio != None: self.segments_audio.append(m3u8_audio_line[i+1]) self.segments.append(ts_url)
console.log(f"[cyan]Find: {len(self.segments)} ts file to download") if self.m3u8_audio != None:
self.segments_audio.append(m3u8_audio_line[i+1])
def download_m3u8(self):
response = requests.get(self.m3u8_url, headers={'user-agent': get_headers()}) console.log(f"[cyan]Find: {len(self.segments)} ts file to download")
if response.ok: # Check video ts segment
m3u8_content = response.text if len(self.segments) == 0:
self.parse_m3u8(m3u8_content) console.log("[red]No ts files to download")
sys.exit(0)
# Check there is audio in first ts file
path_test_ts_file = os.path.join(self.temp_folder, "ts_test.ts") # Check audio ts segment
if self.key and self.iv: if self.m3u8_audio != None:
open(path_test_ts_file, "wb").write(self.decrypt_ts(requests.get(self.segments[0]).content)) console.log(f"[cyan]Find: {len(self.segments_audio)} ts audio file to download")
else:
open(path_test_ts_file, "wb").write(requests.get(self.segments[0]).content) if len(self.segments_audio) == 0:
console.log("[red]No ts audio files to download")
if not there_is_audio(path_test_ts_file): sys.exit(0)
self.download_audio = True
console.log("[cyan]=> Make req to get video and audio file") def download_m3u8(self):
response = requests.get(self.m3u8_url, headers={'user-agent': get_headers()})
os.remove(path_test_ts_file)
if response.ok:
def decrypt_ts(self, encrypted_data): m3u8_content = response.text
cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) self.parse_m3u8(m3u8_content)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize() else:
console.log("[red]Wrong m3u8 url")
return decrypted_data sys.exit(0)
def decrypt_and_save(self, index):
if self.m3u8_audio != None:
video_ts_url = self.segments[index]
video_ts_filename = os.path.join(self.temp_folder, f"{index}_v.ts") # Check there is audio in first ts file
path_test_ts_file = os.path.join(self.temp_folder, "ts_test.ts")
# Download video or audio ts file if self.key and self.iv:
if not os.path.exists(video_ts_filename): # Only for media that not use audio open(path_test_ts_file, "wb").write(self.decrypt_ts(requests.get(self.segments[0]).content))
ts_response = requests.get(video_ts_url, headers={'user-agent': get_headers()}).content else:
open(path_test_ts_file, "wb").write(requests.get(self.segments[0]).content)
if self.key and self.iv:
decrypted_data = self.decrypt_ts(ts_response) if not there_is_audio(path_test_ts_file):
with open(video_ts_filename, "wb") as ts_file: self.download_audio = True
ts_file.write(decrypted_data) console.log("[cyan]=> Make req to get video and audio file")
else: console.log("[cyan]=> Download audio")
with open(video_ts_filename, "wb") as ts_file: os.remove(path_test_ts_file)
ts_file.write(ts_response)
def decrypt_ts(self, encrypted_data):
# Donwload only audio ts file cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend())
if self.download_audio: decryptor = cipher.decryptor()
audio_ts_url = self.segments_audio[index] decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
audio_ts_filename = os.path.join(self.temp_folder, f"{index}_a.ts")
video_audio_ts_filename = os.path.join(self.temp_folder, f"{index}_v_a.ts") return decrypted_data
if not os.path.exists(video_audio_ts_filename): # Only for media use audio def decrypt_and_save(self, index):
ts_response = requests.get(audio_ts_url, headers={'user-agent': get_headers()}).content
video_ts_url = self.segments[index]
if self.key and self.iv: video_ts_filename = os.path.join(self.temp_folder, f"{index}_v.ts")
decrypted_data = self.decrypt_ts(ts_response)
with open(audio_ts_filename, "wb") as ts_file: # Download video or audio ts file
ts_file.write(decrypted_data) if not os.path.exists(video_ts_filename): # Only for media that not use audio
ts_response = requests.get(video_ts_url, headers={'user-agent': get_headers()}).content
else:
with open(audio_ts_filename, "wb") as ts_file: if self.key and self.iv:
ts_file.write(ts_response) decrypted_data = self.decrypt_ts(ts_response)
with open(video_ts_filename, "wb") as ts_file:
# Join ts video and audio ts_file.write(decrypted_data)
merge_ts_files(video_ts_filename, audio_ts_filename, video_audio_ts_filename)
os.remove(video_ts_filename) else:
os.remove(audio_ts_filename) with open(video_ts_filename, "wb") as ts_file:
ts_file.write(ts_response)
def download_and_save_ts(self):
with ThreadPoolExecutor(max_workers=30) as executor: # Donwload only audio ts file
list(tqdm(executor.map(self.decrypt_and_save, range(len(self.segments)) ), total=len(self.segments), unit="bytes", unit_scale=True, unit_divisor=1024, desc="[yellow]Download")) if self.download_audio:
audio_ts_url = self.segments_audio[index]
def join_ts_files(self): audio_ts_filename = os.path.join(self.temp_folder, f"{index}_a.ts")
video_audio_ts_filename = os.path.join(self.temp_folder, f"{index}_v_a.ts")
current_dir = os.path.dirname(os.path.realpath(__file__))
file_list_path = os.path.join(current_dir, 'file_list.txt') if not os.path.exists(video_audio_ts_filename): # Only for media use audio
ts_response = requests.get(audio_ts_url, headers={'user-agent': get_headers()}).content
# Make sort by number
ts_files = [f for f in os.listdir(self.temp_folder) if f.endswith(".ts")] if self.key and self.iv:
def extract_number(file_name): decrypted_data = self.decrypt_ts(ts_response)
return int(''.join(filter(str.isdigit, file_name))) with open(audio_ts_filename, "wb") as ts_file:
ts_file.write(decrypted_data)
ts_files.sort(key=extract_number)
else:
with open(file_list_path, 'w') as f: with open(audio_ts_filename, "wb") as ts_file:
for ts_file in ts_files: ts_file.write(ts_response)
relative_path = os.path.relpath(os.path.join(self.temp_folder, ts_file), current_dir)
f.write(f"file '{relative_path}'\n") # Join ts video and audio
merge_ts_files(video_ts_filename, audio_ts_filename, video_audio_ts_filename)
console.log("[cyan]Start join all file") os.remove(video_ts_filename)
try: os.remove(audio_ts_filename)
(
ffmpeg.input(file_list_path, format='concat', safe=0).output(self.output_filename, c='copy', loglevel='quiet').run() def download_and_save_ts(self):
) with ThreadPoolExecutor(max_workers=30) as executor:
console.log(f"[cyan]Clean ...") list(tqdm(executor.map(self.decrypt_and_save, range(len(self.segments)) ), total=len(self.segments), unit="bytes", unit_scale=True, unit_divisor=1024, desc="[yellow]Download"))
except ffmpeg.Error as e:
print(f"Errore durante il salvataggio del file MP4: {e}") def join_ts_files(self):
finally:
time.sleep(2) current_dir = os.path.dirname(os.path.realpath(__file__))
os.remove(file_list_path) file_list_path = os.path.join(current_dir, 'file_list.txt')
shutil.rmtree("tmp", ignore_errors=True)
# Make sort by number
ts_files = [f for f in os.listdir(self.temp_folder) if f.endswith(".ts")]
# [ function ] def extract_number(file_name):
def dw_m3u8(url, audio_url=None, key=None, output_filename="output.mp4"): return int(''.join(filter(str.isdigit, file_name)))
downloader = M3U8Downloader(url, audio_url, key, output_filename) ts_files.sort(key=extract_number)
downloader.download_m3u8() with open(file_list_path, 'w') as f:
downloader.download_and_save_ts() for ts_file in ts_files:
downloader.join_ts_files() relative_path = os.path.relpath(os.path.join(self.temp_folder, ts_file), current_dir)
f.write(f"file '{relative_path}'\n")
def join_audio_to_video(audio_path, video_path, out_path):
console.log("[cyan]Start join all file")
# Get audio and video try:
audio = mp.AudioFileClip(audio_path) (
video1 = mp.VideoFileClip(video_path) ffmpeg.input(file_list_path, format='concat', safe=0).output(self.output_filename, c='copy', loglevel='quiet').run()
)
# Add audio console.log(f"[cyan]Clean ...")
final = video1.set_audio(audio) except ffmpeg.Error as e:
console.log(f"[red]Error saving MP4: {e.stdout}")
# Join all sys.exit(0)
final.write_videofile(out_path)
time.sleep(2)
os.remove(file_list_path)
shutil.rmtree("tmp", ignore_errors=True)
# [ main function ]
def dw_m3u8(url, audio_url=None, key=None, output_filename="output.mp4"):
downloader = M3U8Downloader(url, audio_url, key, output_filename)
downloader.download_m3u8()
downloader.download_and_save_ts()
downloader.join_ts_files()
def join_audio_to_video(audio_path, video_path, out_path):
# Get audio and video
audio = mp.AudioFileClip(audio_path)
video1 = mp.VideoFileClip(video_path)
# Add audio
final = video1.set_audio(audio)
# Join all
final.write_videofile(out_path)

12
run.py
View File

@ -1,12 +1,12 @@
# 10.12.23 # 10.12.23
# Class import # Class import
import Stream.api.page as Page import Src.Api.page as Page
from Stream.util.message import msg_start from Src.Api.film import main_dw_film as download_film
from Stream.upload.update import main_update from Src.Api.tv import main_dw_tv as download_tv
from Stream.util.console import console, msg from Src.Util.Helper.message import msg_start
from Stream.api.film import main_dw_film as download_film from Src.Util.Helper.console import console, msg
from Stream.api.tv import main_dw_tv as download_tv from Src.Upload.update import main_update
# General import # General import
import sys import sys