Restore ...

This commit is contained in:
Lovi 2024-06-16 19:08:24 +02:00
parent f8dc75ab7d
commit 3b2a2ae3bd
51 changed files with 4437 additions and 7 deletions

View File

@ -0,0 +1,62 @@
# 26.05.24
from typing import List
class MediaItem:
def __init__(self, data: dict):
self.name: str = data.get('name')
self.type: str = "film"
self.score: str = data.get('score')
self.url: int = data.get('url')
def __str__(self):
return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -0,0 +1,178 @@
# 26.05.24
import re
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util.os import run_node_script
class VideoSource:
def __init__(self) -> None:
"""
Initializes the VideoSource object with default values.
Attributes:
headers (dict): An empty dictionary to store HTTP headers.
"""
self.headers = {'user-agent': get_headers()}
def setup(self, url: str) -> None:
"""
Sets up the video source with the provided URL.
Args:
url (str): The URL of the video source.
"""
self.url = url
def make_request(self, url: str) -> str:
"""
Make an HTTP GET request to the provided URL.
Args:
url (str): The URL to make the request to.
Returns:
str: The response content if successful, None otherwise.
"""
try:
response = httpx.get(url, headers=self.headers, follow_redirects=True)
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Request failed [supervideo]: {e}")
return None
def parse_html(self, html_content: str) -> BeautifulSoup:
"""
Parse the provided HTML content using BeautifulSoup.
Args:
html_content (str): The HTML content to parse.
Returns:
BeautifulSoup: Parsed HTML content if successful, None otherwise.
"""
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup
except Exception as e:
logging.error(f"Failed to parse HTML content: {e}")
return None
def get_iframe(self, soup):
"""
Extracts the source URL of the second iframe in the provided BeautifulSoup object.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML.
Returns:
str: The source URL of the second iframe, or None if not found.
"""
iframes = soup.find_all("iframe")
if iframes and len(iframes) > 1:
return iframes[1].get("src")
return None
def find_content(self, url):
"""
Makes a request to the specified URL and parses the HTML content.
Args:
url (str): The URL to fetch content from.
Returns:
BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails.
"""
content = self.make_request(url)
if content:
return self.parse_html(content)
return None
def get_result_node_js(self, soup):
"""
Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content.
Returns:
str: The output from the Node.js script, or None if the script cannot be found or executed.
"""
for script in soup.find_all("script"):
if "eval" in str(script):
new_script = str(script.text).replace("eval", "var a = ")
new_script = new_script.replace(")))", ")));console.log(a);")
return run_node_script(new_script)
return None
def get_playlist(self) -> str:
"""
Download a video from the provided URL.
Returns:
str: The URL of the downloaded video if successful, None otherwise.
"""
try:
html_content = self.make_request(self.url)
if not html_content:
logging.error("Failed to fetch HTML content.")
return None
soup = self.parse_html(html_content)
if not soup:
logging.error("Failed to parse HTML content.")
return None
iframe_src = self.get_iframe(soup)
if not iframe_src:
logging.error("No iframe found.")
return None
down_page_soup = self.find_content(iframe_src)
if not down_page_soup:
logging.error("Failed to fetch down page content.")
return None
pattern = r'data-link="(//supervideo[^"]+)"'
match = re.search(pattern, str(down_page_soup))
if not match:
logging.error("No match found for supervideo URL.")
return None
supervideo_url = "https:" + match.group(1)
supervideo_soup = self.find_content(supervideo_url)
if not supervideo_soup:
logging.error("Failed to fetch supervideo content.")
return None
result = self.get_result_node_js(supervideo_soup)
if not result:
logging.error("No video URL found in script.")
return None
master_playlist = str(result).split(":")[3].split('"}')[0]
return f"https:{master_playlist}"
except Exception as e:
logging.error(f"An error occurred: {e}")
return None

View File

@ -0,0 +1,37 @@
# 26.05.24
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import title_search, get_select_title
from .film import download_film
# Variable
indice = 2
def search():
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(string_to_search)
if len_database > 0:
# Select title from list
select_title = get_select_title()
# Download only film
download_film(
title_name=select_title.name,
url=select_title.url
)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")

View File

@ -0,0 +1,14 @@
# 26.05.24
import os
# Internal utilities
from Src.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
MOVIE_FOLDER = "Movie"

View File

@ -0,0 +1,55 @@
# 26.05.24
import os
import sys
import logging
# Internal utilities
from Src.Util.console import console
from Src.Lib.Hls.downloader import Downloader
from Src.Util.message import start_message
# Logic class
from .Core.Player.supervideo import VideoSource
# Config
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
# Variable
video_source = VideoSource()
def download_film(title_name: str, url: str):
"""
Downloads a film using the provided film ID, title name, and domain.
Args:
- title_name (str): The name of the film title.
- url (str): The url of the video
"""
# Start message and display film information
start_message()
console.print(f"[yellow]Download: [red]{title_name} \n")
# Set domain and media ID for the video source
video_source.setup(
url = url
)
# Define output path
mp4_name = str(title_name).replace("-", "_") + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name)
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()

View File

@ -0,0 +1,119 @@
# 26.05.24
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Util.console import console
from Src.Util.headers import get_headers
# Logic class
from .Core.Class.SearchType import MediaManager, MediaItem
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(title_search: str) -> int:
"""
Search for titles based on a search query.
Args:
- title_search (str): The title to search for.
Returns:
int: The number of titles found.
"""
# Send request to search for titles
response = httpx.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()})
response.raise_for_status()
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', id="dle-content")
# Scrape div film in table on single page
for film_div in table_content.find_all('div', class_='col-lg-3'):
title = film_div.find('h2', class_='titleFilm').get_text(strip=True)
link = film_div.find('h2', class_='titleFilm').find('a')['href']
imdb_rating = film_div.find('div', class_='imdb-rate').get_text(strip=True).split(":")[-1]
film_info = {
'name': title,
'url': link,
'score': imdb_rating
}
media_search_manager.add_media(film_info)
# Return the number of titles found
return media_search_manager.get_length()
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
"Score": {'color': 'cyan'},
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
'Score': media.score,
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)

View File

@ -0,0 +1,91 @@
# 03.03.24
from typing import Dict, Any, List
# Variable
from ...costant import SITE_NAME, DOMAIN_NOW
class Image:
def __init__(self, image_data: Dict[str, Any]):
self.id: int = image_data.get('id', '')
self.filename: str = image_data.get('filename', '')
self.type: str = image_data.get('type', '')
self.imageable_type: str = image_data.get('imageable_type', '')
self.imageable_id: int = image_data.get('imageable_id', '')
self.created_at: str = image_data.get('created_at', '')
self.updated_at: str = image_data.get('updated_at', '')
self.original_url_field: str = image_data.get('original_url_field', '')
self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}"
def __str__(self):
return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')"
class Episode:
def __init__(self, data: Dict[str, Any]):
self.id: int = data.get('id', '')
self.number: int = data.get('number', '')
self.name: str = data.get('name', '')
self.plot: str = data.get('plot', '')
self.duration: int = data.get('duration', '')
self.scws_id: int = data.get('scws_id', '')
self.season_id: int = data.get('season_id', '')
self.created_by: str = data.get('created_by', '')
self.created_at: str = data.get('created_at', '')
self.updated_at: str = data.get('updated_at', '')
self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])]
def __str__(self):
return f"Episode(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', duration={self.duration} sec)"
class EpisodeManager:
def __init__(self):
self.episodes: List[Episode] = []
def add_episode(self, episode_data: Dict[str, Any]):
"""
Add a new episode to the manager.
Args:
- episode_data (Dict[str, Any]): A dictionary containing data for the new episode.
"""
episode = Episode(episode_data)
self.episodes.append(episode)
def get_episode_by_index(self, index: int) -> Episode:
"""
Get an episode by its index.
Args:
- index (int): Index of the episode to retrieve.
Returns:
Episode: The episode object.
"""
return self.episodes[index]
def get_length(self) -> int:
"""
Get the number of episodes in the manager.
Returns:
int: Number of episodes.
"""
return len(self.episodes)
def clear(self) -> None:
"""
This method clears the episodes list.
Args:
- self: The object instance.
"""
self.episodes.clear()
def __str__(self):
return f"EpisodeManager(num_episodes={len(self.episodes)})"

View File

@ -0,0 +1,63 @@
# 12.04.24
class Preview:
def __init__(self, data):
self.id = data.get("id")
self.title_id = data.get("title_id")
self.created_at = data.get("created_at")
self.updated_at = data.get("updated_at")
self.video_id = data.get("video_id")
self.is_viewable = data.get("is_viewable")
self.zoom_factor = data.get("zoom_factor")
self.filename = data.get("filename")
self.embed_url = data.get("embed_url")
def __str__(self):
return f"Preview: ID={self.id}, Title ID={self.title_id}, Created At={self.created_at}, Updated At={self.updated_at}, Video ID={self.video_id}, Viewable={self.is_viewable}, Zoom Factor={self.zoom_factor}, Filename={self.filename}, Embed URL={self.embed_url}"
class Genre:
def __init__(self, data):
self.id = data.get("id")
self.name = data.get("name")
self.type = data.get("type")
self.hidden = data.get("hidden")
self.created_at = data.get("created_at")
self.updated_at = data.get("updated_at")
self.pivot = data.get("pivot")
def __str__(self):
return f"Genre: ID={self.id}, Name={self.name}, Type={self.type}, Hidden={self.hidden}, Created At={self.created_at}, Updated At={self.updated_at}, Pivot={self.pivot}"
class Image:
def __init__(self, data):
self.id = data.get("id")
self.filename = data.get("filename")
self.type = data.get("type")
self.imageable_type = data.get("imageable_type")
self.imageable_id = data.get("imageable_id")
self.created_at = data.get("created_at")
self.updated_at = data.get("updated_at")
self.original_url_field = data.get("original_url_field")
def __str__(self):
return f"Image: ID={self.id}, Filename={self.filename}, Type={self.type}, Imageable Type={self.imageable_type}, Imageable ID={self.imageable_id}, Created At={self.created_at}, Updated At={self.updated_at}, Original URL Field={self.original_url_field}"
class PreviewManager:
def __init__(self, json_data):
self.id = json_data.get("id")
self.type = json_data.get("type")
self.runtime = json_data.get("runtime")
self.release_date = json_data.get("release_date")
self.quality = json_data.get("quality")
self.plot = json_data.get("plot")
self.seasons_count = json_data.get("seasons_count")
self.genres = [Genre(genre_data) for genre_data in json_data.get("genres", [])]
self.preview = Preview(json_data.get("preview"))
self.images = [Image(image_data) for image_data in json_data.get("images", [])]
def __str__(self):
genres_str = "\n".join(str(genre) for genre in self.genres)
images_str = "\n".join(str(image) for image in self.images)
return f"Title: ID={self.id}, Type={self.type}, Runtime={self.runtime}, Release Date={self.release_date}, Quality={self.quality}, Plot={self.plot}, Seasons Count={self.seasons_count}\nGenres:\n{genres_str}\nPreview:\n{self.preview}\nImages:\n{images_str}"

