Add piratebays

This commit is contained in:
Lovi 2024-11-07 11:22:11 +01:00
parent 1b0d14fcdb
commit 1aeac8fc59
27 changed files with 561 additions and 125 deletions

View File

@ -5,25 +5,33 @@ from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title
from .site import title_search, run_get_select_title, media_search_manager
from .title import download_title
# Variable
indice = 8
_use_for = "film_serie"
_useFor = "film_serie"
_deprecate = False
_priority = 2
_engineDownload = "tor"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -84,11 +84,11 @@ def search_domain(site_name: str, base_url: str):
# Test the current domain
response_follow = httpx.get(f"{base_url}.{domain}", headers={'user-agent': get_headers()}, timeout=max_timeout, follow_redirects=True)
console.print(f"[cyan]Response site[white]: [red]{response_follow.status_code}")
#console.print(f"[cyan]Response site[white]: [red]{response_follow.status_code}")
response_follow.raise_for_status()
except Exception as e:
console.print(f"[cyan]Test url[white]: [red]{base_url}.{domain}, [cyan]error[white]: [red]{e}")
#console.print(f"[cyan]Test url[white]: [red]{base_url}.{domain}, [cyan]error[white]: [red]{e}")
query = base_url.split("/")[-1]
first_url = google_search(query)
@ -115,16 +115,15 @@ def search_domain(site_name: str, base_url: str):
config_manager.write_config()
# Return config domain
console.print(f"[cyan]Return domain: [red]{new_domain_extract} \n")
#console.print(f"[cyan]Return domain: [red]{new_domain_extract} \n")
return new_domain_extract, f"{base_url}.{new_domain_extract}"
else:
console.print("[bold red]\nManually change the domain in the JSON file.[/bold red]")
sys.exit(0)
raise
else:
console.print("[bold red]No valid URL to follow redirects.[/bold red]")
sys.exit(0)
# Ensure the URL is in string format before parsing
parsed_url = urlparse(str(response_follow.url))
@ -138,5 +137,5 @@ def search_domain(site_name: str, base_url: str):
config_manager.write_config()
# Return config domain
console.print(f"[cyan]Return domain: [red]{tld} \n")
#console.print(f"[cyan]Return domain: [red]{tld} \n")
return tld, f"{base_url}.{tld}"

View File

@ -5,25 +5,33 @@ from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title
from .site import title_search, run_get_select_title, media_search_manager
from .film import download_film
# Variable
indice = 2
_use_for = "film"
_useFor = "film"
_deprecate = False
_priority = 2
_engineDownload = "hls"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -47,6 +47,7 @@ def title_search(title_search: str) -> int:
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
raise
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")

View File

@ -5,22 +5,30 @@ from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title
from .site import title_search, run_get_select_title, media_search_manager
from .anime import download_film, download_series
# Variable
indice = 1
_use_for = "anime"
_useFor = "anime"
_deprecate = False
_priority = 2
_engineDownload = "mp4"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -5,25 +5,33 @@ from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title
from .site import title_search, run_get_select_title, media_search_manager
from .title import download_title
# Variable
indice = 7
_use_for = "film_serie"
_useFor = "film_serie"
_deprecate = False
_priority = 2
_engineDownload = "tor"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -8,25 +8,35 @@ from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title
from .site import title_search, run_get_select_title, media_search_manager
from .series import download_thread
# Variable
indice = 3
_use_for = "serie"
_useFor = "serie"
_deprecate = False
_priority = 2
_engineDownload = "mp4"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -5,25 +5,35 @@ from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title
from .site import title_search, run_get_select_title, media_search_manager
from .series import download_series
# Variable
indice = 4
_use_for = "serie"
_useFor = "serie"
_deprecate = False
_priority = 2
_engineDownload = "hls"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -11,20 +11,28 @@ from .film import download_film
# Variable
indice = 9
_use_for = "film"
_useFor = "film"
_deprecate = False
_priority = 2
_engineDownload = "hls"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Not available for the moment
if get_onylDatabase:
return 0
# Search on database
movie_id = tmdb.search_movie(string_to_search)
if movie_id:
if movie_id is not None:
movie_details: Json_film = tmdb.get_movie_details(tmdb_id=movie_id)
# Download only film

