v1 Finish Guardaserie

This commit is contained in:
Lovi 2024-06-13 10:00:52 +02:00
parent 893894af87
commit 38c1355f6b
17 changed files with 863 additions and 188 deletions

View File

@ -14,7 +14,7 @@ from .site import (
from .film import download_film from .film import download_film
def main_film(): def search():
""" """
Main function of the application for film and series. Main function of the application for film and series.
""" """

View File

@ -8,7 +8,7 @@ from .site import title_search, get_select_title
from .anime import donwload_film, donwload_series from .anime import donwload_film, donwload_series
def main_anime(): def search():
# Make request to site to get content that corrsisponde to that string # Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip() string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()

View File

@ -1,3 +1,3 @@
# 09.06.24 # 09.06.24
from .site import title_search from .site import search

View File

@ -30,7 +30,7 @@ from .costant import MAIN_FOLDER, MOVIE_FOLDER
cookie_index = config_manager.get_dict('REQUESTS', 'index') cookie_index = config_manager.get_dict('REQUESTS', 'index')
def title_search() -> int: def search() -> int:
""" """
Search for titles based on a search query. Search for titles based on a search query.
""" """

View File

@ -0,0 +1,75 @@
# 13.06.24
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.headers import get_headers
# Logic class
from .SearchType import MediaItem
class GetSerieInfo:
def __init__(self, dict_serie: MediaItem = None) -> None:
"""
Initializes the VideoSource object with default values.
Attributes:
headers (dict): An empty dictionary to store HTTP headers.
"""
self.headers = {'user-agent': get_headers()}
self.url = dict_serie.url
self.tv_name = None
self.list_episodes = None
def get_seasons_number(self):
response = httpx.get(self.url, headers=self.headers)
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', class_="tt_season")
seasons_number = len(table_content.find_all("li"))
self.tv_name = soup.find("h1", class_= "front_title").get_text(strip=True)
return seasons_number
def get_episode_number(self, n_season: int):
response = httpx.get(self.url, headers=self.headers)
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', class_="tab-pane", id=f"season-{n_season}")
# Scrape info episode
episode_content = table_content.find_all("li")
list_dict_episode = []
for episode_div in episode_content:
index = episode_div.find("a").get("data-num")
link = episode_div.find("a").get("data-link")
name = episode_div.find("a").get("data-title")
obj_episode = {
'number': index,
'name': name,
'url': link
}
list_dict_episode.append(obj_episode)
self.list_episodes = list_dict_episode
return list_dict_episode

View File

@ -0,0 +1,62 @@
# 26.05.24
from typing import List
class MediaItem:
def __init__(self, data: dict):
self.name: str = data.get('name')
self.type: str = "serie"
self.score: str = data.get('score')
self.url: int = data.get('url')
def __str__(self):
return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -48,10 +48,6 @@ class VideoSource:
try: try:
response = httpx.get(url, headers=self.headers, follow_redirects=True) response = httpx.get(url, headers=self.headers, follow_redirects=True)
response.raise_for_status() response.raise_for_status()
with open('index.html', 'w', encoding='utf-8') as file:
file.write(response.text)
return response.text return response.text
except Exception as e: except Exception as e:
@ -77,38 +73,6 @@ class VideoSource:
logging.error(f"Failed to parse HTML content: {e}") logging.error(f"Failed to parse HTML content: {e}")
return None return None
def get_iframe(self, soup):
"""
Extracts the source URL of the second iframe in the provided BeautifulSoup object.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML.
Returns:
str: The source URL of the second iframe, or None if not found.
"""
tag_a = soup.find_all('a', href='#')
if tag_a and len(tag_a) > 1:
return tag_a[1].get("data-link")
return None
def find_content(self, url):
"""
Makes a request to the specified URL and parses the HTML content.
Args:
url (str): The URL to fetch content from.
Returns:
BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails.
"""
content = self.make_request(url)
if content:
return self.parse_html(content)
return None
def get_result_node_js(self, soup): def get_result_node_js(self, soup):
""" """
Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL. Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL.
@ -145,17 +109,7 @@ class VideoSource:
logging.error("Failed to parse HTML content.") logging.error("Failed to parse HTML content.")
return None return None
iframe_src = self.get_iframe(soup) result = self.get_result_node_js(soup)
if not iframe_src:
logging.error("No iframe found.")
return None
down_page_soup = self.find_content(iframe_src)
if not down_page_soup:
logging.error("Failed to fetch down page content.")
return None
result = self.get_result_node_js(down_page_soup)
if not result: if not result:
logging.error("No video URL found in script.") logging.error("No video URL found in script.")
return None return None

View File

@ -0,0 +1,47 @@
# 02.05.24
import logging
from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
# Config
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
"""
Manage user selection for seasons to download.
Args:
- cmd_insert (str): User input for season selection.
- max_count (int): Maximum count of seasons available.
Returns:
list_season_select (List[int]): List of selected seasons.
"""
list_season_select = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_season_select.append(int(cmd_insert))
# For a range (e.g., '[5-12]')
elif "[" in cmd_insert:
start, end = map(int, cmd_insert[1:-1].split('-'))
list_season_select = list(range(start, end + 1))
# For all seasons
elif cmd_insert == "*":
list_season_select = list(range(1, max_count+1))
# Return list of selected seasons)
logging.info(f"List return: {list_season_select}")
return list_season_select

View File

@ -1,3 +1,27 @@
# 09.06.24 # 09.06.24
from .site import title_search # Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import title_search, get_select_title
from .series import download_series
def search():
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
film_search = msg.ask("\n[purple]Insert word to search in all site").strip()
len_database = title_search(film_search)
if len_database != 0:
# Select title from list
select_title = get_select_title()
# Download only film
download_series(select_title)

View File

@ -1,4 +1,4 @@
# 09.06.24 # 09.06.24
MAIN_FOLDER = "guardaserie" MAIN_FOLDER = "guardaserie"
MOVIE_FOLDER = "Serie" SERIES_FOLDER = "Serie"

View File

@ -0,0 +1,169 @@
# 13.06.24
import os
import sys
import logging
# Internal utilities
from Src.Util.console import console, msg
from Src.Util._jsonConfig import config_manager
from Src.Util.table import TVShowManager
from Src.Util.message import start_message
from Src.Lib.Hls.downloader import Downloader
# Logic class
from .Core.Class.SearchType import MediaItem
from .Core.Class.ScrapeSerie import GetSerieInfo
from .Core.Util.manage_ep import manage_selection
from .Core.Player.supervideo import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import MAIN_FOLDER, SERIES_FOLDER
# Variable
table_show_manager = TVShowManager()
video_source = VideoSource()
def donwload_video(scape_info_serie: GetSerieInfo, index_season_selected: int, index_episode_selected: int) -> None:
"""
Download a single episode video.
Args:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- index_episode_selected (int): Index of the selected episode.
"""
start_message()
# Get info about episode
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}")
print()
# Define filename and path for the downloaded video
mp4_name = f"{obj_episode.get('name')}.mp4"
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}")
# Setup video source
video_source.setup(obj_episode.get('url'))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()
def donwload_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, donwload_all: bool = False) -> None:
"""
Download all episodes of a season.
Args:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- donwload_all (bool): Donwload all seasons episodes
"""
# Start message and collect information about episodes
start_message()
list_dict_episode = scape_info_serie.get_episode_number(index_season_selected)
episodes_count = len(list_dict_episode)
# Download all episodes wihtout ask
if donwload_all:
for i_episode in range(1, episodes_count+1):
donwload_video(scape_info_serie, index_season_selected, i_episode)
console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.")
# If not download all episode but a single season
if not donwload_all:
# Display episodes list and manage user selection
last_command = display_episodes_list(scape_info_serie.list_episodes)
list_episode_select = manage_selection(last_command, episodes_count)
# Download selected episodes
if len(list_episode_select) == 1 and last_command != "*":
donwload_video(scape_info_serie, index_season_selected, list_episode_select[0])
# Download all other episodes selecter
else:
for i_episode in list_episode_select:
donwload_video(scape_info_serie, index_season_selected, i_episode)
def download_series(dict_serie: MediaItem) -> None:
# Start message and set up video source
start_message()
# Init class
scape_info_serie = GetSerieInfo(dict_serie)
# Collect information about seasons
seasons_count = scape_info_serie.get_seasons_number()
# Prompt user for season selection and download episodes
console.print(f"\n[green]Season find: [red]{seasons_count}")
index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media"))
list_season_select = manage_selection(index_season_selected, seasons_count)
# Download selected episodes
if len(list_season_select) == 1 and index_season_selected != "*":
if 1 <= int(index_season_selected) <= seasons_count:
donwload_episode(scape_info_serie, list_season_select[0])
# Dowload all seasons and episodes
elif index_season_selected == "*":
for i_season in list_season_select:
donwload_episode(scape_info_serie, i_season, True)
# Download all other season selecter
else:
for i_season in list_season_select:
donwload_episode(scape_info_serie, i_season)
def display_episodes_list(obj_episode_manager) -> str:
"""
Display episodes list and handle user input.
Returns:
last_command (str): Last command entered by the user.
"""
# Set up table for displaying episodes
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
}
table_show_manager.add_column(column_info)
# Populate the table with episodes information
for media in obj_episode_manager:
table_show_manager.add_tv_show({
'Index': str(media.get('number')),
'Name': media.get('name'),
})
# Run the table and handle user input
last_command = table_show_manager.run()
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
return last_command

