mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-05 02:55:25 +00:00
Re add cb01
This commit is contained in:
parent
068ec7a863
commit
bdbcfb2fd1
146
Src/Api/cb01new/Player/maxstream.py
Normal file
146
Src/Api/cb01new/Player/maxstream.py
Normal file
@ -0,0 +1,146 @@
|
||||
# 05.07.24
|
||||
|
||||
import re
|
||||
import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
import jsbeautifier
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.headers import get_headers
|
||||
|
||||
|
||||
class VideoSource:
|
||||
def __init__(self, url: str):
|
||||
"""
|
||||
Sets up the video source with the provided URL.
|
||||
|
||||
Parameters:
|
||||
- url (str): The URL of the video.
|
||||
"""
|
||||
self.url = url
|
||||
self.redirect_url = None
|
||||
self.maxstream_url = None
|
||||
self.m3u8_url = None
|
||||
self.headers = {'user-agent': get_headers()}
|
||||
|
||||
def get_redirect_url(self):
|
||||
"""
|
||||
Sends a request to the initial URL and extracts the redirect URL.
|
||||
"""
|
||||
try:
|
||||
|
||||
# Send a GET request to the initial URL
|
||||
response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
# Extract the redirect URL from the HTML
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
self.redirect_url = soup.find("div", id="iframen1").get("data-src")
|
||||
logging.info(f"Redirect URL: {self.redirect_url}")
|
||||
|
||||
return self.redirect_url
|
||||
|
||||
except httpx.RequestError as e:
|
||||
logging.error(f"Error during the initial request: {e}")
|
||||
raise
|
||||
|
||||
except AttributeError as e:
|
||||
logging.error(f"Error parsing HTML: {e}")
|
||||
raise
|
||||
|
||||
def get_maxstream_url(self):
|
||||
"""
|
||||
Sends a request to the redirect URL and extracts the Maxstream URL.
|
||||
"""
|
||||
try:
|
||||
|
||||
# Send a GET request to the redirect URL
|
||||
response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
# Extract the Maxstream URL from the HTML
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
maxstream_url = soup.find("a")
|
||||
|
||||
if maxstream_url is None:
|
||||
|
||||
# If no anchor tag is found, try the alternative method
|
||||
logging.warning("Anchor tag not found. Trying the alternative method.")
|
||||
headers = {
|
||||
'origin': 'https://stayonline.pro',
|
||||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 OPR/111.0.0.0',
|
||||
'x-requested-with': 'XMLHttpRequest',
|
||||
}
|
||||
|
||||
# Make request to stayonline api
|
||||
data = {'id': self.redirect_url.split("/")[-2], 'ref': ''}
|
||||
response = httpx.post('https://stayonline.pro/ajax/linkEmbedView.php', headers=headers, data=data)
|
||||
response.raise_for_status()
|
||||
uprot_url = response.json()['data']['value']
|
||||
|
||||
# Retry getting maxtstream url
|
||||
response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=10)
|
||||
response.raise_for_status()
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
maxstream_url = soup.find("a").get("href")
|
||||
|
||||
else:
|
||||
maxstream_url = maxstream_url.get("href")
|
||||
|
||||
self.maxstream_url = maxstream_url
|
||||
logging.info(f"Maxstream URL: {self.maxstream_url}")
|
||||
|
||||
return self.maxstream_url
|
||||
|
||||
except httpx.RequestError as e:
|
||||
logging.error(f"Error during the request to the redirect URL: {e}")
|
||||
raise
|
||||
|
||||
except AttributeError as e:
|
||||
logging.error(f"Error parsing HTML: {e}")
|
||||
raise
|
||||
|
||||
def get_m3u8_url(self):
|
||||
"""
|
||||
Sends a request to the Maxstream URL and extracts the .m3u8 file URL.
|
||||
"""
|
||||
try:
|
||||
|
||||
# Send a GET request to the Maxstream URL
|
||||
response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=10)
|
||||
response.raise_for_status()
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
# Iterate over all script tags in the HTML
|
||||
for script in soup.find_all("script"):
|
||||
if "eval(function(p,a,c,k,e,d)" in script.text:
|
||||
|
||||
# Execute the script using
|
||||
data_js = jsbeautifier.beautify(script.text)
|
||||
|
||||
# Extract the .m3u8 URL from the script's output
|
||||
match = re.search(r'sources:\s*\[\{\s*src:\s*"([^"]+)"', data_js)
|
||||
|
||||
if match:
|
||||
self.m3u8_url = match.group(1)
|
||||
logging.info(f"M3U8 URL: {self.m3u8_url}")
|
||||
break
|
||||
|
||||
return self.m3u8_url
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error executing the Node.js script: {e}")
|
||||
raise
|
||||
|
||||
def get_playlist(self):
|
||||
"""
|
||||
Executes the entire flow to obtain the final .m3u8 file URL.
|
||||
"""
|
||||
self.get_redirect_url()
|
||||
self.get_maxstream_url()
|
||||
return self.get_m3u8_url()
|
42
Src/Api/cb01new/__init__.py
Normal file
42
Src/Api/cb01new/__init__.py
Normal file
@ -0,0 +1,42 @@
|
||||
# 09.06.24
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console, msg
|
||||
|
||||
|
||||
# Logic class
|
||||
from .site import title_search, run_get_select_title
|
||||
from .film import download_film
|
||||
|
||||
|
||||
# Variable
|
||||
indice = 9
|
||||
_useFor = "film"
|
||||
_deprecate = False
|
||||
_priority = 2
|
||||
_engineDownload = "mp4"
|
||||
|
||||
|
||||
def search():
|
||||
"""
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
# Make request to site to get content that corrsisponde to that string
|
||||
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(string_to_search)
|
||||
|
||||
if len_database > 0:
|
||||
|
||||
# Select title from list
|
||||
select_title = run_get_select_title()
|
||||
|
||||
# !!! ADD TYPE DONT WORK FOR SERIE
|
||||
download_film(select_title)
|
||||
|
||||
|
||||
else:
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
||||
# Retry
|
||||
search()
|
15
Src/Api/cb01new/costant.py
Normal file
15
Src/Api/cb01new/costant.py
Normal file
@ -0,0 +1,15 @@
|
||||
# 03.07.24
|
||||
|
||||
import os
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
|
||||
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
|
||||
DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain']
|
||||
|
||||
MOVIE_FOLDER = "Movie"
|
||||
SERIES_FOLDER = "Serie"
|
63
Src/Api/cb01new/film.py
Normal file
63
Src/Api/cb01new/film.py
Normal file
@ -0,0 +1,63 @@
|
||||
# 03.07.24
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console, msg
|
||||
from Src.Util.os import remove_special_characters
|
||||
from Src.Util.message import start_message
|
||||
from Src.Util.call_stack import get_call_stack
|
||||
from Src.Lib.Downloader import HLS_Downloader
|
||||
from ..Template import execute_search
|
||||
|
||||
|
||||
# Logic class
|
||||
from ..Template.Class.SearchType import MediaItem
|
||||
from .Player.maxstream import VideoSource
|
||||
|
||||
|
||||
# Config
|
||||
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
|
||||
|
||||
|
||||
def download_film(select_title: MediaItem):
|
||||
"""
|
||||
Downloads a film using the provided obj.
|
||||
|
||||
Parameters:
|
||||
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
|
||||
"""
|
||||
|
||||
# Start message and display film information
|
||||
start_message()
|
||||
console.print(f"[yellow]Download: [red]{select_title.name} \n")
|
||||
|
||||
# Setup api manger
|
||||
video_source = VideoSource(select_title.url)
|
||||
|
||||
# Define output path
|
||||
title_name = remove_special_characters(select_title.name)
|
||||
mp4_name = title_name +".mp4"
|
||||
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name)
|
||||
|
||||
# Get m3u8 master playlist
|
||||
master_playlist = video_source.get_playlist()
|
||||
|
||||
# Download the film using the m3u8 playlist, and output filename
|
||||
r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start()
|
||||
|
||||
if r_proc == 404:
|
||||
time.sleep(2)
|
||||
|
||||
# Re call search function
|
||||
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
|
||||
frames = get_call_stack()
|
||||
execute_search(frames[-4])
|
||||
|
||||
if r_proc != None:
|
||||
console.print("[green]Result: ")
|
||||
console.print(r_proc)
|
74
Src/Api/cb01new/site.py
Normal file
74
Src/Api/cb01new/site.py
Normal file
@ -0,0 +1,74 @@
|
||||
# 03.07.24
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Util.table import TVShowManager
|
||||
from ..Template import search_domain, get_select_title
|
||||
|
||||
|
||||
# Logic class
|
||||
from ..Template.Class.SearchType import MediaManager
|
||||
|
||||
|
||||
# Variable
|
||||
from .costant import SITE_NAME
|
||||
media_search_manager = MediaManager()
|
||||
table_show_manager = TVShowManager()
|
||||
|
||||
|
||||
def title_search(word_to_search: str) -> int:
|
||||
"""
|
||||
Search for titles based on a search query.
|
||||
|
||||
Parameters:
|
||||
- title_search (str): The title to search for.
|
||||
|
||||
Returns:
|
||||
- int: The number of titles found.
|
||||
"""
|
||||
|
||||
# Find new domain if prev dont work
|
||||
max_timeout = config_manager.get_int("REQUESTS", "timeout")
|
||||
domain_to_use, _ = search_domain(SITE_NAME, f"https://{SITE_NAME}")
|
||||
|
||||
response = httpx.get(
|
||||
url=f"https://{SITE_NAME}.{domain_to_use}/?s={unidecode(word_to_search)}",
|
||||
headers={'user-agent': get_headers()},
|
||||
timeout=max_timeout
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
# Create soup and find table
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
# For all element in table
|
||||
for div in soup.find_all("div", class_ = "card-content"):
|
||||
|
||||
url = div.find("h3").find("a").get("href")
|
||||
title = div.find("h3").find("a").get_text(strip=True)
|
||||
desc = div.find("p").find("strong").text
|
||||
|
||||
title_info = {
|
||||
'name': title,
|
||||
'desc': desc,
|
||||
'url': url
|
||||
}
|
||||
|
||||
media_search_manager.add_media(title_info)
|
||||
|
||||
# Return the number of titles found
|
||||
return media_search_manager.get_length()
|
||||
|
||||
|
||||
def run_get_select_title():
|
||||
"""
|
||||
Display a selection of titles and prompt the user to choose one.
|
||||
"""
|
||||
return get_select_title(table_show_manager, media_search_manager)
|
@ -85,6 +85,9 @@
|
||||
"animeunity": {
|
||||
"domain": "to"
|
||||
},
|
||||
"cb01new": {
|
||||
"domain": "org"
|
||||
},
|
||||
"bitsearch": {
|
||||
"domain": "to"
|
||||
},
|
||||
|
Loading…
x
Reference in New Issue
Block a user