Merge branch 'Lovi-0:main' into main

This commit is contained in:
Francesco Grazioso 2024-06-10 10:48:55 +02:00 committed by GitHub
commit f613c6a58c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 456 additions and 50 deletions

View File

@ -1,4 +1,4 @@
# 26.05.24
STREAMING_FOLDER = "altadefinizione"
MAIN_FOLDER = "altadefinizione"
MOVIE_FOLDER = "Movie"

View File

@ -18,7 +18,7 @@ from .Core.Player.supervideo import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import STREAMING_FOLDER, MOVIE_FOLDER
from .costant import MAIN_FOLDER, MOVIE_FOLDER
# Variable
@ -45,12 +45,12 @@ def download_film(title_name: str, url: str):
# Define output path
mp4_name = str(title_name).replace("-", "_") + ".mp4"
mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name)
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER, title_name)
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, key, and output filename
# Download the film using the m3u8 playlist, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)

View File

@ -1,7 +1,6 @@
# 26.05.24
import sys
import json
import logging
@ -23,8 +22,8 @@ from .Core.Class.SearchType import MediaManager, MediaItem
# Config
AD_SITE_NAME = "altadefinizione"
AD_DOMAIN_NOW = config_manager.get('SITE', AD_SITE_NAME)
SITE_NAME = "altadefinizione"
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
# Variable
@ -39,14 +38,13 @@ def title_search(title_search: str) -> int:
Args:
- title_search (str): The title to search for.
- domain (str): The domain to search on.
Returns:
int: The number of titles found.
"""
# Send request to search for titles
response = requests.get(f"https://{AD_SITE_NAME}.{AD_DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()})
response = requests.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()})
response.raise_for_status()
# Create soup and find table

View File

@ -18,7 +18,7 @@ from .Core.Util import manage_selection
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import ANIME_FOLDER, SERIES_FOLDER, MOVIE_FOLDER
from .costant import MAIN_FOLDER, SERIES_FOLDER, MOVIE_FOLDER
# Variable
@ -50,18 +50,15 @@ def download_episode(index_select: int):
mp4_path = None
mp4_name = f"{index_select + 1}.mp4"
if video_source.is_series:
mp4_path = os.path.join(ROOT_PATH, ANIME_FOLDER, SERIES_FOLDER, video_source.series_name)
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, SERIES_FOLDER, video_source.series_name)
else:
mp4_path = os.path.join(ROOT_PATH, ANIME_FOLDER, MOVIE_FOLDER, video_source.series_name)
# Crete downloader
obj_download = Downloader(
m3u8_playlist = video_source.get_playlist(),
output_filename = os.path.join(mp4_path, mp4_name)
)
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER, video_source.series_name)
# Start downloading
obj_download.start()
Downloader(
m3u8_playlist = video_source.get_playlist(),
output_filename = os.path.join(mp4_path, mp4_name)
).start()
def donwload_series(tv_id: int, tv_name: str):

View File

@ -1,5 +1,5 @@
# 26.05.24
ANIME_FOLDER = "animeunity"
MAIN_FOLDER = "animeunity"
SERIES_FOLDER= "Serie"
MOVIE_FOLDER = "Movie"

View File

@ -22,8 +22,8 @@ from .Core.Class.SearchType import MediaManager, MediaItem
# Config
AU_SITE_NAME = "animeunity"
AU_DOMAIN_NOW = config_manager.get('SITE', AU_SITE_NAME)
SITE_NAME = "animeunity"
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
# Variable
@ -82,8 +82,8 @@ def update_domain():
# Test current site's accessibility
try:
console.log(f"[cyan]Test site: [red]https://{AU_SITE_NAME}.{AU_DOMAIN_NOW}")
response = requests.get(f"https://www.{AU_SITE_NAME}.{AU_DOMAIN_NOW}")
console.log(f"[cyan]Test site: [red]https://{SITE_NAME}.{DOMAIN_NOW}")
response = requests.get(f"https://www.{SITE_NAME}.{DOMAIN_NOW}")
response.status_code
# If the current site is inaccessible, try to obtain a new domain
@ -97,7 +97,7 @@ def update_domain():
if new_domain:
# Update configuration with the new domain
config_manager.set_key('SITE', AU_SITE_NAME, new_domain)
config_manager.set_key('SITE', SITE_NAME, new_domain)
config_manager.write_config()
else:
@ -144,8 +144,8 @@ def title_search(title: str) -> int:
update_domain()
# Get token and session value from configuration
url_domain = config_manager.get('SITE', AU_SITE_NAME)
data = get_token(AU_SITE_NAME, url_domain)
url_domain = config_manager.get('SITE', SITE_NAME)
data = get_token(SITE_NAME, url_domain)
# Prepare cookies to be used in the request
cookies = {
@ -166,7 +166,7 @@ def title_search(title: str) -> int:
}
# Send a POST request to the API endpoint for live search
response = requests.post(f'https://www.{AU_SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json_data=json_data)
response = requests.post(f'https://www.{SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json_data=json_data)
response.raise_for_status()
# Process each record returned in the response

View File

@ -0,0 +1,3 @@
# 09.06.24
from .site import title_search

View File

@ -0,0 +1,4 @@
# 09.06.24
MAIN_FOLDER = "ddlstreamitaly"
MOVIE_FOLDER = "Movie"

View File

@ -0,0 +1,81 @@
# 09.06.24
import os
import sys
import logging
from urllib.parse import urlparse
# External libraries
import requests
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.message import start_message
from Src.Util.color import Colors
from Src.Util.console import console, msg
from Src.Util.os import create_folder, can_create_file
from Src.Util._jsonConfig import config_manager
from Src.Util.headers import get_headers
from Src.Lib.Hls.download_mp4 import MP4_downloader
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import MAIN_FOLDER, MOVIE_FOLDER
# Variable
cookie_index = config_manager.get_dict('REQUESTS', 'index')
def title_search() -> int:
"""
Search for titles based on a search query.
"""
print()
url_search = msg.ask(f"[cyan]Insert url title")
# Send request to search for titles
try:
response = requests.get(url_search, headers={'user-agent': get_headers()}, cookies=cookie_index)
response.raise_for_status()
except:
logging.error("Insert: {'ips4_IPSSessionFront': 'your_code', 'ips4_member_id': 'your_code'} in config file \ REQUESTS \ index, instead of user-agent. Use browser debug and cookie request with a valid account.")
sys.exit(0)
# Create soup and mp4 video
soup = BeautifulSoup(response.text, "html.parser")
souce = soup.find("source")
# Get url and filename
try:
mp4_link = souce.get("src")
except:
logging.error("Insert: {'ips4_IPSSessionFront': 'your_code', 'ips4_member_id': 'your_code'} in config file \ REQUESTS \ index, instead of user-agent. Use browser debug and cookie request with a valid account.")
sys.exit(0)
parsed_url = urlparse(url_search)
path_parts = parsed_url.path.split('/')
mp4_name = path_parts[-2] if path_parts[-1] == '' else path_parts[-1] + ".mp4"
# Create destination folder
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER)
# Check if can create file output
create_folder(mp4_path)
if not can_create_file(mp4_name):
logging.error("Invalid mp4 name.")
sys.exit(0)
# Start download
start_message()
MP4_downloader(
url = mp4_link,
path = os.path.join(mp4_path, mp4_name),
referer = f"{parsed_url.scheme}://{parsed_url.netloc}/",
add_desc=f"{Colors.MAGENTA}video"
)

View File

@ -0,0 +1,170 @@
# 26.05.24
import re
import sys
import logging
# External libraries
import requests
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util.os import run_node_script
class VideoSource:
def __init__(self) -> None:
"""
Initializes the VideoSource object with default values.
Attributes:
headers (dict): An empty dictionary to store HTTP headers.
"""
self.headers = {'user-agent': get_headers()}
def setup(self, url: str) -> None:
"""
Sets up the video source with the provided URL.
Args:
url (str): The URL of the video source.
"""
self.url = url
def make_request(self, url: str) -> str:
"""
Make an HTTP GET request to the provided URL.
Args:
url (str): The URL to make the request to.
Returns:
str: The response content if successful, None otherwise.
"""
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
with open('index.html', 'w', encoding='utf-8') as file:
file.write(response.text)
return response.text
except Exception as e:
logging.error(f"Request failed: {e}")
return None
def parse_html(self, html_content: str) -> BeautifulSoup:
"""
Parse the provided HTML content using BeautifulSoup.
Args:
html_content (str): The HTML content to parse.
Returns:
BeautifulSoup: Parsed HTML content if successful, None otherwise.
"""
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup
except Exception as e:
logging.error(f"Failed to parse HTML content: {e}")
return None
def get_iframe(self, soup):
"""
Extracts the source URL of the second iframe in the provided BeautifulSoup object.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML.
Returns:
str: The source URL of the second iframe, or None if not found.
"""
tag_a = soup.find_all('a', href='#')
if tag_a and len(tag_a) > 1:
return tag_a[1].get("data-link")
return None
def find_content(self, url):
"""
Makes a request to the specified URL and parses the HTML content.
Args:
url (str): The URL to fetch content from.
Returns:
BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails.
"""
content = self.make_request(url)
if content:
return self.parse_html(content)
return None
def get_result_node_js(self, soup):
"""
Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content.
Returns:
str: The output from the Node.js script, or None if the script cannot be found or executed.
"""
for script in soup.find_all("script"):
if "eval" in str(script):
new_script = str(script.text).replace("eval", "var a = ")
new_script = new_script.replace(")))", ")));console.log(a);")
return run_node_script(new_script)
return None
def get_playlist(self) -> str:
"""
Download a video from the provided URL.
Returns:
str: The URL of the downloaded video if successful, None otherwise.
"""
try:
html_content = self.make_request(self.url)
if not html_content:
logging.error("Failed to fetch HTML content.")
return None
soup = self.parse_html(html_content)
if not soup:
logging.error("Failed to parse HTML content.")
return None
iframe_src = self.get_iframe(soup)
if not iframe_src:
logging.error("No iframe found.")
return None
down_page_soup = self.find_content(iframe_src)
if not down_page_soup:
logging.error("Failed to fetch down page content.")
return None
result = self.get_result_node_js(down_page_soup)
if not result:
logging.error("No video URL found in script.")
return None
master_playlist = str(result).split(":")[3].split('"}')[0]
return f"https:{master_playlist}"
except Exception as e:
logging.error(f"An error occurred: {e}")
return None

View File

@ -0,0 +1,3 @@
# 09.06.24
from .site import title_search

View File

@ -0,0 +1,4 @@
# 09.06.24
MAIN_FOLDER = "guardaserie"
MOVIE_FOLDER = "Serie"

View File

@ -0,0 +1,68 @@
# 09.06.24
import os
import sys
import logging
from urllib.parse import urlparse
# External libraries
import requests
# Internal utilities
from Src.Util.console import console, msg
from Src.Util.os import create_folder, can_create_file
from Src.Util._jsonConfig import config_manager
from Src.Util.headers import get_headers
from Src.Lib.Hls.downloader import Downloader
# Logic class
from .Core.Player.supervideo import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import MAIN_FOLDER, MOVIE_FOLDER
def title_search() -> int:
"""
Search for titles based on a search query.
"""
print()
url_search = msg.ask(f"[cyan]Insert url title")
# Send request to search for titles
response = requests.get(url_search, headers={'user-agent': get_headers()})
response.raise_for_status()
# Get playlist
video_source = VideoSource()
video_source.setup(url_search)
parsed_url = urlparse(url_search)
path_parts = parsed_url.path.split('/')
mp4_name = path_parts[-2] if path_parts[-1] == '' else path_parts[-1] + ".mp4"
# Create destination folder
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER)
# Check if can create file output
create_folder(mp4_path)
if not can_create_file(mp4_name):
logging.error("Invalid mp4 name.")
sys.exit(0)
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()

View File

@ -1,5 +1,5 @@
# 26.05.24
STREAMING_FOLDER = "streamingcommunity"
MAIN_FOLDER = "streamingcommunity"
MOVIE_FOLDER = "Movie"
SERIES_FOLDER = "Serie"

View File

@ -18,7 +18,7 @@ from .Core.Player.vixcloud import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import STREAMING_FOLDER, MOVIE_FOLDER
from .costant import MAIN_FOLDER, MOVIE_FOLDER
# Variable
@ -53,9 +53,9 @@ def download_film(id_film: str, title_name: str, domain: str):
# Define the filename and path for the downloaded film
mp4_name = title_name.replace("-", "_")
mp4_format = (mp4_name) + ".mp4"
mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name)
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER, title_name)
# Download the film using the m3u8 playlist, key, and output filename
# Download the film using the m3u8 playlist, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_format)

View File

@ -20,7 +20,7 @@ from .Core.Util import manage_selection, map_episode_title
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import STREAMING_FOLDER, SERIES_FOLDER
from .costant import MAIN_FOLDER, SERIES_FOLDER
# Variable
@ -85,7 +85,7 @@ def donwload_video(tv_name: str, index_season_selected: int, index_episode_selec
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4"
mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, SERIES_FOLDER, tv_name, f"S{index_season_selected}")
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, SERIES_FOLDER, tv_name, f"S{index_season_selected}")
# Retrieve scws and if available master playlist
video_source.get_iframe(obj_episode.id)

View File

@ -26,8 +26,8 @@ from .Core.Class.SearchType import MediaManager, MediaItem
# Config
SC_SITE_NAME = "streamingcommunity"
SC_DOMAIN_NOW = config_manager.get('SITE', SC_SITE_NAME)
SITE_NAME = "streamingcommunity"
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
# Variable
@ -93,7 +93,7 @@ def get_version_and_domain(new_domain = None) -> Tuple[str, str]:
# Get the current domain from the configuration
if new_domain is None:
config_domain = config_manager.get('SITE', SC_SITE_NAME)
config_domain = config_manager.get('SITE', SITE_NAME)
else:
config_domain = new_domain
@ -101,8 +101,8 @@ def get_version_and_domain(new_domain = None) -> Tuple[str, str]:
try:
# Make requests to site to get text
console.print(f"[cyan]Test site[white]: [red]https://{SC_SITE_NAME}.{config_domain}")
response = requests.get(f"https://{SC_SITE_NAME}.{config_domain}")
console.print(f"[cyan]Test site[white]: [red]https://{SITE_NAME}.{config_domain}")
response = requests.get(f"https://{SITE_NAME}.{config_domain}")
console.print(f"[cyan]Test respost site[white]: [red]{response.status_code} \n")
# Extract version from the response
@ -117,7 +117,7 @@ def get_version_and_domain(new_domain = None) -> Tuple[str, str]:
console.log(f"[cyan]Extract new domain: [red]{new_domain}")
# Update the domain in the configuration file
config_manager.set_key('SITE', SC_SITE_NAME, str(new_domain))
config_manager.set_key('SITE', SITE_NAME, str(new_domain))
config_manager.write_config()
# Retry to get the version and domain
@ -137,7 +137,7 @@ def title_search(title_search: str, domain: str) -> int:
"""
# Send request to search for titles ( replace à to a and space to "+" )
response = requests.get(f"https://{SC_SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()})
response = requests.get(f"https://{SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()})
response.raise_for_status()
# Add found titles to media search manager

View File

@ -0,0 +1,74 @@
# 09.06.24
import os
import sys
import logging
# External libraries
import requests
from tqdm import tqdm
# Internal utilities
from Src.Util.headers import get_headers
from Src.Util.color import Colors
from Src.Util.console import console, Panel
from Src.Util._jsonConfig import config_manager
from Src.Util.os import format_size
# Logic class
from ..FFmpeg import print_duration_table
# Config
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
REQUEST_VERIFY = config_manager.get_float('REQUESTS', 'verify_ssl')
REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
def MP4_downloader(url: str, path: str, referer: str, add_desc: str):
if not os.path.exists(path):
console.log("[cyan]Video [red]already exists.")
sys.exit(0)
# Make request to get content of video
logging.info(f"Make request to fetch mp4 from: {url}")
response = requests.get(url, stream=True, headers={'Referer': referer, 'user-agent': get_headers()}, verify=REQUEST_VERIFY, timeout=REQUEST_TIMEOUT)
total = int(response.headers.get('content-length', 0))
# Create bar format
if TQDM_USE_LARGE_BAR:
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}} {Colors.WHITE}| {Colors.YELLOW}{{rate_fmt}}{{postfix}} {Colors.WHITE}]"
else:
bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
# Create progress bar
progress_bar = tqdm(
total=total,
unit='iB',
ascii='░▒█',
bar_format=bar_format,
unit_scale=True,
unit_divisor=1024
)
# Download file
with open(path, 'wb') as file, progress_bar as bar:
for data in response.iter_content(chunk_size=1024):
size = file.write(data)
bar.update(size)
# Get summary
console.print(Panel(
f"[bold green]Download completed![/bold green]\n"
f"File size: [bold red]{format_size(os.path.getsize(path))}[/bold red]\n"
f"Duration: [bold]{print_duration_table(path, show=False)}[/bold]",
title=f"{os.path.basename(path.replace('.mp4', ''))}", border_style="green"))

