diff --git a/.gitignore b/.gitignore index b3cc9d0..9c6faab 100644 --- a/.gitignore +++ b/.gitignore @@ -59,4 +59,5 @@ venv.bak/ Video note.txt list_proxy.txt -config.json \ No newline at end of file +config.json +downloaded_files \ No newline at end of file diff --git a/README.md b/README.md index ba7c102..b44e56d 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,6 @@ You can change some behaviors by tweaking the configuration file.
REQUESTS - * **disable_error**: Whether to disable error messages. - - **Default Value**: `false` - * **timeout**: The timeout value for requests. - **Default Value**: `10` @@ -158,9 +155,6 @@ You can change some behaviors by tweaking the configuration file.
M3U8_PARSER - * **skip_empty_row_playlist**: Whether to skip empty rows in the playlist m3u8. - - **Default Value**: `false` - * **force_resolution**: Forces the use of a specific resolution. `-1` means no forced resolution. - **Default Value**: `-1` - **Example Value**: `1080` @@ -222,6 +216,8 @@ The `run-container` command mounts also the `config.json` file, so any change to ## Tutorial For a detailed walkthrough, refer to the [video tutorial](https://www.youtube.com/watch?v=Ok7hQCgxqLg&ab_channel=Nothing) +Add [api_1](https://www.youtube.com/watch?v=3ylBSMyQlhM) +Add [api_2](https://www.youtube.com/watch?v=ReEYUIbdbG4) ## To do diff --git a/Src/Api/1337xx/Core/Class/SearchType.py b/Src/Api/1337xx/Core/Class/SearchType.py new file mode 100644 index 0000000..abb0bd8 --- /dev/null +++ b/Src/Api/1337xx/Core/Class/SearchType.py @@ -0,0 +1,63 @@ +# 02.07.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('title') + self.url: int = data.get('link') + self.size:str = data.get('size') + self.seader: int = data.get('seader') + self.leacher: int = data.get('leacher') + self.date: str = data.get('date') + + def __str__(self): + return f"MediaItem(name='{self.name}', type='{self.type}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/1337xx/__init__.py b/Src/Api/1337xx/__init__.py new file mode 100644 index 0000000..9474232 --- /dev/null +++ b/Src/Api/1337xx/__init__.py @@ -0,0 +1,37 @@ +# 02.07.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, run_get_select_title +from .title import download_title + + +# Variable +indice = 8 +_deprecate = False + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = run_get_select_title() + + # Download only film + download_title( + select_title + ) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/1337xx/costant.py b/Src/Api/1337xx/costant.py new file mode 100644 index 0000000..7243c3d --- /dev/null +++ b/Src/Api/1337xx/costant.py @@ -0,0 +1,15 @@ +# 09.06.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] + +SERIES_FOLDER = "Serie" +MOVIE_FOLDER = "Film" \ No newline at end of file diff --git a/Src/Api/1337xx/site.py b/Src/Api/1337xx/site.py new file mode 100644 index 0000000..7e5e25a --- /dev/null +++ b/Src/Api/1337xx/site.py @@ -0,0 +1,78 @@ +# 02.07.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.table import TVShowManager +from ..Template import search_domain, get_select_title + + +# Logic class +from .Core.Class.SearchType import MediaManager + + +# Variable +from .costant import SITE_NAME +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(word_to_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. + """ + + # Find new domain if prev dont work + domain_to_use, _ = search_domain(SITE_NAME, ' bool: """ try: - response = httpx.get(url, timeout=timeout, headers={'user-agent': get_headers()}) + response = httpx.get(url, timeout=timeout, headers={'user-agent': get_headers()}, follow_redirects=True) logging.info(f"Testing site to extract domain: {url}, response: {response.status_code}") # Raise an error if the status is not successful @@ -169,7 +169,7 @@ def search_domain(site_name: str, target_content: str, base_url: str): try: # Test the current domain - response = httpx.get(f"{base_url}.{domain}", headers={'user-agent': get_headers()}, timeout=2) + response = httpx.get(f"{base_url}.{domain}", headers={'user-agent': get_headers()}, timeout=5, follow_redirects=True) console.print(f"[cyan]Test response site[white]: [red]{response.status_code}") response.raise_for_status() diff --git a/Src/Api/altadefinizione/Core/Player/supervideo.py b/Src/Api/altadefinizione/Core/Player/supervideo.py index f851924..c402f43 100644 --- a/Src/Api/altadefinizione/Core/Player/supervideo.py +++ b/Src/Api/altadefinizione/Core/Player/supervideo.py @@ -16,14 +16,14 @@ from Src.Util.os import run_node_script, run_node_script_api class VideoSource: - - def __init__(self) -> None: + def __init__(self, url: str): """ - Initializes the VideoSource object with default values. + Sets up the video source with the provided URL. - Attributes: - headers (dict): An empty dictionary to store HTTP headers. + Args: + url (str): The URL of the video. """ + self.url = url self.headers = {'user-agent': get_headers()} def setup(self, url: str) -> None: diff --git a/Src/Api/altadefinizione/costant.py b/Src/Api/altadefinizione/costant.py index aa9d6eb..9173fc4 100644 --- a/Src/Api/altadefinizione/costant.py +++ b/Src/Api/altadefinizione/costant.py @@ -9,6 +9,7 @@ from Src.Util._jsonConfig import config_manager SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] -MOVIE_FOLDER = "Movie" \ No newline at end of file +MOVIE_FOLDER = "Movie" +SERIES_FOLDER= "Serie" \ No newline at end of file diff --git a/Src/Api/altadefinizione/film.py b/Src/Api/altadefinizione/film.py index 31c1f1b..2b95ce1 100644 --- a/Src/Api/altadefinizione/film.py +++ b/Src/Api/altadefinizione/film.py @@ -6,9 +6,10 @@ import logging # Internal utilities -from Src.Util.console import console -from Src.Lib.Downloader import HLS_Downloader from Src.Util.message import start_message +from Src.Util.console import console +from Src.Util.os import create_folder, can_create_file +from Src.Lib.Downloader import HLS_Downloader # Logic class @@ -18,10 +19,7 @@ from .Core.Player.supervideo import VideoSource # Config from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER - -# Variable -video_source = VideoSource() - + def download_film(title_name: str, url: str): """ @@ -37,14 +35,20 @@ def download_film(title_name: str, url: str): console.print(f"[yellow]Download: [red]{title_name} \n") # Set domain and media ID for the video source - video_source.setup( - url = url - ) + video_source = VideoSource(url) # Define output path mp4_name = str(title_name).replace("-", "_") + ".mp4" mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + # Ensure the folder path exists + create_folder(mp4_path) + + # Check if the MP4 file can be created + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + # Get m3u8 master playlist master_playlist = video_source.get_playlist() diff --git a/Src/Api/animeunity/Core/Class/SeriesType.py b/Src/Api/animeunity/Core/Class/SeriesType.py deleted file mode 100644 index b8c14ef..0000000 --- a/Src/Api/animeunity/Core/Class/SeriesType.py +++ /dev/null @@ -1,67 +0,0 @@ -# 03.03.24 - -from typing import List, Dict, Union - - -class Title: - def __init__(self, title_data: Dict[str, Union[int, str, None]]): - self.id: int = title_data.get('id') - self.number: int = title_data.get('number') - self.name: str = title_data.get('name') - self.plot: str = title_data.get('plot') - self.release_date: str = title_data.get('release_date') - self.title_id: int = title_data.get('title_id') - self.created_at: str = title_data.get('created_at') - self.updated_at: str = title_data.get('updated_at') - self.episodes_count: int = title_data.get('episodes_count') - - def __str__(self): - return f"Title(id={self.id}, number={self.number}, name='{self.name}', plot='{self.plot}', release_date='{self.release_date}', title_id={self.title_id}, created_at='{self.created_at}', updated_at='{self.updated_at}', episodes_count={self.episodes_count})" - - -class TitleManager: - def __init__(self): - self.titles: List[Title] = [] - - def add_title(self, title_data: Dict[str, Union[int, str, None]]): - """ - Add a new title to the manager. - - Args: - title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title. - """ - title = Title(title_data) - self.titles.append(title) - - def get_title_by_index(self, index: int) -> Title: - """ - Get a title by its index. - - Args: - index (int): Index of the title to retrieve. - - Returns: - Title: The title object. - """ - return self.titles[index] - - def get_length(self) -> int: - """ - Get the number of titles in the manager. - - Returns: - int: Number of titles. - """ - return len(self.titles) - - def clear(self) -> None: - """ - This method clears the titles list. - - Args: - self: The object instance. - """ - self.titles.clear() - - def __str__(self): - return f"TitleManager(num_titles={len(self.titles)})" diff --git a/Src/Api/animeunity/Core/Player/vixcloud.py b/Src/Api/animeunity/Core/Player/vixcloud.py index d3dfe0e..7f34039 100644 --- a/Src/Api/animeunity/Core/Player/vixcloud.py +++ b/Src/Api/animeunity/Core/Player/vixcloud.py @@ -16,7 +16,6 @@ from Src.Util._jsonConfig import config_manager # Logic class -from ..Class.SeriesType import TitleManager from ..Class.EpisodeType import EpisodeManager, Episode from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter @@ -30,9 +29,7 @@ class VideoSource: """ Initialize a VideoSource object. """ - self.headers = { - 'user-agent': get_headers() - } + self.headers = {'user-agent': get_headers()} self.is_series = False self.base_name = SITE_NAME self.domain = config_manager.get_dict('SITE', self.base_name)['domain'] @@ -50,7 +47,6 @@ class VideoSource: if series_name is not None: self.is_series = True self.series_name = series_name - self.obj_title_manager: TitleManager = TitleManager() self.obj_episode_manager: EpisodeManager = EpisodeManager() def get_count_episodes(self): diff --git a/Src/Api/animeunity/costant.py b/Src/Api/animeunity/costant.py index 6275469..308ffa4 100644 --- a/Src/Api/animeunity/costant.py +++ b/Src/Api/animeunity/costant.py @@ -9,7 +9,7 @@ from Src.Util._jsonConfig import config_manager SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] SERIES_FOLDER= "Serie" MOVIE_FOLDER = "Movie" diff --git a/Src/Api/bitsearch/Core/Class/SearchType.py b/Src/Api/bitsearch/Core/Class/SearchType.py new file mode 100644 index 0000000..fc048ad --- /dev/null +++ b/Src/Api/bitsearch/Core/Class/SearchType.py @@ -0,0 +1,63 @@ +# 13.06.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('title') + self.url: int = data.get('link') + self.size:str = data.get('size') + self.seader: int = data.get('seader') + self.leacher: int = data.get('leacher') + self.date: str = data.get('date') + + def __str__(self): + return f"MediaItem(name='{self.name}', type='{self.type}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/bitsearch/__init__.py b/Src/Api/bitsearch/__init__.py new file mode 100644 index 0000000..720df67 --- /dev/null +++ b/Src/Api/bitsearch/__init__.py @@ -0,0 +1,39 @@ +# 01.07.24 + +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, run_get_select_title +from .title import download_title + + + +# Variable +indice = 7 +_deprecate = False + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = run_get_select_title() + download_title(select_title) + + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/bitsearch/costant.py b/Src/Api/bitsearch/costant.py new file mode 100644 index 0000000..dac30e8 --- /dev/null +++ b/Src/Api/bitsearch/costant.py @@ -0,0 +1,15 @@ +# 01.07.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] + +SERIES_FOLDER = "Serie" +MOVIE_FOLDER = "Film" diff --git a/Src/Api/bitsearch/site.py b/Src/Api/bitsearch/site.py new file mode 100644 index 0000000..6685e64 --- /dev/null +++ b/Src/Api/bitsearch/site.py @@ -0,0 +1,72 @@ +# 01.07.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.table import TVShowManager +from ..Template import search_domain, get_select_title + + +# Logic class +from .Core.Class.SearchType import MediaManager + + +# Variable +from .costant import SITE_NAME +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(word_to_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. + """ + + # Find new domain if prev dont work + domain_to_use, _ = search_domain(SITE_NAME, 'g', f"https://{SITE_NAME}") + + # Construct the full site URL and load the search page + response = httpx.get(f"https://{SITE_NAME}.{domain_to_use}/search?q={unidecode(word_to_search)}&category=1&subcat=2&page=1", headers={'user-agent': get_headers()}) + response.raise_for_status() + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + + for title_div in soup.find_all("li", class_ = "card"): + + title_info = { + 'title': title_div.find("a").get_text(strip=True), + 'link': title_div.find_all("a")[-1].get("href"), + 'size': title_div.find_all("div")[-5].get_text(strip=True), + 'seader': title_div.find_all("div")[-4].get_text(strip=True), + 'leacher': title_div.find_all("div")[-3].get_text(strip=True), + 'date': title_div.find_all("div")[-2].get_text(strip=True) + } + + media_search_manager.add_media(title_info) + + # Return the number of titles found + return media_search_manager.get_length() + + +def run_get_select_title(): + """ + Display a selection of titles and prompt the user to choose one. + """ + return get_select_title(table_show_manager, media_search_manager) \ No newline at end of file diff --git a/Src/Api/bitsearch/title.py b/Src/Api/bitsearch/title.py new file mode 100644 index 0000000..cb4db09 --- /dev/null +++ b/Src/Api/bitsearch/title.py @@ -0,0 +1,51 @@ +# 01.07.24 + +import os +import sys +import logging + + +# Internal utilities +from Src.Util.console import console +from Src.Util.message import start_message +from Src.Util.os import create_folder, can_create_file, remove_special_characters +from Src.Lib.Downloader import TOR_downloader + + +# Logic class +from .Core.Class.SearchType import MediaItem + + +# Config +from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER + + +def download_title(select_title: MediaItem): + """ + Downloads a media item and saves it as an MP4 file. + + Parameters: + - select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`. + """ + + start_message() + + console.print(f"[yellow]Download: [red]{select_title.name} \n") + print() + + # Define output path + title_name = remove_special_characters(select_title.name) + mp4_name = title_name.replace("-", "_") + ".mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(title_name.replace(".mp4", ""))) + + # Check if can create file output + create_folder(mp4_path) + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + + # Tor manager + manager = TOR_downloader() + manager.add_magnet_link(select_title.url) + manager.start_download() + manager.move_downloaded_files(mp4_path) \ No newline at end of file diff --git a/Src/Api/cb01/Core/Class/SearchType.py b/Src/Api/cb01/Core/Class/SearchType.py new file mode 100644 index 0000000..3e78ae6 --- /dev/null +++ b/Src/Api/cb01/Core/Class/SearchType.py @@ -0,0 +1,60 @@ +# 03.07.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('name') + self.url: int = data.get('url') + self.desc: int = data.get('desc') + + def __str__(self): + return f"MediaItem(name='{self.name}', desc='{self.desc}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/cb01/Core/Player/maxstream.py b/Src/Api/cb01/Core/Player/maxstream.py new file mode 100644 index 0000000..3b8973e --- /dev/null +++ b/Src/Api/cb01/Core/Player/maxstream.py @@ -0,0 +1,151 @@ +# 05.07.24 + +import re +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.os import run_node_script, run_node_script_api + + +class VideoSource: + def __init__(self, url: str): + """ + Sets up the video source with the provided URL. + + Args: + url (str): The URL of the video. + """ + self.url = url + self.redirect_url = None + self.maxstream_url = None + self.m3u8_url = None + self.headers = {'user-agent': get_headers()} + + def get_redirect_url(self): + """ + Sends a request to the initial URL and extracts the redirect URL. + """ + try: + + # Send a GET request to the initial URL + response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + + # Extract the redirect URL from the HTML + soup = BeautifulSoup(response.text, "html.parser") + self.redirect_url = soup.find("div", id="iframen1").get("data-src") + logging.info(f"Redirect URL: {self.redirect_url}") + + return self.redirect_url + + except httpx.RequestError as e: + logging.error(f"Error during the initial request: {e}") + raise + + except AttributeError as e: + logging.error(f"Error parsing HTML: {e}") + raise + + def get_maxstream_url(self): + """ + Sends a request to the redirect URL and extracts the Maxstream URL. + """ + if not self.redirect_url: + raise ValueError("Redirect URL not found. Please call get_redirect_url() first.") + + try: + # Send a GET request to the redirect URL + response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + + # Extract the Maxstream URL from the HTML + soup = BeautifulSoup(response.text, "html.parser") + maxstream_url = soup.find("a") + + if maxstream_url is None: + + # If no anchor tag is found, try the alternative method + logging.warning("Anchor tag not found. Trying the alternative method.") + headers = { + 'origin': 'https://stayonline.pro', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 OPR/111.0.0.0', + 'x-requested-with': 'XMLHttpRequest', + } + + # Make request to stayonline api + data = {'id': self.redirect_url.split("/")[-2], 'ref': ''} + response = httpx.post('https://stayonline.pro/ajax/linkEmbedView.php', headers=headers, data=data) + response.raise_for_status() + uprot_url = response.json()['data']['value'] + + # Retry getting maxtstream url + response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + maxstream_url = soup.find("a").get("href") + + else: + maxstream_url = maxstream_url.get("href") + + self.maxstream_url = maxstream_url + logging.info(f"Maxstream URL: {self.maxstream_url}") + + return self.maxstream_url + + except httpx.RequestError as e: + logging.error(f"Error during the request to the redirect URL: {e}") + raise + + except AttributeError as e: + logging.error(f"Error parsing HTML: {e}") + raise + + def get_m3u8_url(self): + """ + Sends a request to the Maxstream URL and extracts the .m3u8 file URL. + """ + if not self.maxstream_url: + raise ValueError("Maxstream URL not found. Please call get_maxstream_url() first.") + + try: + # Send a GET request to the Maxstream URL + response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + + # Iterate over all script tags in the HTML + for script in soup.find_all("script"): + if "eval(function(p,a,c,k,e,d)" in script.text: + + # Execute the script using the run_node_script_api function + text_run_node_js = run_node_script_api(script.text) + + # Extract the .m3u8 URL from the script's output + m3u8_match = re.search(r'src:"(https://.*?\.m3u8)"', text_run_node_js) + + if m3u8_match: + self.m3u8_url = m3u8_match.group(1) + logging.info(f"M3U8 URL: {self.m3u8_url}") + break + + return self.m3u8_url + + except Exception as e: + logging.error(f"Error executing the Node.js script: {e}") + raise + + def get_playlist(self): + """ + Executes the entire flow to obtain the final .m3u8 file URL. + """ + self.get_redirect_url() + self.get_maxstream_url() + return self.get_m3u8_url() diff --git a/Src/Api/cb01/__init__.py b/Src/Api/cb01/__init__.py new file mode 100644 index 0000000..fdef766 --- /dev/null +++ b/Src/Api/cb01/__init__.py @@ -0,0 +1,41 @@ +# 09.06.24 + +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, run_get_select_title +from .film import download_film + + + +# Variable +indice = 9 +_deprecate = False + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = run_get_select_title() + + # !!! ADD TYPE DONT WORK FOR SERIE + download_film(select_title) + + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/cb01/costant.py b/Src/Api/cb01/costant.py new file mode 100644 index 0000000..ba6ce6e --- /dev/null +++ b/Src/Api/cb01/costant.py @@ -0,0 +1,15 @@ +# 03.07.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] + +MOVIE_FOLDER = "Movie" +SERIES_FOLDER = "Serie" diff --git a/Src/Api/cb01/film.py b/Src/Api/cb01/film.py new file mode 100644 index 0000000..cd13fb9 --- /dev/null +++ b/Src/Api/cb01/film.py @@ -0,0 +1,56 @@ +# 03.07.24 + +import os +import sys +import logging + +# Internal utilities +from Src.Util.console import console +from Src.Util.message import start_message +from Src.Util.os import create_folder, can_create_file, remove_special_characters +from Src.Lib.Downloader import HLS_Downloader + + +# Logic class +from .Core.Class.SearchType import MediaItem +from .Core.Player.maxstream import VideoSource + + +# Config +from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER + + + +def download_film(select_title: MediaItem): + """ + Downloads a film using the provided obj. + """ + + # Start message and display film information + start_message() + console.print(f"[yellow]Download: [red]{select_title.name} \n") + + # Setup api manger + video_source = VideoSource(select_title.url) + + # Define output path + title_name = remove_special_characters(select_title.name) + mp4_name = remove_special_characters(title_name.replace("-", "_") + ".mp4") + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + + # Ensure the folder path exists + create_folder(mp4_path) + + # Check if the MP4 file can be created + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + + # Get m3u8 master playlist + master_playlist = video_source.get_playlist() + + # Download the film using the m3u8 playlist, and output filename + HLS_Downloader( + m3u8_playlist = master_playlist, + output_filename = os.path.join(mp4_path, mp4_name) + ).start() diff --git a/Src/Api/cb01/site.py b/Src/Api/cb01/site.py new file mode 100644 index 0000000..728ceac --- /dev/null +++ b/Src/Api/cb01/site.py @@ -0,0 +1,77 @@ +# 03.07.24 + +import sys +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.headers import get_headers +from Src.Util.table import TVShowManager +from ..Template import search_domain, get_select_title + + +# Logic class +from .Core.Class.SearchType import MediaManager + + +# Variable +from .costant import SITE_NAME +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(word_to_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. + """ + try: + + # Find new domain if prev dont work + domain_to_use, _ = search_domain(SITE_NAME, '', f"https://{SITE_NAME}") + + # Send request to search for titles + response = httpx.get(f"https://{SITE_NAME}.{domain_to_use}/?s={unidecode(word_to_search)}", headers={'user-agent': get_headers()}, follow_redirects=True) + response.raise_for_status() + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + + for div_title in soup.find_all("div", class_ = "card"): + + url = div_title.find("h3").find("a").get("href") + title = div_title.find("h3").find("a").get_text(strip=True) + desc = div_title.find("span").find("strong").get_text(strip=True) + + title_info = { + 'url': url, + 'name': title, + 'desc': desc + } + + media_search_manager.add_media(title_info) + + except Exception as err: + logging.error(f"An error occurred: {err}") + + # Return the number of titles found + return media_search_manager.get_length() + + +def run_get_select_title(): + """ + Display a selection of titles and prompt the user to choose one. + """ + return get_select_title(table_show_manager, media_search_manager) \ No newline at end of file diff --git a/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py b/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py index 2060987..2e083f9 100644 --- a/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py +++ b/Src/Api/ddlstreamitaly/Core/Class/ScrapeSerie.py @@ -56,7 +56,7 @@ class GetSerieInfo: response.raise_for_status() except Exception as e: - logging.error(f"Insert: ['ips4_device_key': 'your_code', 'ips4_member_id': 'your_code', 'ips4_login_key': 'your_code'] in config.json file REQUESTS -> index, instead of user-agent. Use browser debug and cookie request with a valid account, filter by DOC.") + logging.error(f"Insert value for [ips4_device_key, ips4_member_id, ips4_login_key] in config.json file SITE \ ddlstreamitaly \ cookie. Use browser debug and cookie request with a valid account, filter by DOC.") sys.exit(0) # Parse HTML content of the page diff --git a/Src/Api/ddlstreamitaly/Core/Player/ddl.py b/Src/Api/ddlstreamitaly/Core/Player/ddl.py index 2969f15..dcb0e96 100644 --- a/Src/Api/ddlstreamitaly/Core/Player/ddl.py +++ b/Src/Api/ddlstreamitaly/Core/Player/ddl.py @@ -11,7 +11,6 @@ from bs4 import BeautifulSoup # Internal utilities from Src.Util.headers import get_headers -from Src.Util._jsonConfig import config_manager # Variable @@ -19,7 +18,6 @@ from ...costant import COOKIE class VideoSource: - def __init__(self) -> None: """ Initializes the VideoSource object with default values. @@ -54,10 +52,10 @@ class VideoSource: response = httpx.get(url, headers=self.headers, cookies=self.cookie) response.raise_for_status() return response.text - except httpx.HTTPStatusError as http_err: - logging.error(f"HTTP error occurred: {http_err}") + except Exception as err: logging.error(f"An error occurred: {err}") + return None def get_playlist(self): diff --git a/Src/Api/ddlstreamitaly/costant.py b/Src/Api/ddlstreamitaly/costant.py index 2ff35c8..923fb34 100644 --- a/Src/Api/ddlstreamitaly/costant.py +++ b/Src/Api/ddlstreamitaly/costant.py @@ -9,7 +9,7 @@ from Src.Util._jsonConfig import config_manager SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] COOKIE = config_manager.get_dict('SITE', SITE_NAME)['cookie'] MOVIE_FOLDER = "Movie" diff --git a/Src/Api/ddlstreamitaly/series.py b/Src/Api/ddlstreamitaly/series.py index ddd86e2..2fcae22 100644 --- a/Src/Api/ddlstreamitaly/series.py +++ b/Src/Api/ddlstreamitaly/series.py @@ -63,7 +63,6 @@ def donwload_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) # Parse start page url start_message() parsed_url = urlparse(obj_episode.get('url')) - path_parts = parsed_url.path.split('/') MP4_downloader( url = master_playlist, diff --git a/Src/Api/ddlstreamitaly/site.py b/Src/Api/ddlstreamitaly/site.py index 6331c15..b06d401 100644 --- a/Src/Api/ddlstreamitaly/site.py +++ b/Src/Api/ddlstreamitaly/site.py @@ -7,11 +7,11 @@ import logging # External libraries import httpx from bs4 import BeautifulSoup +from unidecode import unidecode # Internal utilities from Src.Util.headers import get_headers -from Src.Util._jsonConfig import config_manager from Src.Util.table import TVShowManager from ..Template import search_domain, get_select_title @@ -27,9 +27,15 @@ table_show_manager = TVShowManager() -def title_search(word_to_search) -> int: +def title_search(word_to_search: str) -> int: """ Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. """ try: @@ -37,7 +43,7 @@ def title_search(word_to_search) -> int: domain_to_use, _ = search_domain(SITE_NAME, ' None: """ Initializes the VideoSource object with default values. diff --git a/Src/Api/guardaserie/costant.py b/Src/Api/guardaserie/costant.py index 4072af4..a00f5e0 100644 --- a/Src/Api/guardaserie/costant.py +++ b/Src/Api/guardaserie/costant.py @@ -9,6 +9,7 @@ from Src.Util._jsonConfig import config_manager SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) ROOT_PATH = config_manager.get('DEFAULT', 'root_path') -DOMAIN_NOW = config_manager.get('SITE', SITE_NAME) +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] SERIES_FOLDER = "Serie" +MOVIE_FOLDER = "Film" diff --git a/Src/Api/guardaserie/series.py b/Src/Api/guardaserie/series.py index 0224070..848ea3b 100644 --- a/Src/Api/guardaserie/series.py +++ b/Src/Api/guardaserie/series.py @@ -7,8 +7,9 @@ import logging # Internal utilities from Src.Util.console import console, msg -from Src.Util.table import TVShowManager +from Src.Util.os import create_folder, can_create_file from Src.Util.message import start_message +from Src.Util.table import TVShowManager from Src.Lib.Downloader import HLS_Downloader from ..Template import manage_selection, map_episode_title @@ -47,6 +48,14 @@ def donwload_video(scape_info_serie: GetSerieInfo, index_season_selected: int, i mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4" mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}") + # Ensure the folder path exists + create_folder(mp4_path) + + # Check if the MP4 file can be created + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + # Setup video source video_source.setup(obj_episode.get('url')) diff --git a/Src/Api/guardaserie/site.py b/Src/Api/guardaserie/site.py index 82a160d..4905087 100644 --- a/Src/Api/guardaserie/site.py +++ b/Src/Api/guardaserie/site.py @@ -6,6 +6,7 @@ import logging # External libraries import httpx from bs4 import BeautifulSoup +from unidecode import unidecode # Internal utilities @@ -24,16 +25,22 @@ media_search_manager = MediaManager() table_show_manager = TVShowManager() -def title_search(word_to_search) -> int: +def title_search(word_to_search: str) -> int: """ Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. """ # Find new domain if prev dont work domain_to_use, _ = search_domain(SITE_NAME, ' Title: + def get_season_by_index(self, index: int) -> Season: """ - Get a title by its index. + Get a season by its index. Args: - index (int): Index of the title to retrieve. + index (int): Index of the season to retrieve. Returns: - Title: The title object. + Season: The season object. """ - return self.titles[index] + return self.seasons[index] def get_length(self) -> int: """ - Get the number of titles in the manager. + Get the number of seasons in the manager. Returns: - int: Number of titles. + int: Number of seasons. """ - return len(self.titles) + return len(self.seasons) def clear(self) -> None: """ - This method clears the titles list. + This method clears the seasons list. Args: self: The object instance. """ - self.titles.clear() + self.seasons.clear() def __str__(self): - return f"TitleManager(num_titles={len(self.titles)})" + return f"SeasonManager(num_seasons={len(self.seasons)})" diff --git a/Src/Api/streamingcommunity/Core/Player/vixcloud.py b/Src/Api/streamingcommunity/Core/Player/vixcloud.py index 929e0df..0fc8b6c 100644 --- a/Src/Api/streamingcommunity/Core/Player/vixcloud.py +++ b/Src/Api/streamingcommunity/Core/Player/vixcloud.py @@ -16,7 +16,7 @@ from Src.Util.console import console, Panel # Logic class -from ..Class.SeriesType import TitleManager +from ..Class.SeriesType import SeasonManager from ..Class.EpisodeType import EpisodeManager from ..Class.WindowType import WindowVideo, WindowParameter, DynamicJSONConverter @@ -30,9 +30,7 @@ class VideoSource: """ Initialize a VideoSource object. """ - self.headers = { - 'user-agent': get_headers() - } + self.headers = {'user-agent': get_headers()} self.is_series = False self.base_name = SITE_NAME @@ -53,7 +51,7 @@ class VideoSource: if series_name is not None: self.is_series = True self.series_name = series_name - self.obj_title_manager: TitleManager = TitleManager() + self.obj_season_manager: SeasonManager = SeasonManager() self.obj_episode_manager: EpisodeManager = EpisodeManager() def collect_info_seasons(self) -> None: @@ -69,7 +67,7 @@ class VideoSource: try: - response = httpx.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers) + response = httpx.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers, timeout=15) response.raise_for_status() # Extract JSON response if available @@ -77,7 +75,7 @@ class VideoSource: # Iterate over JSON data and add titles to the manager for dict_season in json_response: - self.obj_title_manager.add_title(dict_season) + self.obj_season_manager.add_season(dict_season) except Exception as e: logging.error(f"Error collecting season info: {e}") @@ -93,7 +91,7 @@ class VideoSource: try: # Make a request to collect information about a specific season - response = httpx.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers) + response = httpx.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers, timeout=15) response.raise_for_status() # Extract JSON response if available @@ -125,7 +123,7 @@ class VideoSource: try: # Make a request to get iframe source - response = httpx.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params) + response = httpx.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params, timeout=15) response.raise_for_status() # Parse response with BeautifulSoup to get iframe source @@ -167,7 +165,7 @@ class VideoSource: # Make a request to get content try: - response = httpx.get(self.iframe_src, headers=self.headers) + response = httpx.get(self.iframe_src, headers=self.headers, timeout=15) response.raise_for_status() except Exception as e: diff --git a/Src/Api/streamingcommunity/film.py b/Src/Api/streamingcommunity/film.py index 6b5431c..c984a28 100644 --- a/Src/Api/streamingcommunity/film.py +++ b/Src/Api/streamingcommunity/film.py @@ -1,13 +1,15 @@ # 3.12.23 import os +import sys import logging # Internal utilities from Src.Util.console import console -from Src.Lib.Downloader import HLS_Downloader from Src.Util.message import start_message +from Src.Util.os import create_folder, can_create_file, remove_special_characters +from Src.Lib.Downloader import HLS_Downloader # Logic class @@ -49,6 +51,14 @@ def download_film(id_film: str, title_name: str, domain: str): mp4_format = (mp4_name) + ".mp4" mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + # Ensure the folder path exists + create_folder(mp4_path) + + # Check if the MP4 file can be created + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + # Download the film using the m3u8 playlist, and output filename HLS_Downloader( m3u8_playlist = master_playlist, diff --git a/Src/Api/streamingcommunity/series.py b/Src/Api/streamingcommunity/series.py index 7f63cba..e723747 100644 --- a/Src/Api/streamingcommunity/series.py +++ b/Src/Api/streamingcommunity/series.py @@ -69,7 +69,7 @@ def donwload_episode(tv_name: str, index_season_selected: int, donwload_all: boo # Clean memory of all episodes and get the number of the season (some dont follow rule of [1,2,3,4,5] but [1,2,3,145,5,6,7]). video_source.obj_episode_manager.clear() - season_number = (video_source.obj_title_manager.titles[index_season_selected-1].number) + season_number = (video_source.obj_season_manager.seasons[index_season_selected-1].number) # Start message and collect information about episodes start_message() @@ -124,7 +124,7 @@ def download_series(tv_id: str, tv_name: str, version: str, domain: str) -> None # Collect information about seasons video_source.collect_info_seasons() - seasons_count = video_source.obj_title_manager.get_length() + seasons_count = video_source.obj_season_manager.get_length() # Prompt user for season selection and download episodes console.print(f"\n[green]Season find: [red]{seasons_count}") diff --git a/Src/Api/streamingcommunity/site.py b/Src/Api/streamingcommunity/site.py index 4becd3c..8510810 100644 --- a/Src/Api/streamingcommunity/site.py +++ b/Src/Api/streamingcommunity/site.py @@ -78,8 +78,8 @@ def get_version_and_domain(): Retrieve the current version and domain of the site. This function performs the following steps: - 1. Determines the correct domain to use for the site by searching for a specific meta tag. - 2. Fetches the content of the site to extract the version information. + - Determines the correct domain to use for the site by searching for a specific meta tag. + - Fetches the content of the site to extract the version information. """ # Find new domain if prev dont work diff --git a/Src/Api/uhdmovies/Core/Class/EpisodeType.py b/Src/Api/uhdmovies/Core/Class/EpisodeType.py new file mode 100644 index 0000000..4474d9e --- /dev/null +++ b/Src/Api/uhdmovies/Core/Class/EpisodeType.py @@ -0,0 +1,60 @@ +# 03.03.24 + +from typing import Dict, Any, List + + +class Episode: + def __init__(self, data: Dict[str, Any]): + self.title: int = data.get('title', '') + self.url: str = data.get('link', '') + + def __str__(self): + return f"Episode(title='{self.title}')" + + +class EpisodeManager: + def __init__(self): + self.episodes: List[Episode] = [] + + def add_episode(self, episode_data: Dict[str, Any]): + """ + Add a new episode to the manager. + + Args: + - episode_data (Dict[str, Any]): A dictionary containing data for the new episode. + """ + episode = Episode(episode_data) + self.episodes.append(episode) + + def get_episode_by_index(self, index: int) -> Episode: + """ + Get an episode by its index. + + Args: + - index (int): Index of the episode to retrieve. + + Returns: + Episode: The episode object. + """ + return self.episodes[index] + + def get_length(self) -> int: + """ + Get the number of episodes in the manager. + + Returns: + int: Number of episodes. + """ + return len(self.episodes) + + def clear(self) -> None: + """ + This method clears the episodes list. + + Args: + - self: The object instance. + """ + self.episodes.clear() + + def __str__(self): + return f"EpisodeManager(num_episodes={len(self.episodes)})" diff --git a/Src/Api/uhdmovies/Core/Class/SearchType.py b/Src/Api/uhdmovies/Core/Class/SearchType.py new file mode 100644 index 0000000..6db7592 --- /dev/null +++ b/Src/Api/uhdmovies/Core/Class/SearchType.py @@ -0,0 +1,59 @@ +# 26.05.24 + +from typing import List + + +class MediaItem: + def __init__(self, data: dict): + self.name: str = data.get('name') + self.url: int = data.get('url') + + def __str__(self): + return f"MediaItem(name='{self.name}', url={self.url})" + + +class MediaManager: + def __init__(self): + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/uhdmovies/Core/Class/SeriesType.py b/Src/Api/uhdmovies/Core/Class/SeriesType.py new file mode 100644 index 0000000..5e4077f --- /dev/null +++ b/Src/Api/uhdmovies/Core/Class/SeriesType.py @@ -0,0 +1,59 @@ +# 03.03.24 + +from typing import List, Dict, Union + + +class Season: + def __init__(self, season_data: Dict[str, Union[int, str, None]]): + self.name: str = season_data.get('name') + + def __str__(self): + return f"Season(name='{self.name}')" + + +class SeasonManager: + def __init__(self): + self.seasons: List[Season] = [] + + def add_season(self, season_data: Dict[str, Union[int, str, None]]): + """ + Add a new season to the manager. + + Args: + season_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new season. + """ + season = Season(season_data) + self.seasons.append(season) + + def get_season_by_index(self, index: int) -> Season: + """ + Get a season by its index. + + Args: + index (int): Index of the season to retrieve. + + Returns: + Season: The season object. + """ + return self.seasons[index] + + def get_length(self) -> int: + """ + Get the number of seasons in the manager. + + Returns: + int: Number of seasons. + """ + return len(self.seasons) + + def clear(self) -> None: + """ + This method clears the seasons list. + + Args: + self: The object instance. + """ + self.seasons.clear() + + def __str__(self): + return f"SeasonManager(num_seasons={len(self.seasons)})" diff --git a/Src/Api/uhdmovies/Core/Player/driveleech.py b/Src/Api/uhdmovies/Core/Player/driveleech.py new file mode 100644 index 0000000..95dd818 --- /dev/null +++ b/Src/Api/uhdmovies/Core/Player/driveleech.py @@ -0,0 +1,263 @@ +# 30.06.24 + +import time +import logging + + +# External library +from bs4 import BeautifulSoup +from seleniumbase import Driver + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +# Config +USE_HEADLESS = config_manager.get_bool("BROWSER", "headless") + + +class DownloadAutomation: + def __init__(self, download_link): + self.download_link = download_link + self.driver = Driver(uc=True, uc_cdp_events=True, headless=USE_HEADLESS) + self.mp4_link = None + + def run(self): + """ + Executes the entire automation process. + """ + try: + self.driver.get(self.download_link) + self._inject_css() + self._observe_title_change() + self._bypass_page_1() + self._bypass_page_2_verify_button() + self._bypass_page_2_two_steps_btn() + self._wait_for_document_complete() + self._wait_for_bypass_completion() + self._extract_download_link() + + except Exception as e: + logging.error(f"Error occurred during automation: {str(e)}") + + finally: + self.quit() + + def _inject_css(self): + """ + Injects CSS to make all elements on the page invisible. + """ + try: + css_script = """ + const voidCSS = `* {opacity: 0;z-index: -999999;}`; + function addStyle(css) { + let head = document.querySelector('head'), + style = document.createElement('style'); + style.innerHTML = css; + head.appendChild(style); + } + """ + + self.driver.execute_script(css_script) + logging.info("CSS injected.") + time.sleep(0.4) + + except Exception as e: + logging.error(f"Error injecting CSS: {str(e)}") + + def _observe_title_change(self): + """ + Observes changes in the document title and applies CSS injection. + """ + try: + observer_script = """ + let headObserver = new MutationObserver(function() { + if (document.title) { + addStyle(voidCSS.replace(';', ' !important;')); + headObserver.disconnect(); + } + }); + headObserver.observe(document.documentElement, {childList: true, subtree: true}); + """ + + self.driver.execute_script(observer_script) + logging.info("Title observer set.") + time.sleep(0.4) + + except Exception as e: + logging.error(f"Error setting title observer: {str(e)}") + + def _bypass_page_1(self): + """ + Executes action to bypass Page 1. + """ + try: + action1_script = """ + function action1() { + try { + document.querySelector('#landing').submit(); + document.title = "Bypass Action (1/3)"; + } catch {} + } + action1(); + """ + + self.driver.execute_script(action1_script) + logging.info("Page 1 bypassed.") + time.sleep(0.4) + + except Exception as e: + logging.error(f"Error bypassing Page 1: {str(e)}") + + def _bypass_page_2_verify_button(self): + """ + Executes action to bypass Page 2 by clicking on verify_button. + """ + try: + action2_script = """ + function action2() { + try { + document.querySelector('#verify_button').click(); + document.title = "Bypass Action (2/3)"; + } catch {} + } + action2(); + """ + + self.driver.execute_script(action2_script) + logging.info("Page 2 bypassed.") + time.sleep(0.4) + + except Exception as e: + logging.error(f"Error bypassing Page 2: {str(e)}") + + def _bypass_page_2_two_steps_btn(self): + """ + Executes action to bypass Page 2 by waiting for and clicking two_steps_btn. + """ + try: + action3_script = """ + function action3() { + try { + let observer = new MutationObserver(function() { + if (document.querySelector('#two_steps_btn').href !== "") { + observer.disconnect(); + document.title = "Bypass Action (3/3)"; + window.location = document.querySelector('#two_steps_btn').href; + } + }); + observer.observe(document.querySelector('#two_steps_btn'), {attributes: true}); + } catch {} + } + action3(); + """ + + self.driver.execute_script(action3_script) + logging.info("Page 2 bypassed with observation and redirect.") + time.sleep(0.4) + + except Exception as e: + logging.error(f"Error bypassing Page 2 with observation: {str(e)}") + + def _wait_for_document_complete(self): + """ + Waits for the document to be completely loaded to execute actions. + """ + try: + onreadystatechange_script = """ + document.onreadystatechange = function () { + if (document.readyState === 'complete') { + action1(); + action2(); + action3(); + } + } + """ + + self.driver.execute_script(onreadystatechange_script) + logging.info("onreadystatechange set.") + time.sleep(0.4) + + except Exception as e: + logging.error(f"Error setting onreadystatechange: {str(e)}") + + def _wait_for_bypass_completion(self): + """ + Waits for the bypass process to complete. + """ + try: + while True: + if ".mkv" in self.driver.title or ".mp4" in self.driver.title: + logging.info("Bypass completed.") + break + + time.sleep(0.5) + + except Exception as e: + logging.error(f"Error waiting for bypass completion: {str(e)}") + + def _extract_download_link(self): + """ + Extracts the final download link after bypassing and loads the download page. + """ + try: + final_html = self.driver.page_source + soup = BeautifulSoup(final_html, 'html.parser') + video_link = soup.find("a", class_="btn").get('href') + + logging.info("Loading download page.") + self.driver.get(video_link) + logging.info(f"Download page link: {video_link}") + + except Exception as e: + logging.error(f"Error extracting download link: {str(e)}") + + def capture_url(self, req): + """ + Function to capture url in background + """ + try: + url = req['params']['documentURL'] + + # Filter for mp4 video download + if "googleusercontent" in str(url): + self.mp4_link = url + + except: + pass + + def quit(self): + """ + Quits the WebDriver instance. + """ + try: + logging.info("Removing ad headers.") + css_script = """ + const voidCSS = ``; + function addStyle(css) { + let head = document.querySelector('head'), + style = document.createElement('style'); + style.innerHTML = css; + head.appendChild(style); + } + """ + + self.driver.execute_script(css_script) + self.driver.add_cdp_listener("*", lambda data: self.capture_url(data)) + time.sleep(0.3) + + logging.info("Clicking button.") + self.driver.execute_script("document.getElementById('ins').click();") + + while True: + time.sleep(0.3) + if self.mp4_link is not None: + break + + logging.info(f"MP4 Link: {self.mp4_link}") + logging.info("Quitting...") + self.driver.quit() + + except Exception as e: + logging.error(f"Error during quitting: {str(e)}") diff --git a/Src/Api/uhdmovies/Core/Player/episode_scraper.py b/Src/Api/uhdmovies/Core/Player/episode_scraper.py new file mode 100644 index 0000000..fffaad5 --- /dev/null +++ b/Src/Api/uhdmovies/Core/Player/episode_scraper.py @@ -0,0 +1,218 @@ +# 29.06.24 + +import re +import sys +import json +import httpx +import logging +import urllib.parse + + +from bs4 import BeautifulSoup + + +# Logic class +from ..Class.EpisodeType import EpisodeManager +from ..Class.SeriesType import SeasonManager + + +class EpisodeScraper: + def __init__(self, url): + """ + The constructor for the EpisodeScraper class. + + Parameters: + - url (str): The URL of the webpage to scrape. + """ + self.url = url + self.soup = self._get_soup() + self.info_site = self._extract_info() + self.stagioni = self._organize_by_season() + + def _get_soup(self): + """ + Retrieves and parses the webpage content using BeautifulSoup. + + Returns: + BeautifulSoup: The parsed HTML content of the webpage. + """ + try: + response = httpx.get(self.url) + response.raise_for_status() + + return BeautifulSoup(response.text, 'html.parser') + + except Exception as e: + print(f"Error fetching the URL: {e}") + raise + + def _extract_info(self): + """ + Extracts the episode information from the parsed HTML. + + Returns: + list: A list of dictionaries containing episode information. + """ + rows = self.soup.find_all("p", style="text-align: center;") + info_site = [] + + # Loop through each

