From f8dc75ab7d1a101781ed0c4d261847501269e06d Mon Sep 17 00:00:00 2001 From: Lovi <62809003+Lovi-0@users.noreply.github.com> Date: Sun, 16 Jun 2024 18:38:26 +0200 Subject: [PATCH] Delete Src/Api --- .../Altadefinizione/Core/Class/SearchType.py | 62 ----- .../Altadefinizione/Core/Player/supervideo.py | 178 ------------- Src/Api/Altadefinizione/__init__.py | 37 --- Src/Api/Altadefinizione/costant.py | 14 -- Src/Api/Altadefinizione/film.py | 55 ---- Src/Api/Altadefinizione/site.py | 119 --------- Src/Api/Animeunity/Core/Class/EpisodeType.py | 91 ------- Src/Api/Animeunity/Core/Class/PreviewType.py | 63 ----- Src/Api/Animeunity/Core/Class/SearchType.py | 85 ------- Src/Api/Animeunity/Core/Class/SeriesType.py | 67 ----- Src/Api/Animeunity/Core/Class/WindowType.py | 160 ------------ Src/Api/Animeunity/Core/Player/vixcloud.py | 194 -------------- Src/Api/Animeunity/Core/Util/__init__.py | 8 - Src/Api/Animeunity/Core/Util/get_domain.py | 108 -------- Src/Api/Animeunity/Core/Util/manage_ep.py | 74 ------ Src/Api/Animeunity/__init__.py | 40 --- Src/Api/Animeunity/anime.py | 111 -------- Src/Api/Animeunity/costant.py | 15 -- Src/Api/Animeunity/site.py | 237 ------------------ .../Ddlstreamitaly/Core/Class/ScrapeSerie.py | 85 ------- .../Ddlstreamitaly/Core/Class/SearchType.py | 60 ----- Src/Api/Ddlstreamitaly/Core/Player/ddl.py | 83 ------ Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py | 71 ------ Src/Api/Ddlstreamitaly/__init__.py | 43 ---- Src/Api/Ddlstreamitaly/costant.py | 15 -- Src/Api/Ddlstreamitaly/series.py | 135 ---------- Src/Api/Ddlstreamitaly/site.py | 126 ---------- Src/Api/Guardaserie/Core/Class/ScrapeSerie.py | 113 --------- Src/Api/Guardaserie/Core/Class/SearchType.py | 61 ----- Src/Api/Guardaserie/Core/Player/supervideo.py | 123 --------- Src/Api/Guardaserie/Core/Util/manage_ep.py | 71 ------ Src/Api/Guardaserie/__init__.py | 34 --- Src/Api/Guardaserie/costant.py | 14 -- Src/Api/Guardaserie/series.py | 164 ------------ Src/Api/Guardaserie/site.py | 115 --------- .../Core/Class/EpisodeType.py | 90 ------- .../Core/Class/PreviewType.py | 63 ----- .../Core/Class/SearchType.py | 85 ------- .../Core/Class/SeriesType.py | 67 ----- .../Core/Class/WindowType.py | 160 ------------ .../Core/Player/vixcloud.py | 228 ----------------- .../Streamingcommunity/Core/Util/__init__.py | 8 - .../Core/Util/get_domain.py | 106 -------- .../Streamingcommunity/Core/Util/manage_ep.py | 75 ------ Src/Api/Streamingcommunity/__init__.py | 57 ----- Src/Api/Streamingcommunity/costant.py | 15 -- Src/Api/Streamingcommunity/film.py | 56 ----- Src/Api/Streamingcommunity/series.py | 183 -------------- Src/Api/Streamingcommunity/site.py | 204 --------------- 49 files changed, 4428 deletions(-) delete mode 100644 Src/Api/Altadefinizione/Core/Class/SearchType.py delete mode 100644 Src/Api/Altadefinizione/Core/Player/supervideo.py delete mode 100644 Src/Api/Altadefinizione/__init__.py delete mode 100644 Src/Api/Altadefinizione/costant.py delete mode 100644 Src/Api/Altadefinizione/film.py delete mode 100644 Src/Api/Altadefinizione/site.py delete mode 100644 Src/Api/Animeunity/Core/Class/EpisodeType.py delete mode 100644 Src/Api/Animeunity/Core/Class/PreviewType.py delete mode 100644 Src/Api/Animeunity/Core/Class/SearchType.py delete mode 100644 Src/Api/Animeunity/Core/Class/SeriesType.py delete mode 100644 Src/Api/Animeunity/Core/Class/WindowType.py delete mode 100644 Src/Api/Animeunity/Core/Player/vixcloud.py delete mode 100644 Src/Api/Animeunity/Core/Util/__init__.py delete mode 100644 Src/Api/Animeunity/Core/Util/get_domain.py delete mode 100644 Src/Api/Animeunity/Core/Util/manage_ep.py delete mode 100644 Src/Api/Animeunity/__init__.py delete mode 100644 Src/Api/Animeunity/anime.py delete mode 100644 Src/Api/Animeunity/costant.py delete mode 100644 Src/Api/Animeunity/site.py delete mode 100644 Src/Api/Ddlstreamitaly/Core/Class/ScrapeSerie.py delete mode 100644 Src/Api/Ddlstreamitaly/Core/Class/SearchType.py delete mode 100644 Src/Api/Ddlstreamitaly/Core/Player/ddl.py delete mode 100644 Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py delete mode 100644 Src/Api/Ddlstreamitaly/__init__.py delete mode 100644 Src/Api/Ddlstreamitaly/costant.py delete mode 100644 Src/Api/Ddlstreamitaly/series.py delete mode 100644 Src/Api/Ddlstreamitaly/site.py delete mode 100644 Src/Api/Guardaserie/Core/Class/ScrapeSerie.py delete mode 100644 Src/Api/Guardaserie/Core/Class/SearchType.py delete mode 100644 Src/Api/Guardaserie/Core/Player/supervideo.py delete mode 100644 Src/Api/Guardaserie/Core/Util/manage_ep.py delete mode 100644 Src/Api/Guardaserie/__init__.py delete mode 100644 Src/Api/Guardaserie/costant.py delete mode 100644 Src/Api/Guardaserie/series.py delete mode 100644 Src/Api/Guardaserie/site.py delete mode 100644 Src/Api/Streamingcommunity/Core/Class/EpisodeType.py delete mode 100644 Src/Api/Streamingcommunity/Core/Class/PreviewType.py delete mode 100644 Src/Api/Streamingcommunity/Core/Class/SearchType.py delete mode 100644 Src/Api/Streamingcommunity/Core/Class/SeriesType.py delete mode 100644 Src/Api/Streamingcommunity/Core/Class/WindowType.py delete mode 100644 Src/Api/Streamingcommunity/Core/Player/vixcloud.py delete mode 100644 Src/Api/Streamingcommunity/Core/Util/__init__.py delete mode 100644 Src/Api/Streamingcommunity/Core/Util/get_domain.py delete mode 100644 Src/Api/Streamingcommunity/Core/Util/manage_ep.py delete mode 100644 Src/Api/Streamingcommunity/__init__.py delete mode 100644 Src/Api/Streamingcommunity/costant.py delete mode 100644 Src/Api/Streamingcommunity/film.py delete mode 100644 Src/Api/Streamingcommunity/series.py delete mode 100644 Src/Api/Streamingcommunity/site.py diff --git a/Src/Api/Altadefinizione/Core/Class/SearchType.py b/Src/Api/Altadefinizione/Core/Class/SearchType.py deleted file mode 100644 index 8e58566..0000000 --- a/Src/Api/Altadefinizione/Core/Class/SearchType.py +++ /dev/null @@ -1,62 +0,0 @@ -# 26.05.24 - -from typing import List - - -class MediaItem: - def __init__(self, data: dict): - self.name: str = data.get('name') - self.type: str = "film" - self.score: str = data.get('score') - self.url: int = data.get('url') - - def __str__(self): - return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})" - - -class MediaManager: - def __init__(self): - self.media_list: List[MediaItem] = [] - - def add_media(self, data: dict) -> None: - """ - Add media to the list. - - Args: - data (dict): Media data to add. - """ - self.media_list.append(MediaItem(data)) - - def get(self, index: int) -> MediaItem: - """ - Get a media item from the list by index. - - Args: - index (int): The index of the media item to retrieve. - - Returns: - MediaItem: The media item at the specified index. - """ - return self.media_list[index] - - def get_length(self) -> int: - """ - Get the number of media find with research - - Returns: - int: Number of episodes. - """ - return len(self.media_list) - - def clear(self) -> None: - """ - This method clears the medias list. - - Args: - self: The object instance. - """ - self.media_list.clear() - - def __str__(self): - return f"MediaManager(num_media={len(self.media_list)})" - diff --git a/Src/Api/Altadefinizione/Core/Player/supervideo.py b/Src/Api/Altadefinizione/Core/Player/supervideo.py deleted file mode 100644 index dfe5138..0000000 --- a/Src/Api/Altadefinizione/Core/Player/supervideo.py +++ /dev/null @@ -1,178 +0,0 @@ -# 26.05.24 - -import re -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util.os import run_node_script - - -class VideoSource: - - def __init__(self) -> None: - """ - Initializes the VideoSource object with default values. - - Attributes: - headers (dict): An empty dictionary to store HTTP headers. - """ - self.headers = {'user-agent': get_headers()} - - def setup(self, url: str) -> None: - """ - Sets up the video source with the provided URL. - - Args: - url (str): The URL of the video source. - """ - self.url = url - - def make_request(self, url: str) -> str: - """ - Make an HTTP GET request to the provided URL. - - Args: - url (str): The URL to make the request to. - - Returns: - str: The response content if successful, None otherwise. - """ - - try: - response = httpx.get(url, headers=self.headers, follow_redirects=True) - response.raise_for_status() - return response.text - - except Exception as e: - logging.error(f"Request failed [supervideo]: {e}") - return None - - def parse_html(self, html_content: str) -> BeautifulSoup: - """ - Parse the provided HTML content using BeautifulSoup. - - Args: - html_content (str): The HTML content to parse. - - Returns: - BeautifulSoup: Parsed HTML content if successful, None otherwise. - """ - - try: - soup = BeautifulSoup(html_content, "html.parser") - return soup - - except Exception as e: - logging.error(f"Failed to parse HTML content: {e}") - return None - - def get_iframe(self, soup): - """ - Extracts the source URL of the second iframe in the provided BeautifulSoup object. - - Args: - soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML. - - Returns: - str: The source URL of the second iframe, or None if not found. - """ - iframes = soup.find_all("iframe") - if iframes and len(iframes) > 1: - return iframes[1].get("src") - - return None - - def find_content(self, url): - """ - Makes a request to the specified URL and parses the HTML content. - - Args: - url (str): The URL to fetch content from. - - Returns: - BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails. - """ - content = self.make_request(url) - if content: - return self.parse_html(content) - - return None - - def get_result_node_js(self, soup): - """ - Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL. - - Args: - soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content. - - Returns: - str: The output from the Node.js script, or None if the script cannot be found or executed. - """ - for script in soup.find_all("script"): - if "eval" in str(script): - new_script = str(script.text).replace("eval", "var a = ") - new_script = new_script.replace(")))", ")));console.log(a);") - return run_node_script(new_script) - - return None - - def get_playlist(self) -> str: - """ - Download a video from the provided URL. - - Returns: - str: The URL of the downloaded video if successful, None otherwise. - """ - try: - html_content = self.make_request(self.url) - if not html_content: - logging.error("Failed to fetch HTML content.") - return None - - soup = self.parse_html(html_content) - if not soup: - logging.error("Failed to parse HTML content.") - return None - - iframe_src = self.get_iframe(soup) - if not iframe_src: - logging.error("No iframe found.") - return None - - down_page_soup = self.find_content(iframe_src) - if not down_page_soup: - logging.error("Failed to fetch down page content.") - return None - - pattern = r'data-link="(//supervideo[^"]+)"' - match = re.search(pattern, str(down_page_soup)) - if not match: - logging.error("No match found for supervideo URL.") - return None - - supervideo_url = "https:" + match.group(1) - supervideo_soup = self.find_content(supervideo_url) - if not supervideo_soup: - logging.error("Failed to fetch supervideo content.") - return None - - result = self.get_result_node_js(supervideo_soup) - if not result: - logging.error("No video URL found in script.") - return None - - master_playlist = str(result).split(":")[3].split('"}')[0] - return f"https:{master_playlist}" - - except Exception as e: - logging.error(f"An error occurred: {e}") - return None - diff --git a/Src/Api/Altadefinizione/__init__.py b/Src/Api/Altadefinizione/__init__.py deleted file mode 100644 index 27a9885..0000000 --- a/Src/Api/Altadefinizione/__init__.py +++ /dev/null @@ -1,37 +0,0 @@ -# 26.05.24 - -# Internal utilities -from Src.Util.console import console, msg - - -# Logic class -from .site import title_search, get_select_title -from .film import download_film - - -# Variable -indice = 2 - - -def search(): - """ - Main function of the application for film and series. - """ - - # Make request to site to get content that corrsisponde to that string - string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() - len_database = title_search(string_to_search) - - if len_database > 0: - - # Select title from list - select_title = get_select_title() - - # Download only film - download_film( - title_name=select_title.name, - url=select_title.url - ) - - else: - console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/Altadefinizione/costant.py b/Src/Api/Altadefinizione/costant.py deleted file mode 100644 index aa9d6eb..0000000 --- a/Src/Api/Altadefinizione/costant.py +++ /dev/null @@ -1,14 +0,0 @@ -# 26.05.24 - -import os - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) - -MOVIE_FOLDER = "Movie" \ No newline at end of file diff --git a/Src/Api/Altadefinizione/film.py b/Src/Api/Altadefinizione/film.py deleted file mode 100644 index d0cc4b7..0000000 --- a/Src/Api/Altadefinizione/film.py +++ /dev/null @@ -1,55 +0,0 @@ -# 26.05.24 - -import os -import sys -import logging - - -# Internal utilities -from Src.Util.console import console -from Src.Lib.Hls.downloader import Downloader -from Src.Util.message import start_message - - -# Logic class -from .Core.Player.supervideo import VideoSource - - -# Config -from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER - - -# Variable -video_source = VideoSource() - - -def download_film(title_name: str, url: str): - """ - Downloads a film using the provided film ID, title name, and domain. - - Args: - - title_name (str): The name of the film title. - - url (str): The url of the video - """ - - # Start message and display film information - start_message() - console.print(f"[yellow]Download: [red]{title_name} \n") - - # Set domain and media ID for the video source - video_source.setup( - url = url - ) - - # Define output path - mp4_name = str(title_name).replace("-", "_") + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) - - # Get m3u8 master playlist - master_playlist = video_source.get_playlist() - - # Download the film using the m3u8 playlist, and output filename - Downloader( - m3u8_playlist = master_playlist, - output_filename = os.path.join(mp4_path, mp4_name) - ).start() \ No newline at end of file diff --git a/Src/Api/Altadefinizione/site.py b/Src/Api/Altadefinizione/site.py deleted file mode 100644 index 1ce41b6..0000000 --- a/Src/Api/Altadefinizione/site.py +++ /dev/null @@ -1,119 +0,0 @@ -# 26.05.24 - -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup -from unidecode import unidecode - - -# Internal utilities -from Src.Util.table import TVShowManager -from Src.Util.console import console -from Src.Util.headers import get_headers - - -# Logic class -from .Core.Class.SearchType import MediaManager, MediaItem - - -# Variable -from .costant import SITE_NAME, DOMAIN_NOW -media_search_manager = MediaManager() -table_show_manager = TVShowManager() - - - -def title_search(title_search: str) -> int: - """ - Search for titles based on a search query. - - Args: - - title_search (str): The title to search for. - - Returns: - int: The number of titles found. - """ - - # Send request to search for titles - response = httpx.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()}) - response.raise_for_status() - - # Create soup and find table - soup = BeautifulSoup(response.text, "html.parser") - table_content = soup.find('div', id="dle-content") - - # Scrape div film in table on single page - for film_div in table_content.find_all('div', class_='col-lg-3'): - title = film_div.find('h2', class_='titleFilm').get_text(strip=True) - link = film_div.find('h2', class_='titleFilm').find('a')['href'] - imdb_rating = film_div.find('div', class_='imdb-rate').get_text(strip=True).split(":")[-1] - - film_info = { - 'name': title, - 'url': link, - 'score': imdb_rating - } - - media_search_manager.add_media(film_info) - - # Return the number of titles found - return media_search_manager.get_length() - - -def get_select_title(type_filter: list = None) -> MediaItem: - """ - Display a selection of titles and prompt the user to choose one. - - Args: - - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] - - Returns: - MediaItem: The selected media item. - """ - - # Set up table for displaying titles - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - "Type": {'color': 'yellow'}, - "Score": {'color': 'cyan'}, - } - table_show_manager.add_column(column_info) - - # Populate the table with title information - for i, media in enumerate(media_search_manager.media_list): - - # Filter for only a list of category - if type_filter is not None: - if str(media.type) not in type_filter: - continue - - table_show_manager.add_tv_show({ - 'Index': str(i), - 'Name': media.name, - 'Type': media.type, - 'Score': media.score, - }) - - # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) - table_show_manager.clear() - - # Handle user's quit command - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - # Check if the selected index is within range - if 0 <= int(last_command) <= len(media_search_manager.media_list): - return media_search_manager.get(int(last_command)) - else: - console.print("\n[red]Wrong index") - sys.exit(0) diff --git a/Src/Api/Animeunity/Core/Class/EpisodeType.py b/Src/Api/Animeunity/Core/Class/EpisodeType.py deleted file mode 100644 index 4483274..0000000 --- a/Src/Api/Animeunity/Core/Class/EpisodeType.py +++ /dev/null @@ -1,91 +0,0 @@ -# 03.03.24 - -from typing import Dict, Any, List - - -# Variable -from ...costant import SITE_NAME, DOMAIN_NOW - - - - -class Image: - def __init__(self, image_data: Dict[str, Any]): - self.id: int = image_data.get('id', '') - self.filename: str = image_data.get('filename', '') - self.type: str = image_data.get('type', '') - self.imageable_type: str = image_data.get('imageable_type', '') - self.imageable_id: int = image_data.get('imageable_id', '') - self.created_at: str = image_data.get('created_at', '') - self.updated_at: str = image_data.get('updated_at', '') - self.original_url_field: str = image_data.get('original_url_field', '') - self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" - - def __str__(self): - return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')" - - -class Episode: - def __init__(self, data: Dict[str, Any]): - self.id: int = data.get('id', '') - self.number: int = data.get('number', '') - self.name: str = data.get('name', '') - self.plot: str = data.get('plot', '') - self.duration: int = data.get('duration', '') - self.scws_id: int = data.get('scws_id', '') - self.season_id: int = data.get('season_id', '') - self.created_by: str = data.get('created_by', '') - self.created_at: str = data.get('created_at', '') - self.updated_at: str = data.get('updated_at', '') - self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] - - def __str__(self): - return f"Episode(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', duration={self.duration} sec)" - - -class EpisodeManager: - def __init__(self): - self.episodes: List[Episode] = [] - - def add_episode(self, episode_data: Dict[str, Any]): - """ - Add a new episode to the manager. - - Args: - - episode_data (Dict[str, Any]): A dictionary containing data for the new episode. - """ - episode = Episode(episode_data) - self.episodes.append(episode) - - def get_episode_by_index(self, index: int) -> Episode: - """ - Get an episode by its index. - - Args: - - index (int): Index of the episode to retrieve. - - Returns: - Episode: The episode object. - """ - return self.episodes[index] - - def get_length(self) -> int: - """ - Get the number of episodes in the manager. - - Returns: - int: Number of episodes. - """ - return len(self.episodes) - - def clear(self) -> None: - """ - This method clears the episodes list. - - Args: - - self: The object instance. - """ - self.episodes.clear() - - def __str__(self): - return f"EpisodeManager(num_episodes={len(self.episodes)})" diff --git a/Src/Api/Animeunity/Core/Class/PreviewType.py b/Src/Api/Animeunity/Core/Class/PreviewType.py deleted file mode 100644 index 28d741e..0000000 --- a/Src/Api/Animeunity/Core/Class/PreviewType.py +++ /dev/null @@ -1,63 +0,0 @@ -# 12.04.24 - -class Preview: - def __init__(self, data): - self.id = data.get("id") - self.title_id = data.get("title_id") - self.created_at = data.get("created_at") - self.updated_at = data.get("updated_at") - self.video_id = data.get("video_id") - self.is_viewable = data.get("is_viewable") - self.zoom_factor = data.get("zoom_factor") - self.filename = data.get("filename") - self.embed_url = data.get("embed_url") - - def __str__(self): - return f"Preview: ID={self.id}, Title ID={self.title_id}, Created At={self.created_at}, Updated At={self.updated_at}, Video ID={self.video_id}, Viewable={self.is_viewable}, Zoom Factor={self.zoom_factor}, Filename={self.filename}, Embed URL={self.embed_url}" - -class Genre: - def __init__(self, data): - self.id = data.get("id") - self.name = data.get("name") - self.type = data.get("type") - self.hidden = data.get("hidden") - self.created_at = data.get("created_at") - self.updated_at = data.get("updated_at") - self.pivot = data.get("pivot") - - def __str__(self): - return f"Genre: ID={self.id}, Name={self.name}, Type={self.type}, Hidden={self.hidden}, Created At={self.created_at}, Updated At={self.updated_at}, Pivot={self.pivot}" - -class Image: - def __init__(self, data): - self.id = data.get("id") - self.filename = data.get("filename") - self.type = data.get("type") - self.imageable_type = data.get("imageable_type") - self.imageable_id = data.get("imageable_id") - self.created_at = data.get("created_at") - self.updated_at = data.get("updated_at") - self.original_url_field = data.get("original_url_field") - - def __str__(self): - return f"Image: ID={self.id}, Filename={self.filename}, Type={self.type}, Imageable Type={self.imageable_type}, Imageable ID={self.imageable_id}, Created At={self.created_at}, Updated At={self.updated_at}, Original URL Field={self.original_url_field}" - -class PreviewManager: - def __init__(self, json_data): - self.id = json_data.get("id") - self.type = json_data.get("type") - self.runtime = json_data.get("runtime") - self.release_date = json_data.get("release_date") - self.quality = json_data.get("quality") - self.plot = json_data.get("plot") - self.seasons_count = json_data.get("seasons_count") - self.genres = [Genre(genre_data) for genre_data in json_data.get("genres", [])] - self.preview = Preview(json_data.get("preview")) - self.images = [Image(image_data) for image_data in json_data.get("images", [])] - - def __str__(self): - genres_str = "\n".join(str(genre) for genre in self.genres) - images_str = "\n".join(str(image) for image in self.images) - return f"Title: ID={self.id}, Type={self.type}, Runtime={self.runtime}, Release Date={self.release_date}, Quality={self.quality}, Plot={self.plot}, Seasons Count={self.seasons_count}\nGenres:\n{genres_str}\nPreview:\n{self.preview}\nImages:\n{images_str}" - - diff --git a/Src/Api/Animeunity/Core/Class/SearchType.py b/Src/Api/Animeunity/Core/Class/SearchType.py deleted file mode 100644 index 1b7355f..0000000 --- a/Src/Api/Animeunity/Core/Class/SearchType.py +++ /dev/null @@ -1,85 +0,0 @@ -# 03.03.24 - -from typing import List - - -# Variable -from ...costant import SITE_NAME, DOMAIN_NOW - - - -class Image: - def __init__(self, data: dict): - self.imageable_id: int = data.get('imageable_id') - self.imageable_type: str = data.get('imageable_type') - self.filename: str = data.get('filename') - self.type: str = data.get('type') - self.original_url_field: str = data.get('original_url_field') - self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" - - def __str__(self): - return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')" - - -class MediaItem: - def __init__(self, data: dict): - self.id: int = data.get('id') - self.slug: str = data.get('slug') - self.name: str = data.get('name') - self.type: str = data.get('type') - self.score: str = data.get('score') - self.sub_ita: int = data.get('sub_ita') - self.last_air_date: str = data.get('last_air_date') - self.seasons_count: int = data.get('seasons_count') - self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] - - def __str__(self): - return f"MediaItem(id={self.id}, slug='{self.slug}', name='{self.name}', type='{self.type}', score='{self.score}', sub_ita={self.sub_ita}, last_air_date='{self.last_air_date}', seasons_count={self.seasons_count}, images={self.images})" - - -class MediaManager: - def __init__(self): - self.media_list: List[MediaItem] = [] - - def add_media(self, data: dict) -> None: - """ - Add media to the list. - - Args: - data (dict): Media data to add. - """ - self.media_list.append(MediaItem(data)) - - def get(self, index: int) -> MediaItem: - """ - Get a media item from the list by index. - - Args: - index (int): The index of the media item to retrieve. - - Returns: - MediaItem: The media item at the specified index. - """ - return self.media_list[index] - - def get_length(self) -> int: - """ - Get the number of media find with research - - Returns: - int: Number of episodes. - """ - return len(self.media_list) - - def clear(self) -> None: - """ - This method clears the medias list. - - Args: - self: The object instance. - """ - self.media_list.clear() - - def __str__(self): - return f"MediaManager(num_media={len(self.media_list)})" - diff --git a/Src/Api/Animeunity/Core/Class/SeriesType.py b/Src/Api/Animeunity/Core/Class/SeriesType.py deleted file mode 100644 index b8c14ef..0000000 --- a/Src/Api/Animeunity/Core/Class/SeriesType.py +++ /dev/null @@ -1,67 +0,0 @@ -# 03.03.24 - -from typing import List, Dict, Union - - -class Title: - def __init__(self, title_data: Dict[str, Union[int, str, None]]): - self.id: int = title_data.get('id') - self.number: int = title_data.get('number') - self.name: str = title_data.get('name') - self.plot: str = title_data.get('plot') - self.release_date: str = title_data.get('release_date') - self.title_id: int = title_data.get('title_id') - self.created_at: str = title_data.get('created_at') - self.updated_at: str = title_data.get('updated_at') - self.episodes_count: int = title_data.get('episodes_count') - - def __str__(self): - return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})" - - -class TitleManager: - def __init__(self): - self.titles: List[Title] = [] - - def add_title(self, title_data: Dict[str, Union[int, str, None]]): - """ - Add a new title to the manager. - - Args: - title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title. - """ - title = Title(title_data) - self.titles.append(title) - - def get_title_by_index(self, index: int) -> Title: - """ - Get a title by its index. - - Args: - index (int): Index of the title to retrieve. - - Returns: - Title: The title object. - """ - return self.titles[index] - - def get_length(self) -> int: - """ - Get the number of titles in the manager. - - Returns: - int: Number of titles. - """ - return len(self.titles) - - def clear(self) -> None: - """ - This method clears the titles list. - - Args: - self: The object instance. - """ - self.titles.clear() - - def __str__(self): - return f"TitleManager(num_titles={len(self.titles)})" diff --git a/Src/Api/Animeunity/Core/Class/WindowType.py b/Src/Api/Animeunity/Core/Class/WindowType.py deleted file mode 100644 index 07acca8..0000000 --- a/Src/Api/Animeunity/Core/Class/WindowType.py +++ /dev/null @@ -1,160 +0,0 @@ -# 03.03.24 - -import re -import logging - -from typing import Dict, Any - - -class WindowVideo: - def __init__(self, data: Dict[str, Any]): - self.data = data - self.id: int = data.get('id', '') - self.name: str = data.get('name', '') - self.filename: str = data.get('filename', '') - self.size: str = data.get('size', '') - self.quality: str = data.get('quality', '') - self.duration: str = data.get('duration', '') - self.views: int = data.get('views', '') - self.is_viewable: bool = data.get('is_viewable', '') - self.status: str = data.get('status', '') - self.fps: float = data.get('fps', '') - self.legacy: bool = data.get('legacy', '') - self.folder_id: int = data.get('folder_id', '') - self.created_at_diff: str = data.get('created_at_diff', '') - - def __str__(self): - return f"WindowVideo(id={self.id}, name='{self.name}', filename='{self.filename}', size='{self.size}', quality='{self.quality}', duration='{self.duration}', views={self.views}, is_viewable={self.is_viewable}, status='{self.status}', fps={self.fps}, legacy={self.legacy}, folder_id={self.folder_id}, created_at_diff='{self.created_at_diff}')" - -class WindowParameter: - def __init__(self, data: Dict[str, Any]): - self.data = data - self.token: str = data.get('token', '') - self.token360p: str = data.get('token360p', '') - self.token480p: str = data.get('token480p', '') - self.token720p: str = data.get('token720p', '') - self.token1080p: str = data.get('token1080p', '') - self.expires: str = data.get('expires', '') - - def __str__(self): - return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')" - - -class DynamicJSONConverter: - """ - Class for converting an input string into dynamic JSON. - """ - - def __init__(self, input_string: str): - """ - Initialize the converter with the input string. - - Args: - input_string (str): The input string to convert. - """ - self.input_string = input_string - self.json_data = {} - - def _parse_key_value(self, key: str, value: str): - """ - Parse a key-value pair. - - Args: - key (str): The key. - value (str): The value. - - Returns: - object: The parsed value. - """ - try: - value = value.strip() - - if value.startswith('{'): - return self._parse_json_object(value) - else: - return self._parse_non_json_value(value) - - except Exception as e: - logging.error(f"Error parsing key-value pair '{key}': {e}") - raise - - def _parse_json_object(self, obj_str: str): - """ - Parse a JSON object. - - Args: - obj_str (str): The string representation of the JSON object. - - Returns: - dict: The parsed JSON object. - """ - try: - # Use regular expression to find key-value pairs in the JSON object string - obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str)) - - # Strip double quotes from values and return the parsed dictionary - return {k: v.strip('"') for k, v in obj_dict.items()} - - except Exception as e: - logging.error(f"Error parsing JSON object: {e}") - raise - - def _parse_non_json_value(self, value: str): - """ - Parse a non-JSON value. - - Args: - value (str): The value to parse. - - Returns: - object: The parsed value. - """ - try: - - # Remove extra quotes and convert to lowercase - value = value.replace('"', "").strip().lower() - - if value.endswith('\n}'): - value = value.replace('\n}', '') - - # Check if the value matches 'true' or 'false' using regular expressions - if re.match(r'\btrue\b', value, re.IGNORECASE): - return True - - elif re.match(r'\bfalse\b', value, re.IGNORECASE): - return False - - return value - - except Exception as e: - logging.error(f"Error parsing non-JSON value: {e}") - raise - - def convert_to_dynamic_json(self): - """ - Convert the input string into dynamic JSON. - - Returns: - str: The JSON representation of the result. - """ - try: - - # Replace invalid characters with valid JSON syntax - self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}" - - # Find all key-value matches in the input string using regular expression - matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string) - - for match in matches: - key = match[0].strip() - value = match[1].strip() - - # Parse each key-value pair and add it to the json_data dictionary - self.json_data[key] = self._parse_key_value(key, value) - - # Convert the json_data dictionary to a formatted JSON string - return self.json_data - - except Exception as e: - logging.error(f"Error converting to dynamic JSON: {e}") - raise diff --git a/Src/Api/Animeunity/Core/Player/vixcloud.py b/Src/Api/Animeunity/Core/Player/vixcloud.py deleted file mode 100644 index fad7472..0000000 --- a/Src/Api/Animeunity/Core/Player/vixcloud.py +++ /dev/null @@ -1,194 +0,0 @@ -# 01.03.24 - -import sys -import logging -from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util._jsonConfig import config_manager - - -# Logic class -from ..Class.SeriesType import TitleManager -from ..Class.EpisodeType import EpisodeManager, Episode -from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter - - -# Variable -from ...costant import SITE_NAME - - -class VideoSource: - def __init__(self): - """ - Initialize a VideoSource object. - """ - self.headers = { - 'user-agent': get_headers() - } - self.is_series = False - self.base_name = SITE_NAME - self.domain = config_manager.get('SITE', self.base_name) - - def setup(self, media_id: int = None, series_name: str = None): - """ - Set up the class - - Args: - - media_id (int): The media ID to set. - - series_name (str): The series name to set. - """ - self.media_id = media_id - - if series_name is not None: - self.is_series = True - self.series_name = series_name - self.obj_title_manager: TitleManager = TitleManager() - self.obj_episode_manager: EpisodeManager = EpisodeManager() - - def get_count_episodes(self): - """ - Fetches the total count of episodes available for the anime. - - Returns: - int or None: Total count of episodes if successful, otherwise None. - """ - try: - - response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/") - response.raise_for_status() - - # Parse JSON response and return episode count - return response.json()["episodes_count"] - - except Exception as e: - logging.error(f"(EpisodeDownloader) Error fetching episode count: {e}") - return None - - def get_info_episode(self, index_ep: int) -> Episode: - """ - Fetches information about a specific episode. - - Args: - - index_ep (int): Index of the episode. - - Returns: - obj Episode or None: Information about the episode if successful, otherwise None. - """ - try: - - params = { - "start_range": index_ep, - "end_range": index_ep + 1 - } - - response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params) - response.raise_for_status() - - # Return information about the episode - json_data = response.json()["episodes"][-1] - return Episode(json_data) - - except Exception as e: - logging.error(f"(EpisodeDownloader) Error fetching episode information: {e}") - return None - - def get_embed(self, episode_id: int): - """ - Fetches the script text for a given episode ID. - - Args: - - episode_id (int): ID of the episode. - - Returns: - str or None: Script successful, otherwise None. - """ - try: - - response = httpx.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}") - response.raise_for_status() - - # Extract and clean embed URL - embed_url = response.text.strip() - self.iframe_src = embed_url - - # Fetch video content using embed URL - video_response = httpx.get(embed_url) - video_response.raise_for_status() - - - # Parse response with BeautifulSoup to get content of the scriot - soup = BeautifulSoup(video_response.text, "html.parser") - script = soup.find("body").find("script").text - - return script - - except Exception as e: - logging.error(f"(EpisodeDownloader) Error fetching embed URL: {e}") - return None - - def parse_script(self, script_text: str) -> None: - """ - Parse script text. - - Args: - - script_text (str): The script text to parse. - """ - try: - - converter = DynamicJSONConverter(script_text) - result = converter.convert_to_dynamic_json() - - # Create window video and parameter objects - self.window_video = WindowVideo(result['video']) - self.window_parameter = WindowParameter(result['masterPlaylist']) - - except Exception as e: - logging.error(f"Error parsing script: {e}") - raise - - def get_playlist(self) -> str: - """ - Get playlist. - - Returns: - - str: The playlist URL, or None if there's an error. - """ - - iframe_url = self.iframe_src - - # Create base uri for playlist - base_url = f'https://vixcloud.co/playlist/{self.window_video.id}' - query = urlencode(list(self.window_parameter.data.items())) - master_playlist_url = urljoin(base_url, '?' + query) - - # Parse the current query string and the master playlist URL query string - current_params = parse_qs(iframe_url[1:]) - m = urlparse(master_playlist_url) - master_params = parse_qs(m.query) - - # Create the final parameters dictionary with token and expires from the master playlist - final_params = { - "token": master_params.get("token", [""])[0], - "expires": master_params.get("expires", [""])[0] - } - - # Add conditional parameters - if "b" in current_params: - final_params["b"] = "1" - if "canPlayFHD" in current_params: - final_params["h"] = "1" - - # Construct the new query string and final URL - new_query = urlencode(final_params) # Encode final_params into a query string - new_url = m._replace(query=new_query) # Replace the old query string with the new one - final_url = urlunparse(new_url) # Construct the final URL from the modified parts - - return final_url diff --git a/Src/Api/Animeunity/Core/Util/__init__.py b/Src/Api/Animeunity/Core/Util/__init__.py deleted file mode 100644 index 6f629ba..0000000 --- a/Src/Api/Animeunity/Core/Util/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# 21.05.24 - -from .get_domain import grab_au_top_level_domain as extract_domain - -from .manage_ep import ( - manage_selection, - map_episode_title -) \ No newline at end of file diff --git a/Src/Api/Animeunity/Core/Util/get_domain.py b/Src/Api/Animeunity/Core/Util/get_domain.py deleted file mode 100644 index 4194e69..0000000 --- a/Src/Api/Animeunity/Core/Util/get_domain.py +++ /dev/null @@ -1,108 +0,0 @@ -# 02.04.24 - -import os -import threading -import logging - - -# External libraries -import httpx - - -# Internal utilities -from Src.Lib.Google import search as google_search - - - -def check_url_for_content(url: str, content: str) -> bool: - """ - Check if a URL contains specific content. - - Args: - - url (str): The URL to check. - - content (str): The content to search for in the response. - - Returns: - bool: True if the content is found, False otherwise. - """ - try: - - logging.info(f"Test site to extract domain: {url}") - response = httpx.get(url, timeout = 1) - response.raise_for_status() - - if content in response.text: - return True - - except Exception as e: - pass - - return False - - -def grab_top_level_domain(base_url: str, target_content: str) -> str: - """ - Get the top-level domain (TLD) from a list of URLs. - - Args: - - base_url (str): The base URL to construct complete URLs. - - target_content (str): The content to search for in the response. - - Returns: - str: The found TLD, if any. - """ - results = [] - threads = [] - path_file = os.path.join("Test", "data", "TLD", "tld_list.txt") - logging.info(f"Load file: {path_file}") - - def url_checker(url: str): - if check_url_for_content(url, target_content): - results.append(url.split(".")[-1]) - - if not os.path.exists(path_file): - raise FileNotFoundError("The file 'tld_list.txt' does not exist.") - - with open(path_file, "r") as file: - urls = [f"{base_url}.{x.strip().lower()}" for x in file] - - for url in urls: - thread = threading.Thread(target=url_checker, args=(url,)) - thread.start() - threads.append(thread) - - for thread in threads: - thread.join() - - if results: - return results[-1] - - -def grab_top_level_domain_light(query: str) -> str: - """ - Get the top-level domain (TLD) using a light method via Google search. - - Args: - - query (str): The search query for Google search. - - Returns: - str: The found TLD, if any. - """ - for result in google_search(query, num=1, stop=1, pause=2): - return result.split(".", 2)[-1].replace("/", "") - - -def grab_au_top_level_domain(method: str) -> str: - """ - Get the top-level domain (TLD) for Anime Unity. - - Args: - - method (str): The method to use to obtain the TLD ("light" or "strong"). - - Returns: - str: The found TLD, if any. - """ - if method == "light": - return grab_top_level_domain_light("animeunity") - elif method == "strong": - return grab_top_level_domain("https://www.animeunity", '') diff --git a/Src/Api/Animeunity/Core/Util/manage_ep.py b/Src/Api/Animeunity/Core/Util/manage_ep.py deleted file mode 100644 index 331771c..0000000 --- a/Src/Api/Animeunity/Core/Util/manage_ep.py +++ /dev/null @@ -1,74 +0,0 @@ -# 02.05.24 - -import logging - -from typing import List - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -# Logic class -from ..Class.EpisodeType import Episode - - -# Config -MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') - - -def manage_selection(cmd_insert: str, max_count: int) -> List[int]: - """ - Manage user selection for seasons to download. - - Args: - - cmd_insert (str): User input for season selection. - - max_count (int): Maximum count of seasons available. - - Returns: - list_season_select (List[int]): List of selected seasons. - """ - list_season_select = [] - logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") - - # For a single number (e.g., '5') - if cmd_insert.isnumeric(): - list_season_select.append(int(cmd_insert)) - - # For a range (e.g., '[5-12]') - elif "[" in cmd_insert: - start, end = map(int, cmd_insert[1:-1].split('-')) - list_season_select = list(range(start, end + 1)) - - # For all seasons - elif cmd_insert == "*": - list_season_select = list(range(1, max_count+1)) - - # Return list of selected seasons) - logging.info(f"List return: {list_season_select}") - return list_season_select - - -def map_episode_title(tv_name: str, episode: Episode, number_season: int): - """ - Maps the episode title to a specific format. - - Args: - - tv_name (str): The name of the TV show. - - episode (Episode): The episode object. - - number_season (int): The season number. - - Returns: - str: The mapped episode title. - """ - map_episode_temp = MAP_EPISODE - map_episode_temp = map_episode_temp.replace("%(tv_name)", tv_name) - map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2)) - map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2)) - map_episode_temp = map_episode_temp.replace("%(episode_name)", episode.name) - - # Additional fix - map_episode_temp = map_episode_temp.replace(".", "_") - - logging.info(f"Map episode string return: {map_episode_temp}") - return map_episode_temp diff --git a/Src/Api/Animeunity/__init__.py b/Src/Api/Animeunity/__init__.py deleted file mode 100644 index b7fe3ed..0000000 --- a/Src/Api/Animeunity/__init__.py +++ /dev/null @@ -1,40 +0,0 @@ -# 21.05.24 - -# Internal utilities -from Src.Util.console import console, msg - - -# Logic class -from .site import title_search, get_select_title -from .anime import donwload_film, donwload_series - - -# Variable -indice = 1 - - -def search(): - - # Make request to site to get content that corrsisponde to that string - string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() - len_database = title_search(string_to_search) - - if len_database > 0: - - # Select title from list - select_title = get_select_title() - - if select_title.type == 'TV': - donwload_series( - tv_id=select_title.id, - tv_name=select_title.slug - ) - - else: - donwload_film( - id_film=select_title.id, - title_name=select_title.slug - ) - - else: - console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/Animeunity/anime.py b/Src/Api/Animeunity/anime.py deleted file mode 100644 index 3d597ad..0000000 --- a/Src/Api/Animeunity/anime.py +++ /dev/null @@ -1,111 +0,0 @@ -# 11.03.24 - -import os -import logging - - -# Internal utilities -from Src.Util.console import console, msg -from Src.Lib.Hls.downloader import Downloader -from Src.Util.message import start_message - - -# Logic class -from .Core.Player.vixcloud import VideoSource -from .Core.Util import manage_selection - - -# Variable -from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER, MOVIE_FOLDER -video_source = VideoSource() - - - -def download_episode(index_select: int): - """ - Downloads the selected episode. - - Args: - - index_select (int): Index of the episode to download. - """ - - # Get information about the selected episode - obj_episode = video_source.get_info_episode(index_select) - - start_message() - console.print(f"[yellow]Download: [red]EP_{obj_episode.number} \n") - - # Get the embed URL for the episode - embed_url = video_source.get_embed(obj_episode.id) - - # Parse parameter in embed text - video_source.parse_script(embed_url) - - # Create output path - mp4_path = None - mp4_name = f"{index_select + 1}.mp4" - if video_source.is_series: - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, video_source.series_name) - else: - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, video_source.series_name) - - # Start downloading - Downloader( - m3u8_playlist = video_source.get_playlist(), - output_filename = os.path.join(mp4_path, mp4_name) - ).start() - - -def donwload_series(tv_id: int, tv_name: str): - """ - Function to download episodes of a TV series. - - Args: - - tv_id (int): The ID of the TV series. - - tv_name (str): The name of the TV series. - """ - - # Set up video source - video_source.setup( - media_id = tv_id, - series_name = tv_name - ) - - # Get the count of episodes for the TV series - episoded_count = video_source.get_count_episodes() - console.log(f"[cyan]Episodes find: [red]{episoded_count}") - - # Prompt user to select an episode index - last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media") - - # Manage user selection - list_episode_select = manage_selection(last_command, episoded_count) - - # Download selected episodes - if len(list_episode_select) == 1 and last_command != "*": - download_episode(list_episode_select[0]-1) - - # Download all other episodes selecter - else: - for i_episode in list_episode_select: - download_episode(i_episode-1) - - -def donwload_film(id_film: int, title_name: str): - """ - Function to download a film. - - Args: - - id_film (int): The ID of the film. - - title_name (str): The title of the film. - """ - - # Set up video source - video_source.setup( - media_id = id_film, - series_name = title_name - ) - video_source.is_series = False - - # Start download - download_episode(0) diff --git a/Src/Api/Animeunity/costant.py b/Src/Api/Animeunity/costant.py deleted file mode 100644 index 6275469..0000000 --- a/Src/Api/Animeunity/costant.py +++ /dev/null @@ -1,15 +0,0 @@ -# 26.05.24 - -import os - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) - -SERIES_FOLDER= "Serie" -MOVIE_FOLDER = "Movie" diff --git a/Src/Api/Animeunity/site.py b/Src/Api/Animeunity/site.py deleted file mode 100644 index 6744379..0000000 --- a/Src/Api/Animeunity/site.py +++ /dev/null @@ -1,237 +0,0 @@ -# 10.12.23 - -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup -from unidecode import unidecode - - -# Internal utilities -from Src.Util.table import TVShowManager -from Src.Util.console import console -from Src.Util._jsonConfig import config_manager - - -# Logic class -from .Core.Util import extract_domain -from .Core.Class.SearchType import MediaManager, MediaItem - - -# Variable -from .costant import SITE_NAME, DOMAIN_NOW -media_search_manager = MediaManager() -table_show_manager = TVShowManager() - - - -def get_token(site_name: str, domain: str) -> dict: - """ - Function to retrieve session tokens from a specified website. - - Args: - - site_name (str): The name of the site. - - domain (str): The domain of the site. - - Returns: - - dict: A dictionary containing session tokens. The keys are 'XSRF_TOKEN', 'animeunity_session', and 'csrf_token'. - """ - - # Send a GET request to the specified URL composed of the site name and domain - response = httpx.get(f"https://www.{site_name}.{domain}") - response.raise_for_status() - - # Initialize variables to store CSRF token - find_csrf_token = None - - # Parse the HTML response using BeautifulSoup - soup = BeautifulSoup(response.text, "html.parser") - - # Loop through all meta tags in the HTML response - for html_meta in soup.find_all("meta"): - - # Check if the meta tag has a 'name' attribute equal to "csrf-token" - if html_meta.get('name') == "csrf-token": - - # If found, retrieve the content of the meta tag, which is the CSRF token - find_csrf_token = html_meta.get('content') - - logging.info(f"Extract: ('animeunity_session': {response.cookies['animeunity_session']}, 'csrf_token': {find_csrf_token})") - return { - 'animeunity_session': response.cookies['animeunity_session'], - 'csrf_token': find_csrf_token - } - - -def update_domain(): - """ - Update the domain for the anime streaming site. - - This function tests the accessibility of the current anime streaming site. - If the current domain is inaccessible, it attempts to obtain and set a new domain. - It uses the 'light' method to extract a new domain from Anime Unity. - """ - - # Test current site's accessibility - try: - - console.log(f"[cyan]Test site: [red]https://{SITE_NAME}.{DOMAIN_NOW}") - response = httpx.get(f"https://www.{SITE_NAME}.{DOMAIN_NOW}") - response.status_code - - # If the current site is inaccessible, try to obtain a new domain - except Exception as e: - - # Get new domain - console.print("[red]\nExtract new DOMAIN from TLD list.") - new_domain = extract_domain(method="light") - console.log(f"[cyan]Extract new domain: [red]{new_domain}") - - if new_domain: - - # Update configuration with the new domain - config_manager.set_key('SITE', SITE_NAME, new_domain) - config_manager.write_config() - - else: - logging.error("Failed to find a new animeunity domain") - sys.exit(0) - - -def get_real_title(record): - """ - Get the real title from a record. - - This function takes a record, which is assumed to be a dictionary representing a row of JSON data. - It looks for a title in the record, prioritizing English over Italian titles if available. - - Args: - - record (dict): A dictionary representing a row of JSON data. - - Returns: - - str: The title found in the record. If no title is found, returns None. - """ - - if record['title'] is not None: - return record['title'] - - elif record['title_eng'] is not None: - return record['title_eng'] - - else: - return record['title_it'] - - -def title_search(title: str) -> int: - """ - Function to perform an anime search using a provided title. - - Args: - - title_search (str): The title to search for. - - Returns: - - int: A number containing the length of media search manager. - """ - - # Update domain - update_domain() - - # Get token and session value from configuration - url_domain = config_manager.get('SITE', SITE_NAME) - data = get_token(SITE_NAME, url_domain) - - # Prepare cookies to be used in the request - cookies = { - 'animeunity_session': data.get('animeunity_session') - } - - # Prepare headers for the request - headers = { - 'accept': 'application/json, text/plain, */*', - 'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7', - 'content-type': 'application/json;charset=UTF-8', - 'x-csrf-token': data.get('csrf_token') - } - - # Prepare JSON data to be sent in the request - json_data = { - 'title': unidecode(title) # Use the provided title for the search - } - - # Send a POST request to the API endpoint for live search - response = httpx.post(f'https://www.{SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json=json_data) - response.raise_for_status() - - # Process each record returned in the response - for record in response.json()['records']: - - # Rename keys for consistency - record['name'] = get_real_title(record) - record['last_air_date'] = record.pop('date') - - # Add the record to media search manager if the name is not None - media_search_manager.add_media(record) - - # Return the length of media search manager - return media_search_manager.get_length() - - - -def get_select_title(type_filter: list = None) -> MediaItem: - """ - Display a selection of titles and prompt the user to choose one. - - Args: - - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] - - Returns: - MediaItem: The selected media item. - """ - - # Set up table for displaying titles - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - "Type": {'color': 'yellow'}, - "Score": {'color': 'cyan'}, - "Date": {'color': 'green'} - } - table_show_manager.add_column(column_info) - - # Populate the table with title information - for i, media in enumerate(media_search_manager.media_list): - - # Filter for only a list of category - if type_filter is not None: - if str(media.type) not in type_filter: - continue - - table_show_manager.add_tv_show({ - 'Index': str(i), - 'Name': media.name, - 'Type': media.type, - 'Score': media.score, - 'Date': media.last_air_date - }) - - # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) - table_show_manager.clear() - - # Handle user's quit command - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - # Check if the selected index is within range - if 0 <= int(last_command) <= len(media_search_manager.media_list): - return media_search_manager.get(int(last_command)) - else: - console.print("\n[red]Wrong index") - sys.exit(0) diff --git a/Src/Api/Ddlstreamitaly/Core/Class/ScrapeSerie.py b/Src/Api/Ddlstreamitaly/Core/Class/ScrapeSerie.py deleted file mode 100644 index c5dd366..0000000 --- a/Src/Api/Ddlstreamitaly/Core/Class/ScrapeSerie.py +++ /dev/null @@ -1,85 +0,0 @@ -# 13.06.24 - -import sys -import logging - -from typing import List, Dict - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util._jsonConfig import config_manager - - -# Logic class -from .SearchType import MediaItem - - - -class GetSerieInfo: - - def __init__(self, dict_serie: MediaItem) -> None: - """ - Initializes the GetSerieInfo object with default values. - - Args: - dict_serie (MediaItem): Dictionary containing series information (optional). - """ - self.headers = {'user-agent': get_headers()} - self.cookies = config_manager.get_dict('REQUESTS', 'index') - self.url = dict_serie.url - self.tv_name = None - self.list_episodes = None - - def get_episode_number(self) -> List[Dict[str, str]]: - """ - Retrieves the number of episodes for a specific season. - - Args: - n_season (int): The season number. - - Returns: - List[Dict[str, str]]: List of dictionaries containing episode information. - """ - - # Make an HTTP request to the series URL - try: - response = httpx.get(self.url + "?area=online", cookies=self.cookies, headers=self.headers) - response.raise_for_status() - - except Exception as e: - logging.error(f"Insert: ['ips4_device_key': 'your_code', 'ips4_member_id': 'your_code', 'ips4_login_key': 'your_code'] in config.json file REQUESTS -> index, instead of user-agent. Use browser debug and cookie request with a valid account, filter by DOC.") - sys.exit(0) - - # Parse HTML content of the page - soup = BeautifulSoup(response.text, "html.parser") - - # Get tv name - self.tv_name = soup.find("span", class_= "ipsType_break").get_text(strip=True) - - # Find the container of episodes for the specified season - table_content = soup.find('div', class_='ipsMargin_bottom:half') - list_dict_episode = [] - - for episode_div in table_content.find_all('a', href=True): - - # Get text of episode - part_name = episode_div.get_text(strip=True) - - if part_name: - link = episode_div['href'] - - obj_episode = { - 'name': part_name, - 'url': link - } - list_dict_episode.append(obj_episode) - - self.list_episodes = list_dict_episode - return list_dict_episode - \ No newline at end of file diff --git a/Src/Api/Ddlstreamitaly/Core/Class/SearchType.py b/Src/Api/Ddlstreamitaly/Core/Class/SearchType.py deleted file mode 100644 index f291f34..0000000 --- a/Src/Api/Ddlstreamitaly/Core/Class/SearchType.py +++ /dev/null @@ -1,60 +0,0 @@ -# 13.06.24 - -from typing import List - - -class MediaItem: - def __init__(self, data: dict): - self.name: str = data.get('name') - self.type: str = data.get('type') - self.url: int = data.get('url') - - def __str__(self): - return f"MediaItem(name='{self.name}', type='{self.type}', url={self.url})" - - -class MediaManager: - def __init__(self): - self.media_list: List[MediaItem] = [] - - def add_media(self, data: dict) -> None: - """ - Add media to the list. - - Args: - data (dict): Media data to add. - """ - self.media_list.append(MediaItem(data)) - - def get(self, index: int) -> MediaItem: - """ - Get a media item from the list by index. - - Args: - index (int): The index of the media item to retrieve. - - Returns: - MediaItem: The media item at the specified index. - """ - return self.media_list[index] - - def get_length(self) -> int: - """ - Get the number of media find with research - - Returns: - int: Number of episodes. - """ - return len(self.media_list) - - def clear(self) -> None: - """ - This method clears the medias list. - - Args: - self: The object instance. - """ - self.media_list.clear() - - def __str__(self): - return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/Ddlstreamitaly/Core/Player/ddl.py b/Src/Api/Ddlstreamitaly/Core/Player/ddl.py deleted file mode 100644 index 64857b7..0000000 --- a/Src/Api/Ddlstreamitaly/Core/Player/ddl.py +++ /dev/null @@ -1,83 +0,0 @@ -# 14.06.24 - -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util._jsonConfig import config_manager - - -class VideoSource: - - def __init__(self) -> None: - """ - Initializes the VideoSource object with default values. - - Attributes: - headers (dict): A dictionary to store HTTP headers. - cookie (dict): A dictionary to store cookies. - """ - self.headers = {'user-agent': get_headers()} - self.cookie = config_manager.get_dict('REQUESTS', 'index') - - def setup(self, url: str) -> None: - """ - Sets up the video source with the provided URL. - - Args: - url (str): The URL of the video source. - """ - self.url = url - - def make_request(self, url: str) -> str: - """ - Make an HTTP GET request to the provided URL. - - Args: - url (str): The URL to make the request to. - - Returns: - str: The response content if successful, None otherwise. - """ - try: - response = httpx.get(url, headers=self.headers, cookies=self.cookie) - response.raise_for_status() - return response.text - except httpx.HTTPStatusError as http_err: - logging.error(f"HTTP error occurred: {http_err}") - except Exception as err: - logging.error(f"An error occurred: {err}") - return None - - def get_playlist(self): - """ - Retrieves the playlist URL from the video source. - - Returns: - tuple: The mp4 link if found, None otherwise. - """ - try: - text = self.make_request(self.url) - - if text: - soup = BeautifulSoup(text, "html.parser") - source = soup.find("source") - - if source: - mp4_link = source.get("src") - return mp4_link - - else: - logging.error("No tag found in the HTML.") - else: - logging.error("Failed to retrieve content from the URL.") - - except Exception as e: - logging.error(f"An error occurred while parsing the playlist: {e}") diff --git a/Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py b/Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py deleted file mode 100644 index 06b24fd..0000000 --- a/Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py +++ /dev/null @@ -1,71 +0,0 @@ -# 02.05.24 - -import logging - -from typing import List - - -# Internal utilities -from Src.Util._jsonConfig import config_manager -from Src.Util.os import remove_special_characters - - -# Config -MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') - - -def manage_selection(cmd_insert: str, max_count: int) -> List[int]: - """ - Manage user selection for seasons to download. - - Args: - - cmd_insert (str): User input for season selection. - - max_count (int): Maximum count of seasons available. - - Returns: - list_season_select (List[int]): List of selected seasons. - """ - list_season_select = [] - logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") - - # For a single number (e.g., '5') - if cmd_insert.isnumeric(): - list_season_select.append(int(cmd_insert)) - - # For a range (e.g., '[5-12]') - elif "[" in cmd_insert: - start, end = map(int, cmd_insert[1:-1].split('-')) - list_season_select = list(range(start, end + 1)) - - # For all seasons - elif cmd_insert == "*": - list_season_select = list(range(1, max_count+1)) - - # Return list of selected seasons) - logging.info(f"List return: {list_season_select}") - return list_season_select - -def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str: - """ - Maps the episode title to a specific format. - - Args: - tv_name (str): The name of the TV show. - number_season (int): The season number. - episode_number (int): The episode number. - episode_name (str): The original name of the episode. - - Returns: - str: The mapped episode title. - """ - map_episode_temp = MAP_EPISODE - map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) - map_episode_temp = map_episode_temp.replace("%(season)", str(number_season)) - map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number)) - map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name)) - - # Additional fix - map_episode_temp = map_episode_temp.replace(".", "_") - - logging.info(f"Map episode string return: {map_episode_temp}") - return map_episode_temp diff --git a/Src/Api/Ddlstreamitaly/__init__.py b/Src/Api/Ddlstreamitaly/__init__.py deleted file mode 100644 index 411fa85..0000000 --- a/Src/Api/Ddlstreamitaly/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -# 09.06.24 - -import sys -import logging - - -# Internal utilities -from Src.Util.console import console, msg - - -# Logic class -from .site import title_search, get_select_title -from .series import download_thread - - -# Variable -indice = 3 - - -def search(): - """ - Main function of the application for film and series. - """ - - # Make request to site to get content that corrsisponde to that string - string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() - len_database = title_search(string_to_search) - - if len_database > 0: - - # Select title from list - select_title = get_select_title() - - # Download only film - if "Serie TV" in str(select_title.type): - download_thread(select_title) - - else: - logging.error(f"Not supported: {select_title.type}") - sys.exit(0) - - else: - console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/Ddlstreamitaly/costant.py b/Src/Api/Ddlstreamitaly/costant.py deleted file mode 100644 index 9c7e339..0000000 --- a/Src/Api/Ddlstreamitaly/costant.py +++ /dev/null @@ -1,15 +0,0 @@ -# 09.06.24 - -import os - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) - -MOVIE_FOLDER = "Movie" -SERIES_FOLDER = "Serie" diff --git a/Src/Api/Ddlstreamitaly/series.py b/Src/Api/Ddlstreamitaly/series.py deleted file mode 100644 index 8903fdc..0000000 --- a/Src/Api/Ddlstreamitaly/series.py +++ /dev/null @@ -1,135 +0,0 @@ -# 13.06.24 - -import os -import sys -import logging -from urllib.parse import urlparse - - -# Internal utilities -from Src.Util.color import Colors -from Src.Util.console import console, msg -from Src.Util.os import create_folder, can_create_file -from Src.Util.table import TVShowManager -from Src.Util.message import start_message -from Src.Lib.Hls.download_mp4 import MP4_downloader - - -# Logic class -from .Core.Class.SearchType import MediaItem -from .Core.Class.ScrapeSerie import GetSerieInfo -from .Core.Util.manage_ep import manage_selection, map_episode_title -from .Core.Player.ddl import VideoSource - - -# Variable -from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER -table_show_manager = TVShowManager() -video_source = VideoSource() - - -def donwload_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) -> None: - """ - Download a single episode video. - - Args: - - tv_name (str): Name of the TV series. - - index_episode_selected (int): Index of the selected episode. - """ - - start_message() - - # Get info about episode - obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1] - console.print(f"[yellow]Download: [red]{obj_episode.get('name')}") - print() - - # Define filename and path for the downloaded video - mp4_name = f"{map_episode_title(scape_info_serie.tv_name, None, index_episode_selected, obj_episode.get('name'))}.mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name) - - # Check if can create file output - create_folder(mp4_path) - if not can_create_file(mp4_name): - logging.error("Invalid mp4 name.") - sys.exit(0) - - # Setup video source - video_source.setup(obj_episode.get('url')) - - # Get m3u8 master playlist - master_playlist = video_source.get_playlist() - - # Parse start page url - start_message() - parsed_url = urlparse(obj_episode.get('url')) - path_parts = parsed_url.path.split('/') - - MP4_downloader( - url = master_playlist, - path = os.path.join(mp4_path, mp4_name), - referer = f"{parsed_url.scheme}://{parsed_url.netloc}/", - add_desc=f"{Colors.MAGENTA}video" - ) - - -def download_thread(dict_serie: MediaItem): - """Download all episode of a thread""" - - # Start message and set up video source - start_message() - - # Init class - scape_info_serie = GetSerieInfo(dict_serie) - - # Collect information about thread - list_dict_episode = scape_info_serie.get_episode_number() - episodes_count = len(list_dict_episode) - - # Display episodes list and manage user selection - last_command = display_episodes_list(list_dict_episode) - list_episode_select = manage_selection(last_command, episodes_count) - - # Download selected episodes - if len(list_episode_select) == 1 and last_command != "*": - donwload_video(scape_info_serie, list_episode_select[0]) - - # Download all other episodes selecter - else: - for i_episode in list_episode_select: - donwload_video(scape_info_serie, i_episode) - - -def display_episodes_list(obj_episode_manager) -> str: - """ - Display episodes list and handle user input. - - Returns: - last_command (str): Last command entered by the user. - """ - - # Set up table for displaying episodes - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - } - table_show_manager.add_column(column_info) - - # Populate the table with episodes information - for i, media in enumerate(obj_episode_manager): - table_show_manager.add_tv_show({ - 'Index': str(i+1), - 'Name': media.get('name'), - }) - - # Run the table and handle user input - last_command = table_show_manager.run() - - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - return last_command diff --git a/Src/Api/Ddlstreamitaly/site.py b/Src/Api/Ddlstreamitaly/site.py deleted file mode 100644 index aabee59..0000000 --- a/Src/Api/Ddlstreamitaly/site.py +++ /dev/null @@ -1,126 +0,0 @@ -# 09.06.24 - -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.table import TVShowManager -from Src.Util.console import console, msg -from Src.Util._jsonConfig import config_manager -from Src.Util.headers import get_headers - - -# Logic class -from .Core.Class.SearchType import MediaManager, MediaItem - - -# Variable -from .costant import DOMAIN_NOW -cookie_index = config_manager.get_dict('REQUESTS', 'index') -media_search_manager = MediaManager() -table_show_manager = TVShowManager() - - - -def title_search(word_to_search) -> int: - """ - Search for titles based on a search query. - """ - try: - - # Send request to search for titles - response = httpx.get(f"https://ddlstreamitaly.{DOMAIN_NOW}/search/?&q={word_to_search}&quick=1&type=videobox_video&nodes=11", headers={'user-agent': get_headers()}) - response.raise_for_status() - - # Create soup and find table - soup = BeautifulSoup(response.text, "html.parser") - table_content = soup.find('ol', class_="ipsStream") - - if table_content: - for title_div in table_content.find_all('li', class_='ipsStreamItem'): - try: - title_type = title_div.find("p", class_="ipsType_reset").find_all("a")[-1].get_text(strip=True) - name = title_div.find("span", class_="ipsContained").find("a").get_text(strip=True) - link = title_div.find("span", class_="ipsContained").find("a").get("href") - - title_info = { - 'name': name, - 'url': link, - 'type': title_type - } - - media_search_manager.add_media(title_info) - - except Exception as e: - logging.error(f"Error processing title div: {e}") - - # Return the number of titles found - return media_search_manager.get_length() - - else: - logging.error("No table content found.") - return -999 - - except Exception as err: - logging.error(f"An error occurred: {err}") - - return -9999 - - -def get_select_title(type_filter: list = None) -> MediaItem: - """ - Display a selection of titles and prompt the user to choose one. - - Args: - - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] - - Returns: - MediaItem: The selected media item. - """ - - # Set up table for displaying titles - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - "Type": {'color': 'yellow'}, - } - table_show_manager.add_column(column_info) - - # Populate the table with title information - for i, media in enumerate(media_search_manager.media_list): - - # Filter for only a list of category - if type_filter is not None: - if str(media.type) not in type_filter: - continue - - table_show_manager.add_tv_show({ - 'Index': str(i), - 'Name': media.name, - 'Type': media.type, - }) - - # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) - table_show_manager.clear() - - # Handle user's quit command - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - # Check if the selected index is within range - if 0 <= int(last_command) <= len(media_search_manager.media_list): - return media_search_manager.get(int(last_command)) - else: - console.print("\n[red]Wrong index") - sys.exit(0) diff --git a/Src/Api/Guardaserie/Core/Class/ScrapeSerie.py b/Src/Api/Guardaserie/Core/Class/ScrapeSerie.py deleted file mode 100644 index 2275a43..0000000 --- a/Src/Api/Guardaserie/Core/Class/ScrapeSerie.py +++ /dev/null @@ -1,113 +0,0 @@ -# 13.06.24 - -import sys -import logging - -from typing import List, Dict - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers - - -# Logic class -from .SearchType import MediaItem - - - -class GetSerieInfo: - - def __init__(self, dict_serie: MediaItem) -> None: - """ - Initializes the GetSerieInfo object with default values. - - Args: - dict_serie (MediaItem): Dictionary containing series information (optional). - """ - self.headers = {'user-agent': get_headers()} - self.url = dict_serie.url - self.tv_name = None - self.list_episodes = None - - def get_seasons_number(self) -> int: - """ - Retrieves the number of seasons of a TV series. - - Returns: - int: Number of seasons of the TV series. - """ - try: - - # Make an HTTP request to the series URL - response = httpx.get(self.url, headers=self.headers, timeout=10) - response.raise_for_status() - - # Parse HTML content of the page - soup = BeautifulSoup(response.text, "html.parser") - - # Find the container of seasons - table_content = soup.find('div', class_="tt_season") - - # Count the number of seasons - seasons_number = len(table_content.find_all("li")) - - # Extract the name of the series - self.tv_name = soup.find("h1", class_="front_title").get_text(strip=True) - - return seasons_number - - except Exception as e: - logging.error(f"Error parsing HTML page: {e}") - - return -999 - - def get_episode_number(self, n_season: int) -> List[Dict[str, str]]: - """ - Retrieves the number of episodes for a specific season. - - Args: - n_season (int): The season number. - - Returns: - List[Dict[str, str]]: List of dictionaries containing episode information. - """ - try: - - # Make an HTTP request to the series URL - response = httpx.get(self.url, headers=self.headers) - response.raise_for_status() - - # Parse HTML content of the page - soup = BeautifulSoup(response.text, "html.parser") - - # Find the container of episodes for the specified season - table_content = soup.find('div', class_="tab-pane", id=f"season-{n_season}") - - # Extract episode information - episode_content = table_content.find_all("li") - list_dict_episode = [] - - for episode_div in episode_content: - index = episode_div.find("a").get("data-num") - link = episode_div.find("a").get("data-link") - name = episode_div.find("a").get("data-title") - - obj_episode = { - 'number': index, - 'name': name, - 'url': link - } - list_dict_episode.append(obj_episode) - - self.list_episodes = list_dict_episode - return list_dict_episode - - except Exception as e: - logging.error(f"Error parsing HTML page: {e}") - - return [] diff --git a/Src/Api/Guardaserie/Core/Class/SearchType.py b/Src/Api/Guardaserie/Core/Class/SearchType.py deleted file mode 100644 index 6a45d67..0000000 --- a/Src/Api/Guardaserie/Core/Class/SearchType.py +++ /dev/null @@ -1,61 +0,0 @@ -# 26.05.24 - -from typing import List - - -class MediaItem: - def __init__(self, data: dict): - self.name: str = data.get('name') - self.type: str = "serie" - self.score: str = data.get('score') - self.url: int = data.get('url') - - def __str__(self): - return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})" - - -class MediaManager: - def __init__(self): - self.media_list: List[MediaItem] = [] - - def add_media(self, data: dict) -> None: - """ - Add media to the list. - - Args: - data (dict): Media data to add. - """ - self.media_list.append(MediaItem(data)) - - def get(self, index: int) -> MediaItem: - """ - Get a media item from the list by index. - - Args: - index (int): The index of the media item to retrieve. - - Returns: - MediaItem: The media item at the specified index. - """ - return self.media_list[index] - - def get_length(self) -> int: - """ - Get the number of media find with research - - Returns: - int: Number of episodes. - """ - return len(self.media_list) - - def clear(self) -> None: - """ - This method clears the medias list. - - Args: - self: The object instance. - """ - self.media_list.clear() - - def __str__(self): - return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/Guardaserie/Core/Player/supervideo.py b/Src/Api/Guardaserie/Core/Player/supervideo.py deleted file mode 100644 index e97e856..0000000 --- a/Src/Api/Guardaserie/Core/Player/supervideo.py +++ /dev/null @@ -1,123 +0,0 @@ -# 26.05.24 - -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util.os import run_node_script - - -class VideoSource: - - def __init__(self) -> None: - """ - Initializes the VideoSource object with default values. - - Attributes: - headers (dict): An empty dictionary to store HTTP headers. - """ - self.headers = {'user-agent': get_headers()} - - def setup(self, url: str) -> None: - """ - Sets up the video source with the provided URL. - - Args: - url (str): The URL of the video source. - """ - self.url = url - - def make_request(self, url: str) -> str: - """ - Make an HTTP GET request to the provided URL. - - Args: - url (str): The URL to make the request to. - - Returns: - str: The response content if successful, None otherwise. - """ - - try: - response = httpx.get(url, headers=self.headers, follow_redirects=True) - response.raise_for_status() - return response.text - - except Exception as e: - logging.error(f"Request failed: {e}") - return None - - def parse_html(self, html_content: str) -> BeautifulSoup: - """ - Parse the provided HTML content using BeautifulSoup. - - Args: - html_content (str): The HTML content to parse. - - Returns: - BeautifulSoup: Parsed HTML content if successful, None otherwise. - """ - - try: - soup = BeautifulSoup(html_content, "html.parser") - return soup - - except Exception as e: - logging.error(f"Failed to parse HTML content: {e}") - return None - - def get_result_node_js(self, soup): - """ - Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL. - - Args: - soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content. - - Returns: - str: The output from the Node.js script, or None if the script cannot be found or executed. - """ - for script in soup.find_all("script"): - if "eval" in str(script): - new_script = str(script.text).replace("eval", "var a = ") - new_script = new_script.replace(")))", ")));console.log(a);") - return run_node_script(new_script) - - return None - - def get_playlist(self) -> str: - """ - Download a video from the provided URL. - - Returns: - str: The URL of the downloaded video if successful, None otherwise. - """ - try: - html_content = self.make_request(self.url) - if not html_content: - logging.error("Failed to fetch HTML content.") - return None - - soup = self.parse_html(html_content) - if not soup: - logging.error("Failed to parse HTML content.") - return None - - result = self.get_result_node_js(soup) - if not result: - logging.error("No video URL found in script.") - return None - - master_playlist = str(result).split(":")[3].split('"}')[0] - return f"https:{master_playlist}" - - except Exception as e: - logging.error(f"An error occurred: {e}") - return None - \ No newline at end of file diff --git a/Src/Api/Guardaserie/Core/Util/manage_ep.py b/Src/Api/Guardaserie/Core/Util/manage_ep.py deleted file mode 100644 index 06b24fd..0000000 --- a/Src/Api/Guardaserie/Core/Util/manage_ep.py +++ /dev/null @@ -1,71 +0,0 @@ -# 02.05.24 - -import logging - -from typing import List - - -# Internal utilities -from Src.Util._jsonConfig import config_manager -from Src.Util.os import remove_special_characters - - -# Config -MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') - - -def manage_selection(cmd_insert: str, max_count: int) -> List[int]: - """ - Manage user selection for seasons to download. - - Args: - - cmd_insert (str): User input for season selection. - - max_count (int): Maximum count of seasons available. - - Returns: - list_season_select (List[int]): List of selected seasons. - """ - list_season_select = [] - logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") - - # For a single number (e.g., '5') - if cmd_insert.isnumeric(): - list_season_select.append(int(cmd_insert)) - - # For a range (e.g., '[5-12]') - elif "[" in cmd_insert: - start, end = map(int, cmd_insert[1:-1].split('-')) - list_season_select = list(range(start, end + 1)) - - # For all seasons - elif cmd_insert == "*": - list_season_select = list(range(1, max_count+1)) - - # Return list of selected seasons) - logging.info(f"List return: {list_season_select}") - return list_season_select - -def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str: - """ - Maps the episode title to a specific format. - - Args: - tv_name (str): The name of the TV show. - number_season (int): The season number. - episode_number (int): The episode number. - episode_name (str): The original name of the episode. - - Returns: - str: The mapped episode title. - """ - map_episode_temp = MAP_EPISODE - map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) - map_episode_temp = map_episode_temp.replace("%(season)", str(number_season)) - map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number)) - map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name)) - - # Additional fix - map_episode_temp = map_episode_temp.replace(".", "_") - - logging.info(f"Map episode string return: {map_episode_temp}") - return map_episode_temp diff --git a/Src/Api/Guardaserie/__init__.py b/Src/Api/Guardaserie/__init__.py deleted file mode 100644 index e7e05c7..0000000 --- a/Src/Api/Guardaserie/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -# 09.06.24 - -# Internal utilities -from Src.Util.console import console, msg - - -# Logic class -from .site import title_search, get_select_title -from .series import download_series - - -# Variable -indice = 4 - - -def search(): - """ - Main function of the application for film and series. - """ - - # Make request to site to get content that corrsisponde to that string - string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() - len_database = title_search(string_to_search) - - if len_database > 0: - - # Select title from list - select_title = get_select_title() - - # Download only film - download_series(select_title) - - else: - console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/Guardaserie/costant.py b/Src/Api/Guardaserie/costant.py deleted file mode 100644 index 4072af4..0000000 --- a/Src/Api/Guardaserie/costant.py +++ /dev/null @@ -1,14 +0,0 @@ -# 09.06.24 - -import os - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) - -SERIES_FOLDER = "Serie" diff --git a/Src/Api/Guardaserie/series.py b/Src/Api/Guardaserie/series.py deleted file mode 100644 index 4f677ac..0000000 --- a/Src/Api/Guardaserie/series.py +++ /dev/null @@ -1,164 +0,0 @@ -# 13.06.24 - -import os -import sys -import logging - - -# Internal utilities -from Src.Util.console import console, msg -from Src.Util.table import TVShowManager -from Src.Util.message import start_message -from Src.Lib.Hls.downloader import Downloader - - -# Logic class -from .Core.Class.SearchType import MediaItem -from .Core.Class.ScrapeSerie import GetSerieInfo -from .Core.Util.manage_ep import manage_selection, map_episode_title -from .Core.Player.supervideo import VideoSource - - -# Variable -from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER -table_show_manager = TVShowManager() -video_source = VideoSource() - - -def donwload_video(scape_info_serie: GetSerieInfo, index_season_selected: int, index_episode_selected: int) -> None: - """ - Download a single episode video. - - Args: - - tv_name (str): Name of the TV series. - - index_season_selected (int): Index of the selected season. - - index_episode_selected (int): Index of the selected episode. - """ - - start_message() - - # Get info about episode - obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1] - console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}") - print() - - # Define filename and path for the downloaded video - mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}") - - # Setup video source - video_source.setup(obj_episode.get('url')) - - # Get m3u8 master playlist - master_playlist = video_source.get_playlist() - - Downloader( - m3u8_playlist = master_playlist, - output_filename = os.path.join(mp4_path, mp4_name) - ).start() - - -def donwload_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, donwload_all: bool = False) -> None: - """ - Download all episodes of a season. - - Args: - - tv_name (str): Name of the TV series. - - index_season_selected (int): Index of the selected season. - - donwload_all (bool): Donwload all seasons episodes - """ - - # Start message and collect information about episodes - start_message() - list_dict_episode = scape_info_serie.get_episode_number(index_season_selected) - episodes_count = len(list_dict_episode) - - # Download all episodes wihtout ask - if donwload_all: - for i_episode in range(1, episodes_count+1): - donwload_video(scape_info_serie, index_season_selected, i_episode) - - console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.") - - # If not download all episode but a single season - if not donwload_all: - - # Display episodes list and manage user selection - last_command = display_episodes_list(scape_info_serie.list_episodes) - list_episode_select = manage_selection(last_command, episodes_count) - - # Download selected episodes - if len(list_episode_select) == 1 and last_command != "*": - donwload_video(scape_info_serie, index_season_selected, list_episode_select[0]) - - # Download all other episodes selecter - else: - for i_episode in list_episode_select: - donwload_video(scape_info_serie, index_season_selected, i_episode) - - -def download_series(dict_serie: MediaItem) -> None: - - # Start message and set up video source - start_message() - - # Init class - scape_info_serie = GetSerieInfo(dict_serie) - - # Collect information about seasons - seasons_count = scape_info_serie.get_seasons_number() - - # Prompt user for season selection and download episodes - console.print(f"\n[green]Season find: [red]{seasons_count}") - index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")) - list_season_select = manage_selection(index_season_selected, seasons_count) - - # Download selected episodes - if len(list_season_select) == 1 and index_season_selected != "*": - if 1 <= int(index_season_selected) <= seasons_count: - donwload_episode(scape_info_serie, list_season_select[0]) - - # Dowload all seasons and episodes - elif index_season_selected == "*": - for i_season in list_season_select: - donwload_episode(scape_info_serie, i_season, True) - - # Download all other season selecter - else: - for i_season in list_season_select: - donwload_episode(scape_info_serie, i_season) - - -def display_episodes_list(obj_episode_manager) -> str: - """ - Display episodes list and handle user input. - - Returns: - last_command (str): Last command entered by the user. - """ - - # Set up table for displaying episodes - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - } - table_show_manager.add_column(column_info) - - # Populate the table with episodes information - for media in obj_episode_manager: - table_show_manager.add_tv_show({ - 'Index': str(media.get('number')), - 'Name': media.get('name'), - }) - - # Run the table and handle user input - last_command = table_show_manager.run() - - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - return last_command diff --git a/Src/Api/Guardaserie/site.py b/Src/Api/Guardaserie/site.py deleted file mode 100644 index 74c6e3b..0000000 --- a/Src/Api/Guardaserie/site.py +++ /dev/null @@ -1,115 +0,0 @@ -# 09.06.24 - -import sys -import logging - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.table import TVShowManager -from Src.Util.console import console, msg -from Src.Util.headers import get_headers - - -# Logic class -from .Core.Class.SearchType import MediaManager, MediaItem - - -# Variable -from .costant import DOMAIN_NOW -media_search_manager = MediaManager() -table_show_manager = TVShowManager() - - -def title_search(word_to_search) -> int: - """ - Search for titles based on a search query. - """ - - # Send request to search for titles - response = httpx.get(f"https://guardaserie.{DOMAIN_NOW}/?story={word_to_search}&do=search&subaction=search", headers={'user-agent': get_headers()}) - response.raise_for_status() - - # Create soup and find table - soup = BeautifulSoup(response.text, "html.parser") - table_content = soup.find('div', class_="mlnew-list") - - for serie_div in table_content.find_all('div', class_='mlnew'): - - try: - title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True) - link = serie_div.find('div', class_='mlnh-2').find('a')['href'] - imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True) - - serie_info = { - 'name': title, - 'url': link, - 'score': imdb_rating - } - - media_search_manager.add_media(serie_info) - - except: - pass - - # Return the number of titles found - return media_search_manager.get_length() - - -def get_select_title(type_filter: list = None) -> MediaItem: - """ - Display a selection of titles and prompt the user to choose one. - - Args: - - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] - - Returns: - MediaItem: The selected media item. - """ - - # Set up table for displaying titles - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - "Type": {'color': 'yellow'}, - "Score": {'color': 'cyan'}, - } - table_show_manager.add_column(column_info) - - # Populate the table with title information - for i, media in enumerate(media_search_manager.media_list): - - # Filter for only a list of category - if type_filter is not None: - if str(media.type) not in type_filter: - continue - - table_show_manager.add_tv_show({ - 'Index': str(i), - 'Name': media.name, - 'Type': media.type, - 'Score': media.score, - }) - - # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) - table_show_manager.clear() - - # Handle user's quit command - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - # Check if the selected index is within range - if 0 <= int(last_command) <= len(media_search_manager.media_list): - return media_search_manager.get(int(last_command)) - else: - console.print("\n[red]Wrong index") - sys.exit(0) diff --git a/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py b/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py deleted file mode 100644 index e98fd62..0000000 --- a/Src/Api/Streamingcommunity/Core/Class/EpisodeType.py +++ /dev/null @@ -1,90 +0,0 @@ -# 03.03.24 - -from typing import Dict, Any, List - - -# Variable -from ...costant import SITE_NAME, DOMAIN_NOW - - - -class Image: - def __init__(self, image_data: Dict[str, Any]): - self.id: int = image_data.get('id', '') - self.filename: str = image_data.get('filename', '') - self.type: str = image_data.get('type', '') - self.imageable_type: str = image_data.get('imageable_type', '') - self.imageable_id: int = image_data.get('imageable_id', '') - self.created_at: str = image_data.get('created_at', '') - self.updated_at: str = image_data.get('updated_at', '') - self.original_url_field: str = image_data.get('original_url_field', '') - self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" - - def __str__(self): - return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')" - - -class Episode: - def __init__(self, data: Dict[str, Any]): - self.id: int = data.get('id', '') - self.number: int = data.get('number', '') - self.name: str = data.get('name', '') - self.plot: str = data.get('plot', '') - self.duration: int = data.get('duration', '') - self.scws_id: int = data.get('scws_id', '') - self.season_id: int = data.get('season_id', '') - self.created_by: str = data.get('created_by', '') - self.created_at: str = data.get('created_at', '') - self.updated_at: str = data.get('updated_at', '') - self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] - - def __str__(self): - return f"Episode(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', duration={self.duration} sec)" - - -class EpisodeManager: - def __init__(self): - self.episodes: List[Episode] = [] - - def add_episode(self, episode_data: Dict[str, Any]): - """ - Add a new episode to the manager. - - Args: - - episode_data (Dict[str, Any]): A dictionary containing data for the new episode. - """ - episode = Episode(episode_data) - self.episodes.append(episode) - - def get_episode_by_index(self, index: int) -> Episode: - """ - Get an episode by its index. - - Args: - - index (int): Index of the episode to retrieve. - - Returns: - Episode: The episode object. - """ - return self.episodes[index] - - def get_length(self) -> int: - """ - Get the number of episodes in the manager. - - Returns: - int: Number of episodes. - """ - return len(self.episodes) - - def clear(self) -> None: - """ - This method clears the episodes list. - - Args: - - self: The object instance. - """ - self.episodes.clear() - - def __str__(self): - return f"EpisodeManager(num_episodes={len(self.episodes)})" diff --git a/Src/Api/Streamingcommunity/Core/Class/PreviewType.py b/Src/Api/Streamingcommunity/Core/Class/PreviewType.py deleted file mode 100644 index 28d741e..0000000 --- a/Src/Api/Streamingcommunity/Core/Class/PreviewType.py +++ /dev/null @@ -1,63 +0,0 @@ -# 12.04.24 - -class Preview: - def __init__(self, data): - self.id = data.get("id") - self.title_id = data.get("title_id") - self.created_at = data.get("created_at") - self.updated_at = data.get("updated_at") - self.video_id = data.get("video_id") - self.is_viewable = data.get("is_viewable") - self.zoom_factor = data.get("zoom_factor") - self.filename = data.get("filename") - self.embed_url = data.get("embed_url") - - def __str__(self): - return f"Preview: ID={self.id}, Title ID={self.title_id}, Created At={self.created_at}, Updated At={self.updated_at}, Video ID={self.video_id}, Viewable={self.is_viewable}, Zoom Factor={self.zoom_factor}, Filename={self.filename}, Embed URL={self.embed_url}" - -class Genre: - def __init__(self, data): - self.id = data.get("id") - self.name = data.get("name") - self.type = data.get("type") - self.hidden = data.get("hidden") - self.created_at = data.get("created_at") - self.updated_at = data.get("updated_at") - self.pivot = data.get("pivot") - - def __str__(self): - return f"Genre: ID={self.id}, Name={self.name}, Type={self.type}, Hidden={self.hidden}, Created At={self.created_at}, Updated At={self.updated_at}, Pivot={self.pivot}" - -class Image: - def __init__(self, data): - self.id = data.get("id") - self.filename = data.get("filename") - self.type = data.get("type") - self.imageable_type = data.get("imageable_type") - self.imageable_id = data.get("imageable_id") - self.created_at = data.get("created_at") - self.updated_at = data.get("updated_at") - self.original_url_field = data.get("original_url_field") - - def __str__(self): - return f"Image: ID={self.id}, Filename={self.filename}, Type={self.type}, Imageable Type={self.imageable_type}, Imageable ID={self.imageable_id}, Created At={self.created_at}, Updated At={self.updated_at}, Original URL Field={self.original_url_field}" - -class PreviewManager: - def __init__(self, json_data): - self.id = json_data.get("id") - self.type = json_data.get("type") - self.runtime = json_data.get("runtime") - self.release_date = json_data.get("release_date") - self.quality = json_data.get("quality") - self.plot = json_data.get("plot") - self.seasons_count = json_data.get("seasons_count") - self.genres = [Genre(genre_data) for genre_data in json_data.get("genres", [])] - self.preview = Preview(json_data.get("preview")) - self.images = [Image(image_data) for image_data in json_data.get("images", [])] - - def __str__(self): - genres_str = "\n".join(str(genre) for genre in self.genres) - images_str = "\n".join(str(image) for image in self.images) - return f"Title: ID={self.id}, Type={self.type}, Runtime={self.runtime}, Release Date={self.release_date}, Quality={self.quality}, Plot={self.plot}, Seasons Count={self.seasons_count}\nGenres:\n{genres_str}\nPreview:\n{self.preview}\nImages:\n{images_str}" - - diff --git a/Src/Api/Streamingcommunity/Core/Class/SearchType.py b/Src/Api/Streamingcommunity/Core/Class/SearchType.py deleted file mode 100644 index 1b7355f..0000000 --- a/Src/Api/Streamingcommunity/Core/Class/SearchType.py +++ /dev/null @@ -1,85 +0,0 @@ -# 03.03.24 - -from typing import List - - -# Variable -from ...costant import SITE_NAME, DOMAIN_NOW - - - -class Image: - def __init__(self, data: dict): - self.imageable_id: int = data.get('imageable_id') - self.imageable_type: str = data.get('imageable_type') - self.filename: str = data.get('filename') - self.type: str = data.get('type') - self.original_url_field: str = data.get('original_url_field') - self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" - - def __str__(self): - return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')" - - -class MediaItem: - def __init__(self, data: dict): - self.id: int = data.get('id') - self.slug: str = data.get('slug') - self.name: str = data.get('name') - self.type: str = data.get('type') - self.score: str = data.get('score') - self.sub_ita: int = data.get('sub_ita') - self.last_air_date: str = data.get('last_air_date') - self.seasons_count: int = data.get('seasons_count') - self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] - - def __str__(self): - return f"MediaItem(id={self.id}, slug='{self.slug}', name='{self.name}', type='{self.type}', score='{self.score}', sub_ita={self.sub_ita}, last_air_date='{self.last_air_date}', seasons_count={self.seasons_count}, images={self.images})" - - -class MediaManager: - def __init__(self): - self.media_list: List[MediaItem] = [] - - def add_media(self, data: dict) -> None: - """ - Add media to the list. - - Args: - data (dict): Media data to add. - """ - self.media_list.append(MediaItem(data)) - - def get(self, index: int) -> MediaItem: - """ - Get a media item from the list by index. - - Args: - index (int): The index of the media item to retrieve. - - Returns: - MediaItem: The media item at the specified index. - """ - return self.media_list[index] - - def get_length(self) -> int: - """ - Get the number of media find with research - - Returns: - int: Number of episodes. - """ - return len(self.media_list) - - def clear(self) -> None: - """ - This method clears the medias list. - - Args: - self: The object instance. - """ - self.media_list.clear() - - def __str__(self): - return f"MediaManager(num_media={len(self.media_list)})" - diff --git a/Src/Api/Streamingcommunity/Core/Class/SeriesType.py b/Src/Api/Streamingcommunity/Core/Class/SeriesType.py deleted file mode 100644 index b8c14ef..0000000 --- a/Src/Api/Streamingcommunity/Core/Class/SeriesType.py +++ /dev/null @@ -1,67 +0,0 @@ -# 03.03.24 - -from typing import List, Dict, Union - - -class Title: - def __init__(self, title_data: Dict[str, Union[int, str, None]]): - self.id: int = title_data.get('id') - self.number: int = title_data.get('number') - self.name: str = title_data.get('name') - self.plot: str = title_data.get('plot') - self.release_date: str = title_data.get('release_date') - self.title_id: int = title_data.get('title_id') - self.created_at: str = title_data.get('created_at') - self.updated_at: str = title_data.get('updated_at') - self.episodes_count: int = title_data.get('episodes_count') - - def __str__(self): - return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})" - - -class TitleManager: - def __init__(self): - self.titles: List[Title] = [] - - def add_title(self, title_data: Dict[str, Union[int, str, None]]): - """ - Add a new title to the manager. - - Args: - title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title. - """ - title = Title(title_data) - self.titles.append(title) - - def get_title_by_index(self, index: int) -> Title: - """ - Get a title by its index. - - Args: - index (int): Index of the title to retrieve. - - Returns: - Title: The title object. - """ - return self.titles[index] - - def get_length(self) -> int: - """ - Get the number of titles in the manager. - - Returns: - int: Number of titles. - """ - return len(self.titles) - - def clear(self) -> None: - """ - This method clears the titles list. - - Args: - self: The object instance. - """ - self.titles.clear() - - def __str__(self): - return f"TitleManager(num_titles={len(self.titles)})" diff --git a/Src/Api/Streamingcommunity/Core/Class/WindowType.py b/Src/Api/Streamingcommunity/Core/Class/WindowType.py deleted file mode 100644 index 07acca8..0000000 --- a/Src/Api/Streamingcommunity/Core/Class/WindowType.py +++ /dev/null @@ -1,160 +0,0 @@ -# 03.03.24 - -import re -import logging - -from typing import Dict, Any - - -class WindowVideo: - def __init__(self, data: Dict[str, Any]): - self.data = data - self.id: int = data.get('id', '') - self.name: str = data.get('name', '') - self.filename: str = data.get('filename', '') - self.size: str = data.get('size', '') - self.quality: str = data.get('quality', '') - self.duration: str = data.get('duration', '') - self.views: int = data.get('views', '') - self.is_viewable: bool = data.get('is_viewable', '') - self.status: str = data.get('status', '') - self.fps: float = data.get('fps', '') - self.legacy: bool = data.get('legacy', '') - self.folder_id: int = data.get('folder_id', '') - self.created_at_diff: str = data.get('created_at_diff', '') - - def __str__(self): - return f"WindowVideo(id={self.id}, name='{self.name}', filename='{self.filename}', size='{self.size}', quality='{self.quality}', duration='{self.duration}', views={self.views}, is_viewable={self.is_viewable}, status='{self.status}', fps={self.fps}, legacy={self.legacy}, folder_id={self.folder_id}, created_at_diff='{self.created_at_diff}')" - -class WindowParameter: - def __init__(self, data: Dict[str, Any]): - self.data = data - self.token: str = data.get('token', '') - self.token360p: str = data.get('token360p', '') - self.token480p: str = data.get('token480p', '') - self.token720p: str = data.get('token720p', '') - self.token1080p: str = data.get('token1080p', '') - self.expires: str = data.get('expires', '') - - def __str__(self): - return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')" - - -class DynamicJSONConverter: - """ - Class for converting an input string into dynamic JSON. - """ - - def __init__(self, input_string: str): - """ - Initialize the converter with the input string. - - Args: - input_string (str): The input string to convert. - """ - self.input_string = input_string - self.json_data = {} - - def _parse_key_value(self, key: str, value: str): - """ - Parse a key-value pair. - - Args: - key (str): The key. - value (str): The value. - - Returns: - object: The parsed value. - """ - try: - value = value.strip() - - if value.startswith('{'): - return self._parse_json_object(value) - else: - return self._parse_non_json_value(value) - - except Exception as e: - logging.error(f"Error parsing key-value pair '{key}': {e}") - raise - - def _parse_json_object(self, obj_str: str): - """ - Parse a JSON object. - - Args: - obj_str (str): The string representation of the JSON object. - - Returns: - dict: The parsed JSON object. - """ - try: - # Use regular expression to find key-value pairs in the JSON object string - obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str)) - - # Strip double quotes from values and return the parsed dictionary - return {k: v.strip('"') for k, v in obj_dict.items()} - - except Exception as e: - logging.error(f"Error parsing JSON object: {e}") - raise - - def _parse_non_json_value(self, value: str): - """ - Parse a non-JSON value. - - Args: - value (str): The value to parse. - - Returns: - object: The parsed value. - """ - try: - - # Remove extra quotes and convert to lowercase - value = value.replace('"', "").strip().lower() - - if value.endswith('\n}'): - value = value.replace('\n}', '') - - # Check if the value matches 'true' or 'false' using regular expressions - if re.match(r'\btrue\b', value, re.IGNORECASE): - return True - - elif re.match(r'\bfalse\b', value, re.IGNORECASE): - return False - - return value - - except Exception as e: - logging.error(f"Error parsing non-JSON value: {e}") - raise - - def convert_to_dynamic_json(self): - """ - Convert the input string into dynamic JSON. - - Returns: - str: The JSON representation of the result. - """ - try: - - # Replace invalid characters with valid JSON syntax - self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}" - - # Find all key-value matches in the input string using regular expression - matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string) - - for match in matches: - key = match[0].strip() - value = match[1].strip() - - # Parse each key-value pair and add it to the json_data dictionary - self.json_data[key] = self._parse_key_value(key, value) - - # Convert the json_data dictionary to a formatted JSON string - return self.json_data - - except Exception as e: - logging.error(f"Error converting to dynamic JSON: {e}") - raise diff --git a/Src/Api/Streamingcommunity/Core/Player/vixcloud.py b/Src/Api/Streamingcommunity/Core/Player/vixcloud.py deleted file mode 100644 index dfc3cc5..0000000 --- a/Src/Api/Streamingcommunity/Core/Player/vixcloud.py +++ /dev/null @@ -1,228 +0,0 @@ -# 01.03.24 - -import sys -import logging -from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse - - -# External libraries -import httpx -from bs4 import BeautifulSoup - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util.console import console, Panel - - -# Logic class -from ..Class.SeriesType import TitleManager -from ..Class.EpisodeType import EpisodeManager -from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter - - -# Variable -from ...costant import SITE_NAME - - -class VideoSource: - def __init__(self): - """ - Initialize a VideoSource object. - """ - self.headers = { - 'user-agent': get_headers() - } - self.is_series = False - self.base_name = SITE_NAME - - def setup(self, version: str = None, domain: str = None, media_id: int = None, series_name: str = None): - """ - Set up the class - - Args: - - version (str): The version to set. - - media_id (str): The media ID to set. - - media_id (int): The media ID to set. - - series_name (str): The series name to set. - """ - self.version = version - self.domain = domain - self.media_id = media_id - - if series_name is not None: - self.is_series = True - self.series_name = series_name - self.obj_title_manager: TitleManager = TitleManager() - self.obj_episode_manager: EpisodeManager = EpisodeManager() - - def collect_info_seasons(self) -> None: - """ - Collect information about seasons. - """ - - self.headers = { - 'user-agent': get_headers(), - 'x-inertia': 'true', - 'x-inertia-version': self.version, - } - - try: - - response = httpx.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers) - response.raise_for_status() - - # Extract JSON response if available - json_response = response.json().get('props', {}).get('title', {}).get('seasons', []) - - # Iterate over JSON data and add titles to the manager - for dict_season in json_response: - self.obj_title_manager.add_title(dict_season) - - except Exception as e: - logging.error(f"Error collecting season info: {e}") - raise - - def collect_title_season(self, number_season: int) -> None: - """ - Collect information about a specific season. - - Args: - - number_season (int): The season number. - """ - try: - - # Make a request to collect information about a specific season - response = httpx.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers) - response.raise_for_status() - - # Extract JSON response if available - json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', []) - - # Iterate over JSON data and add episodes to the manager - for dict_episode in json_response: - self.obj_episode_manager.add_episode(dict_episode) - - except Exception as e: - logging.error(f"Error collecting title season info: {e}") - raise - - def get_iframe(self, episode_id: int = None) -> None: - """ - Get iframe source. - - Args: - - episode_id (int): The episode ID, present only for series - """ - params = {} - - if self.is_series: - params = { - 'episode_id': episode_id, - 'next_episode': '1' - } - - try: - - # Make a request to get iframe source - response = httpx.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params) - response.raise_for_status() - - # Parse response with BeautifulSoup to get iframe source - soup = BeautifulSoup(response.text, "html.parser") - self.iframe_src = soup.find("iframe").get("src") - - except Exception as e: - logging.error(f"Error getting iframe source: {e}") - raise - - def parse_script(self, script_text: str) -> None: - """ - Parse script text. - - Args: - - script_text (str): The script text to parse. - """ - try: - - converter = DynamicJSONConverter(script_text) - result = converter.convert_to_dynamic_json() - - # Create window video and parameter objects - self.window_video = WindowVideo(result['video']) - self.window_parameter = WindowParameter(result['masterPlaylist']) - - except Exception as e: - logging.error(f"Error parsing script: {e}") - raise - - def get_content(self) -> None: - """ - Get content. - """ - try: - - # Check if iframe source is available - if self.iframe_src is not None: - - # Make a request to get content - try: - response = httpx.get(self.iframe_src, headers=self.headers) - response.raise_for_status() - - except Exception as e: - print("\n") - console.print(Panel("[red bold]Coming soon", title="Notification", title_align="left", border_style="yellow")) - sys.exit(0) - - if response.status_code == 200: - - # Parse response with BeautifulSoup to get content - soup = BeautifulSoup(response.text, "html.parser") - script = soup.find("body").find("script").text - - # Parse script to get video information - self.parse_script(script_text=script) - - except Exception as e: - logging.error(f"Error getting content: {e}") - raise - - def get_playlist(self) -> str: - """ - Get playlist. - - Returns: - str: The playlist URL, or None if there's an error. - """ - - iframe_url = self.iframe_src - - # Create base uri for playlist - base_url = f'https://vixcloud.co/playlist/{self.window_video.id}' - query = urlencode(list(self.window_parameter.data.items())) - master_playlist_url = urljoin(base_url, '?' + query) - - # Parse the current query string and the master playlist URL query string - current_params = parse_qs(iframe_url[1:]) - m = urlparse(master_playlist_url) - master_params = parse_qs(m.query) - - # Create the final parameters dictionary with token and expires from the master playlist - final_params = { - "token": master_params.get("token", [""])[0], - "expires": master_params.get("expires", [""])[0] - } - - # Add conditional parameters - if "b" in current_params: - final_params["b"] = "1" - if "canPlayFHD" in current_params: - final_params["h"] = "1" - - # Construct the new query string and final URL - new_query = urlencode(final_params) # Encode final_params into a query string - new_url = m._replace(query=new_query) # Replace the old query string with the new one - final_url = urlunparse(new_url) # Construct the final URL from the modified parts - - return final_url diff --git a/Src/Api/Streamingcommunity/Core/Util/__init__.py b/Src/Api/Streamingcommunity/Core/Util/__init__.py deleted file mode 100644 index d9eb4f1..0000000 --- a/Src/Api/Streamingcommunity/Core/Util/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# 21.05.24 - -from .get_domain import grab_sc_top_level_domain as extract_domain - -from .manage_ep import ( - manage_selection, - map_episode_title -) \ No newline at end of file diff --git a/Src/Api/Streamingcommunity/Core/Util/get_domain.py b/Src/Api/Streamingcommunity/Core/Util/get_domain.py deleted file mode 100644 index 63f224a..0000000 --- a/Src/Api/Streamingcommunity/Core/Util/get_domain.py +++ /dev/null @@ -1,106 +0,0 @@ -# 02.04.24 - -import os -import threading -import logging - - -# External library -import httpx - - -# Internal utilities -from Src.Lib.Google import search as google_search - - - -def check_url_for_content(url: str, content: str) -> bool: - """ - Check if a URL contains specific content. - - Args: - - url (str): The URL to check. - - content (str): The content to search for in the response. - - Returns: - bool: True if the content is found, False otherwise. - """ - try: - - logging.info(f"Test site to extract domain: {url}") - response = httpx.get(url, timeout = 1) - response.raise_for_status() - - if content in response.text: - return True - - except Exception as e: - pass - - return False - - -def grab_top_level_domain(base_url: str, target_content: str) -> str: - """ - Get the top-level domain (TLD) from a list of URLs. - - Args: - - base_url (str): The base URL to construct complete URLs. - - target_content (str): The content to search for in the response. - - Returns: - str: The found TLD, if any. - """ - results = [] - threads = [] - path_file = os.path.join("Test", "data", "TLD", "tld_list.txt") - logging.info(f"Load file: {path_file}") - - def url_checker(url: str): - if check_url_for_content(url, target_content): - results.append(url.split(".")[-1]) - - if not os.path.exists(path_file): - raise FileNotFoundError("The file 'tld_list.txt' does not exist.") - - with open(path_file, "r") as file: - urls = [f"{base_url}.{x.strip().lower()}" for x in file] - - for url in urls: - thread = threading.Thread(target=url_checker, args=(url,)) - thread.start() - threads.append(thread) - - for thread in threads: - thread.join() - - if results: - return results[-1] - - -def grab_top_level_domain_light(query: str) -> str: - """ - Get the top-level domain (TLD) using a light method via Google search. - - Args: - - query (str): The search query for Google search. - - Returns: - str: The found TLD, if any. - """ - for result in google_search(query, num=1, stop=1, pause=2): - return result.split(".", 2)[-1].replace("/", "") - - -def grab_sc_top_level_domain(method: str) -> str: - """ - Get the top-level domain (TLD) for the streaming community. - Args: - method (str): The method to use to obtain the TLD ("light" or "strong"). - Returns: - str: The found TLD, if any. - """ - if method == "light": - return grab_top_level_domain_light("streaming community") - elif method == "strong": - return grab_top_level_domain("https://streamingcommunity", '') diff --git a/Src/Api/Streamingcommunity/Core/Util/manage_ep.py b/Src/Api/Streamingcommunity/Core/Util/manage_ep.py deleted file mode 100644 index ab4ae71..0000000 --- a/Src/Api/Streamingcommunity/Core/Util/manage_ep.py +++ /dev/null @@ -1,75 +0,0 @@ -# 02.05.24 - -import logging - -from typing import List - - -# Internal utilities -from Src.Util._jsonConfig import config_manager -from Src.Util.os import remove_special_characters - - -# Logic class -from ..Class.EpisodeType import Episode - - -# Config -MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') - - -def manage_selection(cmd_insert: str, max_count: int) -> List[int]: - """ - Manage user selection for seasons to download. - - Args: - - cmd_insert (str): User input for season selection. - - max_count (int): Maximum count of seasons available. - - Returns: - list_season_select (List[int]): List of selected seasons. - """ - list_season_select = [] - logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") - - # For a single number (e.g., '5') - if cmd_insert.isnumeric(): - list_season_select.append(int(cmd_insert)) - - # For a range (e.g., '[5-12]') - elif "[" in cmd_insert: - start, end = map(int, cmd_insert[1:-1].split('-')) - list_season_select = list(range(start, end + 1)) - - # For all seasons - elif cmd_insert == "*": - list_season_select = list(range(1, max_count+1)) - - # Return list of selected seasons) - logging.info(f"List return: {list_season_select}") - return list_season_select - - -def map_episode_title(tv_name: str, episode: Episode, number_season: int): - """ - Maps the episode title to a specific format. - - Args: - - tv_name (str): The name of the TV show. - - episode (Episode): The episode object. - - number_season (int): The season number. - - Returns: - str: The mapped episode title. - """ - map_episode_temp = MAP_EPISODE - map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) - map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2)) - map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2)) - map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode.name)) - - # Additional fix - map_episode_temp = map_episode_temp.replace(".", "_") - - logging.info(f"Map episode string return: {map_episode_temp}") - return map_episode_temp diff --git a/Src/Api/Streamingcommunity/__init__.py b/Src/Api/Streamingcommunity/__init__.py deleted file mode 100644 index 078449a..0000000 --- a/Src/Api/Streamingcommunity/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -# 21.05.24 - -# Internal utilities -from Src.Util.console import console, msg - - -# Logic class -from .site import ( - get_version_and_domain, - title_search, - get_select_title -) - -from .film import download_film -from .series import download_series - - -# Variable -indice = 0 - - -def search(): - """ - Main function of the application for film and series. - """ - - # Make request to site to get content that corrsisponde to that string - string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() - - # Get site domain and version and get result of the search - site_version, domain = get_version_and_domain() - len_database = title_search(string_to_search, domain) - - if len_database > 0: - - # Select title from list - select_title = get_select_title() - - # For series - if select_title.type == 'tv': - download_series( - tv_id=select_title.id, - tv_name=select_title.slug, - version=site_version, - domain=domain - ) - - # For film - else: - download_film( - id_film=select_title.id, - title_name=select_title.slug, - domain=domain - ) - - else: - console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/Streamingcommunity/costant.py b/Src/Api/Streamingcommunity/costant.py deleted file mode 100644 index 68f52fc..0000000 --- a/Src/Api/Streamingcommunity/costant.py +++ /dev/null @@ -1,15 +0,0 @@ -# 26.05.24 - -import os - - -# Internal utilities -from Src.Util._jsonConfig import config_manager - - -SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) -ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) - -MOVIE_FOLDER = "Movie" -SERIES_FOLDER = "Serie" \ No newline at end of file diff --git a/Src/Api/Streamingcommunity/film.py b/Src/Api/Streamingcommunity/film.py deleted file mode 100644 index 95fdd19..0000000 --- a/Src/Api/Streamingcommunity/film.py +++ /dev/null @@ -1,56 +0,0 @@ -# 3.12.23 - -import os -import logging - - -# Internal utilities -from Src.Util.console import console -from Src.Lib.Hls.downloader import Downloader -from Src.Util.message import start_message - - -# Logic class -from .Core.Player.vixcloud import VideoSource - - -# Variable -from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER -video_source = VideoSource() - - -def download_film(id_film: str, title_name: str, domain: str): - """ - Downloads a film using the provided film ID, title name, and domain. - - Args: - - id_film (str): The ID of the film. - - title_name (str): The name of the film title. - - domain (str): The domain of the site - """ - - # Start message and display film information - start_message() - console.print(f"[yellow]Download: [red]{title_name} \n") - - # Set domain and media ID for the video source - video_source.setup( - domain = domain, - media_id = id_film - ) - - # Retrieve scws and if available master playlist - video_source.get_iframe() - video_source.get_content() - master_playlist = video_source.get_playlist() - - # Define the filename and path for the downloaded film - mp4_name = title_name.replace("-", "_") - mp4_format = (mp4_name) + ".mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) - - # Download the film using the m3u8 playlist, and output filename - Downloader( - m3u8_playlist = master_playlist, - output_filename = os.path.join(mp4_path, mp4_format) - ).start() \ No newline at end of file diff --git a/Src/Api/Streamingcommunity/series.py b/Src/Api/Streamingcommunity/series.py deleted file mode 100644 index 6b69387..0000000 --- a/Src/Api/Streamingcommunity/series.py +++ /dev/null @@ -1,183 +0,0 @@ -# 3.12.23 - -import os -import sys -import logging - - -# Internal utilities -from Src.Util.console import console, msg -from Src.Util.table import TVShowManager -from Src.Util.message import start_message -from Src.Lib.Hls.downloader import Downloader - - -# Logic class -from .Core.Player.vixcloud import VideoSource -from .Core.Util import manage_selection, map_episode_title - - -# Variable -from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER -video_source = VideoSource() -table_show_manager = TVShowManager() - - -def donwload_video(tv_name: str, index_season_selected: int, index_episode_selected: int) -> None: - """ - Download a single episode video. - - Args: - - tv_name (str): Name of the TV series. - - index_season_selected (int): Index of the selected season. - - index_episode_selected (int): Index of the selected episode. - """ - - start_message() - - # Get info about episode - obj_episode = video_source.obj_episode_manager.episodes[index_episode_selected - 1] - console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.name}") - print() - - # Define filename and path for the downloaded video - mp4_name = f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4" - mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, tv_name, f"S{index_season_selected}") - - # Retrieve scws and if available master playlist - video_source.get_iframe(obj_episode.id) - video_source.get_content() - master_playlist = video_source.get_playlist() - - # Download the episode - Downloader( - m3u8_playlist = master_playlist, - output_filename = os.path.join(mp4_path, mp4_name) - ).start() - - -def donwload_episode(tv_name: str, index_season_selected: int, donwload_all: bool = False) -> None: - """ - Download all episodes of a season. - - Args: - - tv_name (str): Name of the TV series. - - index_season_selected (int): Index of the selected season. - - donwload_all (bool): Donwload all seasons episodes - """ - - # Clean memory of all episodes and get the number of the season (some dont follow rule of [1,2,3,4,5] but [1,2,3,145,5,6,7]). - video_source.obj_episode_manager.clear() - season_number = (video_source.obj_title_manager.titles[index_season_selected-1].number) - - # Start message and collect information about episodes - start_message() - video_source.collect_title_season(season_number) - episodes_count = video_source.obj_episode_manager.get_length() - - # Download all episodes wihtout ask - if donwload_all: - for i_episode in range(1, episodes_count+1): - donwload_video(tv_name, index_season_selected, i_episode) - - console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.") - - # If not download all episode but a single season - if not donwload_all: - - # Display episodes list and manage user selection - last_command = display_episodes_list() - list_episode_select = manage_selection(last_command, episodes_count) - - # Download selected episodes - if len(list_episode_select) == 1 and last_command != "*": - donwload_video(tv_name, index_season_selected, list_episode_select[0]) - - # Download all other episodes selecter - else: - for i_episode in list_episode_select: - donwload_video(tv_name, index_season_selected, i_episode) - - -def download_series(tv_id: str, tv_name: str, version: str, domain: str) -> None: - """ - Download all episodes of a TV series. - - Args: - - tv_id (str): ID of the TV series. - - tv_name (str): Name of the TV series. - - version (str): Version of the TV series. - - domain (str): Domain from which to download. - """ - - # Start message and set up video source - start_message() - - # Setup video source - video_source.setup( - version = version, - domain = domain, - media_id = tv_id, - series_name = tv_name - ) - - # Collect information about seasons - video_source.collect_info_seasons() - seasons_count = video_source.obj_title_manager.get_length() - - # Prompt user for season selection and download episodes - console.print(f"\n[green]Season find: [red]{seasons_count}") - index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")) - list_season_select = manage_selection(index_season_selected, seasons_count) - - # Download selected episodes - if len(list_season_select) == 1 and index_season_selected != "*": - if 1 <= int(index_season_selected) <= seasons_count: - donwload_episode(tv_name, list_season_select[0]) - - # Dowload all seasons and episodes - elif index_season_selected == "*": - for i_season in list_season_select: - donwload_episode(tv_name, i_season, True) - - # Download all other season selecter - else: - for i_season in list_season_select: - donwload_episode(tv_name, i_season) - - -def display_episodes_list() -> str: - """ - Display episodes list and handle user input. - - Returns: - last_command (str): Last command entered by the user. - """ - - # Set up table for displaying episodes - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - "Duration": {'color': 'green'} - } - table_show_manager.add_column(column_info) - - # Populate the table with episodes information - for i, media in enumerate(video_source.obj_episode_manager.episodes): - table_show_manager.add_tv_show({ - 'Index': str(media.number), - 'Name': media.name, - 'Duration': str(media.duration) - }) - - # Run the table and handle user input - last_command = table_show_manager.run() - - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - return last_command diff --git a/Src/Api/Streamingcommunity/site.py b/Src/Api/Streamingcommunity/site.py deleted file mode 100644 index 6802459..0000000 --- a/Src/Api/Streamingcommunity/site.py +++ /dev/null @@ -1,204 +0,0 @@ -# 10.12.23 - -import sys -import json -import logging - -from typing import Tuple - - -# External libraries -import httpx -from bs4 import BeautifulSoup -from unidecode import unidecode - - -# Internal utilities -from Src.Util.headers import get_headers -from Src.Util._jsonConfig import config_manager -from Src.Util.console import console -from Src.Util.table import TVShowManager - - -# Logic class -from .Core.Util import extract_domain -from .Core.Class.SearchType import MediaManager, MediaItem - - -# Config -from .costant import SITE_NAME - - -# Variable -media_search_manager = MediaManager() -table_show_manager = TVShowManager() - - - -def get_version(text: str) -> tuple[str, list]: - """ - Extracts the version from the HTML text of a webpage. - - Args: - - text (str): The HTML text of the webpage. - - Returns: - str: The version extracted from the webpage. - list: Top 10 titles headlines for today. - """ - console.print("[cyan]Make request to get version [white]...") - - try: - - # Parse request to site - soup = BeautifulSoup(text, "html.parser") - - # Extract version - version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version'] - sliders = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['props']['sliders'] - - title_top_10 = sliders[2] - - # Collect info about only top 10 title - list_title_top_10 = [] - for title in title_top_10['titles']: - list_title_top_10.append({ - 'name': title['name'], - 'type': title['type'] - }) - - console.print(f"[cyan]Get version [white]=> [red]{version} \n") - - return version, list_title_top_10 - - except Exception as e: - logging.error(f"Error extracting version: {e}") - raise - - -def get_version_and_domain(new_domain = None) -> Tuple[str, str]: - """ - Retrieves the version and domain of the streaming website. - - This function retrieves the version and domain of the streaming website. - It first checks the accessibility of the current site. - If the site is accessible, it extracts the version from the response. - If configured to do so, it also scrapes and prints the titles of the moments. - If the site is inaccessible, it attempts to obtain a new domain using the 'insta' method. - - Returns: - Tuple[str, str]: A tuple containing the version and domain. - """ - - # Get the current domain from the configuration - if new_domain is None: - config_domain = config_manager.get('SITE', SITE_NAME) - else: - config_domain = new_domain - - # Test the accessibility of the current site - try: - - # Make requests to site to get text - console.print(f"[cyan]Test site[white]: [red]https://{SITE_NAME}.{config_domain}") - response = httpx.get(f"https://{SITE_NAME}.{config_domain}") - console.print(f"[cyan]Test respost site[white]: [red]{response.status_code} \n") - - # Extract version from the response - version, list_title_top_10 = get_version(response.text) - - return version, config_domain - - except: - - console.print("[red]\nExtract new DOMAIN from TLD list.") - new_domain = extract_domain(method="light") - console.log(f"[cyan]Extract new domain: [red]{new_domain}") - - # Update the domain in the configuration file - config_manager.set_key('SITE', SITE_NAME, str(new_domain)) - config_manager.write_config() - - # Retry to get the version and domain - return get_version_and_domain(new_domain) - - -def title_search(title_search: str, domain: str) -> int: - """ - Search for titles based on a search query. - - Args: - - title_search (str): The title to search for. - - domain (str): The domain to search on. - - Returns: - int: The number of titles found. - """ - - # Send request to search for titles ( replace à to a and space to "+" ) - response = httpx.get(f"https://{SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()}) - response.raise_for_status() - - # Add found titles to media search manager - for dict_title in response.json()['data']: - media_search_manager.add_media(dict_title) - - # Return the number of titles found - return media_search_manager.get_length() - - -def get_select_title(type_filter: list = None) -> MediaItem: - """ - Display a selection of titles and prompt the user to choose one. - - Args: - - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] - - Returns: - MediaItem: The selected media item. - """ - - # Set up table for displaying titles - table_show_manager.set_slice_end(10) - - # Add columns to the table - column_info = { - "Index": {'color': 'red'}, - "Name": {'color': 'magenta'}, - "Type": {'color': 'yellow'}, - "Score": {'color': 'cyan'}, - "Date": {'color': 'green'} - } - table_show_manager.add_column(column_info) - - # Populate the table with title information - for i, media in enumerate(media_search_manager.media_list): - - # Filter for only a list of category - if type_filter is not None: - if str(media.type) not in type_filter: - continue - - table_show_manager.add_tv_show({ - 'Index': str(i), - 'Name': media.name, - 'Type': media.type, - 'Score': media.score, - 'Date': media.last_air_date - }) - - # Run the table and handle user input - last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) - table_show_manager.clear() - - # Handle user's quit command - if last_command == "q": - console.print("\n[red]Quit [white]...") - sys.exit(0) - - # Check if the selected index is within range - if 0 <= int(last_command) <= len(media_search_manager.media_list): - return media_search_manager.get(int(last_command)) - else: - console.print("\n[red]Wrong index") - sys.exit(0)