View File

@ -44,19 +44,20 @@ def download_film(movie_details: Json_film):
console.print(f"[yellow]Download: [red]{movie_details.title} \n")
# Make request to main site
url = f"https://{SITE_NAME}.{DOMAIN_NOW}/set-movie-a/{movie_details.imdb_id}"
response = httpx.get(url, headers={'User-Agent': get_headers()})
response.raise_for_status()
# Extract supervideo url
try:
soup = BeautifulSoup(response.text, "html.parser")
player_links = soup.find("ul", class_ = "_player-mirrors").find_all("li")
supervideo_url = "https:" + player_links[0].get("data-link")
url = f"https://{SITE_NAME}.{DOMAIN_NOW}/set-movie-a/{movie_details.imdb_id}"
response = httpx.get(url, headers={'User-Agent': get_headers()})
response.raise_for_status()
except:
logging.error("Not found in the server.")
sys.exit(0)
logging.error(f"Not found in the server. Dict: {movie_details}")
raise
# Extract supervideo url
soup = BeautifulSoup(response.text, "html.parser")
player_links = soup.find("ul", class_ = "_player-mirrors").find_all("li")
supervideo_url = "https:" + player_links[0].get("data-link")
# Set domain and media ID for the video source
video_source = VideoSource()

View File

@ -0,0 +1,47 @@
# 02.07.24
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title, media_search_manager
from .title import download_title
# Variable
indice = 8
_useFor = "film_serie"
_deprecate = False
_priority = 2
_engineDownload = "tor"
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Search on database
len_database = title_search(string_to_search)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list
select_title = run_get_select_title()
# Download title
download_title(select_title)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()

View File

@ -0,0 +1,15 @@
# 09.06.24
import os
# Internal utilities
from Src.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain']
SERIES_FOLDER = "Serie"
MOVIE_FOLDER = "Film"

View File

