diff --git a/Src/Api/altadefinizione/Core/Class/SearchType.py b/Src/Api/altadefinizione/Core/Class/SearchType.py new file mode 100644 index 0000000..8e58566 --- /dev/null +++ b/Src/Api/altadefinizione/Core/Class/SearchType.py @@ -0,0 +1,62 @@ +# 26.05.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('name') + self.type: str = "film" + self.score: str = data.get('score') + self.url: int = data.get('url') + + def __str__(self): + return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" + diff --git a/Src/Api/altadefinizione/Core/Player/supervideo.py b/Src/Api/altadefinizione/Core/Player/supervideo.py new file mode 100644 index 0000000..dfe5138 --- /dev/null +++ b/Src/Api/altadefinizione/Core/Player/supervideo.py @@ -0,0 +1,178 @@ +# 26.05.24 + +import re +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.os import run_node_script + + +class VideoSource: + + def __init__(self) -> None: + """ + Initializes the VideoSource object with default values. + + Attributes: + headers (dict): An empty dictionary to store HTTP headers. + """ + self.headers = {'user-agent': get_headers()} + + def setup(self, url: str) -> None: + """ + Sets up the video source with the provided URL. + + Args: + url (str): The URL of the video source. + """ + self.url = url + + def make_request(self, url: str) -> str: + """ + Make an HTTP GET request to the provided URL. + + Args: + url (str): The URL to make the request to. + + Returns: + str: The response content if successful, None otherwise. + """ + + try: + response = httpx.get(url, headers=self.headers, follow_redirects=True) + response.raise_for_status() + return response.text + + except Exception as e: + logging.error(f"Request failed [supervideo]: {e}") + return None + + def parse_html(self, html_content: str) -> BeautifulSoup: + """ + Parse the provided HTML content using BeautifulSoup. + + Args: + html_content (str): The HTML content to parse. + + Returns: + BeautifulSoup: Parsed HTML content if successful, None otherwise. + """ + + try: + soup = BeautifulSoup(html_content, "html.parser") + return soup + + except Exception as e: + logging.error(f"Failed to parse HTML content: {e}") + return None + + def get_iframe(self, soup): + """ + Extracts the source URL of the second iframe in the provided BeautifulSoup object. + + Args: + soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML. + + Returns: + str: The source URL of the second iframe, or None if not found. + """ + iframes = soup.find_all("iframe") + if iframes and len(iframes) > 1: + return iframes[1].get("src") + + return None + + def find_content(self, url): + """ + Makes a request to the specified URL and parses the HTML content. + + Args: + url (str): The URL to fetch content from. + + Returns: + BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails. + """ + content = self.make_request(url) + if content: + return self.parse_html(content) + + return None + + def get_result_node_js(self, soup): + """ + Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL. + + Args: + soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content. + + Returns: + str: The output from the Node.js script, or None if the script cannot be found or executed. + """ + for script in soup.find_all("script"): + if "eval" in str(script): + new_script = str(script.text).replace("eval", "var a = ") + new_script = new_script.replace(")))", ")));console.log(a);") + return run_node_script(new_script) + + return None + + def get_playlist(self) -> str: + """ + Download a video from the provided URL. + + Returns: + str: The URL of the downloaded video if successful, None otherwise. + """ + try: + html_content = self.make_request(self.url) + if not html_content: + logging.error("Failed to fetch HTML content.") + return None + + soup = self.parse_html(html_content) + if not soup: + logging.error("Failed to parse HTML content.") + return None + + iframe_src = self.get_iframe(soup) + if not iframe_src: + logging.error("No iframe found.") + return None + + down_page_soup = self.find_content(iframe_src) + if not down_page_soup: + logging.error("Failed to fetch down page content.") + return None + + pattern = r'data-link="(//supervideo[^"]+)"' + match = re.search(pattern, str(down_page_soup)) + if not match: + logging.error("No match found for supervideo URL.") + return None + + supervideo_url = "https:" + match.group(1) + supervideo_soup = self.find_content(supervideo_url) + if not supervideo_soup: + logging.error("Failed to fetch supervideo content.") + return None + + result = self.get_result_node_js(supervideo_soup) + if not result: + logging.error("No video URL found in script.") + return None + + master_playlist = str(result).split(":")[3].split('"}')[0] + return f"https:{master_playlist}" + + except Exception as e: + logging.error(f"An error occurred: {e}") + return None + diff --git a/Src/Api/altadefinizione/__init__.py b/Src/Api/altadefinizione/__init__.py new file mode 100644 index 0000000..27a9885 --- /dev/null +++ b/Src/Api/altadefinizione/__init__.py @@ -0,0 +1,37 @@ +# 26.05.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, get_select_title +from .film import download_film + + +# Variable +indice = 2 + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = get_select_title() + + # Download only film + download_film( + title_name=select_title.name, + url=select_title.url + ) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/altadefinizione/costant.py b/Src/Api/altadefinizione/costant.py new file mode 100644 index 0000000..aa9d6eb --- /dev/null +++ b/Src/Api/altadefinizione/costant.py @@ -0,0 +1,14 @@ +# 26.05.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) + +MOVIE_FOLDER = "Movie" \ No newline at end of file diff --git a/Src/Api/altadefinizione/film.py b/Src/Api/altadefinizione/film.py new file mode 100644 index 0000000..d0cc4b7 --- /dev/null +++ b/Src/Api/altadefinizione/film.py @@ -0,0 +1,55 @@ +# 26.05.24 + +import os +import sys +import logging + + +# Internal utilities +from Src.Util.console import console +from Src.Lib.Hls.downloader import Downloader +from Src.Util.message import start_message + + +# Logic class +from .Core.Player.supervideo import VideoSource + + +# Config +from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER + + +# Variable +video_source = VideoSource() + + +def download_film(title_name: str, url: str): + """ + Downloads a film using the provided film ID, title name, and domain. + + Args: + - title_name (str): The name of the film title. + - url (str): The url of the video + """ + + # Start message and display film information + start_message() + console.print(f"[yellow]Download: [red]{title_name} \n") + + # Set domain and media ID for the video source + video_source.setup( + url = url + ) + + # Define output path + mp4_name = str(title_name).replace("-", "_") + ".mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + + # Get m3u8 master playlist + master_playlist = video_source.get_playlist() + + # Download the film using the m3u8 playlist, and output filename + Downloader( + m3u8_playlist = master_playlist, + output_filename = os.path.join(mp4_path, mp4_name) + ).start() \ No newline at end of file diff --git a/Src/Api/altadefinizione/site.py b/Src/Api/altadefinizione/site.py new file mode 100644 index 0000000..1ce41b6 --- /dev/null +++ b/Src/Api/altadefinizione/site.py @@ -0,0 +1,119 @@ +# 26.05.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.table import TVShowManager +from Src.Util.console import console +from Src.Util.headers import get_headers + + +# Logic class +from .Core.Class.SearchType import MediaManager, MediaItem + + +# Variable +from .costant import SITE_NAME, DOMAIN_NOW +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(title_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. + """ + + # Send request to search for titles + response = httpx.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()}) + response.raise_for_status() + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + table_content = soup.find('div', id="dle-content") + + # Scrape div film in table on single page + for film_div in table_content.find_all('div', class_='col-lg-3'): + title = film_div.find('h2', class_='titleFilm').get_text(strip=True) + link = film_div.find('h2', class_='titleFilm').find('a')['href'] + imdb_rating = film_div.find('div', class_='imdb-rate').get_text(strip=True).split(":")[-1] + + film_info = { + 'name': title, + 'url': link, + 'score': imdb_rating + } + + media_search_manager.add_media(film_info) + + # Return the number of titles found + return media_search_manager.get_length() + + +def get_select_title(type_filter: list = None) -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Args: + - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + "Score": {'color': 'cyan'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + + # Filter for only a list of category + if type_filter is not None: + if str(media.type) not in type_filter: + continue + + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + 'Score': media.score, + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + table_show_manager.clear() + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) diff --git a/Src/Api/animeunity/Core/Class/EpisodeType.py b/Src/Api/animeunity/Core/Class/EpisodeType.py new file mode 100644 index 0000000..4483274 --- /dev/null +++ b/Src/Api/animeunity/Core/Class/EpisodeType.py @@ -0,0 +1,91 @@ +# 03.03.24 + +from typing import Dict, Any, List + + +# Variable +from ...costant import SITE_NAME, DOMAIN_NOW + + + + +class Image: + def __init__(self, image_data: Dict[str, Any]): + self.id: int = image_data.get('id', '') + self.filename: str = image_data.get('filename', '') + self.type: str = image_data.get('type', '') + self.imageable_type: str = image_data.get('imageable_type', '') + self.imageable_id: int = image_data.get('imageable_id', '') + self.created_at: str = image_data.get('created_at', '') + self.updated_at: str = image_data.get('updated_at', '') + self.original_url_field: str = image_data.get('original_url_field', '') + self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" + + def __str__(self): + return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')" + + +class Episode: + def __init__(self, data: Dict[str, Any]): + self.id: int = data.get('id', '') + self.number: int = data.get('number', '') + self.name: str = data.get('name', '') + self.plot: str = data.get('plot', '') + self.duration: int = data.get('duration', '') + self.scws_id: int = data.get('scws_id', '') + self.season_id: int = data.get('season_id', '') + self.created_by: str = data.get('created_by', '') + self.created_at: str = data.get('created_at', '') + self.updated_at: str = data.get('updated_at', '') + self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] + + def __str__(self): + return f"Episode(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', duration={self.duration} sec)" + + +class EpisodeManager: + def __init__(self): + self.episodes: List[Episode] = [] + + def add_episode(self, episode_data: Dict[str, Any]): + """ + Add a new episode to the manager. + + Args: + - episode_data (Dict[str, Any]): A dictionary containing data for the new episode. + """ + episode = Episode(episode_data) + self.episodes.append(episode) + + def get_episode_by_index(self, index: int) -> Episode: + """ + Get an episode by its index. + + Args: + - index (int): Index of the episode to retrieve. + + Returns: + Episode: The episode object. + """ + return self.episodes[index] + + def get_length(self) -> int: + """ + Get the number of episodes in the manager. + + Returns: + int: Number of episodes. + """ + return len(self.episodes) + + def clear(self) -> None: + """ + This method clears the episodes list. + + Args: + - self: The object instance. + """ + self.episodes.clear() + + def __str__(self): + return f"EpisodeManager(num_episodes={len(self.episodes)})" diff --git a/Src/Api/animeunity/Core/Class/PreviewType.py b/Src/Api/animeunity/Core/Class/PreviewType.py new file mode 100644 index 0000000..28d741e --- /dev/null +++ b/Src/Api/animeunity/Core/Class/PreviewType.py @@ -0,0 +1,63 @@ +# 12.04.24 + +class Preview: + def __init__(self, data): + self.id = data.get("id") + self.title_id = data.get("title_id") + self.created_at = data.get("created_at") + self.updated_at = data.get("updated_at") + self.video_id = data.get("video_id") + self.is_viewable = data.get("is_viewable") + self.zoom_factor = data.get("zoom_factor") + self.filename = data.get("filename") + self.embed_url = data.get("embed_url") + + def __str__(self): + return f"Preview: ID={self.id}, Title ID={self.title_id}, Created At={self.created_at}, Updated At={self.updated_at}, Video ID={self.video_id}, Viewable={self.is_viewable}, Zoom Factor={self.zoom_factor}, Filename={self.filename}, Embed URL={self.embed_url}" + +class Genre: + def __init__(self, data): + self.id = data.get("id") + self.name = data.get("name") + self.type = data.get("type") + self.hidden = data.get("hidden") + self.created_at = data.get("created_at") + self.updated_at = data.get("updated_at") + self.pivot = data.get("pivot") + + def __str__(self): + return f"Genre: ID={self.id}, Name={self.name}, Type={self.type}, Hidden={self.hidden}, Created At={self.created_at}, Updated At={self.updated_at}, Pivot={self.pivot}" + +class Image: + def __init__(self, data): + self.id = data.get("id") + self.filename = data.get("filename") + self.type = data.get("type") + self.imageable_type = data.get("imageable_type") + self.imageable_id = data.get("imageable_id") + self.created_at = data.get("created_at") + self.updated_at = data.get("updated_at") + self.original_url_field = data.get("original_url_field") + + def __str__(self): + return f"Image: ID={self.id}, Filename={self.filename}, Type={self.type}, Imageable Type={self.imageable_type}, Imageable ID={self.imageable_id}, Created At={self.created_at}, Updated At={self.updated_at}, Original URL Field={self.original_url_field}" + +class PreviewManager: + def __init__(self, json_data): + self.id = json_data.get("id") + self.type = json_data.get("type") + self.runtime = json_data.get("runtime") + self.release_date = json_data.get("release_date") + self.quality = json_data.get("quality") + self.plot = json_data.get("plot") + self.seasons_count = json_data.get("seasons_count") + self.genres = [Genre(genre_data) for genre_data in json_data.get("genres", [])] + self.preview = Preview(json_data.get("preview")) + self.images = [Image(image_data) for image_data in json_data.get("images", [])] + + def __str__(self): + genres_str = "\n".join(str(genre) for genre in self.genres) + images_str = "\n".join(str(image) for image in self.images) + return f"Title: ID={self.id}, Type={self.type}, Runtime={self.runtime}, Release Date={self.release_date}, Quality={self.quality}, Plot={self.plot}, Seasons Count={self.seasons_count}\nGenres:\n{genres_str}\nPreview:\n{self.preview}\nImages:\n{images_str}" + + diff --git a/Src/Api/animeunity/Core/Class/SearchType.py b/Src/Api/animeunity/Core/Class/SearchType.py new file mode 100644 index 0000000..1b7355f --- /dev/null +++ b/Src/Api/animeunity/Core/Class/SearchType.py @@ -0,0 +1,85 @@ +# 03.03.24 + +from typing import List + + +# Variable +from ...costant import SITE_NAME, DOMAIN_NOW + + + +class Image: + def __init__(self, data: dict): + self.imageable_id: int = data.get('imageable_id') + self.imageable_type: str = data.get('imageable_type') + self.filename: str = data.get('filename') + self.type: str = data.get('type') + self.original_url_field: str = data.get('original_url_field') + self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" + + def __str__(self): + return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')" + + +class MediaItem: + def __init__(self, data: dict): + self.id: int = data.get('id') + self.slug: str = data.get('slug') + self.name: str = data.get('name') + self.type: str = data.get('type') + self.score: str = data.get('score') + self.sub_ita: int = data.get('sub_ita') + self.last_air_date: str = data.get('last_air_date') + self.seasons_count: int = data.get('seasons_count') + self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] + + def __str__(self): + return f"MediaItem(id={self.id}, slug='{self.slug}', name='{self.name}', type='{self.type}', score='{self.score}', sub_ita={self.sub_ita}, last_air_date='{self.last_air_date}', seasons_count={self.seasons_count}, images={self.images})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" + diff --git a/Src/Api/animeunity/Core/Class/SeriesType.py b/Src/Api/animeunity/Core/Class/SeriesType.py new file mode 100644 index 0000000..b8c14ef --- /dev/null +++ b/Src/Api/animeunity/Core/Class/SeriesType.py @@ -0,0 +1,67 @@ +# 03.03.24 + +from typing import List, Dict, Union + + +class Title: + def __init__(self, title_data: Dict[str, Union[int, str, None]]): + self.id: int = title_data.get('id') + self.number: int = title_data.get('number') + self.name: str = title_data.get('name') + self.plot: str = title_data.get('plot') + self.release_date: str = title_data.get('release_date') + self.title_id: int = title_data.get('title_id') + self.created_at: str = title_data.get('created_at') + self.updated_at: str = title_data.get('updated_at') + self.episodes_count: int = title_data.get('episodes_count') + + def __str__(self): + return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})" + + +class TitleManager: + def __init__(self): + self.titles: List[Title] = [] + + def add_title(self, title_data: Dict[str, Union[int, str, None]]): + """ + Add a new title to the manager. + + Args: + title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title. + """ + title = Title(title_data) + self.titles.append(title) + + def get_title_by_index(self, index: int) -> Title: + """ + Get a title by its index. + + Args: + index (int): Index of the title to retrieve. + + Returns: + Title: The title object. + """ + return self.titles[index] + + def get_length(self) -> int: + """ + Get the number of titles in the manager. + + Returns: + int: Number of titles. + """ + return len(self.titles) + + def clear(self) -> None: + """ + This method clears the titles list. + + Args: + self: The object instance. + """ + self.titles.clear() + + def __str__(self): + return f"TitleManager(num_titles={len(self.titles)})" diff --git a/Src/Api/animeunity/Core/Class/WindowType.py b/Src/Api/animeunity/Core/Class/WindowType.py new file mode 100644 index 0000000..07acca8 --- /dev/null +++ b/Src/Api/animeunity/Core/Class/WindowType.py @@ -0,0 +1,160 @@ +# 03.03.24 + +import re +import logging + +from typing import Dict, Any + + +class WindowVideo: + def __init__(self, data: Dict[str, Any]): + self.data = data + self.id: int = data.get('id', '') + self.name: str = data.get('name', '') + self.filename: str = data.get('filename', '') + self.size: str = data.get('size', '') + self.quality: str = data.get('quality', '') + self.duration: str = data.get('duration', '') + self.views: int = data.get('views', '') + self.is_viewable: bool = data.get('is_viewable', '') + self.status: str = data.get('status', '') + self.fps: float = data.get('fps', '') + self.legacy: bool = data.get('legacy', '') + self.folder_id: int = data.get('folder_id', '') + self.created_at_diff: str = data.get('created_at_diff', '') + + def __str__(self): + return f"WindowVideo(id={self.id}, name='{self.name}', filename='{self.filename}', size='{self.size}', quality='{self.quality}', duration='{self.duration}', views={self.views}, is_viewable={self.is_viewable}, status='{self.status}', fps={self.fps}, legacy={self.legacy}, folder_id={self.folder_id}, created_at_diff='{self.created_at_diff}')" + +class WindowParameter: + def __init__(self, data: Dict[str, Any]): + self.data = data + self.token: str = data.get('token', '') + self.token360p: str = data.get('token360p', '') + self.token480p: str = data.get('token480p', '') + self.token720p: str = data.get('token720p', '') + self.token1080p: str = data.get('token1080p', '') + self.expires: str = data.get('expires', '') + + def __str__(self): + return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')" + + +class DynamicJSONConverter: + """ + Class for converting an input string into dynamic JSON. + """ + + def __init__(self, input_string: str): + """ + Initialize the converter with the input string. + + Args: + input_string (str): The input string to convert. + """ + self.input_string = input_string + self.json_data = {} + + def _parse_key_value(self, key: str, value: str): + """ + Parse a key-value pair. + + Args: + key (str): The key. + value (str): The value. + + Returns: + object: The parsed value. + """ + try: + value = value.strip() + + if value.startswith('{'): + return self._parse_json_object(value) + else: + return self._parse_non_json_value(value) + + except Exception as e: + logging.error(f"Error parsing key-value pair '{key}': {e}") + raise + + def _parse_json_object(self, obj_str: str): + """ + Parse a JSON object. + + Args: + obj_str (str): The string representation of the JSON object. + + Returns: + dict: The parsed JSON object. + """ + try: + # Use regular expression to find key-value pairs in the JSON object string + obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str)) + + # Strip double quotes from values and return the parsed dictionary + return {k: v.strip('"') for k, v in obj_dict.items()} + + except Exception as e: + logging.error(f"Error parsing JSON object: {e}") + raise + + def _parse_non_json_value(self, value: str): + """ + Parse a non-JSON value. + + Args: + value (str): The value to parse. + + Returns: + object: The parsed value. + """ + try: + + # Remove extra quotes and convert to lowercase + value = value.replace('"', "").strip().lower() + + if value.endswith('\n}'): + value = value.replace('\n}', '') + + # Check if the value matches 'true' or 'false' using regular expressions + if re.match(r'\btrue\b', value, re.IGNORECASE): + return True + + elif re.match(r'\bfalse\b', value, re.IGNORECASE): + return False + + return value + + except Exception as e: + logging.error(f"Error parsing non-JSON value: {e}") + raise + + def convert_to_dynamic_json(self): + """ + Convert the input string into dynamic JSON. + + Returns: + str: The JSON representation of the result. + """ + try: + + # Replace invalid characters with valid JSON syntax + self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}" + + # Find all key-value matches in the input string using regular expression + matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string) + + for match in matches: + key = match[0].strip() + value = match[1].strip() + + # Parse each key-value pair and add it to the json_data dictionary + self.json_data[key] = self._parse_key_value(key, value) + + # Convert the json_data dictionary to a formatted JSON string + return self.json_data + + except Exception as e: + logging.error(f"Error converting to dynamic JSON: {e}") + raise diff --git a/Src/Api/animeunity/Core/Player/vixcloud.py b/Src/Api/animeunity/Core/Player/vixcloud.py new file mode 100644 index 0000000..fad7472 --- /dev/null +++ b/Src/Api/animeunity/Core/Player/vixcloud.py @@ -0,0 +1,194 @@ +# 01.03.24 + +import sys +import logging +from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util._jsonConfig import config_manager + + +# Logic class +from ..Class.SeriesType import TitleManager +from ..Class.EpisodeType import EpisodeManager, Episode +from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter + + +# Variable +from ...costant import SITE_NAME + + +class VideoSource: + def __init__(self): + """ + Initialize a VideoSource object. + """ + self.headers = { + 'user-agent': get_headers() + } + self.is_series = False + self.base_name = SITE_NAME + self.domain = config_manager.get('SITE', self.base_name) + + def setup(self, media_id: int = None, series_name: str = None): + """ + Set up the class + + Args: + - media_id (int): The media ID to set. + - series_name (str): The series name to set. + """ + self.media_id = media_id + + if series_name is not None: + self.is_series = True + self.series_name = series_name + self.obj_title_manager: TitleManager = TitleManager() + self.obj_episode_manager: EpisodeManager = EpisodeManager() + + def get_count_episodes(self): + """ + Fetches the total count of episodes available for the anime. + + Returns: + int or None: Total count of episodes if successful, otherwise None. + """ + try: + + response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/") + response.raise_for_status() + + # Parse JSON response and return episode count + return response.json()["episodes_count"] + + except Exception as e: + logging.error(f"(EpisodeDownloader) Error fetching episode count: {e}") + return None + + def get_info_episode(self, index_ep: int) -> Episode: + """ + Fetches information about a specific episode. + + Args: + - index_ep (int): Index of the episode. + + Returns: + obj Episode or None: Information about the episode if successful, otherwise None. + """ + try: + + params = { + "start_range": index_ep, + "end_range": index_ep + 1 + } + + response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params) + response.raise_for_status() + + # Return information about the episode + json_data = response.json()["episodes"][-1] + return Episode(json_data) + + except Exception as e: + logging.error(f"(EpisodeDownloader) Error fetching episode information: {e}") + return None + + def get_embed(self, episode_id: int): + """ + Fetches the script text for a given episode ID. + + Args: + - episode_id (int): ID of the episode. + + Returns: + str or None: Script successful, otherwise None. + """ + try: + + response = httpx.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}") + response.raise_for_status() + + # Extract and clean embed URL + embed_url = response.text.strip() + self.iframe_src = embed_url + + # Fetch video content using embed URL + video_response = httpx.get(embed_url) + video_response.raise_for_status() + + + # Parse response with BeautifulSoup to get content of the scriot + soup = BeautifulSoup(video_response.text, "html.parser") + script = soup.find("body").find("script").text + + return script + + except Exception as e: + logging.error(f"(EpisodeDownloader) Error fetching embed URL: {e}") + return None + + def parse_script(self, script_text: str) -> None: + """ + Parse script text. + + Args: + - script_text (str): The script text to parse. + """ + try: + + converter = DynamicJSONConverter(script_text) + result = converter.convert_to_dynamic_json() + + # Create window video and parameter objects + self.window_video = WindowVideo(result['video']) + self.window_parameter = WindowParameter(result['masterPlaylist']) + + except Exception as e: + logging.error(f"Error parsing script: {e}") + raise + + def get_playlist(self) -> str: + """ + Get playlist. + + Returns: + - str: The playlist URL, or None if there's an error. + """ + + iframe_url = self.iframe_src + + # Create base uri for playlist + base_url = f'https://vixcloud.co/playlist/{self.window_video.id}' + query = urlencode(list(self.window_parameter.data.items())) + master_playlist_url = urljoin(base_url, '?' + query) + + # Parse the current query string and the master playlist URL query string + current_params = parse_qs(iframe_url[1:]) + m = urlparse(master_playlist_url) + master_params = parse_qs(m.query) + + # Create the final parameters dictionary with token and expires from the master playlist + final_params = { + "token": master_params.get("token", [""])[0], + "expires": master_params.get("expires", [""])[0] + } + + # Add conditional parameters + if "b" in current_params: + final_params["b"] = "1" + if "canPlayFHD" in current_params: + final_params["h"] = "1" + + # Construct the new query string and final URL + new_query = urlencode(final_params) # Encode final_params into a query string + new_url = m._replace(query=new_query) # Replace the old query string with the new one + final_url = urlunparse(new_url) # Construct the final URL from the modified parts + + return final_url diff --git a/Src/Api/animeunity/Core/Util/__init__.py b/Src/Api/animeunity/Core/Util/__init__.py new file mode 100644 index 0000000..6f629ba --- /dev/null +++ b/Src/Api/animeunity/Core/Util/__init__.py @@ -0,0 +1,8 @@ +# 21.05.24 + +from .get_domain import grab_au_top_level_domain as extract_domain + +from .manage_ep import ( + manage_selection, + map_episode_title +) \ No newline at end of file diff --git a/Src/Api/animeunity/Core/Util/get_domain.py b/Src/Api/animeunity/Core/Util/get_domain.py new file mode 100644 index 0000000..4194e69 --- /dev/null +++ b/Src/Api/animeunity/Core/Util/get_domain.py @@ -0,0 +1,108 @@ +# 02.04.24 + +import os +import threading +import logging + + +# External libraries +import httpx + + +# Internal utilities +from Src.Lib.Google import search as google_search + + + +def check_url_for_content(url: str, content: str) -> bool: + """ + Check if a URL contains specific content. + + Args: + - url (str): The URL to check. + - content (str): The content to search for in the response. + + Returns: + bool: True if the content is found, False otherwise. + """ + try: + + logging.info(f"Test site to extract domain: {url}") + response = httpx.get(url, timeout = 1) + response.raise_for_status() + + if content in response.text: + return True + + except Exception as e: + pass + + return False + + +def grab_top_level_domain(base_url: str, target_content: str) -> str: + """ + Get the top-level domain (TLD) from a list of URLs. + + Args: + - base_url (str): The base URL to construct complete URLs. + - target_content (str): The content to search for in the response. + + Returns: + str: The found TLD, if any. + """ + results = [] + threads = [] + path_file = os.path.join("Test", "data", "TLD", "tld_list.txt") + logging.info(f"Load file: {path_file}") + + def url_checker(url: str): + if check_url_for_content(url, target_content): + results.append(url.split(".")[-1]) + + if not os.path.exists(path_file): + raise FileNotFoundError("The file 'tld_list.txt' does not exist.") + + with open(path_file, "r") as file: + urls = [f"{base_url}.{x.strip().lower()}" for x in file] + + for url in urls: + thread = threading.Thread(target=url_checker, args=(url,)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + if results: + return results[-1] + + +def grab_top_level_domain_light(query: str) -> str: + """ + Get the top-level domain (TLD) using a light method via Google search. + + Args: + - query (str): The search query for Google search. + + Returns: + str: The found TLD, if any. + """ + for result in google_search(query, num=1, stop=1, pause=2): + return result.split(".", 2)[-1].replace("/", "") + + +def grab_au_top_level_domain(method: str) -> str: + """ + Get the top-level domain (TLD) for Anime Unity. + + Args: + - method (str): The method to use to obtain the TLD ("light" or "strong"). + + Returns: + str: The found TLD, if any. + """ + if method == "light": + return grab_top_level_domain_light("animeunity") + elif method == "strong": + return grab_top_level_domain("https://www.animeunity", '') diff --git a/Src/Api/animeunity/Core/Util/manage_ep.py b/Src/Api/animeunity/Core/Util/manage_ep.py new file mode 100644 index 0000000..331771c --- /dev/null +++ b/Src/Api/animeunity/Core/Util/manage_ep.py @@ -0,0 +1,74 @@ +# 02.05.24 + +import logging + +from typing import List + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +# Logic class +from ..Class.EpisodeType import Episode + + +# Config +MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') + + +def manage_selection(cmd_insert: str, max_count: int) -> List[int]: + """ + Manage user selection for seasons to download. + + Args: + - cmd_insert (str): User input for season selection. + - max_count (int): Maximum count of seasons available. + + Returns: + list_season_select (List[int]): List of selected seasons. + """ + list_season_select = [] + logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") + + # For a single number (e.g., '5') + if cmd_insert.isnumeric(): + list_season_select.append(int(cmd_insert)) + + # For a range (e.g., '[5-12]') + elif "[" in cmd_insert: + start, end = map(int, cmd_insert[1:-1].split('-')) + list_season_select = list(range(start, end + 1)) + + # For all seasons + elif cmd_insert == "*": + list_season_select = list(range(1, max_count+1)) + + # Return list of selected seasons) + logging.info(f"List return: {list_season_select}") + return list_season_select + + +def map_episode_title(tv_name: str, episode: Episode, number_season: int): + """ + Maps the episode title to a specific format. + + Args: + - tv_name (str): The name of the TV show. + - episode (Episode): The episode object. + - number_season (int): The season number. + + Returns: + str: The mapped episode title. + """ + map_episode_temp = MAP_EPISODE + map_episode_temp = map_episode_temp.replace("%(tv_name)", tv_name) + map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2)) + map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2)) + map_episode_temp = map_episode_temp.replace("%(episode_name)", episode.name) + + # Additional fix + map_episode_temp = map_episode_temp.replace(".", "_") + + logging.info(f"Map episode string return: {map_episode_temp}") + return map_episode_temp diff --git a/Src/Api/animeunity/__init__.py b/Src/Api/animeunity/__init__.py new file mode 100644 index 0000000..b7fe3ed --- /dev/null +++ b/Src/Api/animeunity/__init__.py @@ -0,0 +1,40 @@ +# 21.05.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, get_select_title +from .anime import donwload_film, donwload_series + + +# Variable +indice = 1 + + +def search(): + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = get_select_title() + + if select_title.type == 'TV': + donwload_series( + tv_id=select_title.id, + tv_name=select_title.slug + ) + + else: + donwload_film( + id_film=select_title.id, + title_name=select_title.slug + ) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/animeunity/anime.py b/Src/Api/animeunity/anime.py new file mode 100644 index 0000000..3d597ad --- /dev/null +++ b/Src/Api/animeunity/anime.py @@ -0,0 +1,111 @@ +# 11.03.24 + +import os +import logging + + +# Internal utilities +from Src.Util.console import console, msg +from Src.Lib.Hls.downloader import Downloader +from Src.Util.message import start_message + + +# Logic class +from .Core.Player.vixcloud import VideoSource +from .Core.Util import manage_selection + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER, MOVIE_FOLDER +video_source = VideoSource() + + + +def download_episode(index_select: int): + """ + Downloads the selected episode. + + Args: + - index_select (int): Index of the episode to download. + """ + + # Get information about the selected episode + obj_episode = video_source.get_info_episode(index_select) + + start_message() + console.print(f"[yellow]Download: [red]EP_{obj_episode.number} \n") + + # Get the embed URL for the episode + embed_url = video_source.get_embed(obj_episode.id) + + # Parse parameter in embed text + video_source.parse_script(embed_url) + + # Create output path + mp4_path = None + mp4_name = f"{index_select + 1}.mp4" + if video_source.is_series: + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, video_source.series_name) + else: + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, video_source.series_name) + + # Start downloading + Downloader( + m3u8_playlist = video_source.get_playlist(), + output_filename = os.path.join(mp4_path, mp4_name) + ).start() + + +def donwload_series(tv_id: int, tv_name: str): + """ + Function to download episodes of a TV series. + + Args: + - tv_id (int): The ID of the TV series. + - tv_name (str): The name of the TV series. + """ + + # Set up video source + video_source.setup( + media_id = tv_id, + series_name = tv_name + ) + + # Get the count of episodes for the TV series + episoded_count = video_source.get_count_episodes() + console.log(f"[cyan]Episodes find: [red]{episoded_count}") + + # Prompt user to select an episode index + last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media") + + # Manage user selection + list_episode_select = manage_selection(last_command, episoded_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + download_episode(list_episode_select[0]-1) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + download_episode(i_episode-1) + + +def donwload_film(id_film: int, title_name: str): + """ + Function to download a film. + + Args: + - id_film (int): The ID of the film. + - title_name (str): The title of the film. + """ + + # Set up video source + video_source.setup( + media_id = id_film, + series_name = title_name + ) + video_source.is_series = False + + # Start download + download_episode(0) diff --git a/Src/Api/animeunity/costant.py b/Src/Api/animeunity/costant.py new file mode 100644 index 0000000..6275469 --- /dev/null +++ b/Src/Api/animeunity/costant.py @@ -0,0 +1,15 @@ +# 26.05.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) + +SERIES_FOLDER= "Serie" +MOVIE_FOLDER = "Movie" diff --git a/Src/Api/animeunity/site.py b/Src/Api/animeunity/site.py new file mode 100644 index 0000000..6744379 --- /dev/null +++ b/Src/Api/animeunity/site.py @@ -0,0 +1,237 @@ +# 10.12.23 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.table import TVShowManager +from Src.Util.console import console +from Src.Util._jsonConfig import config_manager + + +# Logic class +from .Core.Util import extract_domain +from .Core.Class.SearchType import MediaManager, MediaItem + + +# Variable +from .costant import SITE_NAME, DOMAIN_NOW +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def get_token(site_name: str, domain: str) -> dict: + """ + Function to retrieve session tokens from a specified website. + + Args: + - site_name (str): The name of the site. + - domain (str): The domain of the site. + + Returns: + - dict: A dictionary containing session tokens. The keys are 'XSRF_TOKEN', 'animeunity_session', and 'csrf_token'. + """ + + # Send a GET request to the specified URL composed of the site name and domain + response = httpx.get(f"https://www.{site_name}.{domain}") + response.raise_for_status() + + # Initialize variables to store CSRF token + find_csrf_token = None + + # Parse the HTML response using BeautifulSoup + soup = BeautifulSoup(response.text, "html.parser") + + # Loop through all meta tags in the HTML response + for html_meta in soup.find_all("meta"): + + # Check if the meta tag has a 'name' attribute equal to "csrf-token" + if html_meta.get('name') == "csrf-token": + + # If found, retrieve the content of the meta tag, which is the CSRF token + find_csrf_token = html_meta.get('content') + + logging.info(f"Extract: ('animeunity_session': {response.cookies['animeunity_session']}, 'csrf_token': {find_csrf_token})") + return { + 'animeunity_session': response.cookies['animeunity_session'], + 'csrf_token': find_csrf_token + } + + +def update_domain(): + """ + Update the domain for the anime streaming site. + + This function tests the accessibility of the current anime streaming site. + If the current domain is inaccessible, it attempts to obtain and set a new domain. + It uses the 'light' method to extract a new domain from Anime Unity. + """ + + # Test current site's accessibility + try: + + console.log(f"[cyan]Test site: [red]https://{SITE_NAME}.{DOMAIN_NOW}") + response = httpx.get(f"https://www.{SITE_NAME}.{DOMAIN_NOW}") + response.status_code + + # If the current site is inaccessible, try to obtain a new domain + except Exception as e: + + # Get new domain + console.print("[red]\nExtract new DOMAIN from TLD list.") + new_domain = extract_domain(method="light") + console.log(f"[cyan]Extract new domain: [red]{new_domain}") + + if new_domain: + + # Update configuration with the new domain + config_manager.set_key('SITE', SITE_NAME, new_domain) + config_manager.write_config() + + else: + logging.error("Failed to find a new animeunity domain") + sys.exit(0) + + +def get_real_title(record): + """ + Get the real title from a record. + + This function takes a record, which is assumed to be a dictionary representing a row of JSON data. + It looks for a title in the record, prioritizing English over Italian titles if available. + + Args: + - record (dict): A dictionary representing a row of JSON data. + + Returns: + - str: The title found in the record. If no title is found, returns None. + """ + + if record['title'] is not None: + return record['title'] + + elif record['title_eng'] is not None: + return record['title_eng'] + + else: + return record['title_it'] + + +def title_search(title: str) -> int: + """ + Function to perform an anime search using a provided title. + + Args: + - title_search (str): The title to search for. + + Returns: + - int: A number containing the length of media search manager. + """ + + # Update domain + update_domain() + + # Get token and session value from configuration + url_domain = config_manager.get('SITE', SITE_NAME) + data = get_token(SITE_NAME, url_domain) + + # Prepare cookies to be used in the request + cookies = { + 'animeunity_session': data.get('animeunity_session') + } + + # Prepare headers for the request + headers = { + 'accept': 'application/json, text/plain, */*', + 'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7', + 'content-type': 'application/json;charset=UTF-8', + 'x-csrf-token': data.get('csrf_token') + } + + # Prepare JSON data to be sent in the request + json_data = { + 'title': unidecode(title) # Use the provided title for the search + } + + # Send a POST request to the API endpoint for live search + response = httpx.post(f'https://www.{SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json=json_data) + response.raise_for_status() + + # Process each record returned in the response + for record in response.json()['records']: + + # Rename keys for consistency + record['name'] = get_real_title(record) + record['last_air_date'] = record.pop('date') + + # Add the record to media search manager if the name is not None + media_search_manager.add_media(record) + + # Return the length of media search manager + return media_search_manager.get_length() + + + +def get_select_title(type_filter: list = None) -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Args: + - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + "Score": {'color': 'cyan'}, + "Date": {'color': 'green'} + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + + # Filter for only a list of category + if type_filter is not None: + if str(media.type) not in type_filter: + continue + + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + 'Score': media.score, + 'Date': media.last_air_date + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + table_show_manager.clear() + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) diff --git a/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py b/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py new file mode 100644 index 0000000..c5dd366 --- /dev/null +++ b/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py @@ -0,0 +1,85 @@ +# 13.06.24 + +import sys +import logging + +from typing import List, Dict + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util._jsonConfig import config_manager + + +# Logic class +from .SearchType import MediaItem + + + +class GetSerieInfo: + + def __init__(self, dict_serie: MediaItem) -> None: + """ + Initializes the GetSerieInfo object with default values. + + Args: + dict_serie (MediaItem): Dictionary containing series information (optional). + """ + self.headers = {'user-agent': get_headers()} + self.cookies = config_manager.get_dict('REQUESTS', 'index') + self.url = dict_serie.url + self.tv_name = None + self.list_episodes = None + + def get_episode_number(self) -> List[Dict[str, str]]: + """ + Retrieves the number of episodes for a specific season. + + Args: + n_season (int): The season number. + + Returns: + List[Dict[str, str]]: List of dictionaries containing episode information. + """ + + # Make an HTTP request to the series URL + try: + response = httpx.get(self.url + "?area=online", cookies=self.cookies, headers=self.headers) + response.raise_for_status() + + except Exception as e: + logging.error(f"Insert: ['ips4_device_key': 'your_code', 'ips4_member_id': 'your_code', 'ips4_login_key': 'your_code'] in config.json file REQUESTS -> index, instead of user-agent. Use browser debug and cookie request with a valid account, filter by DOC.") + sys.exit(0) + + # Parse HTML content of the page + soup = BeautifulSoup(response.text, "html.parser") + + # Get tv name + self.tv_name = soup.find("span", class_= "ipsType_break").get_text(strip=True) + + # Find the container of episodes for the specified season + table_content = soup.find('div', class_='ipsMargin_bottom:half') + list_dict_episode = [] + + for episode_div in table_content.find_all('a', href=True): + + # Get text of episode + part_name = episode_div.get_text(strip=True) + + if part_name: + link = episode_div['href'] + + obj_episode = { + 'name': part_name, + 'url': link + } + list_dict_episode.append(obj_episode) + + self.list_episodes = list_dict_episode + return list_dict_episode + \ No newline at end of file diff --git a/Src/Api/ddlstreamitaly/Core/Class/SearchType.py b/Src/Api/ddlstreamitaly/Core/Class/SearchType.py new file mode 100644 index 0000000..f291f34 --- /dev/null +++ b/Src/Api/ddlstreamitaly/Core/Class/SearchType.py @@ -0,0 +1,60 @@ +# 13.06.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('name') + self.type: str = data.get('type') + self.url: int = data.get('url') + + def __str__(self): + return f"MediaItem(name='{self.name}', type='{self.type}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/ddlstreamitaly/Core/Player/ddl.py b/Src/Api/ddlstreamitaly/Core/Player/ddl.py new file mode 100644 index 0000000..64857b7 --- /dev/null +++ b/Src/Api/ddlstreamitaly/Core/Player/ddl.py @@ -0,0 +1,83 @@ +# 14.06.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util._jsonConfig import config_manager + + +class VideoSource: + + def __init__(self) -> None: + """ + Initializes the VideoSource object with default values. + + Attributes: + headers (dict): A dictionary to store HTTP headers. + cookie (dict): A dictionary to store cookies. + """ + self.headers = {'user-agent': get_headers()} + self.cookie = config_manager.get_dict('REQUESTS', 'index') + + def setup(self, url: str) -> None: + """ + Sets up the video source with the provided URL. + + Args: + url (str): The URL of the video source. + """ + self.url = url + + def make_request(self, url: str) -> str: + """ + Make an HTTP GET request to the provided URL. + + Args: + url (str): The URL to make the request to. + + Returns: + str: The response content if successful, None otherwise. + """ + try: + response = httpx.get(url, headers=self.headers, cookies=self.cookie) + response.raise_for_status() + return response.text + except httpx.HTTPStatusError as http_err: + logging.error(f"HTTP error occurred: {http_err}") + except Exception as err: + logging.error(f"An error occurred: {err}") + return None + + def get_playlist(self): + """ + Retrieves the playlist URL from the video source. + + Returns: + tuple: The mp4 link if found, None otherwise. + """ + try: + text = self.make_request(self.url) + + if text: + soup = BeautifulSoup(text, "html.parser") + source = soup.find("source") + + if source: + mp4_link = source.get("src") + return mp4_link + + else: + logging.error("No tag found in the HTML.") + else: + logging.error("Failed to retrieve content from the URL.") + + except Exception as e: + logging.error(f"An error occurred while parsing the playlist: {e}") diff --git a/Src/Api/ddlstreamitaly/Core/Util/manage_ep.py b/Src/Api/ddlstreamitaly/Core/Util/manage_ep.py new file mode 100644 index 0000000..06b24fd --- /dev/null +++ b/Src/Api/ddlstreamitaly/Core/Util/manage_ep.py @@ -0,0 +1,71 @@ +# 02.05.24 + +import logging + +from typing import List + + +# Internal utilities +from Src.Util._jsonConfig import config_manager +from Src.Util.os import remove_special_characters + + +# Config +MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') + + +def manage_selection(cmd_insert: str, max_count: int) -> List[int]: + """ + Manage user selection for seasons to download. + + Args: + - cmd_insert (str): User input for season selection. + - max_count (int): Maximum count of seasons available. + + Returns: + list_season_select (List[int]): List of selected seasons. + """ + list_season_select = [] + logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") + + # For a single number (e.g., '5') + if cmd_insert.isnumeric(): + list_season_select.append(int(cmd_insert)) + + # For a range (e.g., '[5-12]') + elif "[" in cmd_insert: + start, end = map(int, cmd_insert[1:-1].split('-')) + list_season_select = list(range(start, end + 1)) + + # For all seasons + elif cmd_insert == "*": + list_season_select = list(range(1, max_count+1)) + + # Return list of selected seasons) + logging.info(f"List return: {list_season_select}") + return list_season_select + +def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str: + """ + Maps the episode title to a specific format. + + Args: + tv_name (str): The name of the TV show. + number_season (int): The season number. + episode_number (int): The episode number. + episode_name (str): The original name of the episode. + + Returns: + str: The mapped episode title. + """ + map_episode_temp = MAP_EPISODE + map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) + map_episode_temp = map_episode_temp.replace("%(season)", str(number_season)) + map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number)) + map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name)) + + # Additional fix + map_episode_temp = map_episode_temp.replace(".", "_") + + logging.info(f"Map episode string return: {map_episode_temp}") + return map_episode_temp diff --git a/Src/Api/ddlstreamitaly/__init__.py b/Src/Api/ddlstreamitaly/__init__.py new file mode 100644 index 0000000..411fa85 --- /dev/null +++ b/Src/Api/ddlstreamitaly/__init__.py @@ -0,0 +1,43 @@ +# 09.06.24 + +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, get_select_title +from .series import download_thread + + +# Variable +indice = 3 + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = get_select_title() + + # Download only film + if "Serie TV" in str(select_title.type): + download_thread(select_title) + + else: + logging.error(f"Not supported: {select_title.type}") + sys.exit(0) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/ddlstreamitaly/costant.py b/Src/Api/ddlstreamitaly/costant.py new file mode 100644 index 0000000..9c7e339 --- /dev/null +++ b/Src/Api/ddlstreamitaly/costant.py @@ -0,0 +1,15 @@ +# 09.06.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) + +MOVIE_FOLDER = "Movie" +SERIES_FOLDER = "Serie" diff --git a/Src/Api/ddlstreamitaly/series.py b/Src/Api/ddlstreamitaly/series.py new file mode 100644 index 0000000..8903fdc --- /dev/null +++ b/Src/Api/ddlstreamitaly/series.py @@ -0,0 +1,135 @@ +# 13.06.24 + +import os +import sys +import logging +from urllib.parse import urlparse + + +# Internal utilities +from Src.Util.color import Colors +from Src.Util.console import console, msg +from Src.Util.os import create_folder, can_create_file +from Src.Util.table import TVShowManager +from Src.Util.message import start_message +from Src.Lib.Hls.download_mp4 import MP4_downloader + + +# Logic class +from .Core.Class.SearchType import MediaItem +from .Core.Class.ScrapeSerie import GetSerieInfo +from .Core.Util.manage_ep import manage_selection, map_episode_title +from .Core.Player.ddl import VideoSource + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER +table_show_manager = TVShowManager() +video_source = VideoSource() + + +def donwload_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) -> None: + """ + Download a single episode video. + + Args: + - tv_name (str): Name of the TV series. + - index_episode_selected (int): Index of the selected episode. + """ + + start_message() + + # Get info about episode + obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1] + console.print(f"[yellow]Download: [red]{obj_episode.get('name')}") + print() + + # Define filename and path for the downloaded video + mp4_name = f"{map_episode_title(scape_info_serie.tv_name, None, index_episode_selected, obj_episode.get('name'))}.mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name) + + # Check if can create file output + create_folder(mp4_path) + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + + # Setup video source + video_source.setup(obj_episode.get('url')) + + # Get m3u8 master playlist + master_playlist = video_source.get_playlist() + + # Parse start page url + start_message() + parsed_url = urlparse(obj_episode.get('url')) + path_parts = parsed_url.path.split('/') + + MP4_downloader( + url = master_playlist, + path = os.path.join(mp4_path, mp4_name), + referer = f"{parsed_url.scheme}://{parsed_url.netloc}/", + add_desc=f"{Colors.MAGENTA}video" + ) + + +def download_thread(dict_serie: MediaItem): + """Download all episode of a thread""" + + # Start message and set up video source + start_message() + + # Init class + scape_info_serie = GetSerieInfo(dict_serie) + + # Collect information about thread + list_dict_episode = scape_info_serie.get_episode_number() + episodes_count = len(list_dict_episode) + + # Display episodes list and manage user selection + last_command = display_episodes_list(list_dict_episode) + list_episode_select = manage_selection(last_command, episodes_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + donwload_video(scape_info_serie, list_episode_select[0]) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + donwload_video(scape_info_serie, i_episode) + + +def display_episodes_list(obj_episode_manager) -> str: + """ + Display episodes list and handle user input. + + Returns: + last_command (str): Last command entered by the user. + """ + + # Set up table for displaying episodes + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with episodes information + for i, media in enumerate(obj_episode_manager): + table_show_manager.add_tv_show({ + 'Index': str(i+1), + 'Name': media.get('name'), + }) + + # Run the table and handle user input + last_command = table_show_manager.run() + + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + return last_command diff --git a/Src/Api/ddlstreamitaly/site.py b/Src/Api/ddlstreamitaly/site.py new file mode 100644 index 0000000..ad0495b --- /dev/null +++ b/Src/Api/ddlstreamitaly/site.py @@ -0,0 +1,126 @@ +# 09.06.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.table import TVShowManager +from Src.Util.console import console, msg +from Src.Util._jsonConfig import config_manager +from Src.Util.headers import get_headers + + +# Logic class +from .Core.Class.SearchType import MediaManager, MediaItem + + +# Variable +from .costant import SITE_NAME, DOMAIN_NOW +cookie_index = config_manager.get_dict('REQUESTS', 'index') +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(word_to_search) -> int: + """ + Search for titles based on a search query. + """ + try: + + # Send request to search for titles + response = httpx.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/search/?&q={word_to_search}&quick=1&type=videobox_video&nodes=11", headers={'user-agent': get_headers()}) + response.raise_for_status() + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + table_content = soup.find('ol', class_="ipsStream") + + if table_content: + for title_div in table_content.find_all('li', class_='ipsStreamItem'): + try: + title_type = title_div.find("p", class_="ipsType_reset").find_all("a")[-1].get_text(strip=True) + name = title_div.find("span", class_="ipsContained").find("a").get_text(strip=True) + link = title_div.find("span", class_="ipsContained").find("a").get("href") + + title_info = { + 'name': name, + 'url': link, + 'type': title_type + } + + media_search_manager.add_media(title_info) + + except Exception as e: + logging.error(f"Error processing title div: {e}") + + # Return the number of titles found + return media_search_manager.get_length() + + else: + logging.error("No table content found.") + return -999 + + except Exception as err: + logging.error(f"An error occurred: {err}") + + return -9999 + + +def get_select_title(type_filter: list = None) -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Args: + - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + + # Filter for only a list of category + if type_filter is not None: + if str(media.type) not in type_filter: + continue + + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + table_show_manager.clear() + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) diff --git a/Src/Api/guardaserie/Core/Class/ScrapeSerie.py b/Src/Api/guardaserie/Core/Class/ScrapeSerie.py new file mode 100644 index 0000000..2275a43 --- /dev/null +++ b/Src/Api/guardaserie/Core/Class/ScrapeSerie.py @@ -0,0 +1,113 @@ +# 13.06.24 + +import sys +import logging + +from typing import List, Dict + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers + + +# Logic class +from .SearchType import MediaItem + + + +class GetSerieInfo: + + def __init__(self, dict_serie: MediaItem) -> None: + """ + Initializes the GetSerieInfo object with default values. + + Args: + dict_serie (MediaItem): Dictionary containing series information (optional). + """ + self.headers = {'user-agent': get_headers()} + self.url = dict_serie.url + self.tv_name = None + self.list_episodes = None + + def get_seasons_number(self) -> int: + """ + Retrieves the number of seasons of a TV series. + + Returns: + int: Number of seasons of the TV series. + """ + try: + + # Make an HTTP request to the series URL + response = httpx.get(self.url, headers=self.headers, timeout=10) + response.raise_for_status() + + # Parse HTML content of the page + soup = BeautifulSoup(response.text, "html.parser") + + # Find the container of seasons + table_content = soup.find('div', class_="tt_season") + + # Count the number of seasons + seasons_number = len(table_content.find_all("li")) + + # Extract the name of the series + self.tv_name = soup.find("h1", class_="front_title").get_text(strip=True) + + return seasons_number + + except Exception as e: + logging.error(f"Error parsing HTML page: {e}") + + return -999 + + def get_episode_number(self, n_season: int) -> List[Dict[str, str]]: + """ + Retrieves the number of episodes for a specific season. + + Args: + n_season (int): The season number. + + Returns: + List[Dict[str, str]]: List of dictionaries containing episode information. + """ + try: + + # Make an HTTP request to the series URL + response = httpx.get(self.url, headers=self.headers) + response.raise_for_status() + + # Parse HTML content of the page + soup = BeautifulSoup(response.text, "html.parser") + + # Find the container of episodes for the specified season + table_content = soup.find('div', class_="tab-pane", id=f"season-{n_season}") + + # Extract episode information + episode_content = table_content.find_all("li") + list_dict_episode = [] + + for episode_div in episode_content: + index = episode_div.find("a").get("data-num") + link = episode_div.find("a").get("data-link") + name = episode_div.find("a").get("data-title") + + obj_episode = { + 'number': index, + 'name': name, + 'url': link + } + list_dict_episode.append(obj_episode) + + self.list_episodes = list_dict_episode + return list_dict_episode + + except Exception as e: + logging.error(f"Error parsing HTML page: {e}") + + return [] diff --git a/Src/Api/guardaserie/Core/Class/SearchType.py b/Src/Api/guardaserie/Core/Class/SearchType.py new file mode 100644 index 0000000..6a45d67 --- /dev/null +++ b/Src/Api/guardaserie/Core/Class/SearchType.py @@ -0,0 +1,61 @@ +# 26.05.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('name') + self.type: str = "serie" + self.score: str = data.get('score') + self.url: int = data.get('url') + + def __str__(self): + return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/guardaserie/Core/Player/supervideo.py b/Src/Api/guardaserie/Core/Player/supervideo.py new file mode 100644 index 0000000..e97e856 --- /dev/null +++ b/Src/Api/guardaserie/Core/Player/supervideo.py @@ -0,0 +1,123 @@ +# 26.05.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.os import run_node_script + + +class VideoSource: + + def __init__(self) -> None: + """ + Initializes the VideoSource object with default values. + + Attributes: + headers (dict): An empty dictionary to store HTTP headers. + """ + self.headers = {'user-agent': get_headers()} + + def setup(self, url: str) -> None: + """ + Sets up the video source with the provided URL. + + Args: + url (str): The URL of the video source. + """ + self.url = url + + def make_request(self, url: str) -> str: + """ + Make an HTTP GET request to the provided URL. + + Args: + url (str): The URL to make the request to. + + Returns: + str: The response content if successful, None otherwise. + """ + + try: + response = httpx.get(url, headers=self.headers, follow_redirects=True) + response.raise_for_status() + return response.text + + except Exception as e: + logging.error(f"Request failed: {e}") + return None + + def parse_html(self, html_content: str) -> BeautifulSoup: + """ + Parse the provided HTML content using BeautifulSoup. + + Args: + html_content (str): The HTML content to parse. + + Returns: + BeautifulSoup: Parsed HTML content if successful, None otherwise. + """ + + try: + soup = BeautifulSoup(html_content, "html.parser") + return soup + + except Exception as e: + logging.error(f"Failed to parse HTML content: {e}") + return None + + def get_result_node_js(self, soup): + """ + Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL. + + Args: + soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content. + + Returns: + str: The output from the Node.js script, or None if the script cannot be found or executed. + """ + for script in soup.find_all("script"): + if "eval" in str(script): + new_script = str(script.text).replace("eval", "var a = ") + new_script = new_script.replace(")))", ")));console.log(a);") + return run_node_script(new_script) + + return None + + def get_playlist(self) -> str: + """ + Download a video from the provided URL. + + Returns: + str: The URL of the downloaded video if successful, None otherwise. + """ + try: + html_content = self.make_request(self.url) + if not html_content: + logging.error("Failed to fetch HTML content.") + return None + + soup = self.parse_html(html_content) + if not soup: + logging.error("Failed to parse HTML content.") + return None + + result = self.get_result_node_js(soup) + if not result: + logging.error("No video URL found in script.") + return None + + master_playlist = str(result).split(":")[3].split('"}')[0] + return f"https:{master_playlist}" + + except Exception as e: + logging.error(f"An error occurred: {e}") + return None + \ No newline at end of file diff --git a/Src/Api/guardaserie/Core/Util/manage_ep.py b/Src/Api/guardaserie/Core/Util/manage_ep.py new file mode 100644 index 0000000..06b24fd --- /dev/null +++ b/Src/Api/guardaserie/Core/Util/manage_ep.py @@ -0,0 +1,71 @@ +# 02.05.24 + +import logging + +from typing import List + + +# Internal utilities +from Src.Util._jsonConfig import config_manager +from Src.Util.os import remove_special_characters + + +# Config +MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') + + +def manage_selection(cmd_insert: str, max_count: int) -> List[int]: + """ + Manage user selection for seasons to download. + + Args: + - cmd_insert (str): User input for season selection. + - max_count (int): Maximum count of seasons available. + + Returns: + list_season_select (List[int]): List of selected seasons. + """ + list_season_select = [] + logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") + + # For a single number (e.g., '5') + if cmd_insert.isnumeric(): + list_season_select.append(int(cmd_insert)) + + # For a range (e.g., '[5-12]') + elif "[" in cmd_insert: + start, end = map(int, cmd_insert[1:-1].split('-')) + list_season_select = list(range(start, end + 1)) + + # For all seasons + elif cmd_insert == "*": + list_season_select = list(range(1, max_count+1)) + + # Return list of selected seasons) + logging.info(f"List return: {list_season_select}") + return list_season_select + +def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str: + """ + Maps the episode title to a specific format. + + Args: + tv_name (str): The name of the TV show. + number_season (int): The season number. + episode_number (int): The episode number. + episode_name (str): The original name of the episode. + + Returns: + str: The mapped episode title. + """ + map_episode_temp = MAP_EPISODE + map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) + map_episode_temp = map_episode_temp.replace("%(season)", str(number_season)) + map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number)) + map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name)) + + # Additional fix + map_episode_temp = map_episode_temp.replace(".", "_") + + logging.info(f"Map episode string return: {map_episode_temp}") + return map_episode_temp diff --git a/Src/Api/guardaserie/__init__.py b/Src/Api/guardaserie/__init__.py new file mode 100644 index 0000000..e7e05c7 --- /dev/null +++ b/Src/Api/guardaserie/__init__.py @@ -0,0 +1,34 @@ +# 09.06.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, get_select_title +from .series import download_series + + +# Variable +indice = 4 + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = get_select_title() + + # Download only film + download_series(select_title) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/guardaserie/costant.py b/Src/Api/guardaserie/costant.py new file mode 100644 index 0000000..4072af4 --- /dev/null +++ b/Src/Api/guardaserie/costant.py @@ -0,0 +1,14 @@ +# 09.06.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) + +SERIES_FOLDER = "Serie" diff --git a/Src/Api/guardaserie/series.py b/Src/Api/guardaserie/series.py new file mode 100644 index 0000000..4f677ac --- /dev/null +++ b/Src/Api/guardaserie/series.py @@ -0,0 +1,164 @@ +# 13.06.24 + +import os +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg +from Src.Util.table import TVShowManager +from Src.Util.message import start_message +from Src.Lib.Hls.downloader import Downloader + + +# Logic class +from .Core.Class.SearchType import MediaItem +from .Core.Class.ScrapeSerie import GetSerieInfo +from .Core.Util.manage_ep import manage_selection, map_episode_title +from .Core.Player.supervideo import VideoSource + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER +table_show_manager = TVShowManager() +video_source = VideoSource() + + +def donwload_video(scape_info_serie: GetSerieInfo, index_season_selected: int, index_episode_selected: int) -> None: + """ + Download a single episode video. + + Args: + - tv_name (str): Name of the TV series. + - index_season_selected (int): Index of the selected season. + - index_episode_selected (int): Index of the selected episode. + """ + + start_message() + + # Get info about episode + obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1] + console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}") + print() + + # Define filename and path for the downloaded video + mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}") + + # Setup video source + video_source.setup(obj_episode.get('url')) + + # Get m3u8 master playlist + master_playlist = video_source.get_playlist() + + Downloader( + m3u8_playlist = master_playlist, + output_filename = os.path.join(mp4_path, mp4_name) + ).start() + + +def donwload_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, donwload_all: bool = False) -> None: + """ + Download all episodes of a season. + + Args: + - tv_name (str): Name of the TV series. + - index_season_selected (int): Index of the selected season. + - donwload_all (bool): Donwload all seasons episodes + """ + + # Start message and collect information about episodes + start_message() + list_dict_episode = scape_info_serie.get_episode_number(index_season_selected) + episodes_count = len(list_dict_episode) + + # Download all episodes wihtout ask + if donwload_all: + for i_episode in range(1, episodes_count+1): + donwload_video(scape_info_serie, index_season_selected, i_episode) + + console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.") + + # If not download all episode but a single season + if not donwload_all: + + # Display episodes list and manage user selection + last_command = display_episodes_list(scape_info_serie.list_episodes) + list_episode_select = manage_selection(last_command, episodes_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + donwload_video(scape_info_serie, index_season_selected, list_episode_select[0]) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + donwload_video(scape_info_serie, index_season_selected, i_episode) + + +def download_series(dict_serie: MediaItem) -> None: + + # Start message and set up video source + start_message() + + # Init class + scape_info_serie = GetSerieInfo(dict_serie) + + # Collect information about seasons + seasons_count = scape_info_serie.get_seasons_number() + + # Prompt user for season selection and download episodes + console.print(f"\n[green]Season find: [red]{seasons_count}") + index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")) + list_season_select = manage_selection(index_season_selected, seasons_count) + + # Download selected episodes + if len(list_season_select) == 1 and index_season_selected != "*": + if 1 <= int(index_season_selected) <= seasons_count: + donwload_episode(scape_info_serie, list_season_select[0]) + + # Dowload all seasons and episodes + elif index_season_selected == "*": + for i_season in list_season_select: + donwload_episode(scape_info_serie, i_season, True) + + # Download all other season selecter + else: + for i_season in list_season_select: + donwload_episode(scape_info_serie, i_season) + + +def display_episodes_list(obj_episode_manager) -> str: + """ + Display episodes list and handle user input. + + Returns: + last_command (str): Last command entered by the user. + """ + + # Set up table for displaying episodes + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with episodes information + for media in obj_episode_manager: + table_show_manager.add_tv_show({ + 'Index': str(media.get('number')), + 'Name': media.get('name'), + }) + + # Run the table and handle user input + last_command = table_show_manager.run() + + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + return last_command diff --git a/Src/Api/guardaserie/site.py b/Src/Api/guardaserie/site.py new file mode 100644 index 0000000..74c6e3b --- /dev/null +++ b/Src/Api/guardaserie/site.py @@ -0,0 +1,115 @@ +# 09.06.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.table import TVShowManager +from Src.Util.console import console, msg +from Src.Util.headers import get_headers + + +# Logic class +from .Core.Class.SearchType import MediaManager, MediaItem + + +# Variable +from .costant import DOMAIN_NOW +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + +def title_search(word_to_search) -> int: + """ + Search for titles based on a search query. + """ + + # Send request to search for titles + response = httpx.get(f"https://guardaserie.{DOMAIN_NOW}/?story={word_to_search}&do=search&subaction=search", headers={'user-agent': get_headers()}) + response.raise_for_status() + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + table_content = soup.find('div', class_="mlnew-list") + + for serie_div in table_content.find_all('div', class_='mlnew'): + + try: + title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True) + link = serie_div.find('div', class_='mlnh-2').find('a')['href'] + imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True) + + serie_info = { + 'name': title, + 'url': link, + 'score': imdb_rating + } + + media_search_manager.add_media(serie_info) + + except: + pass + + # Return the number of titles found + return media_search_manager.get_length() + + +def get_select_title(type_filter: list = None) -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Args: + - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + "Score": {'color': 'cyan'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + + # Filter for only a list of category + if type_filter is not None: + if str(media.type) not in type_filter: + continue + + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + 'Score': media.score, + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + table_show_manager.clear() + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) diff --git a/Src/Api/streamingcommunity/Core/Class/EpisodeType.py b/Src/Api/streamingcommunity/Core/Class/EpisodeType.py new file mode 100644 index 0000000..e98fd62 --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Class/EpisodeType.py @@ -0,0 +1,90 @@ +# 03.03.24 + +from typing import Dict, Any, List + + +# Variable +from ...costant import SITE_NAME, DOMAIN_NOW + + + +class Image: + def __init__(self, image_data: Dict[str, Any]): + self.id: int = image_data.get('id', '') + self.filename: str = image_data.get('filename', '') + self.type: str = image_data.get('type', '') + self.imageable_type: str = image_data.get('imageable_type', '') + self.imageable_id: int = image_data.get('imageable_id', '') + self.created_at: str = image_data.get('created_at', '') + self.updated_at: str = image_data.get('updated_at', '') + self.original_url_field: str = image_data.get('original_url_field', '') + self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" + + def __str__(self): + return f"Image(id={self.id}, filename='{self.filename}', type='{self.type}', imageable_type='{self.imageable_type}', url='{self.url}')" + + +class Episode: + def __init__(self, data: Dict[str, Any]): + self.id: int = data.get('id', '') + self.number: int = data.get('number', '') + self.name: str = data.get('name', '') + self.plot: str = data.get('plot', '') + self.duration: int = data.get('duration', '') + self.scws_id: int = data.get('scws_id', '') + self.season_id: int = data.get('season_id', '') + self.created_by: str = data.get('created_by', '') + self.created_at: str = data.get('created_at', '') + self.updated_at: str = data.get('updated_at', '') + self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] + + def __str__(self): + return f"Episode(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', duration={self.duration} sec)" + + +class EpisodeManager: + def __init__(self): + self.episodes: List[Episode] = [] + + def add_episode(self, episode_data: Dict[str, Any]): + """ + Add a new episode to the manager. + + Args: + - episode_data (Dict[str, Any]): A dictionary containing data for the new episode. + """ + episode = Episode(episode_data) + self.episodes.append(episode) + + def get_episode_by_index(self, index: int) -> Episode: + """ + Get an episode by its index. + + Args: + - index (int): Index of the episode to retrieve. + + Returns: + Episode: The episode object. + """ + return self.episodes[index] + + def get_length(self) -> int: + """ + Get the number of episodes in the manager. + + Returns: + int: Number of episodes. + """ + return len(self.episodes) + + def clear(self) -> None: + """ + This method clears the episodes list. + + Args: + - self: The object instance. + """ + self.episodes.clear() + + def __str__(self): + return f"EpisodeManager(num_episodes={len(self.episodes)})" diff --git a/Src/Api/streamingcommunity/Core/Class/PreviewType.py b/Src/Api/streamingcommunity/Core/Class/PreviewType.py new file mode 100644 index 0000000..28d741e --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Class/PreviewType.py @@ -0,0 +1,63 @@ +# 12.04.24 + +class Preview: + def __init__(self, data): + self.id = data.get("id") + self.title_id = data.get("title_id") + self.created_at = data.get("created_at") + self.updated_at = data.get("updated_at") + self.video_id = data.get("video_id") + self.is_viewable = data.get("is_viewable") + self.zoom_factor = data.get("zoom_factor") + self.filename = data.get("filename") + self.embed_url = data.get("embed_url") + + def __str__(self): + return f"Preview: ID={self.id}, Title ID={self.title_id}, Created At={self.created_at}, Updated At={self.updated_at}, Video ID={self.video_id}, Viewable={self.is_viewable}, Zoom Factor={self.zoom_factor}, Filename={self.filename}, Embed URL={self.embed_url}" + +class Genre: + def __init__(self, data): + self.id = data.get("id") + self.name = data.get("name") + self.type = data.get("type") + self.hidden = data.get("hidden") + self.created_at = data.get("created_at") + self.updated_at = data.get("updated_at") + self.pivot = data.get("pivot") + + def __str__(self): + return f"Genre: ID={self.id}, Name={self.name}, Type={self.type}, Hidden={self.hidden}, Created At={self.created_at}, Updated At={self.updated_at}, Pivot={self.pivot}" + +class Image: + def __init__(self, data): + self.id = data.get("id") + self.filename = data.get("filename") + self.type = data.get("type") + self.imageable_type = data.get("imageable_type") + self.imageable_id = data.get("imageable_id") + self.created_at = data.get("created_at") + self.updated_at = data.get("updated_at") + self.original_url_field = data.get("original_url_field") + + def __str__(self): + return f"Image: ID={self.id}, Filename={self.filename}, Type={self.type}, Imageable Type={self.imageable_type}, Imageable ID={self.imageable_id}, Created At={self.created_at}, Updated At={self.updated_at}, Original URL Field={self.original_url_field}" + +class PreviewManager: + def __init__(self, json_data): + self.id = json_data.get("id") + self.type = json_data.get("type") + self.runtime = json_data.get("runtime") + self.release_date = json_data.get("release_date") + self.quality = json_data.get("quality") + self.plot = json_data.get("plot") + self.seasons_count = json_data.get("seasons_count") + self.genres = [Genre(genre_data) for genre_data in json_data.get("genres", [])] + self.preview = Preview(json_data.get("preview")) + self.images = [Image(image_data) for image_data in json_data.get("images", [])] + + def __str__(self): + genres_str = "\n".join(str(genre) for genre in self.genres) + images_str = "\n".join(str(image) for image in self.images) + return f"Title: ID={self.id}, Type={self.type}, Runtime={self.runtime}, Release Date={self.release_date}, Quality={self.quality}, Plot={self.plot}, Seasons Count={self.seasons_count}\nGenres:\n{genres_str}\nPreview:\n{self.preview}\nImages:\n{images_str}" + + diff --git a/Src/Api/streamingcommunity/Core/Class/SearchType.py b/Src/Api/streamingcommunity/Core/Class/SearchType.py new file mode 100644 index 0000000..1b7355f --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Class/SearchType.py @@ -0,0 +1,85 @@ +# 03.03.24 + +from typing import List + + +# Variable +from ...costant import SITE_NAME, DOMAIN_NOW + + + +class Image: + def __init__(self, data: dict): + self.imageable_id: int = data.get('imageable_id') + self.imageable_type: str = data.get('imageable_type') + self.filename: str = data.get('filename') + self.type: str = data.get('type') + self.original_url_field: str = data.get('original_url_field') + self.url: str = f"https://cdn.{SITE_NAME}.{DOMAIN_NOW}/images/{self.filename}" + + def __str__(self): + return f"Image(imageable_id={self.imageable_id}, imageable_type='{self.imageable_type}', filename='{self.filename}', type='{self.type}', url='{self.url}')" + + +class MediaItem: + def __init__(self, data: dict): + self.id: int = data.get('id') + self.slug: str = data.get('slug') + self.name: str = data.get('name') + self.type: str = data.get('type') + self.score: str = data.get('score') + self.sub_ita: int = data.get('sub_ita') + self.last_air_date: str = data.get('last_air_date') + self.seasons_count: int = data.get('seasons_count') + self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] + + def __str__(self): + return f"MediaItem(id={self.id}, slug='{self.slug}', name='{self.name}', type='{self.type}', score='{self.score}', sub_ita={self.sub_ita}, last_air_date='{self.last_air_date}', seasons_count={self.seasons_count}, images={self.images})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" + diff --git a/Src/Api/streamingcommunity/Core/Class/SeriesType.py b/Src/Api/streamingcommunity/Core/Class/SeriesType.py new file mode 100644 index 0000000..b8c14ef --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Class/SeriesType.py @@ -0,0 +1,67 @@ +# 03.03.24 + +from typing import List, Dict, Union + + +class Title: + def __init__(self, title_data: Dict[str, Union[int, str, None]]): + self.id: int = title_data.get('id') + self.number: int = title_data.get('number') + self.name: str = title_data.get('name') + self.plot: str = title_data.get('plot') + self.release_date: str = title_data.get('release_date') + self.title_id: int = title_data.get('title_id') + self.created_at: str = title_data.get('created_at') + self.updated_at: str = title_data.get('updated_at') + self.episodes_count: int = title_data.get('episodes_count') + + def __str__(self): + return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})" + + +class TitleManager: + def __init__(self): + self.titles: List[Title] = [] + + def add_title(self, title_data: Dict[str, Union[int, str, None]]): + """ + Add a new title to the manager. + + Args: + title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title. + """ + title = Title(title_data) + self.titles.append(title) + + def get_title_by_index(self, index: int) -> Title: + """ + Get a title by its index. + + Args: + index (int): Index of the title to retrieve. + + Returns: + Title: The title object. + """ + return self.titles[index] + + def get_length(self) -> int: + """ + Get the number of titles in the manager. + + Returns: + int: Number of titles. + """ + return len(self.titles) + + def clear(self) -> None: + """ + This method clears the titles list. + + Args: + self: The object instance. + """ + self.titles.clear() + + def __str__(self): + return f"TitleManager(num_titles={len(self.titles)})" diff --git a/Src/Api/streamingcommunity/Core/Class/WindowType.py b/Src/Api/streamingcommunity/Core/Class/WindowType.py new file mode 100644 index 0000000..07acca8 --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Class/WindowType.py @@ -0,0 +1,160 @@ +# 03.03.24 + +import re +import logging + +from typing import Dict, Any + + +class WindowVideo: + def __init__(self, data: Dict[str, Any]): + self.data = data + self.id: int = data.get('id', '') + self.name: str = data.get('name', '') + self.filename: str = data.get('filename', '') + self.size: str = data.get('size', '') + self.quality: str = data.get('quality', '') + self.duration: str = data.get('duration', '') + self.views: int = data.get('views', '') + self.is_viewable: bool = data.get('is_viewable', '') + self.status: str = data.get('status', '') + self.fps: float = data.get('fps', '') + self.legacy: bool = data.get('legacy', '') + self.folder_id: int = data.get('folder_id', '') + self.created_at_diff: str = data.get('created_at_diff', '') + + def __str__(self): + return f"WindowVideo(id={self.id}, name='{self.name}', filename='{self.filename}', size='{self.size}', quality='{self.quality}', duration='{self.duration}', views={self.views}, is_viewable={self.is_viewable}, status='{self.status}', fps={self.fps}, legacy={self.legacy}, folder_id={self.folder_id}, created_at_diff='{self.created_at_diff}')" + +class WindowParameter: + def __init__(self, data: Dict[str, Any]): + self.data = data + self.token: str = data.get('token', '') + self.token360p: str = data.get('token360p', '') + self.token480p: str = data.get('token480p', '') + self.token720p: str = data.get('token720p', '') + self.token1080p: str = data.get('token1080p', '') + self.expires: str = data.get('expires', '') + + def __str__(self): + return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')" + + +class DynamicJSONConverter: + """ + Class for converting an input string into dynamic JSON. + """ + + def __init__(self, input_string: str): + """ + Initialize the converter with the input string. + + Args: + input_string (str): The input string to convert. + """ + self.input_string = input_string + self.json_data = {} + + def _parse_key_value(self, key: str, value: str): + """ + Parse a key-value pair. + + Args: + key (str): The key. + value (str): The value. + + Returns: + object: The parsed value. + """ + try: + value = value.strip() + + if value.startswith('{'): + return self._parse_json_object(value) + else: + return self._parse_non_json_value(value) + + except Exception as e: + logging.error(f"Error parsing key-value pair '{key}': {e}") + raise + + def _parse_json_object(self, obj_str: str): + """ + Parse a JSON object. + + Args: + obj_str (str): The string representation of the JSON object. + + Returns: + dict: The parsed JSON object. + """ + try: + # Use regular expression to find key-value pairs in the JSON object string + obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str)) + + # Strip double quotes from values and return the parsed dictionary + return {k: v.strip('"') for k, v in obj_dict.items()} + + except Exception as e: + logging.error(f"Error parsing JSON object: {e}") + raise + + def _parse_non_json_value(self, value: str): + """ + Parse a non-JSON value. + + Args: + value (str): The value to parse. + + Returns: + object: The parsed value. + """ + try: + + # Remove extra quotes and convert to lowercase + value = value.replace('"', "").strip().lower() + + if value.endswith('\n}'): + value = value.replace('\n}', '') + + # Check if the value matches 'true' or 'false' using regular expressions + if re.match(r'\btrue\b', value, re.IGNORECASE): + return True + + elif re.match(r'\bfalse\b', value, re.IGNORECASE): + return False + + return value + + except Exception as e: + logging.error(f"Error parsing non-JSON value: {e}") + raise + + def convert_to_dynamic_json(self): + """ + Convert the input string into dynamic JSON. + + Returns: + str: The JSON representation of the result. + """ + try: + + # Replace invalid characters with valid JSON syntax + self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}" + + # Find all key-value matches in the input string using regular expression + matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string) + + for match in matches: + key = match[0].strip() + value = match[1].strip() + + # Parse each key-value pair and add it to the json_data dictionary + self.json_data[key] = self._parse_key_value(key, value) + + # Convert the json_data dictionary to a formatted JSON string + return self.json_data + + except Exception as e: + logging.error(f"Error converting to dynamic JSON: {e}") + raise diff --git a/Src/Api/streamingcommunity/Core/Player/vixcloud.py b/Src/Api/streamingcommunity/Core/Player/vixcloud.py new file mode 100644 index 0000000..dfc3cc5 --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Player/vixcloud.py @@ -0,0 +1,228 @@ +# 01.03.24 + +import sys +import logging +from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.console import console, Panel + + +# Logic class +from ..Class.SeriesType import TitleManager +from ..Class.EpisodeType import EpisodeManager +from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter + + +# Variable +from ...costant import SITE_NAME + + +class VideoSource: + def __init__(self): + """ + Initialize a VideoSource object. + """ + self.headers = { + 'user-agent': get_headers() + } + self.is_series = False + self.base_name = SITE_NAME + + def setup(self, version: str = None, domain: str = None, media_id: int = None, series_name: str = None): + """ + Set up the class + + Args: + - version (str): The version to set. + - media_id (str): The media ID to set. + - media_id (int): The media ID to set. + - series_name (str): The series name to set. + """ + self.version = version + self.domain = domain + self.media_id = media_id + + if series_name is not None: + self.is_series = True + self.series_name = series_name + self.obj_title_manager: TitleManager = TitleManager() + self.obj_episode_manager: EpisodeManager = EpisodeManager() + + def collect_info_seasons(self) -> None: + """ + Collect information about seasons. + """ + + self.headers = { + 'user-agent': get_headers(), + 'x-inertia': 'true', + 'x-inertia-version': self.version, + } + + try: + + response = httpx.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers) + response.raise_for_status() + + # Extract JSON response if available + json_response = response.json().get('props', {}).get('title', {}).get('seasons', []) + + # Iterate over JSON data and add titles to the manager + for dict_season in json_response: + self.obj_title_manager.add_title(dict_season) + + except Exception as e: + logging.error(f"Error collecting season info: {e}") + raise + + def collect_title_season(self, number_season: int) -> None: + """ + Collect information about a specific season. + + Args: + - number_season (int): The season number. + """ + try: + + # Make a request to collect information about a specific season + response = httpx.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers) + response.raise_for_status() + + # Extract JSON response if available + json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', []) + + # Iterate over JSON data and add episodes to the manager + for dict_episode in json_response: + self.obj_episode_manager.add_episode(dict_episode) + + except Exception as e: + logging.error(f"Error collecting title season info: {e}") + raise + + def get_iframe(self, episode_id: int = None) -> None: + """ + Get iframe source. + + Args: + - episode_id (int): The episode ID, present only for series + """ + params = {} + + if self.is_series: + params = { + 'episode_id': episode_id, + 'next_episode': '1' + } + + try: + + # Make a request to get iframe source + response = httpx.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params) + response.raise_for_status() + + # Parse response with BeautifulSoup to get iframe source + soup = BeautifulSoup(response.text, "html.parser") + self.iframe_src = soup.find("iframe").get("src") + + except Exception as e: + logging.error(f"Error getting iframe source: {e}") + raise + + def parse_script(self, script_text: str) -> None: + """ + Parse script text. + + Args: + - script_text (str): The script text to parse. + """ + try: + + converter = DynamicJSONConverter(script_text) + result = converter.convert_to_dynamic_json() + + # Create window video and parameter objects + self.window_video = WindowVideo(result['video']) + self.window_parameter = WindowParameter(result['masterPlaylist']) + + except Exception as e: + logging.error(f"Error parsing script: {e}") + raise + + def get_content(self) -> None: + """ + Get content. + """ + try: + + # Check if iframe source is available + if self.iframe_src is not None: + + # Make a request to get content + try: + response = httpx.get(self.iframe_src, headers=self.headers) + response.raise_for_status() + + except Exception as e: + print("\n") + console.print(Panel("[red bold]Coming soon", title="Notification", title_align="left", border_style="yellow")) + sys.exit(0) + + if response.status_code == 200: + + # Parse response with BeautifulSoup to get content + soup = BeautifulSoup(response.text, "html.parser") + script = soup.find("body").find("script").text + + # Parse script to get video information + self.parse_script(script_text=script) + + except Exception as e: + logging.error(f"Error getting content: {e}") + raise + + def get_playlist(self) -> str: + """ + Get playlist. + + Returns: + str: The playlist URL, or None if there's an error. + """ + + iframe_url = self.iframe_src + + # Create base uri for playlist + base_url = f'https://vixcloud.co/playlist/{self.window_video.id}' + query = urlencode(list(self.window_parameter.data.items())) + master_playlist_url = urljoin(base_url, '?' + query) + + # Parse the current query string and the master playlist URL query string + current_params = parse_qs(iframe_url[1:]) + m = urlparse(master_playlist_url) + master_params = parse_qs(m.query) + + # Create the final parameters dictionary with token and expires from the master playlist + final_params = { + "token": master_params.get("token", [""])[0], + "expires": master_params.get("expires", [""])[0] + } + + # Add conditional parameters + if "b" in current_params: + final_params["b"] = "1" + if "canPlayFHD" in current_params: + final_params["h"] = "1" + + # Construct the new query string and final URL + new_query = urlencode(final_params) # Encode final_params into a query string + new_url = m._replace(query=new_query) # Replace the old query string with the new one + final_url = urlunparse(new_url) # Construct the final URL from the modified parts + + return final_url diff --git a/Src/Api/streamingcommunity/Core/Util/__init__.py b/Src/Api/streamingcommunity/Core/Util/__init__.py new file mode 100644 index 0000000..d9eb4f1 --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Util/__init__.py @@ -0,0 +1,8 @@ +# 21.05.24 + +from .get_domain import grab_sc_top_level_domain as extract_domain + +from .manage_ep import ( + manage_selection, + map_episode_title +) \ No newline at end of file diff --git a/Src/Api/streamingcommunity/Core/Util/get_domain.py b/Src/Api/streamingcommunity/Core/Util/get_domain.py new file mode 100644 index 0000000..63f224a --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Util/get_domain.py @@ -0,0 +1,106 @@ +# 02.04.24 + +import os +import threading +import logging + + +# External library +import httpx + + +# Internal utilities +from Src.Lib.Google import search as google_search + + + +def check_url_for_content(url: str, content: str) -> bool: + """ + Check if a URL contains specific content. + + Args: + - url (str): The URL to check. + - content (str): The content to search for in the response. + + Returns: + bool: True if the content is found, False otherwise. + """ + try: + + logging.info(f"Test site to extract domain: {url}") + response = httpx.get(url, timeout = 1) + response.raise_for_status() + + if content in response.text: + return True + + except Exception as e: + pass + + return False + + +def grab_top_level_domain(base_url: str, target_content: str) -> str: + """ + Get the top-level domain (TLD) from a list of URLs. + + Args: + - base_url (str): The base URL to construct complete URLs. + - target_content (str): The content to search for in the response. + + Returns: + str: The found TLD, if any. + """ + results = [] + threads = [] + path_file = os.path.join("Test", "data", "TLD", "tld_list.txt") + logging.info(f"Load file: {path_file}") + + def url_checker(url: str): + if check_url_for_content(url, target_content): + results.append(url.split(".")[-1]) + + if not os.path.exists(path_file): + raise FileNotFoundError("The file 'tld_list.txt' does not exist.") + + with open(path_file, "r") as file: + urls = [f"{base_url}.{x.strip().lower()}" for x in file] + + for url in urls: + thread = threading.Thread(target=url_checker, args=(url,)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + if results: + return results[-1] + + +def grab_top_level_domain_light(query: str) -> str: + """ + Get the top-level domain (TLD) using a light method via Google search. + + Args: + - query (str): The search query for Google search. + + Returns: + str: The found TLD, if any. + """ + for result in google_search(query, num=1, stop=1, pause=2): + return result.split(".", 2)[-1].replace("/", "") + + +def grab_sc_top_level_domain(method: str) -> str: + """ + Get the top-level domain (TLD) for the streaming community. + Args: + method (str): The method to use to obtain the TLD ("light" or "strong"). + Returns: + str: The found TLD, if any. + """ + if method == "light": + return grab_top_level_domain_light("streaming community") + elif method == "strong": + return grab_top_level_domain("https://streamingcommunity", '') diff --git a/Src/Api/streamingcommunity/Core/Util/manage_ep.py b/Src/Api/streamingcommunity/Core/Util/manage_ep.py new file mode 100644 index 0000000..ab4ae71 --- /dev/null +++ b/Src/Api/streamingcommunity/Core/Util/manage_ep.py @@ -0,0 +1,75 @@ +# 02.05.24 + +import logging + +from typing import List + + +# Internal utilities +from Src.Util._jsonConfig import config_manager +from Src.Util.os import remove_special_characters + + +# Logic class +from ..Class.EpisodeType import Episode + + +# Config +MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name') + + +def manage_selection(cmd_insert: str, max_count: int) -> List[int]: + """ + Manage user selection for seasons to download. + + Args: + - cmd_insert (str): User input for season selection. + - max_count (int): Maximum count of seasons available. + + Returns: + list_season_select (List[int]): List of selected seasons. + """ + list_season_select = [] + logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}") + + # For a single number (e.g., '5') + if cmd_insert.isnumeric(): + list_season_select.append(int(cmd_insert)) + + # For a range (e.g., '[5-12]') + elif "[" in cmd_insert: + start, end = map(int, cmd_insert[1:-1].split('-')) + list_season_select = list(range(start, end + 1)) + + # For all seasons + elif cmd_insert == "*": + list_season_select = list(range(1, max_count+1)) + + # Return list of selected seasons) + logging.info(f"List return: {list_season_select}") + return list_season_select + + +def map_episode_title(tv_name: str, episode: Episode, number_season: int): + """ + Maps the episode title to a specific format. + + Args: + - tv_name (str): The name of the TV show. + - episode (Episode): The episode object. + - number_season (int): The season number. + + Returns: + str: The mapped episode title. + """ + map_episode_temp = MAP_EPISODE + map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name)) + map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2)) + map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2)) + map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode.name)) + + # Additional fix + map_episode_temp = map_episode_temp.replace(".", "_") + + logging.info(f"Map episode string return: {map_episode_temp}") + return map_episode_temp diff --git a/Src/Api/streamingcommunity/__init__.py b/Src/Api/streamingcommunity/__init__.py new file mode 100644 index 0000000..078449a --- /dev/null +++ b/Src/Api/streamingcommunity/__init__.py @@ -0,0 +1,57 @@ +# 21.05.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import ( + get_version_and_domain, + title_search, + get_select_title +) + +from .film import download_film +from .series import download_series + + +# Variable +indice = 0 + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + + # Get site domain and version and get result of the search + site_version, domain = get_version_and_domain() + len_database = title_search(string_to_search, domain) + + if len_database > 0: + + # Select title from list + select_title = get_select_title() + + # For series + if select_title.type == 'tv': + download_series( + tv_id=select_title.id, + tv_name=select_title.slug, + version=site_version, + domain=domain + ) + + # For film + else: + download_film( + id_film=select_title.id, + title_name=select_title.slug, + domain=domain + ) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/streamingcommunity/costant.py b/Src/Api/streamingcommunity/costant.py new file mode 100644 index 0000000..68f52fc --- /dev/null +++ b/Src/Api/streamingcommunity/costant.py @@ -0,0 +1,15 @@ +# 26.05.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) + +MOVIE_FOLDER = "Movie" +SERIES_FOLDER = "Serie" \ No newline at end of file diff --git a/Src/Api/streamingcommunity/film.py b/Src/Api/streamingcommunity/film.py new file mode 100644 index 0000000..95fdd19 --- /dev/null +++ b/Src/Api/streamingcommunity/film.py @@ -0,0 +1,56 @@ +# 3.12.23 + +import os +import logging + + +# Internal utilities +from Src.Util.console import console +from Src.Lib.Hls.downloader import Downloader +from Src.Util.message import start_message + + +# Logic class +from .Core.Player.vixcloud import VideoSource + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER +video_source = VideoSource() + + +def download_film(id_film: str, title_name: str, domain: str): + """ + Downloads a film using the provided film ID, title name, and domain. + + Args: + - id_film (str): The ID of the film. + - title_name (str): The name of the film title. + - domain (str): The domain of the site + """ + + # Start message and display film information + start_message() + console.print(f"[yellow]Download: [red]{title_name} \n") + + # Set domain and media ID for the video source + video_source.setup( + domain = domain, + media_id = id_film + ) + + # Retrieve scws and if available master playlist + video_source.get_iframe() + video_source.get_content() + master_playlist = video_source.get_playlist() + + # Define the filename and path for the downloaded film + mp4_name = title_name.replace("-", "_") + mp4_format = (mp4_name) + ".mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + + # Download the film using the m3u8 playlist, and output filename + Downloader( + m3u8_playlist = master_playlist, + output_filename = os.path.join(mp4_path, mp4_format) + ).start() \ No newline at end of file diff --git a/Src/Api/streamingcommunity/series.py b/Src/Api/streamingcommunity/series.py new file mode 100644 index 0000000..6b69387 --- /dev/null +++ b/Src/Api/streamingcommunity/series.py @@ -0,0 +1,183 @@ +# 3.12.23 + +import os +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg +from Src.Util.table import TVShowManager +from Src.Util.message import start_message +from Src.Lib.Hls.downloader import Downloader + + +# Logic class +from .Core.Player.vixcloud import VideoSource +from .Core.Util import manage_selection, map_episode_title + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER +video_source = VideoSource() +table_show_manager = TVShowManager() + + +def donwload_video(tv_name: str, index_season_selected: int, index_episode_selected: int) -> None: + """ + Download a single episode video. + + Args: + - tv_name (str): Name of the TV series. + - index_season_selected (int): Index of the selected season. + - index_episode_selected (int): Index of the selected episode. + """ + + start_message() + + # Get info about episode + obj_episode = video_source.obj_episode_manager.episodes[index_episode_selected - 1] + console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.name}") + print() + + # Define filename and path for the downloaded video + mp4_name = f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, tv_name, f"S{index_season_selected}") + + # Retrieve scws and if available master playlist + video_source.get_iframe(obj_episode.id) + video_source.get_content() + master_playlist = video_source.get_playlist() + + # Download the episode + Downloader( + m3u8_playlist = master_playlist, + output_filename = os.path.join(mp4_path, mp4_name) + ).start() + + +def donwload_episode(tv_name: str, index_season_selected: int, donwload_all: bool = False) -> None: + """ + Download all episodes of a season. + + Args: + - tv_name (str): Name of the TV series. + - index_season_selected (int): Index of the selected season. + - donwload_all (bool): Donwload all seasons episodes + """ + + # Clean memory of all episodes and get the number of the season (some dont follow rule of [1,2,3,4,5] but [1,2,3,145,5,6,7]). + video_source.obj_episode_manager.clear() + season_number = (video_source.obj_title_manager.titles[index_season_selected-1].number) + + # Start message and collect information about episodes + start_message() + video_source.collect_title_season(season_number) + episodes_count = video_source.obj_episode_manager.get_length() + + # Download all episodes wihtout ask + if donwload_all: + for i_episode in range(1, episodes_count+1): + donwload_video(tv_name, index_season_selected, i_episode) + + console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.") + + # If not download all episode but a single season + if not donwload_all: + + # Display episodes list and manage user selection + last_command = display_episodes_list() + list_episode_select = manage_selection(last_command, episodes_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + donwload_video(tv_name, index_season_selected, list_episode_select[0]) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + donwload_video(tv_name, index_season_selected, i_episode) + + +def download_series(tv_id: str, tv_name: str, version: str, domain: str) -> None: + """ + Download all episodes of a TV series. + + Args: + - tv_id (str): ID of the TV series. + - tv_name (str): Name of the TV series. + - version (str): Version of the TV series. + - domain (str): Domain from which to download. + """ + + # Start message and set up video source + start_message() + + # Setup video source + video_source.setup( + version = version, + domain = domain, + media_id = tv_id, + series_name = tv_name + ) + + # Collect information about seasons + video_source.collect_info_seasons() + seasons_count = video_source.obj_title_manager.get_length() + + # Prompt user for season selection and download episodes + console.print(f"\n[green]Season find: [red]{seasons_count}") + index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")) + list_season_select = manage_selection(index_season_selected, seasons_count) + + # Download selected episodes + if len(list_season_select) == 1 and index_season_selected != "*": + if 1 <= int(index_season_selected) <= seasons_count: + donwload_episode(tv_name, list_season_select[0]) + + # Dowload all seasons and episodes + elif index_season_selected == "*": + for i_season in list_season_select: + donwload_episode(tv_name, i_season, True) + + # Download all other season selecter + else: + for i_season in list_season_select: + donwload_episode(tv_name, i_season) + + +def display_episodes_list() -> str: + """ + Display episodes list and handle user input. + + Returns: + last_command (str): Last command entered by the user. + """ + + # Set up table for displaying episodes + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Duration": {'color': 'green'} + } + table_show_manager.add_column(column_info) + + # Populate the table with episodes information + for i, media in enumerate(video_source.obj_episode_manager.episodes): + table_show_manager.add_tv_show({ + 'Index': str(media.number), + 'Name': media.name, + 'Duration': str(media.duration) + }) + + # Run the table and handle user input + last_command = table_show_manager.run() + + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + return last_command diff --git a/Src/Api/streamingcommunity/site.py b/Src/Api/streamingcommunity/site.py new file mode 100644 index 0000000..6802459 --- /dev/null +++ b/Src/Api/streamingcommunity/site.py @@ -0,0 +1,204 @@ +# 10.12.23 + +import sys +import json +import logging + +from typing import Tuple + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util._jsonConfig import config_manager +from Src.Util.console import console +from Src.Util.table import TVShowManager + + +# Logic class +from .Core.Util import extract_domain +from .Core.Class.SearchType import MediaManager, MediaItem + + +# Config +from .costant import SITE_NAME + + +# Variable +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def get_version(text: str) -> tuple[str, list]: + """ + Extracts the version from the HTML text of a webpage. + + Args: + - text (str): The HTML text of the webpage. + + Returns: + str: The version extracted from the webpage. + list: Top 10 titles headlines for today. + """ + console.print("[cyan]Make request to get version [white]...") + + try: + + # Parse request to site + soup = BeautifulSoup(text, "html.parser") + + # Extract version + version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version'] + sliders = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['props']['sliders'] + + title_top_10 = sliders[2] + + # Collect info about only top 10 title + list_title_top_10 = [] + for title in title_top_10['titles']: + list_title_top_10.append({ + 'name': title['name'], + 'type': title['type'] + }) + + console.print(f"[cyan]Get version [white]=> [red]{version} \n") + + return version, list_title_top_10 + + except Exception as e: + logging.error(f"Error extracting version: {e}") + raise + + +def get_version_and_domain(new_domain = None) -> Tuple[str, str]: + """ + Retrieves the version and domain of the streaming website. + + This function retrieves the version and domain of the streaming website. + It first checks the accessibility of the current site. + If the site is accessible, it extracts the version from the response. + If configured to do so, it also scrapes and prints the titles of the moments. + If the site is inaccessible, it attempts to obtain a new domain using the 'insta' method. + + Returns: + Tuple[str, str]: A tuple containing the version and domain. + """ + + # Get the current domain from the configuration + if new_domain is None: + config_domain = config_manager.get('SITE', SITE_NAME) + else: + config_domain = new_domain + + # Test the accessibility of the current site + try: + + # Make requests to site to get text + console.print(f"[cyan]Test site[white]: [red]https://{SITE_NAME}.{config_domain}") + response = httpx.get(f"https://{SITE_NAME}.{config_domain}") + console.print(f"[cyan]Test respost site[white]: [red]{response.status_code} \n") + + # Extract version from the response + version, list_title_top_10 = get_version(response.text) + + return version, config_domain + + except: + + console.print("[red]\nExtract new DOMAIN from TLD list.") + new_domain = extract_domain(method="light") + console.log(f"[cyan]Extract new domain: [red]{new_domain}") + + # Update the domain in the configuration file + config_manager.set_key('SITE', SITE_NAME, str(new_domain)) + config_manager.write_config() + + # Retry to get the version and domain + return get_version_and_domain(new_domain) + + +def title_search(title_search: str, domain: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + - domain (str): The domain to search on. + + Returns: + int: The number of titles found. + """ + + # Send request to search for titles ( replace à to a and space to "+" ) + response = httpx.get(f"https://{SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()}) + response.raise_for_status() + + # Add found titles to media search manager + for dict_title in response.json()['data']: + media_search_manager.add_media(dict_title) + + # Return the number of titles found + return media_search_manager.get_length() + + +def get_select_title(type_filter: list = None) -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Args: + - type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film'] + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + "Score": {'color': 'cyan'}, + "Date": {'color': 'green'} + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + + # Filter for only a list of category + if type_filter is not None: + if str(media.type) not in type_filter: + continue + + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + 'Score': media.score, + 'Date': media.last_air_date + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + table_show_manager.clear() + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) diff --git a/config.json b/config.json index 791da94..94b054d 100644 --- a/config.json +++ b/config.json @@ -12,12 +12,12 @@ "REQUESTS": { "timeout": 10, "max_retry": 3, - "verify_ssl": true, + "verify_ssl": false, "index": { "user-agent": "" }, "proxy_start_min": 0.1, - "proxy_start_max": 0.4, + "proxy_start_max": 0.5, "proxy": [] }, "M3U8_DOWNLOAD": { @@ -54,9 +54,8 @@ "SITE": { "streamingcommunity": "foo", "animeunity": "to", - "Altadefinizione": "vodka", - "Guardaserie": "ceo", - "Ddlstreamitaly": "co", - "4kTitle": "foo" + "altadefinizione": "vodka", + "guardaserie": "ceo", + "ddlstreamitaly": "co" } } \ No newline at end of file diff --git a/run.py b/run.py index cf1c764..3f9c6e0 100644 --- a/run.py +++ b/run.py @@ -110,7 +110,10 @@ def initialize(): sys.exit(0) # Attempting GitHub update - git_update() + try: + git_update() + except: + console.log("[red]Error with loading github.")