2024-06-09 14:16:48 +02:00

226 lines
7.3 KiB
Python

# 01.03.24
import sys
import logging
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
# External libraries
import requests
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util.console import console, Panel
# Logic class
from ..Class.SeriesType import TitleManager
from ..Class.EpisodeType import EpisodeManager
from ..Class.PreviewType import PreviewManager
from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter
class VideoSource:
def __init__(self):
"""
Initialize a VideoSource object.
"""
self.headers = {
'user-agent': get_headers()
}
self.is_series = False
self.base_name = "streamingcommunity"
def setup(self, version: str = None, domain: str = None, media_id: int = None, series_name: str = None):
"""
Set up the class
Args:
- version (str): The version to set.
- media_id (str): The media ID to set.
- media_id (int): The media ID to set.
- series_name (str): The series name to set.
"""
self.version = version
self.domain = domain
self.media_id = media_id
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_title_manager: TitleManager = TitleManager()
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def collect_info_seasons(self) -> None:
"""
Collect information about seasons.
"""
self.headers = {
'user-agent': get_headers(),
'x-inertia': 'true',
'x-inertia-version': self.version,
}
try:
response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers)
response.raise_for_status()
# Extract JSON response if available
json_response = response.json().get('props', {}).get('title', {}).get('seasons', [])
# Iterate over JSON data and add titles to the manager
for dict_season in json_response:
self.obj_title_manager.add_title(dict_season)
except Exception as e:
logging.error(f"Error collecting season info: {e}")
raise
def collect_title_season(self, number_season: int) -> None:
"""
Collect information about a specific season.
Args:
- number_season (int): The season number.
"""
try:
# Make a request to collect information about a specific season
response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers)
response.raise_for_status()
# Extract JSON response if available
json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', [])
# Iterate over JSON data and add episodes to the manager
for dict_episode in json_response:
self.obj_episode_manager.add_episode(dict_episode)
except Exception as e:
logging.error(f"Error collecting title season info: {e}")
raise
def get_iframe(self, episode_id: int = None) -> None:
"""
Get iframe source.
Args:
- episode_id (int): The episode ID, present only for series
"""
params = {}
if self.is_series:
params = {
'episode_id': episode_id,
'next_episode': '1'
}
try:
# Make a request to get iframe source
response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params)
response.raise_for_status()
# Parse response with BeautifulSoup to get iframe source
soup = BeautifulSoup(response.text, "html.parser")
self.iframe_src = soup.find("iframe").get("src")
except Exception as e:
logging.error(f"Error getting iframe source: {e}")
raise
def parse_script(self, script_text: str) -> None:
"""
Parse script text.
Args:
- script_text (str): The script text to parse.
"""
try:
converter = DynamicJSONConverter(script_text)
result = converter.convert_to_dynamic_json()
# Create window video and parameter objects
self.window_video = WindowVideo(result['video'])
self.window_parameter = WindowParameter(result['masterPlaylist'])
except Exception as e:
logging.error(f"Error parsing script: {e}")
raise
def get_content(self) -> None:
"""
Get content.
"""
try:
# Check if iframe source is available
if self.iframe_src is not None:
# Make a request to get content
try:
response = requests.get(self.iframe_src, headers=self.headers)
response.raise_for_status()
except:
print("\n")
console.print(Panel("[red bold]Coming soon", title="Notification", title_align="left", border_style="yellow"))
sys.exit(0)
if response.ok:
# Parse response with BeautifulSoup to get content
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("body").find("script").text
# Parse script to get video information
self.parse_script(script_text=script)
except Exception as e:
logging.error(f"Error getting content: {e}")
raise
def get_playlist(self) -> str:
"""
Get playlist.
Returns:
str: The playlist URL, or None if there's an error.
"""
iframe_url = self.iframe_src
# Create base uri for playlist
base_url = f'https://vixcloud.co/playlist/{self.window_video.id}'
query = urlencode(list(self.window_parameter.data.items()))
master_playlist_url = urljoin(base_url, '?' + query)
# Parse the current query string and the master playlist URL query string
current_params = parse_qs(iframe_url[1:])
m = urlparse(master_playlist_url)
master_params = parse_qs(m.query)
# Create the final parameters dictionary with token and expires from the master playlist
final_params = {
"token": master_params.get("token", [""])[0],
"expires": master_params.get("expires", [""])[0]
}
# Add conditional parameters
if "b" in current_params:
final_params["b"] = "1"
if "canPlayFHD" in current_params:
final_params["h"] = "1"
# Construct the new query string and final URL
new_query = urlencode(final_params) # Encode final_params into a query string
new_url = m._replace(query=new_query) # Replace the old query string with the new one
final_url = urlunparse(new_url) # Construct the final URL from the modified parts
return final_url