@ -0,0 +1,89 @@
# 02.07.24
# External libraries
import httpx
from bs4 import BeautifulSoup
from unidecode import unidecode
# Internal utilities
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
from Src.Util.headers import get_headers
from Src.Util.table import TVShowManager
from ..Template import get_select_title
# Logic class
from ..Template.Class.SearchType import MediaManager
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(word_to_search: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
Returns:
- int: The number of titles found.
"""
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")
# Construct the full site URL and load the search page
try:
response = httpx.get(
url=f"https://1.{SITE_NAME}.{DOMAIN_NOW}/s/?q={word_to_search}&video=on",
headers={
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7',
'referer': 'https://wwv.thepiratebay3.co/',
'user-agent': get_headers()
},
follow_redirects=True,
timeout=max_timeout
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table = soup.find("tbody")
# Scrape div film in table on single page
for tr in table.find_all('tr'):
try:
title_info = {
'name': tr.find_all("a")[1].get_text(strip=True),
'url': tr.find_all("td")[3].find("a").get("href"),
'upload': tr.find_all("td")[2].get_text(strip=True),
'size': tr.find_all("td")[4].get_text(strip=True),
'seader': tr.find_all("td")[5].get_text(strip=True),
'leacher': tr.find_all("td")[6].get_text(strip=True),
'by': tr.find_all("td")[7].get_text(strip=True),
}
media_search_manager.add_media(title_info)
except:
continue
# Return the number of titles found
return media_search_manager.get_length()
def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)

View File

@ -0,0 +1,56 @@
# 02.07.24
import os
import sys
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.console import console
from Src.Util.message import start_message
from Src.Util.headers import get_headers
from Src.Util.os import create_folder, can_create_file, remove_special_characters
from Src.Lib.Downloader import TOR_downloader
# Logic class
from ..Template.Class.SearchType import MediaItem
# Config
from .costant import ROOT_PATH, DOMAIN_NOW, SITE_NAME, MOVIE_FOLDER
def download_title(select_title: MediaItem):
"""
Downloads a media item and saves it as an MP4 file.
Parameters:
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
"""
start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n")
print()
# Define output path
title_name = remove_special_characters(select_title.name)
mp4_name = title_name.replace("-", "_") + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(title_name.replace(".mp4", "")))
# Check if can create file output
create_folder(mp4_path)
if not can_create_file(mp4_name):
logging.error("Invalid mp4 name.")
sys.exit(0)
# Tor manager
manager = TOR_downloader()
manager.add_magnet_link(select_title.url)
manager.start_download()
manager.move_downloaded_files(mp4_path)

View File

@ -5,29 +5,35 @@ from Src.Util.console import console, msg
# Logic class
from .site import get_version_and_domain, title_search, run_get_select_title
from .site import get_version_and_domain, title_search, run_get_select_title, media_search_manager
from .film import download_film
from .series import download_series
# Variable
indice = 0
_use_for = "film_serie"
_useFor = "film_serie"
_deprecate = False
_priority = 1
_engineDownload = "hls"
def search():
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
if string_to_search is None:
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
# Get site domain and version and get result of the search
site_version, domain = get_version_and_domain()
len_database = title_search(string_to_search, domain)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list

View File

@ -44,8 +44,6 @@ def get_version(text: str):
str: The version extracted from the webpage.
list: Top 10 titles headlines for today.
"""
console.print("[cyan]Make request to get version [white]...")
try:
# Parse request to site
@ -53,7 +51,7 @@ def get_version(text: str):
# Extract version
version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version']
console.print(f"[cyan]Get version [white]=> [red]{version} \n")
#console.print(f"[cyan]Get version [white]=> [red]{version} \n")
return version

View File

@ -149,23 +149,15 @@ class ContentExtractor:
"""
pass
def start(self, url, m3u8_playlist_text: str):
def start(self, obj_parse: M3U8_Parser):
"""
Starts the extraction process by parsing the M3U8 playlist and collecting audio, subtitle, and video data.
Args:
url (str): The URL of the M3U8 playlist.
m3u8_playlist_text (str): The raw text content of the M3U8 playlist.
obj_parse (str): The M3U8_Parser obj of the M3U8 playlist.
"""
# Create an instance of the M3U8_Parser class
self.obj_parse = M3U8_Parser()
# Extract information about the M3U8 playlist
self.obj_parse.parse_data(
uri=url,
raw_content=m3u8_playlist_text
)
self.obj_parse = obj_parse
# Collect audio, subtitle, and video information
self._collect_audio()
@ -689,6 +681,7 @@ class HLS_Downloader:
self.is_playlist_url = is_playlist_url
self.is_index_url = is_index_url
self.expected_real_time = None
self.instace_parserClass = M3U8_Parser()
def _generate_output_filename(self, output_filename, m3u8_playlist, m3u8_index):
"""
@ -733,7 +726,7 @@ class HLS_Downloader:
new_filename = unidecode(new_filename)
return new_filename
def start(self):
"""
Initiates the downloading process. Checks if the output file already exists and proceeds with processing the playlist or index.
@ -744,33 +737,54 @@ class HLS_Downloader:
return
self.path_manager.create_directories()
# Determine whether to process a playlist or index
if self.m3u8_playlist:
if not GET_ONLY_LINK:
r_proc = self._process_playlist()
if r_proc == 404:
return 404
# Parse data from url and get if is a master playlist
self.instace_parserClass.parse_data(uri=self.m3u8_playlist, raw_content=HttpClient().get(self.m3u8_playlist))
is_masterPlaylist = self.instace_parserClass.is_master_playlist
# Check if it's a real master playlist
if is_masterPlaylist:
if not GET_ONLY_LINK:
r_proc = self._process_playlist()
if r_proc == 404:
return 404
else:
return None
else:
return None
return {
'path': self.output_filename,
'url': self.m3u8_playlist
}
else:
return {
'path': self.output_filename,
'url': self.m3u8_playlist
}
console.log("[red]Error: URL passed to M3U8_Parser is an index playlist; expected a master playlist. Crucimorfo strikes again!")
elif self.m3u8_index:
if not GET_ONLY_LINK:
self._process_index()
return None
# Parse data from url and get if is a master playlist
self.instace_parserClass.parse_data(uri=self.m3u8_index, raw_content=HttpClient().get(self.m3u8_playlist))
is_masterPlaylist = self.instace_parserClass.is_master_playlist
# Check if it's a real index playlist
if not is_masterPlaylist:
if not GET_ONLY_LINK:
self._process_index()
return None
else:
return {
'path': self.output_filename,
'url': self.m3u8_index
}
else:
return {
'path': self.output_filename,
'url': self.m3u8_index
}
console.log("[red]Error: URL passed to M3U8_Parser is an master playlist; expected a index playlist. Crucimorfo strikes again!")
def _clean(self, out_path: str) -> None:
"""
@ -877,7 +891,7 @@ class HLS_Downloader:
# Collect information about the playlist
if self.is_playlist_url:
self.content_extractor.start(self.m3u8_playlist, m3u8_playlist_text)
self.content_extractor.start(self.instace_parserClass)
else:
self.content_extractor.start("https://fake.com", m3u8_playlist_text)

View File

@ -43,6 +43,8 @@ REQUEST_VERIFY = config_manager.get_bool('REQUESTS', 'verify_ssl')
THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt")
PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min')
PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max')
DEFAULT_VIDEO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_video_workser')
DEFAULT_AUDIO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_audio_workser')
# Variable
@ -216,20 +218,45 @@ class M3U8_Segments:
# Make request to get content
if THERE_IS_PROXY_LIST:
# Get proxy from list
proxy = self.valid_proxy[index % len(self.valid_proxy)]
logging.info(f"Use proxy: {proxy}")
with httpx.Client(proxies=proxy, verify=need_verify) as client:
if 'key_base_url' in self.__dict__:
response = client.get(ts_url, headers=random_headers(self.key_base_url), timeout=max_timeout, follow_redirects=True)
response = client.get(
url=ts_url,
headers=random_headers(self.key_base_url),
timeout=max_timeout,
follow_redirects=True
)
else:
response = client.get(ts_url, headers={'user-agent': get_headers()}, timeout=max_timeout, follow_redirects=True)
response = client.get(
url=ts_url,
headers={'user-agent': get_headers()},
timeout=max_timeout,
follow_redirects=True
)
else:
with httpx.Client(verify=need_verify) as client_2:
if 'key_base_url' in self.__dict__:
response = client_2.get(ts_url, headers=random_headers(self.key_base_url), timeout=max_timeout, follow_redirects=True)
response = client_2.get(
url=ts_url,
headers=random_headers(self.key_base_url),
timeout=max_timeout,
follow_redirects=True
)
else:
response = client_2.get(ts_url, headers={'user-agent': get_headers()}, timeout=max_timeout, follow_redirects=True)
response = client_2.get(
url=ts_url,
headers={'user-agent': get_headers()},
timeout=max_timeout,
follow_redirects=True
)
# Validate response and content
response.raise_for_status()
@ -248,15 +275,22 @@ class M3U8_Segments:
segment_content = self.decryption.decrypt(segment_content)
if len(segment_content) < min_segment_size:
raise Exception(f"Decrypted segment {index} too small ({len(segment_content)} bytes)")
except Exception as e:
logging.error(f"Decryption failed for segment {index}: {str(e)}")
raise
# Update progress and queue
self.class_ts_estimator.update_progress_bar(content_size, duration, progress_bar)
# Add the segment to the queue
self.queue.put((index, segment_content))
self.downloaded_segments.add(index) # Track successfully downloaded segments
# Track successfully downloaded segments
self.downloaded_segments.add(index)
progress_bar.update(1)
# Break out of the loop on success
return
except Exception as e:
@ -344,15 +378,15 @@ class M3U8_Segments:
# Select audio workers from folder of frames stack prev call.
try:
VIDEO_WORKERS = int(config_manager.get_dict('SITE', config_site)['video_workers'])
if VIDEO_WORKERS == -1: VIDEO_WORKERS = os.cpu_count()
except:
VIDEO_WORKERS = os.cpu_count()
#VIDEO_WORKERS = os.cpu_count()
VIDEO_WORKERS = DEFAULT_VIDEO_WORKERS
try:
AUDIO_WORKERS = int(config_manager.get_dict('SITE', config_site)['audio_workers'])
if AUDIO_WORKERS == -1: AUDIO_WORKERS = os.cpu_count()
except:
AUDIO_WORKERS = os.cpu_count()
#AUDIO_WORKERS = os.cpu_count()
AUDIO_WORKERS = DEFAULT_AUDIO_WORKERS
# Differnt workers for audio and video
if "video" in str(add_desc):

View File

@ -43,8 +43,11 @@ class TOR_downloader:
- username (str): Username for logging into qBittorrent.
- password (str): Password for logging into qBittorrent.
"""
try: self.qb = Client(f'http://{HOST}:{PORT}/')
except: logging.error("Start qbitorrent first.")
try:
self.qb = Client(f'http://{HOST}:{PORT}/')
except:
logging.error("Start qbitorrent first.")
self.username = USERNAME
self.password = PASSWORD
self.logged_in = False

View File

@ -421,6 +421,7 @@ class M3U8_Subtitle:
class M3U8_Parser:
def __init__(self):
self.is_master_playlist = None
self.segments = []
self.video_playlist = []
self.keys = None
@ -450,6 +451,7 @@ class M3U8_Parser:
self.__parse_video_info__(m3u8_obj)
self.__parse_subtitles_and_audio__(m3u8_obj)
self.__parse_segments__(m3u8_obj)
self.is_master_playlist = self.__is_master__(m3u8_obj)
@staticmethod
def extract_resolution(uri: str) -> int:
@ -475,6 +477,28 @@ class M3U8_Parser:
logging.warning("No resolution found with custom parsing.")
return (0, 0)
def __is_master__(self, m3u8_obj) -> bool:
"""
Determines if the given M3U8 object is a master playlist.
Parameters:
- m3u8_obj (m3u8.M3U8): The parsed M3U8 object.
Returns:
- bool: True if it's a master playlist, False if it's a media playlist, None if unknown.
"""
# Check if the playlist contains variants (master playlist)
if m3u8_obj.is_variant:
return True
# Check if the playlist contains segments directly (media playlist)
elif m3u8_obj.segments:
return False
# Return None if the playlist type is undetermined
return None
def __parse_video_info__(self, m3u8_obj) -> None:
"""
Extracts video information from the M3U8 object.

View File

@ -1,7 +1,6 @@
# 23.06.24
# Fix import
import time
import sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))