View File

@ -0,0 +1,85 @@
# 03.03.24
from typing import List
# Variable
from ...costant import SITE_NAME, DOMAIN_NOW
class Image:
def __init__(self, data: dict):
self.imageable_id: int = data.get('imageable_id')
self.imageable_type: str = data.get('imageable_type')
self.filename: str = data.get('filename')
self.type: str = data.get('type')
self.original_url_field: str = data.get('original_url_field')
self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}"
def __str__(self):
return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')"
class MediaItem:
def __init__(self, data: dict):
self.id: int = data.get('id')
self.slug: str = data.get('slug')
self.name: str = data.get('name')
self.type: str = data.get('type')
self.score: str = data.get('score')
self.sub_ita: int = data.get('sub_ita')
self.last_air_date: str = data.get('last_air_date')
self.seasons_count: int = data.get('seasons_count')
self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])]
def __str__(self):
return f"MediaItem(id={self.id}, slug='{self.slug}', name='{self.name}', type='{self.type}', score='{self.score}', sub_ita={self.sub_ita}, last_air_date='{self.last_air_date}', seasons_count={self.seasons_count}, images={self.images})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -0,0 +1,67 @@
# 03.03.24
from typing import List, Dict, Union
class Title:
def __init__(self, title_data: Dict[str, Union[int, str, None]]):
self.id: int = title_data.get('id')
self.number: int = title_data.get('number')
self.name: str = title_data.get('name')
self.plot: str = title_data.get('plot')
self.release_date: str = title_data.get('release_date')
self.title_id: int = title_data.get('title_id')
self.created_at: str = title_data.get('created_at')
self.updated_at: str = title_data.get('updated_at')
self.episodes_count: int = title_data.get('episodes_count')
def __str__(self):
return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})"
class TitleManager:
def __init__(self):
self.titles: List[Title] = []
def add_title(self, title_data: Dict[str, Union[int, str, None]]):
"""
Add a new title to the manager.
Args:
title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title.
"""
title = Title(title_data)
self.titles.append(title)
def get_title_by_index(self, index: int) -> Title:
"""
Get a title by its index.
Args:
index (int): Index of the title to retrieve.
Returns:
Title: The title object.
"""
return self.titles[index]
def get_length(self) -> int:
"""
Get the number of titles in the manager.
Returns:
int: Number of titles.
"""
return len(self.titles)
def clear(self) -> None:
"""
This method clears the titles list.
Args:
self: The object instance.
"""
self.titles.clear()
def __str__(self):
return f"TitleManager(num_titles={len(self.titles)})"

View File

@ -0,0 +1,160 @@
# 03.03.24
import re
import logging
from typing import Dict, Any
class WindowVideo:
def __init__(self, data: Dict[str, Any]):
self.data = data
self.id: int = data.get('id', '')
self.name: str = data.get('name', '')
self.filename: str = data.get('filename', '')
self.size: str = data.get('size', '')
self.quality: str = data.get('quality', '')
self.duration: str = data.get('duration', '')
self.views: int = data.get('views', '')
self.is_viewable: bool = data.get('is_viewable', '')
self.status: str = data.get('status', '')
self.fps: float = data.get('fps', '')
self.legacy: bool = data.get('legacy', '')
self.folder_id: int = data.get('folder_id', '')
self.created_at_diff: str = data.get('created_at_diff', '')
def __str__(self):
return f"WindowVideo(id={self.id}, name='{self.name}', filename='{self.filename}', size='{self.size}', quality='{self.quality}', duration='{self.duration}', views={self.views}, is_viewable={self.is_viewable}, status='{self.status}', fps={self.fps}, legacy={self.legacy}, folder_id={self.folder_id}, created_at_diff='{self.created_at_diff}')"
class WindowParameter:
def __init__(self, data: Dict[str, Any]):
self.data = data
self.token: str = data.get('token', '')
self.token360p: str = data.get('token360p', '')
self.token480p: str = data.get('token480p', '')
self.token720p: str = data.get('token720p', '')
self.token1080p: str = data.get('token1080p', '')
self.expires: str = data.get('expires', '')
def __str__(self):
return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')"
class DynamicJSONConverter:
"""
Class for converting an input string into dynamic JSON.
"""
def __init__(self, input_string: str):
"""
Initialize the converter with the input string.
Args:
input_string (str): The input string to convert.
"""
self.input_string = input_string
self.json_data = {}
def _parse_key_value(self, key: str, value: str):
"""
Parse a key-value pair.
Args:
key (str): The key.
value (str): The value.
Returns:
object: The parsed value.
"""
try:
value = value.strip()
if value.startswith('{'):
return self._parse_json_object(value)
else:
return self._parse_non_json_value(value)
except Exception as e:
logging.error(f"Error parsing key-value pair '{key}': {e}")
raise
def _parse_json_object(self, obj_str: str):
"""
Parse a JSON object.
Args:
obj_str (str): The string representation of the JSON object.
Returns:
dict: The parsed JSON object.
"""
try:
# Use regular expression to find key-value pairs in the JSON object string
obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str))
# Strip double quotes from values and return the parsed dictionary
return {k: v.strip('"') for k, v in obj_dict.items()}
except Exception as e:
logging.error(f"Error parsing JSON object: {e}")
raise
def _parse_non_json_value(self, value: str):
"""
Parse a non-JSON value.
Args:
value (str): The value to parse.
Returns:
object: The parsed value.
"""
try:
# Remove extra quotes and convert to lowercase
value = value.replace('"', "").strip().lower()
if value.endswith('\n}'):
value = value.replace('\n}', '')
# Check if the value matches 'true' or 'false' using regular expressions
if re.match(r'\btrue\b', value, re.IGNORECASE):
return True
elif re.match(r'\bfalse\b', value, re.IGNORECASE):
return False
return value
except Exception as e:
logging.error(f"Error parsing non-JSON value: {e}")
raise
def convert_to_dynamic_json(self):
"""
Convert the input string into dynamic JSON.
Returns:
str: The JSON representation of the result.
"""
try:
# Replace invalid characters with valid JSON syntax
self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}"
# Find all key-value matches in the input string using regular expression
matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string)
for match in matches:
key = match[0].strip()
value = match[1].strip()
# Parse each key-value pair and add it to the json_data dictionary
self.json_data[key] = self._parse_key_value(key, value)
# Convert the json_data dictionary to a formatted JSON string
return self.json_data
except Exception as e:
logging.error(f"Error converting to dynamic JSON: {e}")
raise

View File

@ -0,0 +1,194 @@
# 01.03.24
import sys
import logging
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util._jsonConfig import config_manager
# Logic class
from ..Class.SeriesType import TitleManager
from ..Class.EpisodeType import EpisodeManager, Episode
from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter
# Variable
from ...costant import SITE_NAME
class VideoSource:
def __init__(self):
"""
Initialize a VideoSource object.
"""
self.headers = {
'user-agent': get_headers()
}
self.is_series = False
self.base_name = SITE_NAME
self.domain = config_manager.get('SITE', self.base_name)
def setup(self, media_id: int = None, series_name: str = None):
"""
Set up the class
Args:
- media_id (int): The media ID to set.
- series_name (str): The series name to set.
"""
self.media_id = media_id
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_title_manager: TitleManager = TitleManager()
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def get_count_episodes(self):
"""
Fetches the total count of episodes available for the anime.
Returns:
int or None: Total count of episodes if successful, otherwise None.
"""
try:
response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/")
response.raise_for_status()
# Parse JSON response and return episode count
return response.json()["episodes_count"]
except Exception as e:
logging.error(f"(EpisodeDownloader) Error fetching episode count: {e}")
return None
def get_info_episode(self, index_ep: int) -> Episode:
"""
Fetches information about a specific episode.
Args:
- index_ep (int): Index of the episode.
Returns:
obj Episode or None: Information about the episode if successful, otherwise None.
"""
try:
params = {
"start_range": index_ep,
"end_range": index_ep + 1
}
response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params)
response.raise_for_status()
# Return information about the episode
json_data = response.json()["episodes"][-1]
return Episode(json_data)
except Exception as e:
logging.error(f"(EpisodeDownloader) Error fetching episode information: {e}")
return None
def get_embed(self, episode_id: int):
"""
Fetches the script text for a given episode ID.
Args:
- episode_id (int): ID of the episode.
Returns:
str or None: Script successful, otherwise None.
"""
try:
response = httpx.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}")
response.raise_for_status()
# Extract and clean embed URL
embed_url = response.text.strip()
self.iframe_src = embed_url
# Fetch video content using embed URL
video_response = httpx.get(embed_url)
video_response.raise_for_status()
# Parse response with BeautifulSoup to get content of the scriot
soup = BeautifulSoup(video_response.text, "html.parser")
script = soup.find("body").find("script").text
return script
except Exception as e:
logging.error(f"(EpisodeDownloader) Error fetching embed URL: {e}")
return None
def parse_script(self, script_text: str) -> None:
"""
Parse script text.
Args:
- script_text (str): The script text to parse.
"""
try:
converter = DynamicJSONConverter(script_text)
result = converter.convert_to_dynamic_json()
# Create window video and parameter objects
self.window_video = WindowVideo(result['video'])
self.window_parameter = WindowParameter(result['masterPlaylist'])
except Exception as e:
logging.error(f"Error parsing script: {e}")
raise
def get_playlist(self) -> str:
"""
Get playlist.
Returns:
- str: The playlist URL, or None if there's an error.
"""
iframe_url = self.iframe_src
# Create base uri for playlist
base_url = f'https://vixcloud.co/playlist/{self.window_video.id}'
query = urlencode(list(self.window_parameter.data.items()))
master_playlist_url = urljoin(base_url, '?' + query)
# Parse the current query string and the master playlist URL query string
current_params = parse_qs(iframe_url[1:])
m = urlparse(master_playlist_url)
master_params = parse_qs(m.query)
# Create the final parameters dictionary with token and expires from the master playlist
final_params = {
"token": master_params.get("token", [""])[0],
"expires": master_params.get("expires", [""])[0]
}
# Add conditional parameters
if "b" in current_params:
final_params["b"] = "1"
if "canPlayFHD" in current_params:
final_params["h"] = "1"
# Construct the new query string and final URL
new_query = urlencode(final_params) # Encode final_params into a query string
new_url = m._replace(query=new_query) # Replace the old query string with the new one
final_url = urlunparse(new_url) # Construct the final URL from the modified parts
return final_url

