diff --git a/Src/Api/cb01new/Player/maxstream.py b/Src/Api/cb01new/Player/maxstream.py new file mode 100644 index 0000000..6c454e6 --- /dev/null +++ b/Src/Api/cb01new/Player/maxstream.py @@ -0,0 +1,146 @@ +# 05.07.24 + +import re +import logging + + +# External libraries +import httpx +import jsbeautifier +from bs4 import BeautifulSoup + + +# Internal utilities +from Src.Util.headers import get_headers + + +class VideoSource: + def __init__(self, url: str): + """ + Sets up the video source with the provided URL. + + Parameters: + - url (str): The URL of the video. + """ + self.url = url + self.redirect_url = None + self.maxstream_url = None + self.m3u8_url = None + self.headers = {'user-agent': get_headers()} + + def get_redirect_url(self): + """ + Sends a request to the initial URL and extracts the redirect URL. + """ + try: + + # Send a GET request to the initial URL + response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + + # Extract the redirect URL from the HTML + soup = BeautifulSoup(response.text, "html.parser") + self.redirect_url = soup.find("div", id="iframen1").get("data-src") + logging.info(f"Redirect URL: {self.redirect_url}") + + return self.redirect_url + + except httpx.RequestError as e: + logging.error(f"Error during the initial request: {e}") + raise + + except AttributeError as e: + logging.error(f"Error parsing HTML: {e}") + raise + + def get_maxstream_url(self): + """ + Sends a request to the redirect URL and extracts the Maxstream URL. + """ + try: + + # Send a GET request to the redirect URL + response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + + # Extract the Maxstream URL from the HTML + soup = BeautifulSoup(response.text, "html.parser") + maxstream_url = soup.find("a") + + if maxstream_url is None: + + # If no anchor tag is found, try the alternative method + logging.warning("Anchor tag not found. Trying the alternative method.") + headers = { + 'origin': 'https://stayonline.pro', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 OPR/111.0.0.0', + 'x-requested-with': 'XMLHttpRequest', + } + + # Make request to stayonline api + data = {'id': self.redirect_url.split("/")[-2], 'ref': ''} + response = httpx.post('https://stayonline.pro/ajax/linkEmbedView.php', headers=headers, data=data) + response.raise_for_status() + uprot_url = response.json()['data']['value'] + + # Retry getting maxtstream url + response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + maxstream_url = soup.find("a").get("href") + + else: + maxstream_url = maxstream_url.get("href") + + self.maxstream_url = maxstream_url + logging.info(f"Maxstream URL: {self.maxstream_url}") + + return self.maxstream_url + + except httpx.RequestError as e: + logging.error(f"Error during the request to the redirect URL: {e}") + raise + + except AttributeError as e: + logging.error(f"Error parsing HTML: {e}") + raise + + def get_m3u8_url(self): + """ + Sends a request to the Maxstream URL and extracts the .m3u8 file URL. + """ + try: + + # Send a GET request to the Maxstream URL + response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + + # Iterate over all script tags in the HTML + for script in soup.find_all("script"): + if "eval(function(p,a,c,k,e,d)" in script.text: + + # Execute the script using + data_js = jsbeautifier.beautify(script.text) + + # Extract the .m3u8 URL from the script's output + match = re.search(r'sources:\s*\[\{\s*src:\s*"([^"]+)"', data_js) + + if match: + self.m3u8_url = match.group(1) + logging.info(f"M3U8 URL: {self.m3u8_url}") + break + + return self.m3u8_url + + except Exception as e: + logging.error(f"Error executing the Node.js script: {e}") + raise + + def get_playlist(self): + """ + Executes the entire flow to obtain the final .m3u8 file URL. + """ + self.get_redirect_url() + self.get_maxstream_url() + return self.get_m3u8_url() diff --git a/Src/Api/cb01new/__init__.py b/Src/Api/cb01new/__init__.py new file mode 100644 index 0000000..be0c4c4 --- /dev/null +++ b/Src/Api/cb01new/__init__.py @@ -0,0 +1,42 @@ +# 09.06.24 + +# Internal utilities +from Src.Util.console import console, msg + + +# Logic class +from .site import title_search, run_get_select_title +from .film import download_film + + +# Variable +indice = 9 +_useFor = "film" +_deprecate = False +_priority = 2 +_engineDownload = "mp4" + + +def search(): + """ + Main function of the application for film and series. + """ + + # Make request to site to get content that corrsisponde to that string + string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() + len_database = title_search(string_to_search) + + if len_database > 0: + + # Select title from list + select_title = run_get_select_title() + + # !!! ADD TYPE DONT WORK FOR SERIE + download_film(select_title) + + + else: + console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}") + + # Retry + search() diff --git a/Src/Api/cb01new/costant.py b/Src/Api/cb01new/costant.py new file mode 100644 index 0000000..ba6ce6e --- /dev/null +++ b/Src/Api/cb01new/costant.py @@ -0,0 +1,15 @@ +# 03.07.24 + +import os + + +# Internal utilities +from Src.Util._jsonConfig import config_manager + + +SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__))) +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain'] + +MOVIE_FOLDER = "Movie" +SERIES_FOLDER = "Serie" diff --git a/Src/Api/cb01new/film.py b/Src/Api/cb01new/film.py new file mode 100644 index 0000000..e6fad17 --- /dev/null +++ b/Src/Api/cb01new/film.py @@ -0,0 +1,63 @@ +# 03.07.24 + +import os +import sys +import time +import logging + + +# Internal utilities +from Src.Util.console import console, msg +from Src.Util.os import remove_special_characters +from Src.Util.message import start_message +from Src.Util.call_stack import get_call_stack +from Src.Lib.Downloader import HLS_Downloader +from ..Template import execute_search + + +# Logic class +from ..Template.Class.SearchType import MediaItem +from .Player.maxstream import VideoSource + + +# Config +from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER + + +def download_film(select_title: MediaItem): + """ + Downloads a film using the provided obj. + + Parameters: + - select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`. + """ + + # Start message and display film information + start_message() + console.print(f"[yellow]Download: [red]{select_title.name} \n") + + # Setup api manger + video_source = VideoSource(select_title.url) + + # Define output path + title_name = remove_special_characters(select_title.name) + mp4_name = title_name +".mp4" + mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name) + + # Get m3u8 master playlist + master_playlist = video_source.get_playlist() + + # Download the film using the m3u8 playlist, and output filename + r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() + + if r_proc == 404: + time.sleep(2) + + # Re call search function + if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n": + frames = get_call_stack() + execute_search(frames[-4]) + + if r_proc != None: + console.print("[green]Result: ") + console.print(r_proc) diff --git a/Src/Api/cb01new/site.py b/Src/Api/cb01new/site.py new file mode 100644 index 0000000..7a0197c --- /dev/null +++ b/Src/Api/cb01new/site.py @@ -0,0 +1,74 @@ +# 03.07.24 + +# External libraries +import httpx +from bs4 import BeautifulSoup +from unidecode import unidecode + + +# Internal utilities +from Src.Util._jsonConfig import config_manager +from Src.Util.headers import get_headers +from Src.Util.table import TVShowManager +from ..Template import search_domain, get_select_title + + +# Logic class +from ..Template.Class.SearchType import MediaManager + + +# Variable +from .costant import SITE_NAME +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + + +def title_search(word_to_search: str) -> int: + """ + Search for titles based on a search query. + + Parameters: + - title_search (str): The title to search for. + + Returns: + - int: The number of titles found. + """ + + # Find new domain if prev dont work + max_timeout = config_manager.get_int("REQUESTS", "timeout") + domain_to_use, _ = search_domain(SITE_NAME, f"https://{SITE_NAME}") + + response = httpx.get( + url=f"https://{SITE_NAME}.{domain_to_use}/?s={unidecode(word_to_search)}", + headers={'user-agent': get_headers()}, + timeout=max_timeout + ) + response.raise_for_status() + + # Create soup and find table + soup = BeautifulSoup(response.text, "html.parser") + + # For all element in table + for div in soup.find_all("div", class_ = "card-content"): + + url = div.find("h3").find("a").get("href") + title = div.find("h3").find("a").get_text(strip=True) + desc = div.find("p").find("strong").text + + title_info = { + 'name': title, + 'desc': desc, + 'url': url + } + + media_search_manager.add_media(title_info) + + # Return the number of titles found + return media_search_manager.get_length() + + +def run_get_select_title(): + """ + Display a selection of titles and prompt the user to choose one. + """ + return get_select_title(table_show_manager, media_search_manager) diff --git a/config.json b/config.json index 3e8a1e0..38dac84 100644 --- a/config.json +++ b/config.json @@ -85,6 +85,9 @@ "animeunity": { "domain": "to" }, + "cb01new": { + "domain": "org" + }, "bitsearch": { "domain": "to" },