View File

@ -17,7 +17,7 @@
"proxy": []
},
"M3U8_DOWNLOAD": {
"tdqm_workers": 3,
"tdqm_workers": 2,
"tqdm_delay": 0.01,
"tqdm_use_large_bar": true,
"download_video": true,

20
run.py
View File

@ -21,6 +21,8 @@ from Src.Util.logger import Logger
from Src.Api.Streamingcommunity import main_film_series as streamingcommunity_film_serie
from Src.Api.Animeunity import main_anime as streamingcommunity_anime
from Src.Api.Altadefinizione import main_film as altadefinizione_film
from Src.Api.Ddlstreamitaly import title_search as ddlstreamitaly_film_serie
from Src.Api.Guardaserie import title_search as guardaserie_serie
# Config
@ -78,16 +80,14 @@ def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Script to download film and series from the internet.')
parser.add_argument('-sa', '--streaming_anime', action='store_true', help='Check into anime category')
parser.add_argument('-sf', '--streaming_film', action='store_true', help='Check into film/tv series category')
parser.add_argument('-af', '--altadefinizione_film', action='store_true', help='Check into film/tv series category')
parser.add_argument('-sa', '--streaming_anime', action='store_true', help='')
parser.add_argument('-sf', '--streaming_film', action='store_true', help='')
args = parser.parse_args()
# Mapping command-line arguments to functions
arg_to_function = {
'streaming_anime': streamingcommunity_anime,
'streaming_film': streamingcommunity_film_serie,
'altadefinizione_film': altadefinizione_film,
}
# Check which argument is provided and run the corresponding function
@ -101,14 +101,18 @@ def main():
'0': streamingcommunity_film_serie,
'1': streamingcommunity_anime,
'2': altadefinizione_film,
'3': ddlstreamitaly_film_serie,
'4': guardaserie_serie,
}
# Create dynamic prompt message and choices
choices = list(input_to_function.keys())
choice_labels = {
'0': "Film/Series",
'1': "Anime",
'2': "Altadefinizione"
'0': "Streamingcommunity",
'1': "Animeunity",
'2': "Altadefinizione",
'3': "Ddlstreamitaly",
'4': "Guardaserie",
}
prompt_message = "[cyan]Insert category [white](" + ", ".join(
f"[red]{key}[white]: [bold magenta]{label}[white]" for key, label in choice_labels.items()
@ -121,7 +125,7 @@ def main():
if category in input_to_function:
run_function(input_to_function[category], CLOSE_CONSOLE)
else:
console.print("[red]Invalid category, you need to insert 0, 1, or 2.")
console.print("[red]Invalid category.")
sys.exit(0)