View File

@ -0,0 +1,8 @@
# 21.05.24
from .get_domain import grab_au_top_level_domain as extract_domain
from .manage_ep import (
manage_selection,
map_episode_title
)

View File

@ -0,0 +1,108 @@
# 02.04.24
import os
import threading
import logging
# External libraries
import httpx
# Internal utilities
from Src.Lib.Google import search as google_search
def check_url_for_content(url: str, content: str) -> bool:
"""
Check if a URL contains specific content.
Args:
- url (str): The URL to check.
- content (str): The content to search for in the response.
Returns:
bool: True if the content is found, False otherwise.
"""
try:
logging.info(f"Test site to extract domain: {url}")
response = httpx.get(url, timeout = 1)
response.raise_for_status()
if content in response.text:
return True
except Exception as e:
pass
return False
def grab_top_level_domain(base_url: str, target_content: str) -> str:
"""
Get the top-level domain (TLD) from a list of URLs.
Args:
- base_url (str): The base URL to construct complete URLs.
- target_content (str): The content to search for in the response.
Returns:
str: The found TLD, if any.
"""
results = []
threads = []
path_file = os.path.join("Test", "data", "TLD", "tld_list.txt")
logging.info(f"Load file: {path_file}")
def url_checker(url: str):
if check_url_for_content(url, target_content):
results.append(url.split(".")[-1])
if not os.path.exists(path_file):
raise FileNotFoundError("The file 'tld_list.txt' does not exist.")
with open(path_file, "r") as file:
urls = [f"{base_url}.{x.strip().lower()}" for x in file]
for url in urls:
thread = threading.Thread(target=url_checker, args=(url,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if results:
return results[-1]
def grab_top_level_domain_light(query: str) -> str:
"""
Get the top-level domain (TLD) using a light method via Google search.
Args:
- query (str): The search query for Google search.
Returns:
str: The found TLD, if any.
"""
for result in google_search(query, num=1, stop=1, pause=2):
return result.split(".", 2)[-1].replace("/", "")
def grab_au_top_level_domain(method: str) -> str:
"""
Get the top-level domain (TLD) for Anime Unity.
Args:
- method (str): The method to use to obtain the TLD ("light" or "strong").
Returns:
str: The found TLD, if any.
"""
if method == "light":
return grab_top_level_domain_light("animeunity")
elif method == "strong":
return grab_top_level_domain("https://www.animeunity", '<meta name="author" content="AnimeUnity Staff">')

View File

@ -0,0 +1,74 @@
# 02.05.24
import logging
from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
# Logic class
from ..Class.EpisodeType import Episode
# Config
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
"""
Manage user selection for seasons to download.
Args:
- cmd_insert (str): User input for season selection.
- max_count (int): Maximum count of seasons available.
Returns:
list_season_select (List[int]): List of selected seasons.
"""
list_season_select = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_season_select.append(int(cmd_insert))
# For a range (e.g., '[5-12]')
elif "[" in cmd_insert:
start, end = map(int, cmd_insert[1:-1].split('-'))
list_season_select = list(range(start, end + 1))
# For all seasons
elif cmd_insert == "*":
list_season_select = list(range(1, max_count+1))
# Return list of selected seasons)
logging.info(f"List return: {list_season_select}")
return list_season_select
def map_episode_title(tv_name: str, episode: Episode, number_season: int):
"""
Maps the episode title to a specific format.
Args:
- tv_name (str): The name of the TV show.
- episode (Episode): The episode object.
- number_season (int): The season number.
Returns:
str: The mapped episode title.
"""
map_episode_temp = MAP_EPISODE
map_episode_temp = map_episode_temp.replace("%(tv_name)", tv_name)
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2))
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2))
map_episode_temp = map_episode_temp.replace("%(episode_name)", episode.name)
# Additional fix
map_episode_temp = map_episode_temp.replace(".", "_")
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -0,0 +1,40 @@
# 21.05.24
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import title_search, get_select_title
from .anime import donwload_film, donwload_series
# Variable
indice = 1
def search():
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(string_to_search)
if len_database > 0:
# Select title from list
select_title = get_select_title()
if select_title.type == 'TV':
donwload_series(
tv_id=select_title.id,
tv_name=select_title.slug
)
else:
donwload_film(
id_film=select_title.id,
title_name=select_title.slug
)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")

111
Src/Api/animeunity/anime.py Normal file
View File

@ -0,0 +1,111 @@
# 11.03.24
import os
import logging
# Internal utilities
from Src.Util.console import console, msg
from Src.Lib.Hls.downloader import Downloader
from Src.Util.message import start_message
# Logic class
from .Core.Player.vixcloud import VideoSource
from .Core.Util import manage_selection
# Variable
from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER, MOVIE_FOLDER
video_source = VideoSource()
def download_episode(index_select: int):
"""
Downloads the selected episode.
Args:
- index_select (int): Index of the episode to download.
"""
# Get information about the selected episode
obj_episode = video_source.get_info_episode(index_select)
start_message()
console.print(f"[yellow]Download: [red]EP_{obj_episode.number} \n")
# Get the embed URL for the episode
embed_url = video_source.get_embed(obj_episode.id)
# Parse parameter in embed text
video_source.parse_script(embed_url)
# Create output path
mp4_path = None
mp4_name = f"{index_select + 1}.mp4"
if video_source.is_series:
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, video_source.series_name)
else:
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, video_source.series_name)
# Start downloading
Downloader(
m3u8_playlist = video_source.get_playlist(),
output_filename = os.path.join(mp4_path, mp4_name)
).start()
def donwload_series(tv_id: int, tv_name: str):
"""
Function to download episodes of a TV series.
Args:
- tv_id (int): The ID of the TV series.
- tv_name (str): The name of the TV series.
"""
# Set up video source
video_source.setup(
media_id = tv_id,
series_name = tv_name
)
# Get the count of episodes for the TV series
episoded_count = video_source.get_count_episodes()
console.log(f"[cyan]Episodes find: [red]{episoded_count}")
# Prompt user to select an episode index
last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")
# Manage user selection
list_episode_select = manage_selection(last_command, episoded_count)
# Download selected episodes
if len(list_episode_select) == 1 and last_command != "*":
download_episode(list_episode_select[0]-1)
# Download all other episodes selecter
else:
for i_episode in list_episode_select:
download_episode(i_episode-1)
def donwload_film(id_film: int, title_name: str):
"""
Function to download a film.
Args:
- id_film (int): The ID of the film.
- title_name (str): The title of the film.
"""
# Set up video source
video_source.setup(
media_id = id_film,
series_name = title_name
)
video_source.is_series = False
# Start download
download_episode(0)

View File

@ -0,0 +1,15 @@
# 26.05.24
import os
# Internal utilities
from Src.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
SERIES_FOLDER= "Serie"
MOVIE_FOLDER = "Movie"

237
Src/Api/animeunity/site.py Normal file
View File

@ -0,0 +1,237 @@
# 10.12.23
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
# Logic class
from .Core.Util import extract_domain
from .Core.Class.SearchType import MediaManager, MediaItem
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def get_token(site_name: str, domain: str) -> dict:
"""
Function to retrieve session tokens from a specified website.
Args:
- site_name (str): The name of the site.
- domain (str): The domain of the site.
Returns:
- dict: A dictionary containing session tokens. The keys are 'XSRF_TOKEN', 'animeunity_session', and 'csrf_token'.
"""
# Send a GET request to the specified URL composed of the site name and domain
response = httpx.get(f"https://www.{site_name}.{domain}")
response.raise_for_status()
# Initialize variables to store CSRF token
find_csrf_token = None
# Parse the HTML response using BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
# Loop through all meta tags in the HTML response
for html_meta in soup.find_all("meta"):
# Check if the meta tag has a 'name' attribute equal to "csrf-token"
if html_meta.get('name') == "csrf-token":
# If found, retrieve the content of the meta tag, which is the CSRF token
find_csrf_token = html_meta.get('content')
logging.info(f"Extract: ('animeunity_session': {response.cookies['animeunity_session']}, 'csrf_token': {find_csrf_token})")
return {
'animeunity_session': response.cookies['animeunity_session'],
'csrf_token': find_csrf_token
}
def update_domain():
"""
Update the domain for the anime streaming site.
This function tests the accessibility of the current anime streaming site.
If the current domain is inaccessible, it attempts to obtain and set a new domain.
It uses the 'light' method to extract a new domain from Anime Unity.
"""
# Test current site's accessibility
try:
console.log(f"[cyan]Test site: [red]https://{SITE_NAME}.{DOMAIN_NOW}")
response = httpx.get(f"https://www.{SITE_NAME}.{DOMAIN_NOW}")
response.status_code
# If the current site is inaccessible, try to obtain a new domain
except Exception as e:
# Get new domain
console.print("[red]\nExtract new DOMAIN from TLD list.")
new_domain = extract_domain(method="light")
console.log(f"[cyan]Extract new domain: [red]{new_domain}")
if new_domain:
# Update configuration with the new domain
config_manager.set_key('SITE', SITE_NAME, new_domain)
config_manager.write_config()
else:
logging.error("Failed to find a new animeunity domain")
sys.exit(0)
def get_real_title(record):
"""
Get the real title from a record.
This function takes a record, which is assumed to be a dictionary representing a row of JSON data.
It looks for a title in the record, prioritizing English over Italian titles if available.
Args:
- record (dict): A dictionary representing a row of JSON data.
Returns:
- str: The title found in the record. If no title is found, returns None.
"""
if record['title'] is not None:
return record['title']
elif record['title_eng'] is not None:
return record['title_eng']
else:
return record['title_it']
def title_search(title: str) -> int:
"""
Function to perform an anime search using a provided title.
Args:
- title_search (str): The title to search for.
Returns:
- int: A number containing the length of media search manager.
"""
# Update domain
update_domain()
# Get token and session value from configuration
url_domain = config_manager.get('SITE', SITE_NAME)
data = get_token(SITE_NAME, url_domain)
# Prepare cookies to be used in the request
cookies = {
'animeunity_session': data.get('animeunity_session')
}
# Prepare headers for the request
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7',
'content-type': 'application/json;charset=UTF-8',
'x-csrf-token': data.get('csrf_token')
}
# Prepare JSON data to be sent in the request
json_data = {
'title': unidecode(title) # Use the provided title for the search
}
# Send a POST request to the API endpoint for live search
response = httpx.post(f'https://www.{SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json=json_data)
response.raise_for_status()
# Process each record returned in the response
for record in response.json()['records']:
# Rename keys for consistency
record['name'] = get_real_title(record)
record['last_air_date'] = record.pop('date')
# Add the record to media search manager if the name is not None
media_search_manager.add_media(record)
# Return the length of media search manager
return media_search_manager.get_length()
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
"Score": {'color': 'cyan'},
"Date": {'color': 'green'}
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
'Score': media.score,
'Date': media.last_air_date
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)