View File

@ -1,7 +1,6 @@
# 23.06.24
# Fix import
import time
import sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
@ -13,7 +12,6 @@ sys.path.append(src_path)
from Src.Lib.Downloader import MP4_downloader
# Test
MP4_downloader(
"",

View File

@ -1,8 +1,6 @@
# 23.06.24
# Fix import
import time
import sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
@ -14,7 +12,6 @@ sys.path.append(src_path)
from Src.Lib.Downloader import TOR_downloader
# Test
manager = TOR_downloader()

View File

@ -1,7 +0,0 @@
{
"site": "",
"string_search": "",
"serie": true,
"season_cmd": "",
"episode_cmd": ""
}

106
Test/testv1.py Normal file
View File

@ -0,0 +1,106 @@
import os
import glob
import importlib
import logging
from rich.console import Console
from Src.Api.Template.Class.SearchType import MediaManager
console = Console()
def load_search_functions():
modules = []
loaded_functions = {}
# Traverse the Api directory
api_dir = os.path.join(os.path.dirname(__file__), 'Src', 'Api')
init_files = glob.glob(os.path.join(api_dir, '*', '__init__.py'))
logging.info(f"Base folder path: {api_dir}")
logging.info(f"Api module path: {init_files}")
# Retrieve modules and their indices
for init_file in init_files:
# Get folder name as module name
module_name = os.path.basename(os.path.dirname(init_file))
logging.info(f"Load module name: {module_name}")
try:
# Dynamically import the module
mod = importlib.import_module(f'Src.Api.{module_name}')
# Get 'indice' from the module
indice = getattr(mod, 'indice', 0)
is_deprecate = bool(getattr(mod, '_deprecate', True))
use_for = getattr(mod, '_useFor', 'other')
if not is_deprecate:
modules.append((module_name, indice, use_for))
except Exception as e:
console.print(f"[red]Failed to import module {module_name}: {str(e)}")
# Sort modules by 'indice'
modules.sort(key=lambda x: x[1])
# Load search functions in the sorted order
for module_name, _, use_for in modules:
# Construct a unique alias for the module
module_alias = f'{module_name}_search'
logging.info(f"Module alias: {module_alias}")
try:
# Dynamically import the module
mod = importlib.import_module(f'Src.Api.{module_name}')
# Get the search function from the module (assuming the function is named 'search' and defined in __init__.py)
search_function = getattr(mod, 'search')
# Add the function to the loaded functions dictionary
loaded_functions[module_alias] = (search_function, use_for)
except Exception as e:
console.print(f"[red]Failed to load search function from module {module_name}: {str(e)}")
return loaded_functions
def search_all_sites(loaded_functions, search_string, max_sites=2):
total_len_database = 0
site_count = 0 # Per tenere traccia del numero di siti elaborati
# Loop attraverso tutte le funzioni di ricerca caricate e eseguirle con la stessa stringa di ricerca
for module_alias, (search_function, use_for) in loaded_functions.items():
# Limita il numero di siti da cui eseguire la ricerca
if max_sites is not None and site_count >= max_sites:
break
console.print(f"\n[blue]Searching in module: {module_alias} [white](Use for: {use_for})")
try:
# Esegui la funzione di ricerca con 'get_onylDatabase=True' per ottenere solo la lunghezza del database
database: MediaManager = search_function(search_string, get_onylDatabase=True) # Usa get_onylDatabase=True
len_database = len(database.media_list)
for element in database.media_list:
print(element.__dict__)
console.print(f"[green]Database length for {module_alias}: {len_database}")
total_len_database += len_database # Aggiungi il risultato al totale
site_count += 1 # Incrementa il contatore dei siti
except Exception as e:
console.print(f"[red]Error while executing search function for {module_alias}: {str(e)}")
# Restituisce la lunghezza totale di tutti i database combinati
return total_len_database
# Example: Load the search functions, perform the search with a given string, and return the total len_database
search_string = "cars" # The search string you'd like to use
loaded_functions = load_search_functions()
total_len = search_all_sites(loaded_functions, search_string)
console.print(f"\n[cyan]Total number of results from all sites: {total_len}")

View File

@ -9,7 +9,7 @@
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
"special_chars_to_remove": "!@#$%^&*()[]{}<>|`~'\";:,?=+\u00e2\u20ac\u00a6",
"config_qbit_tor": {
"host": "192.168.1.58",
"host": "192.168.1.59",
"port": "8080",
"user": "admin",
"pass": "adminadmin"
@ -34,6 +34,8 @@
"download_video": true,
"download_audio": true,
"merge_audio": true,
"default_video_workser": 12,
"default_audio_workser": 12,
"specific_list_audio": [
"ita"
],
@ -64,18 +66,12 @@
"domain": "computer"
},
"altadefinizione": {
"video_workers": 12,
"audio_workers": 12,
"domain": "now"
},
"guardaserie": {
"video_workers": 12,
"audio_workers": 12,
"domain": "academy"
},
"mostraguarda": {
"video_workers": 12,
"audio_workers": 12,
"domain": "stream"
},
"ddlstreamitaly": {
@ -95,8 +91,8 @@
"1337xx": {
"domain": "to"
},
"cb01": {
"domain": "vet"
"piratebays": {
"domain": "to"
}
}
}

4
run.py
View File

@ -16,7 +16,7 @@ from Src.Util.message import start_message
from Src.Util.console import console, msg
from Src.Util._jsonConfig import config_manager
from Src.Upload.update import update as git_update
from Src.Util.os import get_system_summary, create_folder
from Src.Util.os import get_system_summary
from Src.Lib.TMBD import tmdb
from Src.Util.logger import Logger
@ -64,7 +64,7 @@ def load_search_functions():
# Get 'indice' from the module
indice = getattr(mod, 'indice', 0)
is_deprecate = bool(getattr(mod, '_deprecate', True))
use_for = getattr(mod, '_use_for', 'other')
use_for = getattr(mod, '_useFor', 'other')
if not is_deprecate:
modules.append((module_name, indice, use_for))