View File

@ -8,61 +8,130 @@ from urllib.parse import urlparse
# External libraries # External libraries
import httpx import httpx
from bs4 import BeautifulSoup
# Internal utilities # Internal utilities
from Src.Util.table import TVShowManager
from Src.Util.console import console, msg from Src.Util.console import console, msg
from Src.Util.os import create_folder, can_create_file
from Src.Util._jsonConfig import config_manager from Src.Util._jsonConfig import config_manager
from Src.Util.headers import get_headers from Src.Util.headers import get_headers
from Src.Lib.Hls.downloader import Downloader
# Logic class # Logic class
from .Core.Player.supervideo import VideoSource from .Core.Class.SearchType import MediaManager, MediaItem
# Variable
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
# Config # Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path') ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
from .costant import MAIN_FOLDER, MOVIE_FOLDER
def title_search() -> int:
def title_search(word_to_search) -> int:
""" """
Search for titles based on a search query. Search for titles based on a search query.
""" """
print()
url_search = msg.ask(f"[cyan]Insert url title")
# Send request to search for titles # Send request to search for titles
response = httpx.get(url_search, headers={'user-agent': get_headers()}) response = httpx.get(f"https://guardaserie.ceo/?story={word_to_search}&do=search&subaction=search", headers={'user-agent': get_headers()})
response.raise_for_status() response.raise_for_status()
# Get playlist # Create soup and find table
video_source = VideoSource() soup = BeautifulSoup(response.text, "html.parser")
video_source.setup(url_search) table_content = soup.find('div', class_="mlnew-list")
for serie_div in table_content.find_all('div', class_='mlnew'):
parsed_url = urlparse(url_search)
path_parts = parsed_url.path.split('/')
mp4_name = path_parts[-2] if path_parts[-1] == '' else path_parts[-1] + ".mp4"
# Create destination folder try:
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER) title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True)
link = serie_div.find('div', class_='mlnh-2').find('a')['href']
imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True)
# Check if can create file output serie_info = {
create_folder(mp4_path) 'name': title,
if not can_create_file(mp4_name): 'url': link,
logging.error("Invalid mp4 name.") 'score': imdb_rating
}
media_search_manager.add_media(serie_info)
except:
pass
# Return the number of titles found
return media_search_manager.get_length()
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
"Score": {'color': 'cyan'},
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
'Score': media.score,
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0) sys.exit(0)
# Get m3u8 master playlist def manager_clear():
master_playlist = video_source.get_playlist() """
Clears the data lists managed by media_search_manager and table_show_manager.
# Download the film using the m3u8 playlist, and output filename This function clears the data lists managed by global variables media_search_manager
Downloader( and table_show_manager. It removes all the items from these lists, effectively
m3u8_playlist = master_playlist, resetting them to empty lists.
output_filename = os.path.join(mp4_path, mp4_name) """
).start() global media_search_manager, table_show_manager
# Clear list of data
media_search_manager.clear()
table_show_manager.clear()