View File

@ -0,0 +1,85 @@
# 13.06.24
import sys
import logging
from typing import List, Dict
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util._jsonConfig import config_manager
# Logic class
from .SearchType import MediaItem
class GetSerieInfo:
def __init__(self, dict_serie: MediaItem) -> None:
"""
Initializes the GetSerieInfo object with default values.
Args:
dict_serie (MediaItem): Dictionary containing series information (optional).
"""
self.headers = {'user-agent': get_headers()}
self.cookies = config_manager.get_dict('REQUESTS', 'index')
self.url = dict_serie.url
self.tv_name = None
self.list_episodes = None
def get_episode_number(self) -> List[Dict[str, str]]:
"""
Retrieves the number of episodes for a specific season.
Args:
n_season (int): The season number.
Returns:
List[Dict[str, str]]: List of dictionaries containing episode information.
"""
# Make an HTTP request to the series URL
try:
response = httpx.get(self.url + "?area=online", cookies=self.cookies, headers=self.headers)
response.raise_for_status()
except Exception as e:
logging.error(f"Insert: ['ips4_device_key': 'your_code', 'ips4_member_id': 'your_code', 'ips4_login_key': 'your_code'] in config.json file REQUESTS -> index, instead of user-agent. Use browser debug and cookie request with a valid account, filter by DOC.")
sys.exit(0)
# Parse HTML content of the page
soup = BeautifulSoup(response.text, "html.parser")
# Get tv name
self.tv_name = soup.find("span", class_= "ipsType_break").get_text(strip=True)
# Find the container of episodes for the specified season
table_content = soup.find('div', class_='ipsMargin_bottom:half')
list_dict_episode = []
for episode_div in table_content.find_all('a', href=True):
# Get text of episode
part_name = episode_div.get_text(strip=True)
if part_name:
link = episode_div['href']
obj_episode = {
'name': part_name,
'url': link
}
list_dict_episode.append(obj_episode)
self.list_episodes = list_dict_episode
return list_dict_episode

View File

@ -0,0 +1,60 @@
# 13.06.24
from typing import List
class MediaItem:
def __init__(self, data: dict):
self.name: str = data.get('name')
self.type: str = data.get('type')
self.url: int = data.get('url')
def __str__(self):
return f"MediaItem(name='{self.name}', type='{self.type}', url={self.url})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -0,0 +1,83 @@
# 14.06.24
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util._jsonConfig import config_manager
class VideoSource:
def __init__(self) -> None:
"""
Initializes the VideoSource object with default values.
Attributes:
headers (dict): A dictionary to store HTTP headers.
cookie (dict): A dictionary to store cookies.
"""
self.headers = {'user-agent': get_headers()}
self.cookie = config_manager.get_dict('REQUESTS', 'index')
def setup(self, url: str) -> None:
"""
Sets up the video source with the provided URL.
Args:
url (str): The URL of the video source.
"""
self.url = url
def make_request(self, url: str) -> str:
"""
Make an HTTP GET request to the provided URL.
Args:
url (str): The URL to make the request to.
Returns:
str: The response content if successful, None otherwise.
"""
try:
response = httpx.get(url, headers=self.headers, cookies=self.cookie)
response.raise_for_status()
return response.text
except httpx.HTTPStatusError as http_err:
logging.error(f"HTTP error occurred: {http_err}")
except Exception as err:
logging.error(f"An error occurred: {err}")
return None
def get_playlist(self):
"""
Retrieves the playlist URL from the video source.
Returns:
tuple: The mp4 link if found, None otherwise.
"""
try:
text = self.make_request(self.url)
if text:
soup = BeautifulSoup(text, "html.parser")
source = soup.find("source")
if source:
mp4_link = source.get("src")
return mp4_link
else:
logging.error("No <source> tag found in the HTML.")
else:
logging.error("Failed to retrieve content from the URL.")
except Exception as e:
logging.error(f"An error occurred while parsing the playlist: {e}")

View File

@ -0,0 +1,71 @@
# 02.05.24
import logging
from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
from Src.Util.os import remove_special_characters
# Config
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
"""
Manage user selection for seasons to download.
Args:
- cmd_insert (str): User input for season selection.
- max_count (int): Maximum count of seasons available.
Returns:
list_season_select (List[int]): List of selected seasons.
"""
list_season_select = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_season_select.append(int(cmd_insert))
# For a range (e.g., '[5-12]')
elif "[" in cmd_insert:
start, end = map(int, cmd_insert[1:-1].split('-'))
list_season_select = list(range(start, end + 1))
# For all seasons
elif cmd_insert == "*":
list_season_select = list(range(1, max_count+1))
# Return list of selected seasons)
logging.info(f"List return: {list_season_select}")
return list_season_select
def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str:
"""
Maps the episode title to a specific format.
Args:
tv_name (str): The name of the TV show.
number_season (int): The season number.
episode_number (int): The episode number.
episode_name (str): The original name of the episode.
Returns:
str: The mapped episode title.
"""
map_episode_temp = MAP_EPISODE
map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name))
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season))
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number))
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name))
# Additional fix
map_episode_temp = map_episode_temp.replace(".", "_")
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -0,0 +1,43 @@
# 09.06.24
import sys
import logging
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import title_search, get_select_title
from .series import download_thread
# Variable
indice = 3
def search():
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(string_to_search)
if len_database > 0:
# Select title from list
select_title = get_select_title()
# Download only film
if "Serie TV" in str(select_title.type):
download_thread(select_title)
else:
logging.error(f"Not supported: {select_title.type}")
sys.exit(0)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")

View File

@ -0,0 +1,15 @@
# 09.06.24
import os
# Internal utilities
from Src.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
MOVIE_FOLDER = "Movie"
SERIES_FOLDER = "Serie"

View File

@ -0,0 +1,135 @@
# 13.06.24
import os
import sys
import logging
from urllib.parse import urlparse
# Internal utilities
from Src.Util.color import Colors
from Src.Util.console import console, msg
from Src.Util.os import create_folder, can_create_file
from Src.Util.table import TVShowManager
from Src.Util.message import start_message
from Src.Lib.Hls.download_mp4 import MP4_downloader
# Logic class
from .Core.Class.SearchType import MediaItem
from .Core.Class.ScrapeSerie import GetSerieInfo
from .Core.Util.manage_ep import manage_selection, map_episode_title
from .Core.Player.ddl import VideoSource
# Variable
from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER
table_show_manager = TVShowManager()
video_source = VideoSource()
def donwload_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) -> None:
"""
Download a single episode video.
Args:
- tv_name (str): Name of the TV series.
- index_episode_selected (int): Index of the selected episode.
"""
start_message()
# Get info about episode
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{obj_episode.get('name')}")
print()
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(scape_info_serie.tv_name, None, index_episode_selected, obj_episode.get('name'))}.mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name)
# Check if can create file output
create_folder(mp4_path)
if not can_create_file(mp4_name):
logging.error("Invalid mp4 name.")
sys.exit(0)
# Setup video source
video_source.setup(obj_episode.get('url'))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Parse start page url
start_message()
parsed_url = urlparse(obj_episode.get('url'))
path_parts = parsed_url.path.split('/')
MP4_downloader(
url = master_playlist,
path = os.path.join(mp4_path, mp4_name),
referer = f"{parsed_url.scheme}://{parsed_url.netloc}/",
add_desc=f"{Colors.MAGENTA}video"
)
def download_thread(dict_serie: MediaItem):
"""Download all episode of a thread"""
# Start message and set up video source
start_message()
# Init class
scape_info_serie = GetSerieInfo(dict_serie)
# Collect information about thread
list_dict_episode = scape_info_serie.get_episode_number()
episodes_count = len(list_dict_episode)
# Display episodes list and manage user selection
last_command = display_episodes_list(list_dict_episode)
list_episode_select = manage_selection(last_command, episodes_count)
# Download selected episodes
if len(list_episode_select) == 1 and last_command != "*":
donwload_video(scape_info_serie, list_episode_select[0])
# Download all other episodes selecter
else:
for i_episode in list_episode_select:
donwload_video(scape_info_serie, i_episode)
def display_episodes_list(obj_episode_manager) -> str:
"""
Display episodes list and handle user input.
Returns:
last_command (str): Last command entered by the user.
"""
# Set up table for displaying episodes
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
}
table_show_manager.add_column(column_info)
# Populate the table with episodes information
for i, media in enumerate(obj_episode_manager):
table_show_manager.add_tv_show({
'Index': str(i+1),
'Name': media.get('name'),
})
# Run the table and handle user input
last_command = table_show_manager.run()
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
return last_command

View File

