mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-07 12:05:35 +00:00
204 lines
6.3 KiB
Python
204 lines
6.3 KiB
Python
# 01.03.24
|
|
|
|
import logging
|
|
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
|
|
|
|
|
|
# External libraries
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
# Internal utilities
|
|
from Src.Util.headers import get_headers
|
|
from Src.Util._jsonConfig import config_manager
|
|
|
|
|
|
# Logic class
|
|
from ..Class.EpisodeType import EpisodeManager, Episode
|
|
from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter
|
|
|
|
|
|
# Variable
|
|
from ...costant import SITE_NAME
|
|
max_timeout = config_manager.get_int("REQUESTS", "timeout")
|
|
|
|
|
|
class VideoSource:
|
|
def __init__(self):
|
|
"""
|
|
Initialize a VideoSource object.
|
|
"""
|
|
self.headers = {'user-agent': get_headers()}
|
|
self.is_series = False
|
|
self.base_name = SITE_NAME
|
|
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
|
|
|
|
def setup(self, media_id: int = None, series_name: str = None):
|
|
"""
|
|
Set up the class
|
|
|
|
Parameters:
|
|
- media_id (int): The media ID to set.
|
|
- series_name (str): The series name to set.
|
|
"""
|
|
self.media_id = media_id
|
|
|
|
if series_name is not None:
|
|
self.is_series = True
|
|
self.series_name = series_name
|
|
self.obj_episode_manager: EpisodeManager = EpisodeManager()
|
|
|
|
def get_count_episodes(self):
|
|
"""
|
|
Fetches the total count of episodes available for the anime.
|
|
|
|
Returns:
|
|
- int or None: Total count of episodes if successful, otherwise None.
|
|
"""
|
|
try:
|
|
|
|
response = httpx.get(
|
|
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/",
|
|
headers=self.headers,
|
|
timeout=max_timeout
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse JSON response and return episode count
|
|
return response.json()["episodes_count"]
|
|
|
|
except Exception as e:
|
|
logging.error(f"(EpisodeDownloader) Error fetching episode count: {e}")
|
|
return None
|
|
|
|
def get_info_episode(self, index_ep: int) -> Episode:
|
|
"""
|
|
Fetches information about a specific episode.
|
|
|
|
Parameters:
|
|
- index_ep (int): Index of the episode.
|
|
|
|
Returns:
|
|
- obj Episode or None: Information about the episode if successful, otherwise None.
|
|
"""
|
|
try:
|
|
|
|
params = {
|
|
"start_range": index_ep,
|
|
"end_range": index_ep + 1
|
|
}
|
|
|
|
response = httpx.get(
|
|
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}",
|
|
headers=self.headers,
|
|
params=params,
|
|
timeout=max_timeout
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Return information about the episode
|
|
json_data = response.json()["episodes"][-1]
|
|
return Episode(json_data)
|
|
|
|
except Exception as e:
|
|
logging.error(f"(EpisodeDownloader) Error fetching episode information: {e}")
|
|
return None
|
|
|
|
def get_embed(self, episode_id: int):
|
|
"""
|
|
Fetches the script text for a given episode ID.
|
|
|
|
Parameters:
|
|
- episode_id (int): ID of the episode.
|
|
|
|
Returns:
|
|
- str or None: Script successful, otherwise None.
|
|
"""
|
|
try:
|
|
|
|
response = httpx.get(
|
|
url=f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}",
|
|
headers=self.headers,
|
|
timeout=max_timeout
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Extract and clean embed URL
|
|
embed_url = response.text.strip()
|
|
self.iframe_src = embed_url
|
|
|
|
# Fetch video content using embed URL
|
|
video_response = httpx.get(embed_url)
|
|
video_response.raise_for_status()
|
|
|
|
# Parse response with BeautifulSoup to get content of the scriot
|
|
soup = BeautifulSoup(video_response.text, "html.parser")
|
|
script = soup.find("body").find("script").text
|
|
self.src_mp4 = soup.find("body").find_all("script")[1].text.split(" = ")[1].replace("'", "")
|
|
|
|
return script
|
|
|
|
except Exception as e:
|
|
logging.error(f"(EpisodeDownloader) Error fetching embed URL: {e}")
|
|
return None
|
|
|
|
def parse_script(self, script_text: str) -> None:
|
|
"""
|
|
Parse script text.
|
|
|
|
Parameters:
|
|
- script_text (str): The script text to parse.
|
|
"""
|
|
try:
|
|
|
|
converter = DynamicJSONConverter(script_text)
|
|
result = converter.convert_to_dynamic_json()
|
|
|
|
# Create window video and parameter objects
|
|
self.window_video = WindowVideo(result['video'])
|
|
self.window_parameter = WindowParameter(result['masterPlaylist'])
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error parsing script: {e}")
|
|
raise
|
|
|
|
def get_playlist(self) -> str:
|
|
"""
|
|
Get playlist.
|
|
|
|
Returns:
|
|
- str: The playlist URL, or None if there's an error.
|
|
"""
|
|
|
|
iframe_url = self.iframe_src
|
|
|
|
# Create base uri for playlist
|
|
base_url = f'https://vixcloud.co/playlist/{self.window_video.id}'
|
|
query = urlencode(list(self.window_parameter.data.items()))
|
|
master_playlist_url = urljoin(base_url, '?' + query)
|
|
|
|
# Parse the current query string and the master playlist URL query string
|
|
current_params = parse_qs(iframe_url[1:])
|
|
m = urlparse(master_playlist_url)
|
|
master_params = parse_qs(m.query)
|
|
|
|
# Create the final parameters dictionary with token and expires from the master playlist
|
|
final_params = {
|
|
"token": master_params.get("token", [""])[0],
|
|
"expires": master_params.get("expires", [""])[0]
|
|
}
|
|
|
|
# Add conditional parameters
|
|
if "b" in current_params:
|
|
final_params["b"] = "1"
|
|
if "canPlayFHD" in current_params:
|
|
final_params["h"] = "1"
|
|
|
|
# Construct the new query string and final URL
|
|
new_query = urlencode(final_params)
|
|
new_url = m._replace(query=new_query)
|
|
final_url = urlunparse(new_url)
|
|
|
|
return final_url
|