View File

@ -16,7 +16,7 @@ from .film import download_film
from .series import download_series from .series import download_series
def main_film_series(): def search():
""" """
Main function of the application for film and series. Main function of the application for film and series.
""" """

View File

@ -0,0 +1,340 @@
# 18.04.24
import os
import sys
import time
import queue
import threading
import logging
import binascii
from queue import PriorityQueue
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
# External libraries
import httpx
from tqdm import tqdm
# Internal utilities
from Src.Util.console import console
from Src.Util.headers import get_headers, random_headers
from Src.Util.color import Colors
from Src.Util._jsonConfig import config_manager
from Src.Util.os import check_file_existence
# Logic class
from ..M3U8 import (
M3U8_Decryption,
M3U8_Ts_Estimator,
M3U8_Parser,
M3U8_UrlFix
)
from .proxyes import main_test_proxy
# Warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Config
TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers')
TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay')
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt")
PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min')
PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max')
# Variable
headers_index = config_manager.get_dict('REQUESTS', 'index')
class RetryTransport(httpx.BaseTransport):
def __init__(self, transport, retries=3, backoff_factor=0.3):
self.transport = transport
self.retries = retries
self.backoff_factor = backoff_factor
def handle_request(self, request: httpx.Request) -> httpx.Response:
url = request.url
for attempt in range(1, self.retries + 1):
try:
response = self.transport.handle_request(request)
response.raise_for_status()
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if attempt == self.retries:
raise
else:
wait = self.backoff_factor * (2 ** (attempt - 1))
print(f"Attempt {attempt} for URL {url} failed: {e}. Retrying in {wait} seconds...")
time.sleep(wait)
transport = RetryTransport(httpx.HTTPTransport())
client = httpx.Client(transport=transport, verify=False, timeout=REQUEST_TIMEOUT)
class M3U8_Segments:
def __init__(self, url: str, tmp_folder: str):
"""
Initializes the M3U8_Segments object.
Args:
- url (str): The URL of the M3U8 playlist.
- tmp_folder (str): The temporary folder to store downloaded segments.
"""
self.url = url
self.tmp_folder = tmp_folder
self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts")
os.makedirs(self.tmp_folder, exist_ok=True)
# Util class
self.decryption: M3U8_Decryption = None
self.class_ts_estimator = M3U8_Ts_Estimator(0)
self.class_url_fixer = M3U8_UrlFix(url)
# Sync
self.queue = PriorityQueue()
self.stop_event = threading.Event()
def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes:
"""
Retrieves the encryption key from the M3U8 playlist.
Args:
- m3u8_parser (M3U8_Parser): The parser object containing M3U8 playlist information.
Returns:
bytes: The encryption key in bytes.
"""
headers_index['user-agent'] = get_headers()
# Construct the full URL of the key
key_uri = urljoin(self.url, m3u8_parser.keys.get('uri'))
parsed_url = urlparse(key_uri)
self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
logging.info(f"Uri key: {key_uri}")
try:
response = httpx.get(key_uri, headers=headers_index)
response.raise_for_status()
except Exception as e:
raise Exception(f"Failed to fetch key from {key_uri}: {e}")
# Convert the content of the response to hexadecimal and then to bytes
hex_content = binascii.hexlify(response.content).decode('utf-8')
byte_content = bytes.fromhex(hex_content)
logging.info(f"Key: ('hex': {hex_content}, 'byte': {byte_content})")
return byte_content
def parse_data(self, m3u8_content: str) -> None:
"""
Parses the M3U8 content to extract segment information.
Args:
- m3u8_content (str): The content of the M3U8 file.
"""
m3u8_parser = M3U8_Parser()
m3u8_parser.parse_data(uri=self.url, raw_content=m3u8_content)
console.log(f"[red]Expected duration after download: {m3u8_parser.get_duration()}")
console.log(f"[red]There is key: [yellow]{m3u8_parser.keys is not None}")
# Check if there is an encryption key in the playlis
if m3u8_parser.keys is not None:
try:
# Extract byte from the key
key = self.__get_key__(m3u8_parser)
except Exception as e:
raise Exception(f"Failed to retrieve encryption key {e}.")
iv = m3u8_parser.keys.get('iv')
method = m3u8_parser.keys.get('method')
# Create a decryption object with the key and set the method
self.decryption = M3U8_Decryption(key, iv, method)
# Store the segment information parsed from the playlist
self.segments = m3u8_parser.segments
# Fix URL if it is incomplete (missing 'http')
for i in range(len(self.segments)):
segment_url = self.segments[i]
if "http" not in segment_url:
self.segments[i] = self.class_url_fixer.generate_full_url(segment_url)
logging.info(f"Generated new URL: {self.segments[i]}, from: {segment_url}")
# Update segments for estimator
self.class_ts_estimator.total_segments = len(self.segments)
logging.info(f"Segmnets to donwload: [{len(self.segments)}]")
# Proxy
if THERE_IS_PROXY_LIST:
console.log("[red]Validate proxy.")
self.valid_proxy = main_test_proxy(self.segments[0])
console.log(f"[cyan]N. Valid ip: [red]{len(self.valid_proxy)}")
if len(self.valid_proxy) == 0:
sys.exit(0)
def get_info(self) -> None:
"""
Makes a request to the index M3U8 file to get information about segments.
"""
headers_index['user-agent'] = get_headers()
# Send a GET request to retrieve the index M3U8 file
response = httpx.get(self.url, headers=headers_index)
response.raise_for_status()
# Save the M3U8 file to the temporary folder
if response.status_code == 200:
path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8")
open(path_m3u8_file, "w+").write(response.text)
# Parse the text from the M3U8 index file
self.parse_data(response.text)
def make_requests_stream(self, ts_url: str, index: int, progress_bar: tqdm) -> None:
"""
Downloads a TS segment and adds it to the segment queue.
Args:
- ts_url (str): The URL of the TS segment.
- index (int): The index of the segment.
- progress_bar (tqdm): Progress counter for tracking download progress.
"""
# Generate headers
start_time = time.time()
# Make request to get content
if THERE_IS_PROXY_LIST:
proxy = self.valid_proxy[index % len(self.valid_proxy)]
logging.info(f"Use proxy: {proxy}")
if 'key_base_url' in self.__dict__:
response = client.get(ts_url, headers=random_headers(self.key_base_url), proxy=proxy)
else:
response = client.get(ts_url, headers={'user-agent': get_headers()}, proxy=proxy)
else:
if 'key_base_url' in self.__dict__:
response = client.get(ts_url, headers=random_headers(self.key_base_url))
else:
response = client.get(ts_url, headers={'user-agent': get_headers()})
# Get response content
response.raise_for_status()
segment_content = response.content
# Update bar
duration = time.time() - start_time
response_size = int(response.headers.get('Content-Length', 0))
self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar)
# Decrypt the segment content if decryption is needed
if self.decryption is not None:
segment_content = self.decryption.decrypt(segment_content)
# Add the segment to the queue
self.queue.put((index, segment_content))
progress_bar.update(1)
def write_segments_to_file(self):
"""
Writes downloaded segments to a file in the correct order.
"""
with open(self.tmp_file_path, 'wb') as f:
expected_index = 0
buffer = {}
while not self.stop_event.is_set() or not self.queue.empty():
try:
index, segment_content = self.queue.get(timeout=1)
if index == expected_index:
f.write(segment_content)
f.flush()
expected_index += 1
# Write any buffered segments in order
while expected_index in buffer:
f.write(buffer.pop(expected_index))
f.flush()
expected_index += 1
else:
buffer[index] = segment_content
except queue.Empty:
continue
def download_streams(self, add_desc):
"""
Downloads all TS segments in parallel and writes them to a file.
Args:
- add_desc (str): Additional description for the progress bar.
"""
if TQDM_USE_LARGE_BAR:
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
else:
bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
# Create progress bar
progress_bar = tqdm(
total=len(self.segments),
unit='s',
ascii='░▒█',
bar_format=bar_format
)
# Start a separate thread to write segments to the file
writer_thread = threading.Thread(target=self.write_segments_to_file)
writer_thread.start()
# Ff proxy avaiable set max_workers to number of proxy
# else set max_workers to TQDM_MAX_WORKER
max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER
# if proxy avaiable set timeout to variable time
# else set timeout to TDQM_DELAY_WORKER
if THERE_IS_PROXY_LIST:
num_proxies = len(self.valid_proxy)
self.working_proxy_list = self.valid_proxy
if num_proxies > 0:
# calculate delay based on number of proxies
# dalay should be between 0.5 and 1
delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (num_proxies + 1)))
else:
delay = TQDM_DELAY_WORKER
else:
delay = TQDM_DELAY_WORKER
# Start all workers
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for index, segment_url in enumerate(self.segments):
time.sleep(delay)
executor.submit(self.make_requests_stream, segment_url, index, progress_bar)
# Wait for all tasks to complete
executor.shutdown(wait=True)
self.stop_event.set()
writer_thread.join()
progress_bar.close()