@ -0,0 +1,126 @@
# 09.06.24
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Util.console import console, msg
from Src.Util._jsonConfig import config_manager
from Src.Util.headers import get_headers
# Logic class
from .Core.Class.SearchType import MediaManager, MediaItem
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
cookie_index = config_manager.get_dict('REQUESTS', 'index')
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(word_to_search) -> int:
"""
Search for titles based on a search query.
"""
try:
# Send request to search for titles
response = httpx.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/search/?&q={word_to_search}&quick=1&type=videobox_video&nodes=11", headers={'user-agent': get_headers()})
response.raise_for_status()
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('ol', class_="ipsStream")
if table_content:
for title_div in table_content.find_all('li', class_='ipsStreamItem'):
try:
title_type = title_div.find("p", class_="ipsType_reset").find_all("a")[-1].get_text(strip=True)
name = title_div.find("span", class_="ipsContained").find("a").get_text(strip=True)
link = title_div.find("span", class_="ipsContained").find("a").get("href")
title_info = {
'name': name,
'url': link,
'type': title_type
}
media_search_manager.add_media(title_info)
except Exception as e:
logging.error(f"Error processing title div: {e}")
# Return the number of titles found
return media_search_manager.get_length()
else:
logging.error("No table content found.")
return -999
except Exception as err:
logging.error(f"An error occurred: {err}")
return -9999
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)

View File

@ -0,0 +1,113 @@
# 13.06.24
import sys
import logging
from typing import List, Dict
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
# Logic class
from .SearchType import MediaItem
class GetSerieInfo:
def __init__(self, dict_serie: MediaItem) -> None:
"""
Initializes the GetSerieInfo object with default values.
Args:
dict_serie (MediaItem): Dictionary containing series information (optional).
"""
self.headers = {'user-agent': get_headers()}
self.url = dict_serie.url
self.tv_name = None
self.list_episodes = None
def get_seasons_number(self) -> int:
"""
Retrieves the number of seasons of a TV series.
Returns:
int: Number of seasons of the TV series.
"""
try:
# Make an HTTP request to the series URL
response = httpx.get(self.url, headers=self.headers, timeout=10)
response.raise_for_status()
# Parse HTML content of the page
soup = BeautifulSoup(response.text, "html.parser")
# Find the container of seasons
table_content = soup.find('div', class_="tt_season")
# Count the number of seasons
seasons_number = len(table_content.find_all("li"))
# Extract the name of the series
self.tv_name = soup.find("h1", class_="front_title").get_text(strip=True)
return seasons_number
except Exception as e:
logging.error(f"Error parsing HTML page: {e}")
return -999
def get_episode_number(self, n_season: int) -> List[Dict[str, str]]:
"""
Retrieves the number of episodes for a specific season.
Args:
n_season (int): The season number.
Returns:
List[Dict[str, str]]: List of dictionaries containing episode information.
"""
try:
# Make an HTTP request to the series URL
response = httpx.get(self.url, headers=self.headers)
response.raise_for_status()
# Parse HTML content of the page
soup = BeautifulSoup(response.text, "html.parser")
# Find the container of episodes for the specified season
table_content = soup.find('div', class_="tab-pane", id=f"season-{n_season}")
# Extract episode information
episode_content = table_content.find_all("li")
list_dict_episode = []
for episode_div in episode_content:
index = episode_div.find("a").get("data-num")
link = episode_div.find("a").get("data-link")
name = episode_div.find("a").get("data-title")
obj_episode = {
'number': index,
'name': name,
'url': link
}
list_dict_episode.append(obj_episode)
self.list_episodes = list_dict_episode
return list_dict_episode
except Exception as e:
logging.error(f"Error parsing HTML page: {e}")
return []

View File

@ -0,0 +1,61 @@
# 26.05.24
from typing import List
class MediaItem:
def __init__(self, data: dict):
self.name: str = data.get('name')
self.type: str = "serie"
self.score: str = data.get('score')
self.url: int = data.get('url')
def __str__(self):
return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -0,0 +1,123 @@
# 26.05.24
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util.os import run_node_script
class VideoSource:
def __init__(self) -> None:
"""
Initializes the VideoSource object with default values.
Attributes:
headers (dict): An empty dictionary to store HTTP headers.
"""
self.headers = {'user-agent': get_headers()}
def setup(self, url: str) -> None:
"""
Sets up the video source with the provided URL.
Args:
url (str): The URL of the video source.
"""
self.url = url
def make_request(self, url: str) -> str:
"""
Make an HTTP GET request to the provided URL.
Args:
url (str): The URL to make the request to.
Returns:
str: The response content if successful, None otherwise.
"""
try:
response = httpx.get(url, headers=self.headers, follow_redirects=True)
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Request failed: {e}")
return None
def parse_html(self, html_content: str) -> BeautifulSoup:
"""
Parse the provided HTML content using BeautifulSoup.
Args:
html_content (str): The HTML content to parse.
Returns:
BeautifulSoup: Parsed HTML content if successful, None otherwise.
"""
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup
except Exception as e:
logging.error(f"Failed to parse HTML content: {e}")
return None
def get_result_node_js(self, soup):
"""
Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content.
Returns:
str: The output from the Node.js script, or None if the script cannot be found or executed.
"""
for script in soup.find_all("script"):
if "eval" in str(script):
new_script = str(script.text).replace("eval", "var a = ")
new_script = new_script.replace(")))", ")));console.log(a);")
return run_node_script(new_script)
return None
def get_playlist(self) -> str:
"""
Download a video from the provided URL.
Returns:
str: The URL of the downloaded video if successful, None otherwise.
"""
try:
html_content = self.make_request(self.url)
if not html_content:
logging.error("Failed to fetch HTML content.")
return None
soup = self.parse_html(html_content)
if not soup:
logging.error("Failed to parse HTML content.")
return None
result = self.get_result_node_js(soup)
if not result:
logging.error("No video URL found in script.")
return None
master_playlist = str(result).split(":")[3].split('"}')[0]
return f"https:{master_playlist}"
except Exception as e:
logging.error(f"An error occurred: {e}")
return None

View File

@ -0,0 +1,71 @@
# 02.05.24
import logging
from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
from Src.Util.os import remove_special_characters
# Config
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
"""
Manage user selection for seasons to download.
Args:
- cmd_insert (str): User input for season selection.
- max_count (int): Maximum count of seasons available.
Returns:
list_season_select (List[int]): List of selected seasons.
"""
list_season_select = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_season_select.append(int(cmd_insert))
# For a range (e.g., '[5-12]')
elif "[" in cmd_insert:
start, end = map(int, cmd_insert[1:-1].split('-'))
list_season_select = list(range(start, end + 1))
# For all seasons
elif cmd_insert == "*":
list_season_select = list(range(1, max_count+1))
# Return list of selected seasons)
logging.info(f"List return: {list_season_select}")
return list_season_select
def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str:
"""
Maps the episode title to a specific format.
Args:
tv_name (str): The name of the TV show.
number_season (int): The season number.
episode_number (int): The episode number.
episode_name (str): The original name of the episode.
Returns:
str: The mapped episode title.
"""
map_episode_temp = MAP_EPISODE
map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name))
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season))
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number))
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name))
# Additional fix
map_episode_temp = map_episode_temp.replace(".", "_")
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -0,0 +1,34 @@
# 09.06.24
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import title_search, get_select_title
from .series import download_series
# Variable
indice = 4
def search():
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(string_to_search)
if len_database > 0:
# Select title from list
select_title = get_select_title()
# Download only film
download_series(select_title)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")

View File

@ -0,0 +1,14 @@
# 09.06.24
import os
# Internal utilities
from Src.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
SERIES_FOLDER = "Serie"

View File

@ -0,0 +1,164 @@
# 13.06.24
import os
import sys
import logging
# Internal utilities
from Src.Util.console import console, msg
from Src.Util.table import TVShowManager
from Src.Util.message import start_message
from Src.Lib.Hls.downloader import Downloader
# Logic class
from .Core.Class.SearchType import MediaItem
from .Core.Class.ScrapeSerie import GetSerieInfo
from .Core.Util.manage_ep import manage_selection, map_episode_title
from .Core.Player.supervideo import VideoSource
# Variable
from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER
table_show_manager = TVShowManager()
video_source = VideoSource()
def donwload_video(scape_info_serie: GetSerieInfo, index_season_selected: int, index_episode_selected: int) -> None:
"""
Download a single episode video.
Args:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- index_episode_selected (int): Index of the selected episode.
"""
start_message()
# Get info about episode
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}")
print()
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}")
# Setup video source
video_source.setup(obj_episode.get('url'))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()
def donwload_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, donwload_all: bool = False) -> None:
"""
Download all episodes of a season.
Args:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- donwload_all (bool): Donwload all seasons episodes
"""
# Start message and collect information about episodes
start_message()
list_dict_episode = scape_info_serie.get_episode_number(index_season_selected)
episodes_count = len(list_dict_episode)
# Download all episodes wihtout ask
if donwload_all:
for i_episode in range(1, episodes_count+1):
donwload_video(scape_info_serie, index_season_selected, i_episode)
console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.")
# If not download all episode but a single season
if not donwload_all:
# Display episodes list and manage user selection
last_command = display_episodes_list(scape_info_serie.list_episodes)
list_episode_select = manage_selection(last_command, episodes_count)
# Download selected episodes
if len(list_episode_select) == 1 and last_command != "*":
donwload_video(scape_info_serie, index_season_selected, list_episode_select[0])
# Download all other episodes selecter
else:
for i_episode in list_episode_select:
donwload_video(scape_info_serie, index_season_selected, i_episode)
def download_series(dict_serie: MediaItem) -> None:
# Start message and set up video source
start_message()
# Init class
scape_info_serie = GetSerieInfo(dict_serie)
# Collect information about seasons
seasons_count = scape_info_serie.get_seasons_number()
# Prompt user for season selection and download episodes
console.print(f"\n[green]Season find: [red]{seasons_count}")
index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media"))
list_season_select = manage_selection(index_season_selected, seasons_count)
# Download selected episodes
if len(list_season_select) == 1 and index_season_selected != "*":
if 1 <= int(index_season_selected) <= seasons_count:
donwload_episode(scape_info_serie, list_season_select[0])
# Dowload all seasons and episodes
elif index_season_selected == "*":
for i_season in list_season_select:
donwload_episode(scape_info_serie, i_season, True)
# Download all other season selecter
else:
for i_season in list_season_select:
donwload_episode(scape_info_serie, i_season)
def display_episodes_list(obj_episode_manager) -> str:
"""
Display episodes list and handle user input.
Returns:
last_command (str): Last command entered by the user.
"""
# Set up table for displaying episodes
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
}
table_show_manager.add_column(column_info)
# Populate the table with episodes information
for media in obj_episode_manager:
table_show_manager.add_tv_show({
'Index': str(media.get('number')),
'Name': media.get('name'),
})
# Run the table and handle user input
last_command = table_show_manager.run()
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
return last_command

115
Src/Api/guardaserie/site.py Normal file
View File

