mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-07 12:05:35 +00:00
227 lines
9.0 KiB
Python
227 lines
9.0 KiB
Python
# 3.12.23 -> 10.12.23
|
|
|
|
# General import
|
|
import binascii
|
|
import json
|
|
import os
|
|
import re
|
|
import requests
|
|
import sys
|
|
import urllib
|
|
from bs4 import BeautifulSoup
|
|
|
|
# Class import
|
|
from Src.Lib.FFmpeg.my_m3u8 import download_m3u8
|
|
from Src.Lib.FFmpeg.util import audio_extractor_m3u8
|
|
from Src.Util.config import config
|
|
from Src.Util.console import console, msg
|
|
from Src.Util.headers import get_headers
|
|
|
|
|
|
def get_info_tv(id_film, title_name, site_version, domain):
|
|
req = requests.get(f"https://streamingcommunity.{domain}/titles/{id_film}-{title_name}", headers={
|
|
'user-agent': get_headers(),
|
|
'X-Inertia': 'true',
|
|
'X-Inertia-Version': site_version,
|
|
})
|
|
|
|
if req.ok:
|
|
return req.json()['props']['title']['seasons_count']
|
|
else:
|
|
console.log(f"[red]Error: {req.status_code}")
|
|
sys.exit(0)
|
|
|
|
|
|
def get_info_season(tv_id, tv_name, domain, version, n_season):
|
|
req = requests.get(
|
|
f'https://streamingcommunity.{domain}/titles/{tv_id}-{tv_name}/stagione-{n_season}',
|
|
headers={
|
|
'user-agent': get_headers(),
|
|
'x-inertia': 'true',
|
|
'x-inertia-version': version,
|
|
})
|
|
|
|
if req.ok:
|
|
return [{'id': ep['id'], 'n': ep['number'], 'name': ep['name'] if ep['name'] is not None else ""} for ep in
|
|
req.json()['props']['loadedSeason']['episodes']]
|
|
else:
|
|
console.log(f"[red]Error: {req.status_code}")
|
|
sys.exit(0)
|
|
|
|
|
|
def get_iframe(tv_id, ep_id, domain):
|
|
req = requests.get(f'https://streamingcommunity.{domain}/iframe/{tv_id}',
|
|
params={'episode_id': ep_id, 'next_episode': '1'},
|
|
headers={'user-agent': get_headers()}
|
|
)
|
|
|
|
if req.ok:
|
|
try:
|
|
url_embed = BeautifulSoup(req.text, "lxml").find("iframe").get("src")
|
|
req_embed = requests.get(url_embed, headers={"User-agent": get_headers()}).text
|
|
return BeautifulSoup(req_embed, "lxml").find("body").find("script").text
|
|
except:
|
|
console.log("[red]Error with episode. Skipping...")
|
|
return None
|
|
else:
|
|
console.log(f"[red]Error: {req.status_code}")
|
|
sys.exit(0)
|
|
|
|
|
|
def select_quality(json_win_param):
|
|
if json_win_param['token1080p']:
|
|
return "1080p"
|
|
elif json_win_param['token720p']:
|
|
return "720p"
|
|
elif json_win_param['token480p']:
|
|
return "480p"
|
|
else:
|
|
return "360p"
|
|
|
|
|
|
def parse_content(embed_content):
|
|
# Parse parameter from req embed content
|
|
win_video = re.search(r"window.video = {.*}", str(embed_content)).group()
|
|
win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group()
|
|
|
|
# Parse parameter to make read for json
|
|
json_win_video = "{" + win_video.split("{")[1].split("}")[0] + "}"
|
|
json_win_param = "{" + win_param.split("{")[1].split("}")[0].replace("\n", "").replace(" ", "") + "}"
|
|
json_win_param = json_win_param.replace(",}", "}").replace("'", '"')
|
|
return json.loads(json_win_video), json.loads(json_win_param), select_quality(json.loads(json_win_param))
|
|
|
|
|
|
def get_playlist(json_win_video, json_win_param, render_quality):
|
|
token_render = f"token{render_quality}"
|
|
return f"https://vixcloud.co/playlist/{json_win_video['id']}?token={json_win_param['token']}&{token_render}={json_win_param[token_render]}&expires={json_win_param['expires']}"
|
|
|
|
|
|
def get_m3u8_url(json_win_video, json_win_param, render_quality):
|
|
token_render = f"token{render_quality}"
|
|
return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition={render_quality}&token={json_win_param[token_render]}&expires={json_win_param['expires']}"
|
|
|
|
|
|
def get_m3u8_key_ep():
|
|
response = requests.get('https://vixcloud.co/storage/enc.key')
|
|
|
|
if response.ok:
|
|
return binascii.hexlify(response.content).decode('utf-8')
|
|
else:
|
|
console.log(f"[red]Error: {response.status_code}")
|
|
sys.exit(0)
|
|
|
|
|
|
def get_m3u8_audio(json_win_video, json_win_param, tv_name, n_season, n_ep, ep_title, token_render):
|
|
req = requests.get(f'https://vixcloud.co/playlist/{json_win_video["id"]}',
|
|
params={'token': json_win_param['token'],
|
|
'expires': json_win_param["expires"]},
|
|
headers={
|
|
'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param[token_render]}&title={tv_name}&referer=1&expires={json_win_param["expires"]}&description=S{n_season}%3AE{n_ep}+{ep_title}&nextEpisode=1'
|
|
})
|
|
|
|
if req.ok:
|
|
result = audio_extractor_m3u8(req)
|
|
return result
|
|
else:
|
|
console.log(f"[red]Error: {req.status_code}")
|
|
sys.exit(0)
|
|
|
|
|
|
# [func \ main]
|
|
def dw_single_ep(tv_id, eps, index_ep_select, domain, tv_name, season_select):
|
|
encoded_name = urllib.parse.quote(eps[index_ep_select]['name'])
|
|
|
|
console.print(
|
|
f"[green]Downloading episode: [blue]{eps[index_ep_select]['n']} [green]=> [purple]{eps[index_ep_select]['name']}")
|
|
embed_content = get_iframe(tv_id, eps[index_ep_select]['id'], domain)
|
|
if embed_content is None:
|
|
return
|
|
json_win_video, json_win_param, render_quality = parse_content(embed_content)
|
|
|
|
token_render = f"token{render_quality}"
|
|
console.print(f"[blue]Selected quality => [red]{render_quality}")
|
|
|
|
m3u8_playlist = get_playlist(json_win_video, json_win_param, render_quality)
|
|
m3u8_url = get_m3u8_url(json_win_video, json_win_param, render_quality)
|
|
m3u8_key = get_m3u8_key_ep()
|
|
|
|
mp4_name = f"S{str(season_select).zfill(2)}E{str(index_ep_select + 1).zfill(2)}"
|
|
mp4_format = f"{mp4_name}.mp4"
|
|
season = mp4_name.rsplit("E", 1)[0]
|
|
mp4_path = os.path.join(config['root_path'], config['series_folder_name'], mp4_format)
|
|
|
|
m3u8_url_audio = get_m3u8_audio(json_win_video, json_win_param, tv_name, season_select, index_ep_select + 1,
|
|
encoded_name, token_render)
|
|
|
|
if m3u8_url_audio is not None:
|
|
console.print("[blue]Using m3u8 audio => [red]True")
|
|
|
|
subtitle_path = os.path.join(config['root_path'], config['series_folder_name'], tv_name, season)
|
|
download_m3u8(
|
|
m3u8_index = m3u8_url,
|
|
m3u8_audio = m3u8_url_audio,
|
|
m3u8_subtitle = m3u8_playlist,
|
|
key = m3u8_key,
|
|
output_filename = mp4_path,
|
|
subtitle_folder = subtitle_path,
|
|
content_name = mp4_name
|
|
)
|
|
|
|
|
|
def main_dw_tv(tv_id, tv_name, version, domain):
|
|
|
|
num_season_find = get_info_tv(tv_id, tv_name, version, domain)
|
|
console.print(
|
|
"\n[green]Insert season [red]number[green], or [red](*) [green]to download all seasons, or [red][1-2] [green]for a range of seasons")
|
|
console.print(f"\n[blue]Season(s) found: [red]{num_season_find}")
|
|
season_select = str(msg.ask("\n[green]Insert which season(s) number you'd like to download"))
|
|
if "[" in season_select:
|
|
start, end = map(int, season_select[1:-1].split('-'))
|
|
result = list(range(start, end + 1))
|
|
for n_season in result:
|
|
eps = get_info_season(tv_id, tv_name, domain, version, n_season)
|
|
for ep in eps:
|
|
dw_single_ep(tv_id, eps, int(ep['n']) - 1, domain, tv_name, n_season)
|
|
print("\n")
|
|
elif season_select != "*":
|
|
season_select = int(season_select)
|
|
if 1 <= season_select <= num_season_find:
|
|
eps = get_info_season(tv_id, tv_name, domain, version, season_select)
|
|
|
|
for ep in eps:
|
|
console.print(f"[green]Episode: [blue]{ep['n']} [green]=> [purple]{ep['name']}")
|
|
index_ep_select = str(msg.ask(
|
|
"\n[green]Insert episode [blue]number[green], or [red](*) [green]to download all episodes, or [red][1-2] [green]for a range of episodes"))
|
|
|
|
# Download range []
|
|
if "[" in index_ep_select:
|
|
start, end = map(int, index_ep_select[1:-1].split('-'))
|
|
result = list(range(start, end + 1))
|
|
|
|
for n_range_ep in result:
|
|
# index_ep_select = int(n_range_ep) # Unused
|
|
dw_single_ep(tv_id, eps, n_range_ep - 1, domain, tv_name, season_select)
|
|
|
|
# Download single ep
|
|
elif index_ep_select != "*":
|
|
if 1 <= int(index_ep_select) <= len(eps):
|
|
index_ep_select = int(index_ep_select) - 1
|
|
dw_single_ep(tv_id, eps, index_ep_select, domain, tv_name, season_select)
|
|
else:
|
|
console.print("[red]Wrong [yellow]INDEX [red]for the selected Episode")
|
|
|
|
# Download all
|
|
else:
|
|
for ep in eps:
|
|
dw_single_ep(tv_id, eps, int(ep['n']) - 1, domain, tv_name, season_select)
|
|
print("\n")
|
|
|
|
else:
|
|
console.print("[red]Wrong [yellow]INDEX for the selected Season")
|
|
else:
|
|
for n_season in range(1, num_season_find + 1):
|
|
eps = get_info_season(tv_id, tv_name, domain, version, n_season)
|
|
for ep in eps:
|
|
dw_single_ep(tv_id, eps, int(ep['n']) - 1, domain, tv_name, n_season)
|
|
print("\n")
|