View File

@ -55,7 +55,6 @@ DOWNLOAD_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'download_sub')
MERGE_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'merge_subs') MERGE_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'merge_subs')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_DOWNLOAD', 'cleanup_tmp_folder') REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_DOWNLOAD', 'cleanup_tmp_folder')
FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution') FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution')
CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report')
# Variable # Variable
@ -621,16 +620,3 @@ class Downloader():
# Clean all tmp file # Clean all tmp file
self.__clean__(converted_out_path) self.__clean__(converted_out_path)
# Create download report
if CREATE_REPORT:
# Get variable to add
current_date = datetime.today().date()
base_filename = os.path.split(self.output_filename)[-1].replace('.mp4', '')
filename_out_size = format_size(os.path.getsize(self.output_filename))
# Add new row to table and save
report_table.add_row_to_database(str(current_date), str(base_filename), str(filename_out_size))
report_table.save_database()

View File

@ -8,12 +8,13 @@ import threading
import logging import logging
import binascii import binascii
from queue import PriorityQueue from queue import PriorityQueue
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
# External libraries # External libraries
import httpx import requests
from requests.exceptions import HTTPError, ConnectionError, Timeout, RequestException
from tqdm import tqdm from tqdm import tqdm
@ -22,7 +23,6 @@ from Src.Util.console import console
from Src.Util.headers import get_headers, random_headers from Src.Util.headers import get_headers, random_headers
from Src.Util.color import Colors from Src.Util.color import Colors
from Src.Util._jsonConfig import config_manager from Src.Util._jsonConfig import config_manager
from Src.Util.os import check_file_existence
# Logic class # Logic class
@ -45,44 +45,13 @@ TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers')
TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay') TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay')
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar') TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout') REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt") THERE_IS_PROXY_LIST = False
PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min')
PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max')
# Variable # Variable
headers_index = config_manager.get_dict('REQUESTS', 'index') headers_index = config_manager.get_dict('REQUESTS', 'index')
class RetryTransport(httpx.BaseTransport):
def __init__(self, transport, retries=3, backoff_factor=0.3):
self.transport = transport
self.retries = retries
self.backoff_factor = backoff_factor
def handle_request(self, request: httpx.Request) -> httpx.Response:
url = request.url
for attempt in range(1, self.retries + 1):
try:
response = self.transport.handle_request(request)
response.raise_for_status()
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if attempt == self.retries:
raise
else:
wait = self.backoff_factor * (2 ** (attempt - 1))
print(f"Attempt {attempt} for URL {url} failed: {e}. Retrying in {wait} seconds...")
time.sleep(wait)
transport = RetryTransport(httpx.HTTPTransport())
client = httpx.Client(transport=transport, verify=False, timeout=REQUEST_TIMEOUT)
class M3U8_Segments: class M3U8_Segments:
def __init__(self, url: str, tmp_folder: str): def __init__(self, url: str, tmp_folder: str):
""" """
@ -118,14 +87,13 @@ class M3U8_Segments:
""" """
headers_index['user-agent'] = get_headers() headers_index['user-agent'] = get_headers()
# Construct the full URL of the key # Construct the full URL of the key
key_uri = urljoin(self.url, m3u8_parser.keys.get('uri')) key_uri = urljoin(self.url, m3u8_parser.keys.get('uri'))
parsed_url = urlparse(key_uri)
self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
logging.info(f"Uri key: {key_uri}") logging.info(f"Uri key: {key_uri}")
try: try:
response = httpx.get(key_uri, headers=headers_index) response = requests.get(key_uri, headers=headers_index)
response.raise_for_status() response.raise_for_status()
except Exception as e: except Exception as e:
@ -198,11 +166,11 @@ class M3U8_Segments:
headers_index['user-agent'] = get_headers() headers_index['user-agent'] = get_headers()
# Send a GET request to retrieve the index M3U8 file # Send a GET request to retrieve the index M3U8 file
response = httpx.get(self.url, headers=headers_index) response = requests.get(self.url, headers=headers_index)
response.raise_for_status() response.raise_for_status()
# Save the M3U8 file to the temporary folder # Save the M3U8 file to the temporary folder
if response.status_code == 200: if response.ok:
path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8") path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8")
open(path_m3u8_file, "w+").write(response.text) open(path_m3u8_file, "w+").write(response.text)
@ -218,43 +186,43 @@ class M3U8_Segments:
- index (int): The index of the segment. - index (int): The index of the segment.
- progress_bar (tqdm): Progress counter for tracking download progress. - progress_bar (tqdm): Progress counter for tracking download progress.
""" """
try:
# Generate headers # Generate headers
start_time = time.time() start_time = time.time()
# Make request to get content # Make request to get content
if THERE_IS_PROXY_LIST: if THERE_IS_PROXY_LIST:
proxy = self.valid_proxy[index % len(self.valid_proxy)] proxy = self.valid_proxy[index % len(self.valid_proxy)]
logging.info(f"Use proxy: {proxy}") logging.info(f"Use proxy: {proxy}")
response = requests.get(ts_url, headers=random_headers(), timeout=REQUEST_TIMEOUT, proxies=proxy)
if 'key_base_url' in self.__dict__:
response = client.get(ts_url, headers=random_headers(self.key_base_url), proxy=proxy)
else: else:
response = client.get(ts_url, headers={'user-agent': get_headers()}, proxy=proxy) response = requests.get(ts_url, headers=random_headers(), timeout=REQUEST_TIMEOUT)
else:
if 'key_base_url' in self.__dict__:
response = client.get(ts_url, headers=random_headers(self.key_base_url))
else:
response = client.get(ts_url, headers={'user-agent': get_headers()})
# Get response content # Get response content
response.raise_for_status() response.raise_for_status()
segment_content = response.content segment_content = response.content
# Update bar # Update bar
duration = time.time() - start_time duration = time.time() - start_time
response_size = int(response.headers.get('Content-Length', 0)) response_size = int(response.headers.get('Content-Length', 0))
self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar) self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar)
# Decrypt the segment content if decryption is needed # Decrypt the segment content if decryption is needed
if self.decryption is not None: if self.decryption is not None:
segment_content = self.decryption.decrypt(segment_content) segment_content = self.decryption.decrypt(segment_content)
# Add the segment to the queue # Add the segment to the queue
self.queue.put((index, segment_content)) self.queue.put((index, segment_content))
progress_bar.update(1) progress_bar.update(1)
except (HTTPError, ConnectionError, Timeout, RequestException) as e:
progress_bar.update(1)
logging.error(f"Request-related exception while downloading segment: {e}")
except Exception as e:
progress_bar.update(1)
logging.error(f"An unexpected exception occurred while download segment: {e}")
def write_segments_to_file(self): def write_segments_to_file(self):
""" """
@ -308,29 +276,10 @@ class M3U8_Segments:
writer_thread = threading.Thread(target=self.write_segments_to_file) writer_thread = threading.Thread(target=self.write_segments_to_file)
writer_thread.start() writer_thread.start()
# Ff proxy avaiable set max_workers to number of proxy
# else set max_workers to TQDM_MAX_WORKER
max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER
# if proxy avaiable set timeout to variable time
# else set timeout to TDQM_DELAY_WORKER
if THERE_IS_PROXY_LIST:
num_proxies = len(self.valid_proxy)
self.working_proxy_list = self.valid_proxy
if num_proxies > 0:
# calculate delay based on number of proxies
# dalay should be between 0.5 and 1
delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (num_proxies + 1)))
else:
delay = TQDM_DELAY_WORKER
else:
delay = TQDM_DELAY_WORKER
# Start all workers # Start all workers
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor:
for index, segment_url in enumerate(self.segments): for index, segment_url in enumerate(self.segments):
time.sleep(delay) time.sleep(TQDM_DELAY_WORKER)
executor.submit(self.make_requests_stream, segment_url, index, progress_bar) executor.submit(self.make_requests_stream, segment_url, index, progress_bar)
# Wait for all tasks to complete # Wait for all tasks to complete

10
run.py
View File

@ -18,11 +18,11 @@ from Src.Util.logger import Logger
# Internal api # Internal api
from Src.Api.Streamingcommunity import main_film_series as streamingcommunity_film_serie from Src.Api.Streamingcommunity import search as streamingcommunity_film_serie
from Src.Api.Animeunity import main_anime as streamingcommunity_anime from Src.Api.Animeunity import search as streamingcommunity_anime
from Src.Api.Altadefinizione import main_film as altadefinizione_film from Src.Api.Altadefinizione import search as altadefinizione_film
from Src.Api.Ddlstreamitaly import title_search as ddlstreamitaly_film_serie from Src.Api.Ddlstreamitaly import search as ddlstreamitaly_film_serie
from Src.Api.Guardaserie import title_search as guardaserie_serie from Src.Api.Guardaserie import search as guardaserie_serie
# Config # Config