@ -0,0 +1,115 @@
# 09.06.24
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Util.console import console, msg
from Src.Util.headers import get_headers
# Logic class
from .Core.Class.SearchType import MediaManager, MediaItem
# Variable
from .costant import DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(word_to_search) -> int:
"""
Search for titles based on a search query.
"""
# Send request to search for titles
response = httpx.get(f"https://guardaserie.{DOMAIN_NOW}/?story={word_to_search}&do=search&subaction=search", headers={'user-agent': get_headers()})
response.raise_for_status()
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', class_="mlnew-list")
for serie_div in table_content.find_all('div', class_='mlnew'):
try:
title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True)
link = serie_div.find('div', class_='mlnh-2').find('a')['href']
imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True)
serie_info = {
'name': title,
'url': link,
'score': imdb_rating
}
media_search_manager.add_media(serie_info)
except:
pass
# Return the number of titles found
return media_search_manager.get_length()
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
"Score": {'color': 'cyan'},
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
'Score': media.score,
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)

View File

@ -0,0 +1,90 @@
# 03.03.24
from typing import Dict, Any, List
# Variable
from ...costant import SITE_NAME, DOMAIN_NOW
class Image:
def __init__(self, image_data: Dict[str, Any]):
self.id: int = image_data.get('id', '')
self.filename: str = image_data.get('filename', '')
self.type: str = image_data.get('type', '')
self.imageable_type: str = image_data.get('imageable_type', '')
self.imageable_id: int = image_data.get('imageable_id', '')
self.created_at: str = image_data.get('created_at', '')
self.updated_at: str = image_data.get('updated_at', '')
self.original_url_field: str = image_data.get('original_url_field', '')
self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}"
def __str__(self):
return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')"
class Episode:
def __init__(self, data: Dict[str, Any]):
self.id: int = data.get('id', '')
self.number: int = data.get('number', '')
self.name: str = data.get('name', '')
self.plot: str = data.get('plot', '')
self.duration: int = data.get('duration', '')
self.scws_id: int = data.get('scws_id', '')
self.season_id: int = data.get('season_id', '')
self.created_by: str = data.get('created_by', '')
self.created_at: str = data.get('created_at', '')
self.updated_at: str = data.get('updated_at', '')
self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])]
def __str__(self):
return f"Episode(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', duration={self.duration} sec)"
class EpisodeManager:
def __init__(self):
self.episodes: List[Episode] = []
def add_episode(self, episode_data: Dict[str, Any]):
"""
Add a new episode to the manager.
Args:
- episode_data (Dict[str, Any]): A dictionary containing data for the new episode.
"""
episode = Episode(episode_data)
self.episodes.append(episode)
def get_episode_by_index(self, index: int) -> Episode:
"""
Get an episode by its index.
Args:
- index (int): Index of the episode to retrieve.
Returns:
Episode: The episode object.
"""
return self.episodes[index]
def get_length(self) -> int:
"""
Get the number of episodes in the manager.
Returns:
int: Number of episodes.
"""
return len(self.episodes)
def clear(self) -> None:
"""
This method clears the episodes list.
Args:
- self: The object instance.
"""
self.episodes.clear()
def __str__(self):
return f"EpisodeManager(num_episodes={len(self.episodes)})"

View File

@ -0,0 +1,63 @@
# 12.04.24
class Preview:
def __init__(self, data):
self.id = data.get("id")
self.title_id = data.get("title_id")
self.created_at = data.get("created_at")
self.updated_at = data.get("updated_at")
self.video_id = data.get("video_id")
self.is_viewable = data.get("is_viewable")
self.zoom_factor = data.get("zoom_factor")
self.filename = data.get("filename")
self.embed_url = data.get("embed_url")
def __str__(self):
return f"Preview: ID={self.id}, Title ID={self.title_id}, Created At={self.created_at}, Updated At={self.updated_at}, Video ID={self.video_id}, Viewable={self.is_viewable}, Zoom Factor={self.zoom_factor}, Filename={self.filename}, Embed URL={self.embed_url}"
class Genre:
def __init__(self, data):
self.id = data.get("id")
self.name = data.get("name")
self.type = data.get("type")
self.hidden = data.get("hidden")
self.created_at = data.get("created_at")
self.updated_at = data.get("updated_at")
self.pivot = data.get("pivot")
def __str__(self):
return f"Genre: ID={self.id}, Name={self.name}, Type={self.type}, Hidden={self.hidden}, Created At={self.created_at}, Updated At={self.updated_at}, Pivot={self.pivot}"
class Image:
def __init__(self, data):
self.id = data.get("id")
self.filename = data.get("filename")
self.type = data.get("type")
self.imageable_type = data.get("imageable_type")
self.imageable_id = data.get("imageable_id")
self.created_at = data.get("created_at")
self.updated_at = data.get("updated_at")
self.original_url_field = data.get("original_url_field")
def __str__(self):
return f"Image: ID={self.id}, Filename={self.filename}, Type={self.type}, Imageable Type={self.imageable_type}, Imageable ID={self.imageable_id}, Created At={self.created_at}, Updated At={self.updated_at}, Original URL Field={self.original_url_field}"
class PreviewManager:
def __init__(self, json_data):
self.id = json_data.get("id")
self.type = json_data.get("type")
self.runtime = json_data.get("runtime")
self.release_date = json_data.get("release_date")
self.quality = json_data.get("quality")
self.plot = json_data.get("plot")
self.seasons_count = json_data.get("seasons_count")
self.genres = [Genre(genre_data) for genre_data in json_data.get("genres", [])]
self.preview = Preview(json_data.get("preview"))
self.images = [Image(image_data) for image_data in json_data.get("images", [])]
def __str__(self):
genres_str = "\n".join(str(genre) for genre in self.genres)
images_str = "\n".join(str(image) for image in self.images)
return f"Title: ID={self.id}, Type={self.type}, Runtime={self.runtime}, Release Date={self.release_date}, Quality={self.quality}, Plot={self.plot}, Seasons Count={self.seasons_count}\nGenres:\n{genres_str}\nPreview:\n{self.preview}\nImages:\n{images_str}"

View File

@ -0,0 +1,85 @@
# 03.03.24
from typing import List
# Variable
from ...costant import SITE_NAME, DOMAIN_NOW
class Image:
def __init__(self, data: dict):
self.imageable_id: int = data.get('imageable_id')
self.imageable_type: str = data.get('imageable_type')
self.filename: str = data.get('filename')
self.type: str = data.get('type')
self.original_url_field: str = data.get('original_url_field')
self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}"
def __str__(self):
return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')"
class MediaItem:
def __init__(self, data: dict):
self.id: int = data.get('id')
self.slug: str = data.get('slug')
self.name: str = data.get('name')
self.type: str = data.get('type')
self.score: str = data.get('score')
self.sub_ita: int = data.get('sub_ita')
self.last_air_date: str = data.get('last_air_date')
self.seasons_count: int = data.get('seasons_count')
self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])]
def __str__(self):
return f"MediaItem(id={self.id}, slug='{self.slug}', name='{self.name}', type='{self.type}', score='{self.score}', sub_ita={self.sub_ita}, last_air_date='{self.last_air_date}', seasons_count={self.seasons_count}, images={self.images})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -0,0 +1,67 @@
# 03.03.24
from typing import List, Dict, Union
class Title:
def __init__(self, title_data: Dict[str, Union[int, str, None]]):
self.id: int = title_data.get('id')
self.number: int = title_data.get('number')
self.name: str = title_data.get('name')
self.plot: str = title_data.get('plot')
self.release_date: str = title_data.get('release_date')
self.title_id: int = title_data.get('title_id')
self.created_at: str = title_data.get('created_at')
self.updated_at: str = title_data.get('updated_at')
self.episodes_count: int = title_data.get('episodes_count')
def __str__(self):
return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})"
class TitleManager:
def __init__(self):
self.titles: List[Title] = []
def add_title(self, title_data: Dict[str, Union[int, str, None]]):
"""
Add a new title to the manager.
Args:
title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title.
"""
title = Title(title_data)
self.titles.append(title)
def get_title_by_index(self, index: int) -> Title:
"""
Get a title by its index.
Args:
index (int): Index of the title to retrieve.
Returns:
Title: The title object.
"""
return self.titles[index]
def get_length(self) -> int:
"""
Get the number of titles in the manager.
Returns:
int: Number of titles.
"""
return len(self.titles)
def clear(self) -> None:
"""
This method clears the titles list.
Args:
self: The object instance.
"""
self.titles.clear()
def __str__(self):
return f"TitleManager(num_titles={len(self.titles)})"

View File