tag and extract episode information + for i, row in enumerate(rows, start=1): + episodes = [] + + # Find all tags with the specified class and extract title and link + for episode in row.find_all("a", class_="maxbutton-2"): + + episodes.append({ + 'title': episode.text, + 'link': episode.get('href') + }) + + # If there are episodes, add them to the info_site list + if len(episodes) > 0: + if i == 2: + title_name = rows[i-1].get_text().split("\n")[3] + + if "Epis" not in str(title_name): + info_site.append({ + 'name': title_name, + 'episode': episodes, + }) + + else: + title_name = rows[i-2].get_text() + + if "Epis" not in str(title_name): + info_site.append({ + 'name': title_name, + 'episode': episodes, + }) + + # For only episode + if len(info_site) == 0: + for i, row in enumerate(rows, start=1): + + for episode in row.find_all("a", class_="maxbutton-1"): + + info_site.append({ + 'name': rows[i-1].get_text().split("\n")[1], + 'url': episode.get("href"), + }) + + # Get obnly fist quality + break + break + + return info_site + + def _organize_by_season(self): + """ + Organizes the extracted information into seasons. + + Returns: + dict: A dictionary organizing episodes by season. + """ + stagioni = {} + + # Loop through each episode dictionary and organize by season + for dizionario in self.info_site: + nome = dizionario["name"] + + # Use regex to search for season numbers (S01, S02, etc.) + match = re.search(r'S\d+', nome) + if match: + stagione = match.group(0) + if stagione not in stagioni: + stagioni[stagione] = [] + + stagioni[stagione].append(dizionario) + + self.is_serie = len(list(stagioni.keys())) > 0 + return stagioni + + def get_available_seasons(self): + """ + Returns a list of available seasons. + + Returns: + list: A list of available seasons. + """ + return list(self.stagioni.keys()) + + def get_episodes_by_season(self, season): + """ + Returns a list of episodes for a given season. + + Parameters: + - season (str): The season identifier (e.g., 'S01'). + + Returns: + - list: A list of episodes for the specified season. + """ + episodes = self.stagioni[season][0]['episode'] + + def find_group_size(episodes): + seen_titles = {} + + for index, episode in enumerate(episodes): + title = episode["title"] + + if title in seen_titles: + return index - seen_titles[title] + + seen_titles[title] = index + + return len(episodes) + + # Find group size + group_size = find_group_size(episodes) + + grouped_episodes = [] + start_index = 0 + + while start_index < len(episodes): + group = episodes[start_index:start_index + group_size] + grouped_episodes.append(group) + start_index += group_size + + return grouped_episodes[0] + + def get_film(self): + """ + Retrieves the first element from the info_site list. + """ + return self.info_site[0] + + +class ApiManager: + def __init__(self, url): + """ + The constructor for the EpisodeScraper class. + + Parameters: + - url (str): The URL of the webpage to scrape. + """ + self.url = url + self.episode_scraper = EpisodeScraper(url) + self.is_serie = self.episode_scraper.is_serie + + self.obj_season_manager: SeasonManager = SeasonManager() + self.obj_episode_manager: EpisodeManager = EpisodeManager() + + def collect_season(self): + + available_seasons = self.episode_scraper.get_available_seasons() + + for dict_season in available_seasons: + self.obj_season_manager.add_season({'name': dict_season}) + + def collect_episode(self, season_name): + + dict_episodes = self.episode_scraper.get_episodes_by_season(season_name) + + for dict_episode in dict_episodes: + self.obj_episode_manager.add_episode(dict_episode) + + def get_film_playlist(self): + + return self.episode_scraper.get_film() diff --git a/Src/Api/uhdmovies/__init__.py b/Src/Api/uhdmovies/__init__.py new file mode 100644 index 0000000..e7417a9 --- /dev/null +++ b/Src/Api/uhdmovies/__init__.py @@ -0,0 +1,40 @@ +# 09.06.24 + +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, run_get_select_title +from .serie import donwload_serie + + + +# Variable +indice = 6 +_deprecate = False + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = run_get_select_title() + + donwload_serie(select_title) + + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/uhdmovies/costant.py b/Src/Api/uhdmovies/costant.py new file mode 100644 index 0000000..7243c3d --- /dev/null +++ b/Src/Api/uhdmovies/costant.py @@ -0,0 +1,15 @@ +# 09.06.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] + +SERIES_FOLDER = "Serie" +MOVIE_FOLDER = "Film" \ No newline at end of file diff --git a/Src/Api/uhdmovies/serie.py b/Src/Api/uhdmovies/serie.py new file mode 100644 index 0000000..719e08b --- /dev/null +++ b/Src/Api/uhdmovies/serie.py @@ -0,0 +1,189 @@ +# 29.06.24 + +import os +import sys +import logging +from urllib.parse import urlparse + + +# Internal utilities +from Src.Util.console import console, msg +from Src.Util.message import start_message +from Src.Util.os import create_folder, can_create_file +from Src.Util.table import TVShowManager +from Src.Lib.Downloader import MP4_downloader +from ..Template import manage_selection, map_episode_title + + +# Logic class +from .Core.Player.episode_scraper import ApiManager +from .Core.Player.driveleech import DownloadAutomation +from .Core.Class.SearchType import MediaItem + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER +table_show_manager = TVShowManager() + + +def donwload_video(api_manager: ApiManager, index_season_selected: int, index_episode_selected: int) -> None: + """ + Download a single episode video. + + Args: + - tv_name (str): Name of the TV series. + - index_season_selected (int): Index of the selected season. + - index_episode_selected (int): Index of the selected episode. + """ + + start_message() + + # Get info about episode + obj_episode = api_manager.obj_episode_manager.episodes[index_episode_selected - 1] + tv_name = api_manager.obj_season_manager.seasons[index_season_selected - 1].name + console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.title}") + print() + + # Define filename and path for the downloaded video + mp4_name = f"{map_episode_title(tv_name, index_season_selected, index_episode_selected, obj_episode.title)}.mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, tv_name, f"S{index_season_selected}") + + # Check if can create file output + create_folder(mp4_path) + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + + # Parse start page url + start_message() + downloder_vario = DownloadAutomation(obj_episode.url) + downloder_vario.run() + downloder_vario.quit() + + # Parse mp4 link + mp4_final_url = downloder_vario.mp4_link + parsed_url = urlparse(mp4_final_url) + + MP4_downloader( + url = mp4_final_url, + path = os.path.join(mp4_path, mp4_name), + referer = f"{parsed_url.scheme}://{parsed_url.netloc}/", + ) + + +def donwload_episode(api_manager: ApiManager, index_season_selected: int, donwload_all: bool = False) -> None: + """ + Download all episodes of a season. + + Args: + - tv_name (str): Name of the TV series. + - index_season_selected (int): Index of the selected season. + - donwload_all (bool): Donwload all seasons episodes + """ + + # Clean memory of all episodes and get the number of the season (some dont follow rule of [1,2,3,4,5] but [1,2,3,145,5,6,7]). + api_manager.obj_episode_manager.clear() + season_name = api_manager.obj_season_manager.seasons[index_season_selected-1].name + + # Collect all best episode + api_manager.collect_episode(season_name) + episodes_count = api_manager.obj_episode_manager.get_length() + + # Start message + start_message() + + # Download all episodes wihtout ask + if donwload_all: + for i_episode in range(1, episodes_count+1): + donwload_video(api_manager, index_season_selected, i_episode) + + console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.") + + # If not download all episode but a single season + if not donwload_all: + + # Display episodes list and manage user selection + last_command = display_episodes_list(api_manager) + list_episode_select = manage_selection(last_command, episodes_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + donwload_video(api_manager, index_season_selected, list_episode_select[0]) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + donwload_video(api_manager, index_season_selected, i_episode) + + +def donwload_serie(media: MediaItem): + """ + Downloads a media title using its API manager and WebAutomation driver. + + Args: + media (MediaItem): The media item to be downloaded. + """ + + start_message() + + # Initialize the API manager with the media and driver + api_manager = ApiManager(media.url) + + # Collect information about seasons + api_manager.collect_season() + seasons_count = api_manager.obj_season_manager.get_length() + + # Prompt user for season selection and download episodes + console.print(f"\n[green]Season find: [red]{seasons_count}") + index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")) + list_season_select = manage_selection(index_season_selected, seasons_count) + + # Download selected episodes + if len(list_season_select) == 1 and index_season_selected != "*": + if 1 <= int(index_season_selected) <= seasons_count: + donwload_episode(api_manager, list_season_select[0]) + + # Dowload all seasons and episodes + elif index_season_selected == "*": + for i_season in list_season_select: + donwload_episode(api_manager, i_season, True) + + # Download all other season selecter + else: + for i_season in list_season_select: + donwload_episode(api_manager, i_season) + + +def display_episodes_list(api_manager: ApiManager) -> str: + """ + Display episodes list and handle user input. + + Returns: + last_command (str): Last command entered by the user. + """ + + # Set up table for displaying episodes + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + } + table_show_manager.add_column(column_info) + + # Populate the table with episodes information + for i, media in enumerate(api_manager.obj_episode_manager.episodes): + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.title + }) + + # Run the table and handle user input + last_command = table_show_manager.run() + + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + return last_command diff --git a/Src/Api/uhdmovies/site.py b/Src/Api/uhdmovies/site.py new file mode 100644 index 0000000..70ce011 --- /dev/null +++ b/Src/Api/uhdmovies/site.py @@ -0,0 +1,70 @@ +# 09.06.24 + +import logging + + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util.table import TVShowManager +from ..Template import search_domain, get_select_title + + +# Logic class +from .Core.Class.SearchType import MediaManager + + +# Variable +from .costant import SITE_NAME +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(word_to_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. + """ + + # Create a web automation driver instance + domain_to_use, _ = search_domain(SITE_NAME, ' None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + def clear(self) -> None: + """ + This method clears the medias list. + + Args: + self: The object instance. + """ + self.media_list.clear() + + def __str__(self): + return f"MediaManager(num_media={len(self.media_list)})" diff --git a/Src/Api/watch_lonelil/Core/Player/lonelil.py b/Src/Api/watch_lonelil/Core/Player/lonelil.py new file mode 100644 index 0000000..f2a2e29 --- /dev/null +++ b/Src/Api/watch_lonelil/Core/Player/lonelil.py @@ -0,0 +1,78 @@ + +# 29.06.24 +import sys +import json +import urllib.parse + + +# Logic class +from ..Class.SearchType import MediaItem +from Src.Lib.Driver import WebAutomation + +# Variable +from ...costant import SITE_NAME, DOMAIN_NOW +full_site_name=f"{SITE_NAME}.{DOMAIN_NOW}" + + +class ApiManager: + """ + A class to manage API interactions for media items. + """ + def __init__(self, media: MediaItem, main_driver: WebAutomation) -> None: + """ + Initializes the ApiManager with a media item and a web automation driver. + + Args: + - media (MediaItem): The media item to be processed. + - main_driver (WebAutomation): The driver to perform web automation tasks. + """ + self.media = media + self.id = self.media.url.split("/")[-1] + self.main_driver = main_driver + + def get_playlist(self) -> str: + """ + Retrieves the URL of the best quality stream available for the media item. + + Returns: + - str: The URL of the best quality stream. + """ + + # Prepare the JSON payload + json_payload = { + "0": { + "json": { + "type": self.media.type, + "id": self.id, + "provider": "showbox-internal" + } + } + } + + # Convert the payload to a JSON string and properly escape it + json_string = json.dumps (json_payload) + encoded_json_string = urllib.parse.quote(json_string) + + # Format the URL with the encoded JSON string + api_url = f"https://{full_site_name}/api/trpc/provider.run?batch=1&input={encoded_json_string}" + + # Load the API URL in the web driver + self.main_driver.get_page(str(api_url)) + + # Retrieve and parse the page content + soup = self.main_driver.retrieve_soup() + content = soup.find("pre").text + data = json.loads(content)[0]['result']['data']['json']['stream'][0]['qualities'] + + # Return the URL based on the available quality + if len(data.keys()) == 1: + return data.get('360')['url'] + + if len(data.keys()) == 2: + return data.get("720")['url'] + + if len(data.keys()) == 3: + return data.get('1080')['url'] + + if len(data.keys()) == 4: + return data.get('4k')['url'] diff --git a/Src/Api/watch_lonelil/__init__.py b/Src/Api/watch_lonelil/__init__.py new file mode 100644 index 0000000..f209cd7 --- /dev/null +++ b/Src/Api/watch_lonelil/__init__.py @@ -0,0 +1,44 @@ +# 09.06.24 + +import sys +import logging + + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, run_get_select_title +from .film import download_film + + + +# Variable +indice = 5 +_deprecate = False + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database, main_driver = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = run_get_select_title() + + if select_title.type == "movie": + download_film(select_title, main_driver) + + else: + logging.error(f"Not supported: {select_title.type}") + sys.exit(0) + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") diff --git a/Src/Api/watch_lonelil/costant.py b/Src/Api/watch_lonelil/costant.py new file mode 100644 index 0000000..da1f897 --- /dev/null +++ b/Src/Api/watch_lonelil/costant.py @@ -0,0 +1,20 @@ +# 09.06.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] + +SERIES_FOLDER = "Serie" +MOVIE_FOLDER = "Film" + + +# Fix site name for . +# URL => https://watch.lonelil.ru +SITE_NAME = SITE_NAME.replace("_", ".") diff --git a/Src/Api/watch_lonelil/film.py b/Src/Api/watch_lonelil/film.py new file mode 100644 index 0000000..2ab6fda --- /dev/null +++ b/Src/Api/watch_lonelil/film.py @@ -0,0 +1,65 @@ +# 29.06.24 + +import os +import sys +import logging +from urllib.parse import urlparse + + +# Internal utilities +from Src.Util.message import start_message +from Src.Util.os import create_folder, can_create_file +from Src.Lib.Downloader import MP4_downloader + + +# Logic class +from .Core.Player.lonelil import ApiManager +from .Core.Class.SearchType import MediaItem +from Src.Lib.Driver import WebAutomation + + +# Variable +from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER + + +def download_film(media: MediaItem, main_driver: WebAutomation): + """ + Downloads a media title using its API manager and WebAutomation driver. + + Args: + media (MediaItem): The media item to be downloaded. + main_driver (WebAutomation): The web automation driver instance. + """ + + start_message() + + # Initialize the API manager with the media and driver + api_manager = ApiManager(media, main_driver) + + # Get the URL of the media playlist + url_playlist = api_manager.get_playlist() + + # Construct the MP4 file name and path + mp4_name = str(media.name).replace("-", "_") + ".mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, media.name) + + # Ensure the folder path exists + create_folder(mp4_path) + + # Check if the MP4 file can be created + if not can_create_file(mp4_name): + logging.error("Invalid mp4 name.") + sys.exit(0) + + # Parse the URL of the playlist + parsed_url = urlparse(url_playlist) + + # Quit the main driver instance + main_driver.quit() + + # Initiate the MP4 downloader with necessary parameters + MP4_downloader( + url=url_playlist, + path=os.path.join(mp4_path, mp4_name), + referer=f"{parsed_url.scheme}://{parsed_url.netloc}/", + ) diff --git a/Src/Api/watch_lonelil/site.py b/Src/Api/watch_lonelil/site.py new file mode 100644 index 0000000..98f258c --- /dev/null +++ b/Src/Api/watch_lonelil/site.py @@ -0,0 +1,71 @@ +# 09.06.24 + +import logging + + +# External libraries +from unidecode import unidecode + + +# Internal utilities +from Src.Util.table import TVShowManager +from Src.Lib.Driver import WebAutomation +from ..Template import search_domain, get_select_title + + +# Logic class +from .Core.Class.SearchType import MediaManager + + +# Variable +from .costant import SITE_NAME, DOMAIN_NOW +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + + +def title_search(word_to_search: str) -> int: + """ + Search for titles based on a search query. + + Args: + - title_search (str): The title to search for. + + Returns: + int: The number of titles found. + """ + + # Create a web automation driver instance + main_driver = WebAutomation() + + # Construct the full site URL and load the search page + full_site_name = f"{SITE_NAME}.{DOMAIN_NOW}" + main_driver.get_page(f"https://{full_site_name}/search?q={unidecode(word_to_search)}") + + # Retrieve and parse the HTML content of the page + soup = main_driver.retrieve_soup() + content_table = soup.find_all("a") + + # Iterate through the search results to find relevant titles + for title in content_table: + if any(keyword in str(title).lower() for keyword in ["show/", "movie/", "anime/"]): + + # Construct a media object with the title's details + obj = { + 'url': f"https://{full_site_name}" + title.get("href"), + 'name': title.find("img").get("alt"), + 'type': title.find_all("p")[-1].get_text().split("ยท")[0].strip().lower() + } + + # Add the media object to the media search manager + media_search_manager.add_media(obj) + + # Return the number of titles found + return media_search_manager.get_length(), main_driver + + +def run_get_select_title(): + """ + Display a selection of titles and prompt the user to choose one. + """ + return get_select_title(table_show_manager, media_search_manager) \ No newline at end of file diff --git a/Src/Lib/Downloader/HLS/downloader.py b/Src/Lib/Downloader/HLS/downloader.py index 97b35d4..2d42edf 100644 --- a/Src/Lib/Downloader/HLS/downloader.py +++ b/Src/Lib/Downloader/HLS/downloader.py @@ -62,20 +62,23 @@ headers_index = config_manager.get_dict('REQUESTS', 'user-agent') class HLS_Downloader(): - def __init__(self, output_filename: str = None, m3u8_playlist:str = None, m3u8_index:str = None): - + def __init__(self, output_filename: str = None, m3u8_playlist: str = None, m3u8_index: str = None, is_playlist_url: bool = True, is_index_url: bool = True): """ - Initialize the Downloader object. + Initialize the HLS Downloader object. Args: - output_filename (str): Output filename for the downloaded content. - m3u8_playlist (str, optional): URL to the main M3U8 playlist. - - m3u8_playlist (str, optional): URL to the main M3U8 index. ( NOT TEXT ) + - m3u8_index (str, optional): URL to the main M3U8 index file. (NOT USED) + - is_playlist_url (bool): Flag indicating if `m3u8_playlist` is a URL (default True). + - is_index_url (bool): Flag indicating if `m3u8_index` is a URL (default True). """ + self.output_filename = output_filename self.m3u8_playlist = m3u8_playlist self.m3u8_index = m3u8_index - self.output_filename = output_filename + self.is_playlist_url = is_playlist_url + self.is_index_url = is_index_url self.expected_real_time = None # Auto generate out file name if not present @@ -136,6 +139,10 @@ class HLS_Downloader(): str: The text content of the response. """ + if "http" not in url or "https" not in url: + logging.error(f"Invalid url: {url}") + sys.exit(0) + # Send a GET request to the provided URL logging.info(f"Test url: {url}") response = httpx.get(url, headers=headers_index) @@ -248,7 +255,13 @@ class HLS_Downloader(): if not os.path.exists(self.downloaded_video[-1].get('path')): # Create an instance of M3U8_Segments to handle video segments - video_m3u8 = M3U8_Segments(self.m3u8_index, full_path_video) + if self.is_index_url: + logging.info("Parse index by url.") + video_m3u8 = M3U8_Segments(self.m3u8_index, full_path_video, True) + + else: + logging.info("Parse index by text input.") + video_m3u8 = M3U8_Segments(self.m3u8_index, full_path_video, False) # Get information about the video segments video_m3u8.get_info() @@ -528,19 +541,28 @@ class HLS_Downloader(): self.m3u8_index = False console.log("[red]Output file already exist.") + if self.m3u8_playlist: logging.info("Download from PLAYLIST") - m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist) - # Add full URL of the M3U8 playlist to fix next .ts without https if necessary - self.m3u8_url_fixer.set_playlist(self.m3u8_playlist) + # If playlist is a url and not a text of the playlist + if self.is_playlist_url: + logging.info("Parse playlist by url.") + m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist) - if m3u8_playlist_text is None: - console.log("[red]Playlist m3u8 to download is empty.") - sys.exit(0) + # Add full URL of the M3U8 playlist to fix next .ts without https if necessary + self.m3u8_url_fixer.set_playlist(self.m3u8_playlist) + + if m3u8_playlist_text is None: + console.log("[red]Playlist m3u8 to download is empty.") + sys.exit(0) + + else: + logging.info("Parse playlist by text input.") + m3u8_playlist_text = self.m3u8_playlist # Save text playlist - open(os.path.join(self.base_path, "tmp", "playlist.m3u8"), "w+").write(m3u8_playlist_text) + open(os.path.join(self.base_path, "tmp", "playlist.m3u8"), "w+", encoding="utf-8").write(m3u8_playlist_text) # Collect information about the playlist diff --git a/Src/Lib/Downloader/HLS/segments.py b/Src/Lib/Downloader/HLS/segments.py index 5a6bfeb..a157422 100644 --- a/Src/Lib/Downloader/HLS/segments.py +++ b/Src/Lib/Downloader/HLS/segments.py @@ -14,6 +14,7 @@ from concurrent.futures import ThreadPoolExecutor # External libraries import httpx +from httpx import HTTPTransport from tqdm import tqdm @@ -58,17 +59,20 @@ headers_index = config_manager.get_dict('REQUESTS', 'user-agent') class M3U8_Segments: - def __init__(self, url: str, tmp_folder: str): + def __init__(self, url: str, tmp_folder: str, is_index_url: bool = True): """ Initializes the M3U8_Segments object. Args: - url (str): The URL of the M3U8 playlist. - tmp_folder (str): The temporary folder to store downloaded segments. + - is_index_url (bool): Flag indicating if `m3u8_index` is a URL (default True). """ self.url = url self.tmp_folder = tmp_folder + self.is_index_url = is_index_url self.expected_real_time = None + self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts") os.makedirs(self.tmp_folder, exist_ok=True) @@ -124,8 +128,6 @@ class M3U8_Segments: m3u8_parser = M3U8_Parser() m3u8_parser.parse_data(uri=self.url, raw_content=m3u8_content) - #console.log(f"[red]Expected duration after download: {m3u8_parser.get_duration()}") - #console.log(f"[red]There is key: [yellow]{m3u8_parser.keys is not None}") self.expected_real_time = m3u8_parser.get_duration(return_string=False) self.expected_real_time_s = m3u8_parser.duration @@ -175,17 +177,24 @@ class M3U8_Segments: """ headers_index = {'user-agent': get_headers()} - # Send a GET request to retrieve the index M3U8 file - response = httpx.get(self.url, headers=headers_index) - response.raise_for_status() + if self.is_index_url: - # Save the M3U8 file to the temporary folder - if response.status_code == 200: - path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") - open(path_m3u8_file, "w+").write(response.text) + # Send a GET request to retrieve the index M3U8 file + response = httpx.get(self.url, headers=headers_index) + response.raise_for_status() - # Parse the text from the M3U8 index file - self.parse_data(response.text) + # Save the M3U8 file to the temporary folder + if response.status_code == 200: + path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") + open(path_m3u8_file, "w+").write(response.text) + + # Parse the text from the M3U8 index file + self.parse_data(response.text) + + else: + + # Parser data of content of index pass in input to class + self.parse_data(self.url) def make_requests_stream(self, ts_url: str, index: int, progress_bar: tqdm) -> None: """ @@ -196,10 +205,7 @@ class M3U8_Segments: - index (int): The index of the segment. - progress_bar (tqdm): Progress counter for tracking download progress. """ - try: - - # Generate headers start_time = time.time() # Make request to get content @@ -209,14 +215,14 @@ class M3U8_Segments: proxy = self.valid_proxy[index % len(self.valid_proxy)] logging.info(f"Use proxy: {proxy}") - with httpx.Client(proxies=proxy, verify=REQUEST_VERIFY) as client: + with httpx.Client(proxies=proxy, verify=True) as client: if 'key_base_url' in self.__dict__: response = client.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT) else: response = client.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT) - else: - with httpx.Client(verify=REQUEST_VERIFY) as client_2: + else: + with httpx.Client(verify=True) as client_2: if 'key_base_url' in self.__dict__: response = client_2.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT) else: @@ -224,13 +230,23 @@ class M3U8_Segments: # Get response content response.raise_for_status() + duration = time.time() - start_time segment_content = response.content # Update bar - duration = time.time() - start_time response_size = int(response.headers.get('Content-Length', 0)) + + if response_size == 0: + response_size = int(len(response.content)) + + # Check length segments + """ + expected_length = int(response.headers.get('Content-Length', 0)) + if not (expected_length != 0 and len(response.content) == expected_length): + console.print(f"Incomplete download for '{ts_url}' (received {len(response.content)} bytes, expected {expected_length}).") + """ self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar) - + # Decrypt the segment content if decryption is needed if self.decryption is not None: segment_content = self.decryption.decrypt(segment_content) @@ -288,10 +304,17 @@ class M3U8_Segments: TQDM_MAX_WORKER = 0 # Select audio workers from folder of frames stack prev call. - VIDEO_WORKERS = int(config_manager.get_dict('SITE', config_site)['video_workers']) - if VIDEO_WORKERS == -1: VIDEO_WORKERS = os.cpu_count() - AUDIO_WORKERS = int(config_manager.get_dict('SITE', config_site)['audio_workers']) - if AUDIO_WORKERS == -1: AUDIO_WORKERS = os.cpu_count() + try: + VIDEO_WORKERS = int(config_manager.get_dict('SITE', config_site)['video_workers']) + if VIDEO_WORKERS == -1: VIDEO_WORKERS = os.cpu_count() + except: + VIDEO_WORKERS = os.cpu_count() + + try: + AUDIO_WORKERS = int(config_manager.get_dict('SITE', config_site)['audio_workers']) + if AUDIO_WORKERS == -1: AUDIO_WORKERS = os.cpu_count() + except: + AUDIO_WORKERS = os.cpu_count() # Differnt workers for audio and video if "video" in str(add_desc): @@ -349,6 +372,7 @@ class M3U8_Segments: delay = TQDM_DELAY_WORKER # Start all workers + logging.info(f"Worker to use: {max_workers}") with ThreadPoolExecutor(max_workers=max_workers) as executor: for index, segment_url in enumerate(self.segments): time.sleep(delay) diff --git a/Src/Lib/Downloader/MP4/downloader.py b/Src/Lib/Downloader/MP4/downloader.py index 22bb146..d91ec6f 100644 --- a/Src/Lib/Downloader/MP4/downloader.py +++ b/Src/Lib/Downloader/MP4/downloader.py @@ -29,7 +29,7 @@ REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout') -def MP4_downloader(url: str, path: str, referer: str): +def MP4_downloader(url: str, path: str, referer: str = None): """ Downloads an MP4 video from a given URL using the specified referer header. @@ -39,6 +39,10 @@ def MP4_downloader(url: str, path: str, referer: str): - path (str): The local path where the downloaded MP4 file will be saved. - referer (str): The referer header value to include in the HTTP request headers. """ + + if "http" not in url or "https" not in url: + logging.error(f"Invalid url: {url}") + sys.exit(0) # Make request to get content of video logging.info(f"Make request to fetch mp4 from: {url}") diff --git a/Src/Lib/Downloader/TOR/downloader.py b/Src/Lib/Downloader/TOR/downloader.py index d878b85..ddc446f 100644 --- a/Src/Lib/Downloader/TOR/downloader.py +++ b/Src/Lib/Downloader/TOR/downloader.py @@ -1,6 +1,7 @@ # 23.06.24 import os +import sys import time import shutil import logging @@ -97,7 +98,7 @@ class TOR_downloader: return # Sleep to load magnet to qbit app - time.sleep(5) + time.sleep(10) latest_torrent = torrents[-1] torrent_hash = latest_torrent['hash'] @@ -178,6 +179,7 @@ class TOR_downloader: except Exception as e: logging.error(f"Download error: {str(e)}") + sys.exit(0) def move_downloaded_files(self, destination=None): """ @@ -190,20 +192,18 @@ class TOR_downloader: Returns: - bool: True if files are moved successfully, False otherwise. """ + time.sleep(2) # List directories in the save path dirs = [d for d in os.listdir(self.save_path) if os.path.isdir(os.path.join(self.save_path, d))] for dir_name in dirs: - if dir_name in self.torrent_name : + if self.torrent_name.split(" ")[0] in dir_name: dir_path = os.path.join(self.save_path, dir_name) - if destination: - destination_path = os.path.join(destination, dir_name) - else: - destination_path = os.path.join(os.getcwd(), dir_name) - - shutil.move(dir_path, destination_path) - logging.info(f"Moved directory {dir_name} to {destination_path}") + + shutil.move(dir_path, destination) + logging.info(f"Moved directory {dir_name} to {destination}") break + self.qb.delete_permanently(self.qb.torrents()[-1]['hash']) return True \ No newline at end of file diff --git a/Src/Lib/Driver/__init__.py b/Src/Lib/Driver/__init__.py new file mode 100644 index 0000000..be614f5 --- /dev/null +++ b/Src/Lib/Driver/__init__.py @@ -0,0 +1,3 @@ +# 29.06.24 + +from .driver_1 import WebAutomation \ No newline at end of file diff --git a/Src/Lib/Driver/driver_1.py b/Src/Lib/Driver/driver_1.py new file mode 100644 index 0000000..b7c736b --- /dev/null +++ b/Src/Lib/Driver/driver_1.py @@ -0,0 +1,74 @@ +# 29.06.24 + +import tempfile +import logging + + +# External library +from bs4 import BeautifulSoup +from seleniumbase import Driver + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +# Config +USE_HEADLESS = config_manager.get_bool("BROWSER", "headless") + + +class WebAutomation: + """ + A class for automating web interactions using SeleniumBase and BeautifulSoup. + """ + + def __init__(self): + """ + Initializes the WebAutomation instance with SeleniumBase Driver. + + Args: + headless (bool, optional): Whether to run the browser in headless mode. Default is True. + """ + logging.getLogger('seleniumbase').setLevel(logging.ERROR) + self.driver = Driver( + uc=True, + uc_cdp_events=True, + headless=USE_HEADLESS, + user_data_dir = tempfile.mkdtemp() + ) + + def quit(self): + """ + Quits the WebDriver instance. + """ + self.driver.quit() + + def get_page(self, url): + """ + Navigates the browser to the specified URL. + + Args: + url (str): The URL to navigate to. + """ + self.driver.get(url) + + def retrieve_soup(self): + """ + Retrieves the BeautifulSoup object for the current page's HTML content. + + Returns: + BeautifulSoup object: Parsed HTML content of the current page. + """ + html_content = self.driver.page_source + soup = BeautifulSoup(html_content, 'html.parser') + return soup + + def get_content(self): + """ + Returns the HTML content of the current page. + + Returns: + str: The HTML content of the current page. + """ + return self.driver.page_source + diff --git a/Src/Upload/version.py b/Src/Upload/version.py index bc501f4..a8e15ba 100644 --- a/Src/Upload/version.py +++ b/Src/Upload/version.py @@ -1,5 +1,5 @@ __title__ = 'StreamingCommunity' -__version__ = 'v1.2.0' +__version__ = 'v1.3.0' __author__ = 'Lovi-0' __description__ = 'A command-line program to download film' __copyright__ = 'Copyright 2024' diff --git a/Src/Util/call_stack.py b/Src/Util/call_stack.py index d3a6969..4ee90b1 100644 --- a/Src/Util/call_stack.py +++ b/Src/Util/call_stack.py @@ -18,21 +18,6 @@ def get_call_stack(): - folder_base (str): The base name of the directory path. - script (str): The name of the script file containing the function. - line (int): The line number in the script where the function is defined. - - Example: - >>> def func_a(): - ... return func_b() - ... - >>> def func_b(): - ... return func_c() - ... - >>> def func_c(): - ... return get_call_stack() - ... - >>> stack_trace = func_a() - >>> for frame in stack_trace: - ... print(f"Function: {frame['function']}, Folder: {frame['folder']}, " - ... f"Folder Base: {frame['folder_base']}, Script: {frame['script']}, Line: {frame['line']}") """ stack = inspect.stack() call_stack = [] diff --git a/Src/Util/headers.py b/Src/Util/headers.py index 55947fb..e3f5637 100644 --- a/Src/Util/headers.py +++ b/Src/Util/headers.py @@ -124,10 +124,6 @@ def random_headers(referer: str = None): 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', - 'Cache-Control': 'max-age=0', - 'TE': 'Trailers', - 'Pragma': 'no-cache', - 'DNT': '1', 'sec-ch-ua-mobile': '?1' if is_mobile else '?0', 'sec-ch-ua-platform': platform, 'sec-ch-ua': sec_ch_ua, diff --git a/Src/Util/logger.py b/Src/Util/logger.py index 0aac96b..b645163 100644 --- a/Src/Util/logger.py +++ b/Src/Util/logger.py @@ -6,7 +6,6 @@ from logging.handlers import RotatingFileHandler # Internal utilities from Src.Util._jsonConfig import config_manager -from .os import remove_file class Logger: @@ -16,11 +15,6 @@ class Logger: self.DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug") self.log_to_file = config_manager.get_bool("DEFAULT", "log_to_file") self.log_file = config_manager.get("DEFAULT", "log_file") if self.log_to_file else None - - # Remove log file - if self.log_file: - remove_file(self.log_file) - # Setting logging level based on DEBUG_MODE if self.DEBUG_MODE: diff --git a/Src/Util/os.py b/Src/Util/os.py index 3ab458c..6cf0600 100644 --- a/Src/Util/os.py +++ b/Src/Util/os.py @@ -129,11 +129,21 @@ def create_folder(folder_name: str) -> None: Args: folder_name (str): The path of the directory to be created. - """ - try: + if platform.system() == 'Windows': + max_path_length = 260 + else: + max_path_length = 4096 + + try: logging.info(f"Try create folder: {folder_name}") + + # Check if path length exceeds the maximum allowed + if len(folder_name) > max_path_length: + logging.error(f"Path length exceeds the maximum allowed limit: {len(folder_name)} characters (Max: {max_path_length})") + raise OSError(f"Path length exceeds the maximum allowed limit: {len(folder_name)} characters (Max: {max_path_length})") + os.makedirs(folder_name, exist_ok=True) if os.path.exists(folder_name) and os.path.isdir(folder_name): @@ -141,10 +151,6 @@ def create_folder(folder_name: str) -> None: else: logging.error(f"Failed to create directory: {folder_name}") - except OSError as e: - logging.error(f"OS error occurred while creating the directory {folder_name}: {e}") - raise - except Exception as e: logging.error(f"An unexpected error occurred while creating the directory {folder_name}: {e}") raise @@ -187,43 +193,6 @@ def remove_folder(folder_path: str) -> None: except OSError as e: print(f"Error removing folder '{folder_path}': {e}") -def remove_file(file_path: str) -> None: - """ - Remove a file if it exists - - Args: - - file_path (str): The path to the file to be removed. - """ - - if os.path.exists(file_path): - try: - os.remove(file_path) - except OSError as e: - print(f"Error removing file '{file_path}': {e}") - -def move_file_one_folder_up(file_path) -> None: - """ - Move a file one folder up from its current location. - - Args: - - file_path (str): Path to the file to be moved. - """ - - # Get the directory of the file - file_directory = os.path.dirname(file_path) - - # Get the parent directory - parent_directory = os.path.dirname(file_directory) - - # Get the filename - filename = os.path.basename(file_path) - - # New path for the file one folder up - new_path = os.path.join(parent_directory, filename) - - # Move the file - os.rename(file_path, new_path) - def delete_files_except_one(folder_path: str, keep_file: str) -> None: """ Delete all files in a folder except for one specified file. @@ -249,73 +218,6 @@ def delete_files_except_one(folder_path: str, keep_file: str) -> None: except Exception as e: logging.error(f"An error occurred: {e}") -def decompress_file(downloaded_file_path: str, destination: str) -> None: - """ - Decompress one file. - - Args: - - downloaded_file_path (str): The path to the downloaded file. - - destination (str): The directory where the file will be decompressed. - """ - try: - with zipfile.ZipFile(downloaded_file_path) as zip_file: - zip_file.extractall(destination) - except Exception as e: - logging.error(f"Error decompressing file: {e}") - raise - - - -# --> OS MANAGE JSON -def read_json(path: str): - """Reads JSON file and returns its content. - - Args: - - path (str): The file path of the JSON file to read. - - Returns: - variable: The content of the JSON file as a dictionary. - """ - - with open(path, "r") as file: - config = json.load(file) - - return config - -def save_json(json_obj, path: str) -> None: - """Saves JSON object to the specified file path. - - Args: - - json_obj (Dict[str, Any]): The JSON object to be saved. - - path (str): The file path where the JSON object will be saved. - """ - - with open(path, 'w') as file: - json.dump(json_obj, file, indent=4) # Adjust the indentation as needed - -def clean_json(path: str) -> None: - """Reads JSON data from the file, cleans it, and saves it back. - - Args: - - path (str): The file path of the JSON file to clean. - """ - - data = read_json(path) - - # Recursively replace all values with an empty string - def recursive_empty_string(obj): - if isinstance(obj, dict): - return {key: recursive_empty_string(value) for key, value in obj.items()} - elif isinstance(obj, list): - return [recursive_empty_string(item) for item in obj] - else: - return "" - - modified_data = recursive_empty_string(data) - - # Save the modified JSON data back to the file - save_json(modified_data, path) - # --> OS MANAGE SIZE FILE AND INTERNET SPEED @@ -377,50 +279,6 @@ def compute_sha1_hash(input_string: str) -> str: # Return the hashed string return hashed_string -def decode_bytes(bytes_data: bytes, encodings_to_try: List[str] = None) -> str: - """ - Decode a byte sequence using a list of encodings and return the decoded string. - - Args: - - bytes_data (bytes): The byte sequence to decode. - - encodings_to_try (List[str], optional): A list of encoding names to try for decoding. - If None, defaults to ['utf-8', 'latin-1', 'ascii']. - - Returns: - str or None: The decoded string if successful, None if decoding fails. - """ - if encodings_to_try is None: - encodings_to_try = ['utf-8', 'latin-1', 'ascii'] - - for encoding in encodings_to_try: - try: - # Attempt decoding with the current encoding - string_data = bytes_data.decode(encoding) - logging.info("Decoded successfully with encoding: %s", encoding) - logging.info("Decoded string: %s", string_data) - return string_data - except UnicodeDecodeError: - continue # Try the next encoding if decoding fails - - # If none of the encodings work, treat it as raw bytes - logging.warning("Unable to decode the data as text. Treating it as raw bytes.") - logging.info("Raw byte data: %s", bytes_data) - return None - -def convert_to_hex(bytes_data: bytes) -> str: - """ - Convert a byte sequence to its hexadecimal representation. - - Args: - - bytes_data (bytes): The byte sequence to convert. - - Returns: - str: The hexadecimal representation of the byte sequence. - """ - hex_data = ''.join(['{:02x}'.format(char) for char in bytes_data]) - logging.info("Hexadecimal representation of the data: %s", hex_data) - return hex_data - # --> OS GET SUMMARY @@ -600,7 +458,7 @@ def run_node_script_api(script_content: str) -> str: } # Return error - response = httpx.post('https://onecompiler.com/api/code/exec', headers=headers, json=json_data) + response = httpx.post('https://onecompiler.com/api/code/exec', headers=headers, json=json_data, timeout=15) response.raise_for_status() if response.status_code == 200: diff --git a/Test/bandwidth_gui.py b/Test/bandwidth_gui.py deleted file mode 100644 index ddd1363..0000000 --- a/Test/bandwidth_gui.py +++ /dev/null @@ -1,91 +0,0 @@ -# 23.06.24 - -import time -from collections import deque -from threading import Thread, Lock - - -# External library -import psutil -import tkinter as tk - - -class NetworkMonitor: - def __init__(self, maxlen=10): - self.speeds = deque(maxlen=maxlen) - self.lock = Lock() - - def capture_speed(self, interval: float = 0.5): - def get_network_io(): - io_counters = psutil.net_io_counters() - return io_counters - - def format_bytes(bytes): - if bytes < 1024: - return f"{bytes:.2f} Bytes/s" - elif bytes < 1024 * 1024: - return f"{bytes / 1024:.2f} KB/s" - else: - return f"{bytes / (1024 * 1024):.2f} MB/s" - - old_value = get_network_io() - while True: - time.sleep(interval) - new_value = get_network_io() - - with self.lock: - upload_speed = (new_value.bytes_sent - old_value.bytes_sent) / interval - download_speed = (new_value.bytes_recv - old_value.bytes_recv) / interval - - self.speeds.append({ - "upload": format_bytes(upload_speed), - "download": format_bytes(download_speed) - }) - - old_value = new_value - - -class NetworkMonitorApp: - def __init__(self, root): - self.monitor = NetworkMonitor() - self.root = root - self.root.title("Network Bandwidth Monitor") - self.root.geometry("400x200") - self.root.resizable(False, False) - - self.label_upload_header = tk.Label(text="Upload Speed:", font="Quicksand 12 bold") - self.label_upload_header.pack() - - self.label_upload = tk.Label(text="Calculating...", font="Quicksand 12") - self.label_upload.pack() - - self.label_download_header = tk.Label(text="Download Speed:", font="Quicksand 12 bold") - self.label_download_header.pack() - - self.label_download = tk.Label(text="Calculating...", font="Quicksand 12") - self.label_download.pack() - - self.attribution = tk.Label(text="\n~ WaterrMalann ~", font="Quicksand 11 italic") - self.attribution.pack() - - self.update_gui() - self.start_monitoring() - - def update_gui(self): - with self.monitor.lock: - if self.monitor.speeds: - latest_speeds = self.monitor.speeds[-1] - self.label_upload.config(text=latest_speeds["upload"]) - self.label_download.config(text=latest_speeds["download"]) - - self.root.after(250, self.update_gui) # Update every 0.25 seconds - - def start_monitoring(self): - self.monitor_thread = Thread(target=self.monitor.capture_speed, args=(0.5,), daemon=True) - self.monitor_thread.start() - - - -root = tk.Tk() -app = NetworkMonitorApp(root) -root.mainloop() diff --git a/Test/t_down_mp4.py b/Test/t_down_mp4.py index 5dedaa0..dead81d 100644 --- a/Test/t_down_mp4.py +++ b/Test/t_down_mp4.py @@ -13,8 +13,9 @@ sys.path.append(src_path) from Src.Lib.Downloader import MP4_downloader + # Test MP4_downloader( "", - "EP_1.mp4" + "EP_2.mp4", ) diff --git a/config.json b/config.json index 8436a2d..0a5ef22 100644 --- a/config.json +++ b/config.json @@ -9,21 +9,23 @@ "map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)", "auto_update_domain": true, "config_qbit_tor": { - "host": "192.168.1.1", + "host": "192.168.1.125", "port": "8080", "user": "admin", - "pass": "admin" + "pass": "adminadmin" }, "not_close": false }, "REQUESTS": { "timeout": 10, "max_retry": 3, - "verify_ssl": false, + "verify_ssl": true, "user-agent": "", "proxy_start_min": 0.1, - "proxy_start_max": 0.5, - "proxy": [] + "proxy_start_max": 0.5 + }, + "BROWSER" : { + "headless": true }, "M3U8_DOWNLOAD": { "tqdm_delay": 0.01, @@ -61,7 +63,7 @@ "domain": "boston" }, "animeunity": { - "video_workers": 4, + "video_workers": 2, "audio_workers": 2, "domain": "to" }, @@ -76,14 +78,27 @@ "domain": "ceo" }, "ddlstreamitaly": { - "video_workers": -1, - "audio_workers": -1, "domain": "co", "cookie": { "ips4_device_key": "", "ips4_member_id": "", "ips4_login_key": "" } + }, + "watch_lonelil": { + "domain": "ru" + }, + "uhdmovies": { + "domain": "tel" + }, + "bitsearch": { + "domain": "to" + }, + "1337xx": { + "domain": "to" + }, + "cb01": { + "domain": "church" } } } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 8b98cc7..ddf62d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ tqdm m3u8 psutil unidecode +seleniumbase fake-useragent qbittorrent-api python-qbittorrent \ No newline at end of file