@ -0,0 +1,160 @@
# 03.03.24
import re
import logging
from typing import Dict, Any
class WindowVideo:
def __init__(self, data: Dict[str, Any]):
self.data = data
self.id: int = data.get('id', '')
self.name: str = data.get('name', '')
self.filename: str = data.get('filename', '')
self.size: str = data.get('size', '')
self.quality: str = data.get('quality', '')
self.duration: str = data.get('duration', '')
self.views: int = data.get('views', '')
self.is_viewable: bool = data.get('is_viewable', '')
self.status: str = data.get('status', '')
self.fps: float = data.get('fps', '')
self.legacy: bool = data.get('legacy', '')
self.folder_id: int = data.get('folder_id', '')
self.created_at_diff: str = data.get('created_at_diff', '')
def __str__(self):
return f"WindowVideo(id={self.id}, name='{self.name}', filename='{self.filename}', size='{self.size}', quality='{self.quality}', duration='{self.duration}', views={self.views}, is_viewable={self.is_viewable}, status='{self.status}', fps={self.fps}, legacy={self.legacy}, folder_id={self.folder_id}, created_at_diff='{self.created_at_diff}')"
class WindowParameter:
def __init__(self, data: Dict[str, Any]):
self.data = data
self.token: str = data.get('token', '')
self.token360p: str = data.get('token360p', '')
self.token480p: str = data.get('token480p', '')
self.token720p: str = data.get('token720p', '')
self.token1080p: str = data.get('token1080p', '')
self.expires: str = data.get('expires', '')
def __str__(self):
return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')"
class DynamicJSONConverter:
"""
Class for converting an input string into dynamic JSON.
"""
def __init__(self, input_string: str):
"""
Initialize the converter with the input string.
Args:
input_string (str): The input string to convert.
"""
self.input_string = input_string
self.json_data = {}
def _parse_key_value(self, key: str, value: str):
"""
Parse a key-value pair.
Args:
key (str): The key.
value (str): The value.
Returns:
object: The parsed value.
"""
try:
value = value.strip()
if value.startswith('{'):
return self._parse_json_object(value)
else:
return self._parse_non_json_value(value)
except Exception as e:
logging.error(f"Error parsing key-value pair '{key}': {e}")
raise
def _parse_json_object(self, obj_str: str):
"""
Parse a JSON object.
Args:
obj_str (str): The string representation of the JSON object.
Returns:
dict: The parsed JSON object.
"""
try:
# Use regular expression to find key-value pairs in the JSON object string
obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str))
# Strip double quotes from values and return the parsed dictionary
return {k: v.strip('"') for k, v in obj_dict.items()}
except Exception as e:
logging.error(f"Error parsing JSON object: {e}")
raise
def _parse_non_json_value(self, value: str):
"""
Parse a non-JSON value.
Args:
value (str): The value to parse.
Returns:
object: The parsed value.
"""
try:
# Remove extra quotes and convert to lowercase
value = value.replace('"', "").strip().lower()
if value.endswith('\n}'):
value = value.replace('\n}', '')
# Check if the value matches 'true' or 'false' using regular expressions
if re.match(r'\btrue\b', value, re.IGNORECASE):
return True
elif re.match(r'\bfalse\b', value, re.IGNORECASE):
return False
return value
except Exception as e:
logging.error(f"Error parsing non-JSON value: {e}")
raise
def convert_to_dynamic_json(self):
"""
Convert the input string into dynamic JSON.
Returns:
str: The JSON representation of the result.
"""
try:
# Replace invalid characters with valid JSON syntax
self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}"
# Find all key-value matches in the input string using regular expression
matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string)
for match in matches:
key = match[0].strip()
value = match[1].strip()
# Parse each key-value pair and add it to the json_data dictionary
self.json_data[key] = self._parse_key_value(key, value)
# Convert the json_data dictionary to a formatted JSON string
return self.json_data
except Exception as e:
logging.error(f"Error converting to dynamic JSON: {e}")
raise

View File

@ -0,0 +1,228 @@
# 01.03.24
import sys
import logging
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util.console import console, Panel
# Logic class
from ..Class.SeriesType import TitleManager
from ..Class.EpisodeType import EpisodeManager
from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter
# Variable
from ...costant import SITE_NAME
class VideoSource:
def __init__(self):
"""
Initialize a VideoSource object.
"""
self.headers = {
'user-agent': get_headers()
}
self.is_series = False
self.base_name = SITE_NAME
def setup(self, version: str = None, domain: str = None, media_id: int = None, series_name: str = None):
"""
Set up the class
Args:
- version (str): The version to set.
- media_id (str): The media ID to set.
- media_id (int): The media ID to set.
- series_name (str): The series name to set.
"""
self.version = version
self.domain = domain
self.media_id = media_id
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_title_manager: TitleManager = TitleManager()
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def collect_info_seasons(self) -> None:
"""
Collect information about seasons.
"""
self.headers = {
'user-agent': get_headers(),
'x-inertia': 'true',
'x-inertia-version': self.version,
}
try:
response = httpx.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers)
response.raise_for_status()
# Extract JSON response if available
json_response = response.json().get('props', {}).get('title', {}).get('seasons', [])
# Iterate over JSON data and add titles to the manager
for dict_season in json_response:
self.obj_title_manager.add_title(dict_season)
except Exception as e:
logging.error(f"Error collecting season info: {e}")
raise
def collect_title_season(self, number_season: int) -> None:
"""
Collect information about a specific season.
Args:
- number_season (int): The season number.
"""
try:
# Make a request to collect information about a specific season
response = httpx.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers)
response.raise_for_status()
# Extract JSON response if available
json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', [])
# Iterate over JSON data and add episodes to the manager
for dict_episode in json_response:
self.obj_episode_manager.add_episode(dict_episode)
except Exception as e:
logging.error(f"Error collecting title season info: {e}")
raise
def get_iframe(self, episode_id: int = None) -> None:
"""
Get iframe source.
Args:
- episode_id (int): The episode ID, present only for series
"""
params = {}
if self.is_series:
params = {
'episode_id': episode_id,
'next_episode': '1'
}
try:
# Make a request to get iframe source
response = httpx.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params)
response.raise_for_status()
# Parse response with BeautifulSoup to get iframe source
soup = BeautifulSoup(response.text, "html.parser")
self.iframe_src = soup.find("iframe").get("src")
except Exception as e:
logging.error(f"Error getting iframe source: {e}")
raise
def parse_script(self, script_text: str) -> None:
"""
Parse script text.
Args:
- script_text (str): The script text to parse.
"""
try:
converter = DynamicJSONConverter(script_text)
result = converter.convert_to_dynamic_json()
# Create window video and parameter objects
self.window_video = WindowVideo(result['video'])
self.window_parameter = WindowParameter(result['masterPlaylist'])
except Exception as e:
logging.error(f"Error parsing script: {e}")
raise
def get_content(self) -> None:
"""
Get content.
"""
try:
# Check if iframe source is available
if self.iframe_src is not None:
# Make a request to get content
try:
response = httpx.get(self.iframe_src, headers=self.headers)
response.raise_for_status()
except Exception as e:
print("\n")
console.print(Panel("[red bold]Coming soon", title="Notification", title_align="left", border_style="yellow"))
sys.exit(0)
if response.status_code == 200:
# Parse response with BeautifulSoup to get content
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("body").find("script").text
# Parse script to get video information
self.parse_script(script_text=script)
except Exception as e:
logging.error(f"Error getting content: {e}")
raise
def get_playlist(self) -> str:
"""
Get playlist.
Returns:
str: The playlist URL, or None if there's an error.
"""
iframe_url = self.iframe_src
# Create base uri for playlist
base_url = f'https://vixcloud.co/playlist/{self.window_video.id}'
query = urlencode(list(self.window_parameter.data.items()))
master_playlist_url = urljoin(base_url, '?' + query)
# Parse the current query string and the master playlist URL query string
current_params = parse_qs(iframe_url[1:])
m = urlparse(master_playlist_url)
master_params = parse_qs(m.query)
# Create the final parameters dictionary with token and expires from the master playlist
final_params = {
"token": master_params.get("token", [""])[0],
"expires": master_params.get("expires", [""])[0]
}
# Add conditional parameters
if "b" in current_params:
final_params["b"] = "1"
if "canPlayFHD" in current_params:
final_params["h"] = "1"
# Construct the new query string and final URL
new_query = urlencode(final_params) # Encode final_params into a query string
new_url = m._replace(query=new_query) # Replace the old query string with the new one
final_url = urlunparse(new_url) # Construct the final URL from the modified parts
return final_url

View File

@ -0,0 +1,8 @@
# 21.05.24
from .get_domain import grab_sc_top_level_domain as extract_domain
from .manage_ep import (
manage_selection,
map_episode_title
)

View File

@ -0,0 +1,106 @@
# 02.04.24
import os
import threading
import logging
# External library
import httpx
# Internal utilities
from Src.Lib.Google import search as google_search
def check_url_for_content(url: str, content: str) -> bool:
"""
Check if a URL contains specific content.
Args:
- url (str): The URL to check.
- content (str): The content to search for in the response.
Returns:
bool: True if the content is found, False otherwise.
"""
try:
logging.info(f"Test site to extract domain: {url}")
response = httpx.get(url, timeout = 1)
response.raise_for_status()
if content in response.text:
return True
except Exception as e:
pass
return False
def grab_top_level_domain(base_url: str, target_content: str) -> str:
"""
Get the top-level domain (TLD) from a list of URLs.
Args:
- base_url (str): The base URL to construct complete URLs.
- target_content (str): The content to search for in the response.
Returns:
str: The found TLD, if any.
"""
results = []
threads = []
path_file = os.path.join("Test", "data", "TLD", "tld_list.txt")
logging.info(f"Load file: {path_file}")
def url_checker(url: str):
if check_url_for_content(url, target_content):
results.append(url.split(".")[-1])
if not os.path.exists(path_file):
raise FileNotFoundError("The file 'tld_list.txt' does not exist.")
with open(path_file, "r") as file:
urls = [f"{base_url}.{x.strip().lower()}" for x in file]
for url in urls:
thread = threading.Thread(target=url_checker, args=(url,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if results:
return results[-1]
def grab_top_level_domain_light(query: str) -> str:
"""
Get the top-level domain (TLD) using a light method via Google search.
Args:
- query (str): The search query for Google search.
Returns:
str: The found TLD, if any.
"""
for result in google_search(query, num=1, stop=1, pause=2):
return result.split(".", 2)[-1].replace("/", "")
def grab_sc_top_level_domain(method: str) -> str:
"""
Get the top-level domain (TLD) for the streaming community.
Args:
method (str): The method to use to obtain the TLD ("light" or "strong").
Returns:
str: The found TLD, if any.
"""
if method == "light":
return grab_top_level_domain_light("streaming community")
elif method == "strong":
return grab_top_level_domain("https://streamingcommunity", '<meta name="author" content="StreamingCommunity">')

View File

@ -0,0 +1,75 @@
# 02.05.24
import logging
from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
from Src.Util.os import remove_special_characters
# Logic class
from ..Class.EpisodeType import Episode
# Config
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
"""
Manage user selection for seasons to download.
Args:
- cmd_insert (str): User input for season selection.
- max_count (int): Maximum count of seasons available.
Returns:
list_season_select (List[int]): List of selected seasons.
"""
list_season_select = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_season_select.append(int(cmd_insert))
# For a range (e.g., '[5-12]')
elif "[" in cmd_insert:
start, end = map(int, cmd_insert[1:-1].split('-'))
list_season_select = list(range(start, end + 1))
# For all seasons
elif cmd_insert == "*":
list_season_select = list(range(1, max_count+1))
# Return list of selected seasons)
logging.info(f"List return: {list_season_select}")
return list_season_select
def map_episode_title(tv_name: str, episode: Episode, number_season: int):
"""
Maps the episode title to a specific format.
Args:
- tv_name (str): The name of the TV show.
- episode (Episode): The episode object.
- number_season (int): The season number.
Returns:
str: The mapped episode title.
"""
map_episode_temp = MAP_EPISODE
map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name))
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2))
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2))
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode.name))
# Additional fix
map_episode_temp = map_episode_temp.replace(".", "_")
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -0,0 +1,57 @@
# 21.05.24
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import (
get_version_and_domain,
title_search,
get_select_title
)
from .film import download_film
from .series import download_series
# Variable
indice = 0
def search():
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Get site domain and version and get result of the search
site_version, domain = get_version_and_domain()
len_database = title_search(string_to_search, domain)
if len_database > 0:
# Select title from list
select_title = get_select_title()
# For series
if select_title.type == 'tv':
download_series(
tv_id=select_title.id,
tv_name=select_title.slug,
version=site_version,
domain=domain
)
# For film
else:
download_film(
id_film=select_title.id,
title_name=select_title.slug,
domain=domain
)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")

View File

@ -0,0 +1,15 @@
# 26.05.24
import os
# Internal utilities
from Src.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
MOVIE_FOLDER = "Movie"
SERIES_FOLDER = "Serie"

View File

@ -0,0 +1,56 @@
# 3.12.23
import os
import logging
# Internal utilities
from Src.Util.console import console
from Src.Lib.Hls.downloader import Downloader
from Src.Util.message import start_message
# Logic class
from .Core.Player.vixcloud import VideoSource
# Variable
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
video_source = VideoSource()
def download_film(id_film: str, title_name: str, domain: str):
"""
Downloads a film using the provided film ID, title name, and domain.
Args:
- id_film (str): The ID of the film.
- title_name (str): The name of the film title.
- domain (str): The domain of the site
"""
# Start message and display film information
start_message()
console.print(f"[yellow]Download: [red]{title_name} \n")
# Set domain and media ID for the video source
video_source.setup(
domain = domain,
media_id = id_film
)
# Retrieve scws and if available master playlist
video_source.get_iframe()
video_source.get_content()
master_playlist = video_source.get_playlist()
# Define the filename and path for the downloaded film
mp4_name = title_name.replace("-", "_")
mp4_format = (mp4_name) + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name)
# Download the film using the m3u8 playlist, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_format)
).start()

View File

@ -0,0 +1,183 @@
# 3.12.23
import os
import sys
import logging
# Internal utilities
from Src.Util.console import console, msg
from Src.Util.table import TVShowManager
from Src.Util.message import start_message
from Src.Lib.Hls.downloader import Downloader
# Logic class
from .Core.Player.vixcloud import VideoSource
from .Core.Util import manage_selection, map_episode_title
# Variable
from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER
video_source = VideoSource()
table_show_manager = TVShowManager()
def donwload_video(tv_name: str, index_season_selected: int, index_episode_selected: int) -> None:
"""
Download a single episode video.
Args:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- index_episode_selected (int): Index of the selected episode.
"""
start_message()
# Get info about episode
obj_episode = video_source.obj_episode_manager.episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.name}")
print()
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, tv_name, f"S{index_season_selected}")
# Retrieve scws and if available master playlist
video_source.get_iframe(obj_episode.id)
video_source.get_content()
master_playlist = video_source.get_playlist()
# Download the episode
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()
def donwload_episode(tv_name: str, index_season_selected: int, donwload_all: bool = False) -> None:
"""
Download all episodes of a season.
Args:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- donwload_all (bool): Donwload all seasons episodes
"""
# Clean memory of all episodes and get the number of the season (some dont follow rule of [1,2,3,4,5] but [1,2,3,145,5,6,7]).
video_source.obj_episode_manager.clear()
season_number = (video_source.obj_title_manager.titles[index_season_selected-1].number)
# Start message and collect information about episodes
start_message()
video_source.collect_title_season(season_number)
episodes_count = video_source.obj_episode_manager.get_length()
# Download all episodes wihtout ask
if donwload_all:
for i_episode in range(1, episodes_count+1):
donwload_video(tv_name, index_season_selected, i_episode)
console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.")
# If not download all episode but a single season
if not donwload_all:
# Display episodes list and manage user selection
last_command = display_episodes_list()
list_episode_select = manage_selection(last_command, episodes_count)
# Download selected episodes
if len(list_episode_select) == 1 and last_command != "*":
donwload_video(tv_name, index_season_selected, list_episode_select[0])
# Download all other episodes selecter
else:
for i_episode in list_episode_select:
donwload_video(tv_name, index_season_selected, i_episode)
def download_series(tv_id: str, tv_name: str, version: str, domain: str) -> None:
"""
Download all episodes of a TV series.
Args:
- tv_id (str): ID of the TV series.
- tv_name (str): Name of the TV series.
- version (str): Version of the TV series.
- domain (str): Domain from which to download.
"""
# Start message and set up video source
start_message()
# Setup video source
video_source.setup(
version = version,
domain = domain,
media_id = tv_id,
series_name = tv_name
)
# Collect information about seasons
video_source.collect_info_seasons()
seasons_count = video_source.obj_title_manager.get_length()
# Prompt user for season selection and download episodes
console.print(f"\n[green]Season find: [red]{seasons_count}")
index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media"))
list_season_select = manage_selection(index_season_selected, seasons_count)
# Download selected episodes
if len(list_season_select) == 1 and index_season_selected != "*":
if 1 <= int(index_season_selected) <= seasons_count:
donwload_episode(tv_name, list_season_select[0])
# Dowload all seasons and episodes
elif index_season_selected == "*":
for i_season in list_season_select:
donwload_episode(tv_name, i_season, True)
# Download all other season selecter
else:
for i_season in list_season_select:
donwload_episode(tv_name, i_season)
def display_episodes_list() -> str:
"""
Display episodes list and handle user input.
Returns:
last_command (str): Last command entered by the user.
"""
# Set up table for displaying episodes
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Duration": {'color': 'green'}
}
table_show_manager.add_column(column_info)
# Populate the table with episodes information
for i, media in enumerate(video_source.obj_episode_manager.episodes):
table_show_manager.add_tv_show({
'Index': str(media.number),
'Name': media.name,
'Duration': str(media.duration)
})
# Run the table and handle user input
last_command = table_show_manager.run()
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
return last_command

View File

@ -0,0 +1,204 @@
# 10.12.23
import sys
import json
import logging
from typing import Tuple
# External libraries
import httpx
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console
from Src.Util.table import TVShowManager
# Logic class
from .Core.Util import extract_domain
from .Core.Class.SearchType import MediaManager, MediaItem
# Config
from .costant import SITE_NAME
# Variable
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def get_version(text: str) -> tuple[str, list]:
"""
Extracts the version from the HTML text of a webpage.
Args:
- text (str): The HTML text of the webpage.
Returns:
str: The version extracted from the webpage.
list: Top 10 titles headlines for today.
"""
console.print("[cyan]Make request to get version [white]...")
try:
# Parse request to site
soup = BeautifulSoup(text, "html.parser")
# Extract version
version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version']
sliders = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['props']['sliders']
title_top_10 = sliders[2]
# Collect info about only top 10 title
list_title_top_10 = []
for title in title_top_10['titles']:
list_title_top_10.append({
'name': title['name'],
'type': title['type']
})
console.print(f"[cyan]Get version [white]=> [red]{version} \n")
return version, list_title_top_10
except Exception as e:
logging.error(f"Error extracting version: {e}")
raise
def get_version_and_domain(new_domain = None) -> Tuple[str, str]:
"""
Retrieves the version and domain of the streaming website.
This function retrieves the version and domain of the streaming website.
It first checks the accessibility of the current site.
If the site is accessible, it extracts the version from the response.
If configured to do so, it also scrapes and prints the titles of the moments.
If the site is inaccessible, it attempts to obtain a new domain using the 'insta' method.
Returns:
Tuple[str, str]: A tuple containing the version and domain.
"""
# Get the current domain from the configuration
if new_domain is None:
config_domain = config_manager.get('SITE', SITE_NAME)
else:
config_domain = new_domain
# Test the accessibility of the current site
try:
# Make requests to site to get text
console.print(f"[cyan]Test site[white]: [red]https://{SITE_NAME}.{config_domain}")
response = httpx.get(f"https://{SITE_NAME}.{config_domain}")
console.print(f"[cyan]Test respost site[white]: [red]{response.status_code} \n")
# Extract version from the response
version, list_title_top_10 = get_version(response.text)
return version, config_domain
except:
console.print("[red]\nExtract new DOMAIN from TLD list.")
new_domain = extract_domain(method="light")
console.log(f"[cyan]Extract new domain: [red]{new_domain}")
# Update the domain in the configuration file
config_manager.set_key('SITE', SITE_NAME, str(new_domain))
config_manager.write_config()
# Retry to get the version and domain
return get_version_and_domain(new_domain)
def title_search(title_search: str, domain: str) -> int:
"""
Search for titles based on a search query.
Args:
- title_search (str): The title to search for.
- domain (str): The domain to search on.
Returns:
int: The number of titles found.
"""
# Send request to search for titles ( replace à to a and space to "+" )
response = httpx.get(f"https://{SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()})
response.raise_for_status()
# Add found titles to media search manager
for dict_title in response.json()['data']:
media_search_manager.add_media(dict_title)
# Return the number of titles found
return media_search_manager.get_length()
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
"Score": {'color': 'cyan'},
"Date": {'color': 'green'}
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
'Score': media.score,
'Date': media.last_air_date
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)

View File

@ -12,12 +12,12 @@
"REQUESTS": { "REQUESTS": {
"timeout": 10, "timeout": 10,
"max_retry": 3, "max_retry": 3,
"verify_ssl": true, "verify_ssl": false,
"index": { "index": {
"user-agent": "" "user-agent": ""
}, },
"proxy_start_min": 0.1, "proxy_start_min": 0.1,
"proxy_start_max": 0.4, "proxy_start_max": 0.5,
"proxy": [] "proxy": []
}, },
"M3U8_DOWNLOAD": { "M3U8_DOWNLOAD": {
@ -54,9 +54,8 @@
"SITE": { "SITE": {
"streamingcommunity": "foo", "streamingcommunity": "foo",
"animeunity": "to", "animeunity": "to",
"Altadefinizione": "vodka", "altadefinizione": "vodka",
"Guardaserie": "ceo", "guardaserie": "ceo",
"Ddlstreamitaly": "co", "ddlstreamitaly": "co"
"4kTitle": "foo"
} }
} }

5
run.py
View File

@ -110,7 +110,10 @@ def initialize():
sys.exit(0) sys.exit(0)
# Attempting GitHub update # Attempting GitHub update
git_update() try:
git_update()
except:
console.log("[red]Error with loading github.")