mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-07-28 13:51:47 +00:00
Test httpx (#149)
* Migrate to httpx * Revert "Migrate to httpx" This reverts commit fdd2823865824eca5e8cb8806dbddba4bcb37280. * Migrate httpx * minor fixes (#146) * Update headers * Update config * 1v. Add retry * v1 Finish Guardaserie * Need to fix client. * Remove retry * v2 Add comment guardaserie. * Add domain ... * Finish add ddl ... * Fix use of proxy. * Fix cookie error. * Update cookie. * Dynamic import. --------- Co-authored-by: Francesco Grazioso <40018163+FrancescoGrazioso@users.noreply.github.com>
This commit is contained in:
parent
ff1ae79548
commit
d447cf53b7
4
.gitignore
vendored
4
.gitignore
vendored
@ -55,4 +55,6 @@ env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Other
|
||||
Video
|
||||
Video
|
||||
list_proxy.txt
|
||||
note
|
@ -101,8 +101,7 @@ You can change some behaviors by tweaking the configuration file.
|
||||
* **verify_ssl**: Whether to verify SSL certificates.
|
||||
- **Default Value**: `false`
|
||||
|
||||
* **proxy**: The proxy to use for requests. (Note: This parameter works only with HTTP and HTTPS protocols.)
|
||||
- **Example Value**: `["http://user:pass@38.154.227.167:5868"]`
|
||||
* **proxy**: To use proxy create a file with name list_proxy.txt and copy ip and port like "122.114.232.137:8080". They need to be http
|
||||
|
||||
</details>
|
||||
|
||||
|
@ -6,7 +6,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
@ -47,12 +47,12 @@ class VideoSource:
|
||||
"""
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=self.headers)
|
||||
response = httpx.get(url, headers=self.headers, follow_redirects=True)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Request failed: {e}")
|
||||
logging.error(f"Request failed [supervideo]: {e}")
|
||||
return None
|
||||
|
||||
def parse_html(self, html_content: str) -> BeautifulSoup:
|
||||
|
@ -5,25 +5,23 @@ from Src.Util.console import console, msg
|
||||
|
||||
|
||||
# Logic class
|
||||
from .site import (
|
||||
title_search,
|
||||
get_select_title,
|
||||
manager_clear
|
||||
)
|
||||
|
||||
from .site import title_search, get_select_title
|
||||
from .film import download_film
|
||||
|
||||
|
||||
def main_film():
|
||||
# Variable
|
||||
indice = 2
|
||||
|
||||
def search():
|
||||
"""
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
# Make request to site to get content that corrsisponde to that string
|
||||
film_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(film_search)
|
||||
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(string_to_search)
|
||||
|
||||
if len_database != 0:
|
||||
if len_database > 0:
|
||||
|
||||
# Select title from list
|
||||
select_title = get_select_title()
|
||||
@ -34,3 +32,5 @@ def main_film():
|
||||
url=select_title.url
|
||||
)
|
||||
|
||||
else:
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
@ -5,7 +5,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
@ -44,7 +44,7 @@ def title_search(title_search: str) -> int:
|
||||
"""
|
||||
|
||||
# Send request to search for titles
|
||||
response = requests.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()})
|
||||
response = httpx.get(f"https://{SITE_NAME}.{DOMAIN_NOW}/page/1/?story={unidecode(title_search.replace(' ', '+'))}&do=search&subaction=search&titleonly=3", headers={'user-agent': get_headers()})
|
||||
response.raise_for_status()
|
||||
|
||||
# Create soup and find table
|
||||
@ -122,18 +122,3 @@ def get_select_title(type_filter: list = None) -> MediaItem:
|
||||
else:
|
||||
console.print("\n[red]Wrong index")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def manager_clear():
|
||||
"""
|
||||
Clears the data lists managed by media_search_manager and table_show_manager.
|
||||
|
||||
This function clears the data lists managed by global variables media_search_manager
|
||||
and table_show_manager. It removes all the items from these lists, effectively
|
||||
resetting them to empty lists.
|
||||
"""
|
||||
global media_search_manager, table_show_manager
|
||||
|
||||
# Clear list of data
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
@ -6,7 +6,7 @@ from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
@ -60,7 +60,7 @@ class VideoSource:
|
||||
"""
|
||||
try:
|
||||
|
||||
response = requests.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/")
|
||||
response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/")
|
||||
response.raise_for_status()
|
||||
|
||||
# Parse JSON response and return episode count
|
||||
@ -87,7 +87,7 @@ class VideoSource:
|
||||
"end_range": index_ep + 1
|
||||
}
|
||||
|
||||
response = requests.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params)
|
||||
response = httpx.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params)
|
||||
response.raise_for_status()
|
||||
|
||||
# Return information about the episode
|
||||
@ -110,7 +110,7 @@ class VideoSource:
|
||||
"""
|
||||
try:
|
||||
|
||||
response = requests.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}")
|
||||
response = httpx.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}")
|
||||
response.raise_for_status()
|
||||
|
||||
# Extract and clean embed URL
|
||||
@ -118,7 +118,7 @@ class VideoSource:
|
||||
self.iframe_src = embed_url
|
||||
|
||||
# Fetch video content using embed URL
|
||||
video_response = requests.get(embed_url)
|
||||
video_response = httpx.get(embed_url)
|
||||
video_response.raise_for_status()
|
||||
|
||||
|
||||
|
@ -6,7 +6,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
|
||||
|
||||
# Internal utilities
|
||||
@ -28,7 +28,7 @@ def check_url_for_content(url: str, content: str) -> bool:
|
||||
try:
|
||||
|
||||
logging.info(f"Test site to extract domain: {url}")
|
||||
response = requests.get(url, timeout = 1)
|
||||
response = httpx.get(url, timeout = 1)
|
||||
response.raise_for_status()
|
||||
|
||||
if content in response.text:
|
||||
|
@ -3,18 +3,23 @@
|
||||
# Internal utilities
|
||||
from Src.Util.console import console, msg
|
||||
|
||||
|
||||
# Logic class
|
||||
from .site import title_search, get_select_title
|
||||
from .anime import donwload_film, donwload_series
|
||||
|
||||
|
||||
def main_anime():
|
||||
# Variable
|
||||
indice = 1
|
||||
|
||||
|
||||
def search():
|
||||
|
||||
# Make request to site to get content that corrsisponde to that string
|
||||
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(string_to_search)
|
||||
|
||||
if len_database != 0:
|
||||
if len_database > 0:
|
||||
|
||||
# Select title from list
|
||||
select_title = get_select_title()
|
||||
@ -31,6 +36,5 @@ def main_anime():
|
||||
title_name=select_title.slug
|
||||
)
|
||||
|
||||
# If no media find
|
||||
else:
|
||||
console.print("[red]Cant find a single element")
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
@ -5,7 +5,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
@ -45,7 +45,7 @@ def get_token(site_name: str, domain: str) -> dict:
|
||||
"""
|
||||
|
||||
# Send a GET request to the specified URL composed of the site name and domain
|
||||
response = requests.get(f"https://www.{site_name}.{domain}")
|
||||
response = httpx.get(f"https://www.{site_name}.{domain}")
|
||||
response.raise_for_status()
|
||||
|
||||
# Initialize variables to store CSRF token
|
||||
@ -83,11 +83,11 @@ def update_domain():
|
||||
try:
|
||||
|
||||
console.log(f"[cyan]Test site: [red]https://{SITE_NAME}.{DOMAIN_NOW}")
|
||||
response = requests.get(f"https://www.{SITE_NAME}.{DOMAIN_NOW}")
|
||||
response = httpx.get(f"https://www.{SITE_NAME}.{DOMAIN_NOW}")
|
||||
response.status_code
|
||||
|
||||
# If the current site is inaccessible, try to obtain a new domain
|
||||
except:
|
||||
except Exception as e:
|
||||
|
||||
# Get new domain
|
||||
console.print("[red]\nExtract new DOMAIN from TLD list.")
|
||||
@ -166,7 +166,7 @@ def title_search(title: str) -> int:
|
||||
}
|
||||
|
||||
# Send a POST request to the API endpoint for live search
|
||||
response = requests.post(f'https://www.{SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json_data=json_data)
|
||||
response = httpx.post(f'https://www.{SITE_NAME}.{url_domain}/livesearch', cookies=cookies, headers=headers, json=json_data)
|
||||
response.raise_for_status()
|
||||
|
||||
# Process each record returned in the response
|
||||
@ -239,19 +239,3 @@ def get_select_title(type_filter: list = None) -> MediaItem:
|
||||
else:
|
||||
console.print("\n[red]Wrong index")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def manager_clear():
|
||||
"""
|
||||
Clears the data lists managed by media_search_manager and table_show_manager.
|
||||
|
||||
This function clears the data lists managed by global variables media_search_manager
|
||||
and table_show_manager. It removes all the items from these lists, effectively
|
||||
resetting them to empty lists.
|
||||
"""
|
||||
global media_search_manager, table_show_manager
|
||||
|
||||
# Clear list of data
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
||||
|
85
Src/Api/Ddlstreamitaly/Core/Class/ScrapeSerie.py
Normal file
85
Src/Api/Ddlstreamitaly/Core/Class/ScrapeSerie.py
Normal file
@ -0,0 +1,85 @@
|
||||
# 13.06.24
|
||||
|
||||
import sys
|
||||
import logging
|
||||
|
||||
from typing import List, Dict
|
||||
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
# Logic class
|
||||
from .SearchType import MediaItem
|
||||
|
||||
|
||||
|
||||
class GetSerieInfo:
|
||||
|
||||
def __init__(self, dict_serie: MediaItem) -> None:
|
||||
"""
|
||||
Initializes the GetSerieInfo object with default values.
|
||||
|
||||
Args:
|
||||
dict_serie (MediaItem): Dictionary containing series information (optional).
|
||||
"""
|
||||
self.headers = {'user-agent': get_headers()}
|
||||
self.cookies = config_manager.get_dict('REQUESTS', 'index')
|
||||
self.url = dict_serie.url
|
||||
self.tv_name = None
|
||||
self.list_episodes = None
|
||||
|
||||
def get_episode_number(self) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Retrieves the number of episodes for a specific season.
|
||||
|
||||
Args:
|
||||
n_season (int): The season number.
|
||||
|
||||
Returns:
|
||||
List[Dict[str, str]]: List of dictionaries containing episode information.
|
||||
"""
|
||||
|
||||
# Make an HTTP request to the series URL
|
||||
try:
|
||||
response = httpx.get(self.url + "?area=online", cookies=self.cookies, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Insert: ['ips4_device_key': 'your_code', 'ips4_member_id': 'your_code', 'ips4_login_key': 'your_code'] in config file \ REQUESTS \ index, instead of user-agent. Use browser debug and cookie request with a valid account, filter by DOC.")
|
||||
sys.exit(0)
|
||||
|
||||
# Parse HTML content of the page
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
# Get tv name
|
||||
self.tv_name = soup.find("span", class_= "ipsType_break").get_text(strip=True)
|
||||
|
||||
# Find the container of episodes for the specified season
|
||||
table_content = soup.find('div', class_='ipsMargin_bottom:half')
|
||||
list_dict_episode = []
|
||||
|
||||
for episode_div in table_content.find_all('a', href=True):
|
||||
|
||||
# Get text of episode
|
||||
part_name = episode_div.get_text(strip=True)
|
||||
|
||||
if part_name:
|
||||
link = episode_div['href']
|
||||
|
||||
obj_episode = {
|
||||
'name': part_name,
|
||||
'url': link
|
||||
}
|
||||
list_dict_episode.append(obj_episode)
|
||||
|
||||
self.list_episodes = list_dict_episode
|
||||
return list_dict_episode
|
||||
|
60
Src/Api/Ddlstreamitaly/Core/Class/SearchType.py
Normal file
60
Src/Api/Ddlstreamitaly/Core/Class/SearchType.py
Normal file
@ -0,0 +1,60 @@
|
||||
# 13.06.24
|
||||
|
||||
from typing import List
|
||||
|
||||
|
||||
class MediaItem:
|
||||
def __init__(self, data: dict):
|
||||
self.name: str = data.get('name')
|
||||
self.type: str = data.get('type')
|
||||
self.url: int = data.get('url')
|
||||
|
||||
def __str__(self):
|
||||
return f"MediaItem(name='{self.name}', type='{self.type}', url={self.url})"
|
||||
|
||||
|
||||
class MediaManager:
|
||||
def __init__(self):
|
||||
self.media_list: List[MediaItem] = []
|
||||
|
||||
def add_media(self, data: dict) -> None:
|
||||
"""
|
||||
Add media to the list.
|
||||
|
||||
Args:
|
||||
data (dict): Media data to add.
|
||||
"""
|
||||
self.media_list.append(MediaItem(data))
|
||||
|
||||
def get(self, index: int) -> MediaItem:
|
||||
"""
|
||||
Get a media item from the list by index.
|
||||
|
||||
Args:
|
||||
index (int): The index of the media item to retrieve.
|
||||
|
||||
Returns:
|
||||
MediaItem: The media item at the specified index.
|
||||
"""
|
||||
return self.media_list[index]
|
||||
|
||||
def get_length(self) -> int:
|
||||
"""
|
||||
Get the number of media find with research
|
||||
|
||||
Returns:
|
||||
int: Number of episodes.
|
||||
"""
|
||||
return len(self.media_list)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""
|
||||
This method clears the medias list.
|
||||
|
||||
Args:
|
||||
self: The object instance.
|
||||
"""
|
||||
self.media_list.clear()
|
||||
|
||||
def __str__(self):
|
||||
return f"MediaManager(num_media={len(self.media_list)})"
|
83
Src/Api/Ddlstreamitaly/Core/Player/ddl.py
Normal file
83
Src/Api/Ddlstreamitaly/Core/Player/ddl.py
Normal file
@ -0,0 +1,83 @@
|
||||
# 14.06.24
|
||||
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
class VideoSource:
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""
|
||||
Initializes the VideoSource object with default values.
|
||||
|
||||
Attributes:
|
||||
headers (dict): A dictionary to store HTTP headers.
|
||||
cookie (dict): A dictionary to store cookies.
|
||||
"""
|
||||
self.headers = {'user-agent': get_headers()}
|
||||
self.cookie = config_manager.get_dict('REQUESTS', 'index')
|
||||
|
||||
def setup(self, url: str) -> None:
|
||||
"""
|
||||
Sets up the video source with the provided URL.
|
||||
|
||||
Args:
|
||||
url (str): The URL of the video source.
|
||||
"""
|
||||
self.url = url
|
||||
|
||||
def make_request(self, url: str) -> str:
|
||||
"""
|
||||
Make an HTTP GET request to the provided URL.
|
||||
|
||||
Args:
|
||||
url (str): The URL to make the request to.
|
||||
|
||||
Returns:
|
||||
str: The response content if successful, None otherwise.
|
||||
"""
|
||||
try:
|
||||
response = httpx.get(url, headers=self.headers, cookies=self.cookie)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
except httpx.HTTPStatusError as http_err:
|
||||
logging.error(f"HTTP error occurred: {http_err}")
|
||||
except Exception as err:
|
||||
logging.error(f"An error occurred: {err}")
|
||||
return None
|
||||
|
||||
def get_playlist(self):
|
||||
"""
|
||||
Retrieves the playlist URL from the video source.
|
||||
|
||||
Returns:
|
||||
tuple: The mp4 link if found, None otherwise.
|
||||
"""
|
||||
try:
|
||||
text = self.make_request(self.url)
|
||||
|
||||
if text:
|
||||
soup = BeautifulSoup(text, "html.parser")
|
||||
source = soup.find("source")
|
||||
|
||||
if source:
|
||||
mp4_link = source.get("src")
|
||||
return mp4_link
|
||||
|
||||
else:
|
||||
logging.error("No <source> tag found in the HTML.")
|
||||
else:
|
||||
logging.error("Failed to retrieve content from the URL.")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred while parsing the playlist: {e}")
|
71
Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py
Normal file
71
Src/Api/Ddlstreamitaly/Core/Util/manage_ep.py
Normal file
@ -0,0 +1,71 @@
|
||||
# 02.05.24
|
||||
|
||||
import logging
|
||||
|
||||
from typing import List
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.os import remove_special_characters
|
||||
|
||||
|
||||
# Config
|
||||
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
|
||||
|
||||
|
||||
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
|
||||
"""
|
||||
Manage user selection for seasons to download.
|
||||
|
||||
Args:
|
||||
- cmd_insert (str): User input for season selection.
|
||||
- max_count (int): Maximum count of seasons available.
|
||||
|
||||
Returns:
|
||||
list_season_select (List[int]): List of selected seasons.
|
||||
"""
|
||||
list_season_select = []
|
||||
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
|
||||
|
||||
# For a single number (e.g., '5')
|
||||
if cmd_insert.isnumeric():
|
||||
list_season_select.append(int(cmd_insert))
|
||||
|
||||
# For a range (e.g., '[5-12]')
|
||||
elif "[" in cmd_insert:
|
||||
start, end = map(int, cmd_insert[1:-1].split('-'))
|
||||
list_season_select = list(range(start, end + 1))
|
||||
|
||||
# For all seasons
|
||||
elif cmd_insert == "*":
|
||||
list_season_select = list(range(1, max_count+1))
|
||||
|
||||
# Return list of selected seasons)
|
||||
logging.info(f"List return: {list_season_select}")
|
||||
return list_season_select
|
||||
|
||||
def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str:
|
||||
"""
|
||||
Maps the episode title to a specific format.
|
||||
|
||||
Args:
|
||||
tv_name (str): The name of the TV show.
|
||||
number_season (int): The season number.
|
||||
episode_number (int): The episode number.
|
||||
episode_name (str): The original name of the episode.
|
||||
|
||||
Returns:
|
||||
str: The mapped episode title.
|
||||
"""
|
||||
map_episode_temp = MAP_EPISODE
|
||||
map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name))
|
||||
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season))
|
||||
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number))
|
||||
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name))
|
||||
|
||||
# Additional fix
|
||||
map_episode_temp = map_episode_temp.replace(".", "_")
|
||||
|
||||
logging.info(f"Map episode string return: {map_episode_temp}")
|
||||
return map_episode_temp
|
@ -1,3 +1,43 @@
|
||||
# 09.06.24
|
||||
|
||||
from .site import title_search
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console, msg
|
||||
|
||||
|
||||
# Logic class
|
||||
from .site import title_search, get_select_title
|
||||
from .series import download_thread
|
||||
|
||||
|
||||
# Variable
|
||||
indice = 3
|
||||
|
||||
|
||||
def search():
|
||||
"""
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
# Make request to site to get content that corrsisponde to that string
|
||||
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(string_to_search)
|
||||
|
||||
if len_database > 0:
|
||||
|
||||
# Select title from list
|
||||
select_title = get_select_title()
|
||||
|
||||
# Download only film
|
||||
if "Serie TV" in str(select_title.type):
|
||||
download_thread(select_title)
|
||||
|
||||
else:
|
||||
logging.error(f"Not supported: {select_title.type}")
|
||||
sys.exit(0)
|
||||
|
||||
else:
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
@ -2,3 +2,4 @@
|
||||
|
||||
MAIN_FOLDER = "ddlstreamitaly"
|
||||
MOVIE_FOLDER = "Movie"
|
||||
SERIES_FOLDER = "Serie"
|
||||
|
140
Src/Api/Ddlstreamitaly/series.py
Normal file
140
Src/Api/Ddlstreamitaly/series.py
Normal file
@ -0,0 +1,140 @@
|
||||
# 13.06.24
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.color import Colors
|
||||
from Src.Util.console import console, msg
|
||||
from Src.Util.os import create_folder, can_create_file
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.table import TVShowManager
|
||||
from Src.Util.message import start_message
|
||||
from Src.Lib.Hls.download_mp4 import MP4_downloader
|
||||
|
||||
|
||||
# Logic class
|
||||
from .Core.Class.SearchType import MediaItem
|
||||
from .Core.Class.ScrapeSerie import GetSerieInfo
|
||||
from .Core.Util.manage_ep import manage_selection, map_episode_title
|
||||
from .Core.Player.ddl import VideoSource
|
||||
|
||||
|
||||
# Config
|
||||
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
|
||||
|
||||
|
||||
# Variable
|
||||
table_show_manager = TVShowManager()
|
||||
from .costant import MAIN_FOLDER, SERIES_FOLDER
|
||||
video_source = VideoSource()
|
||||
|
||||
|
||||
def donwload_video(scape_info_serie: GetSerieInfo, index_episode_selected: int) -> None:
|
||||
"""
|
||||
Download a single episode video.
|
||||
|
||||
Args:
|
||||
- tv_name (str): Name of the TV series.
|
||||
- index_episode_selected (int): Index of the selected episode.
|
||||
"""
|
||||
|
||||
start_message()
|
||||
|
||||
# Get info about episode
|
||||
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
|
||||
console.print(f"[yellow]Download: [red]{obj_episode.get('name')}")
|
||||
print()
|
||||
|
||||
# Define filename and path for the downloaded video
|
||||
mp4_name = f"{map_episode_title(scape_info_serie.tv_name, None, index_episode_selected, obj_episode.get('name'))}.mp4"
|
||||
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, SERIES_FOLDER, scape_info_serie.tv_name)
|
||||
|
||||
# Check if can create file output
|
||||
create_folder(mp4_path)
|
||||
if not can_create_file(mp4_name):
|
||||
logging.error("Invalid mp4 name.")
|
||||
sys.exit(0)
|
||||
|
||||
# Setup video source
|
||||
video_source.setup(obj_episode.get('url'))
|
||||
|
||||
# Get m3u8 master playlist
|
||||
master_playlist = video_source.get_playlist()
|
||||
|
||||
# Parse start page url
|
||||
start_message()
|
||||
parsed_url = urlparse(obj_episode.get('url'))
|
||||
path_parts = parsed_url.path.split('/')
|
||||
|
||||
MP4_downloader(
|
||||
url = master_playlist,
|
||||
path = os.path.join(mp4_path, mp4_name),
|
||||
referer = f"{parsed_url.scheme}://{parsed_url.netloc}/",
|
||||
add_desc=f"{Colors.MAGENTA}video"
|
||||
)
|
||||
|
||||
|
||||
def download_thread(dict_serie: MediaItem):
|
||||
"""Download all episode of a thread"""
|
||||
|
||||
# Start message and set up video source
|
||||
start_message()
|
||||
|
||||
# Init class
|
||||
scape_info_serie = GetSerieInfo(dict_serie)
|
||||
|
||||
# Collect information about thread
|
||||
list_dict_episode = scape_info_serie.get_episode_number()
|
||||
episodes_count = len(list_dict_episode)
|
||||
|
||||
# Display episodes list and manage user selection
|
||||
last_command = display_episodes_list(list_dict_episode)
|
||||
list_episode_select = manage_selection(last_command, episodes_count)
|
||||
|
||||
# Download selected episodes
|
||||
if len(list_episode_select) == 1 and last_command != "*":
|
||||
donwload_video(scape_info_serie, list_episode_select[0])
|
||||
|
||||
# Download all other episodes selecter
|
||||
else:
|
||||
for i_episode in list_episode_select:
|
||||
donwload_video(scape_info_serie, i_episode)
|
||||
|
||||
|
||||
def display_episodes_list(obj_episode_manager) -> str:
|
||||
"""
|
||||
Display episodes list and handle user input.
|
||||
|
||||
Returns:
|
||||
last_command (str): Last command entered by the user.
|
||||
"""
|
||||
|
||||
# Set up table for displaying episodes
|
||||
table_show_manager.set_slice_end(10)
|
||||
|
||||
# Add columns to the table
|
||||
column_info = {
|
||||
"Index": {'color': 'red'},
|
||||
"Name": {'color': 'magenta'},
|
||||
}
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with episodes information
|
||||
for i, media in enumerate(obj_episode_manager):
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(i+1),
|
||||
'Name': media.get('name'),
|
||||
})
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run()
|
||||
|
||||
if last_command == "q":
|
||||
console.print("\n[red]Quit [white]...")
|
||||
sys.exit(0)
|
||||
|
||||
return last_command
|
@ -1,81 +1,130 @@
|
||||
# 09.06.24
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.message import start_message
|
||||
from Src.Util.color import Colors
|
||||
from Src.Util.table import TVShowManager
|
||||
from Src.Util.console import console, msg
|
||||
from Src.Util.os import create_folder, can_create_file
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Lib.Hls.download_mp4 import MP4_downloader
|
||||
|
||||
|
||||
# Logic class
|
||||
from .Core.Class.SearchType import MediaManager, MediaItem
|
||||
|
||||
|
||||
# Config
|
||||
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
|
||||
from .costant import MAIN_FOLDER, MOVIE_FOLDER
|
||||
SITE_NAME = "ddlstreamitaly"
|
||||
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
|
||||
|
||||
|
||||
# Variable
|
||||
cookie_index = config_manager.get_dict('REQUESTS', 'index')
|
||||
media_search_manager = MediaManager()
|
||||
table_show_manager = TVShowManager()
|
||||
|
||||
|
||||
def title_search() -> int:
|
||||
|
||||
def title_search(word_to_search) -> int:
|
||||
"""
|
||||
Search for titles based on a search query.
|
||||
"""
|
||||
|
||||
print()
|
||||
url_search = msg.ask(f"[cyan]Insert url title")
|
||||
|
||||
# Send request to search for titles
|
||||
try:
|
||||
response = requests.get(url_search, headers={'user-agent': get_headers()}, cookies=cookie_index)
|
||||
|
||||
# Send request to search for titles
|
||||
response = httpx.get(f"https://ddlstreamitaly.{DOMAIN_NOW}/search/?&q={word_to_search}&quick=1&type=videobox_video&nodes=11", headers={'user-agent': get_headers()})
|
||||
response.raise_for_status()
|
||||
except:
|
||||
logging.error("Insert: {'ips4_IPSSessionFront': 'your_code', 'ips4_member_id': 'your_code'} in config file \ REQUESTS \ index, instead of user-agent. Use browser debug and cookie request with a valid account.")
|
||||
|
||||
# Create soup and find table
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
table_content = soup.find('ol', class_="ipsStream")
|
||||
|
||||
if table_content:
|
||||
for title_div in table_content.find_all('li', class_='ipsStreamItem'):
|
||||
try:
|
||||
title_type = title_div.find("p", class_="ipsType_reset").find_all("a")[-1].get_text(strip=True)
|
||||
name = title_div.find("span", class_="ipsContained").find("a").get_text(strip=True)
|
||||
link = title_div.find("span", class_="ipsContained").find("a").get("href")
|
||||
|
||||
title_info = {
|
||||
'name': name,
|
||||
'url': link,
|
||||
'type': title_type
|
||||
}
|
||||
|
||||
media_search_manager.add_media(title_info)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing title div: {e}")
|
||||
|
||||
# Return the number of titles found
|
||||
return media_search_manager.get_length()
|
||||
|
||||
else:
|
||||
logging.error("No table content found.")
|
||||
return -999
|
||||
|
||||
except Exception as err:
|
||||
logging.error(f"An error occurred: {err}")
|
||||
|
||||
return -9999
|
||||
|
||||
|
||||
def get_select_title(type_filter: list = None) -> MediaItem:
|
||||
"""
|
||||
Display a selection of titles and prompt the user to choose one.
|
||||
|
||||
Args:
|
||||
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
|
||||
|
||||
Returns:
|
||||
MediaItem: The selected media item.
|
||||
"""
|
||||
|
||||
# Set up table for displaying titles
|
||||
table_show_manager.set_slice_end(10)
|
||||
|
||||
# Add columns to the table
|
||||
column_info = {
|
||||
"Index": {'color': 'red'},
|
||||
"Name": {'color': 'magenta'},
|
||||
"Type": {'color': 'yellow'},
|
||||
}
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with title information
|
||||
for i, media in enumerate(media_search_manager.media_list):
|
||||
|
||||
# Filter for only a list of category
|
||||
if type_filter is not None:
|
||||
if str(media.type) not in type_filter:
|
||||
continue
|
||||
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(i),
|
||||
'Name': media.name,
|
||||
'Type': media.type,
|
||||
})
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
|
||||
table_show_manager.clear()
|
||||
|
||||
# Handle user's quit command
|
||||
if last_command == "q":
|
||||
console.print("\n[red]Quit [white]...")
|
||||
sys.exit(0)
|
||||
|
||||
# Create soup and mp4 video
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
souce = soup.find("source")
|
||||
|
||||
# Get url and filename
|
||||
try:
|
||||
mp4_link = souce.get("src")
|
||||
except:
|
||||
logging.error("Insert: {'ips4_IPSSessionFront': 'your_code', 'ips4_member_id': 'your_code'} in config file \ REQUESTS \ index, instead of user-agent. Use browser debug and cookie request with a valid account.")
|
||||
# Check if the selected index is within range
|
||||
if 0 <= int(last_command) <= len(media_search_manager.media_list):
|
||||
return media_search_manager.get(int(last_command))
|
||||
else:
|
||||
console.print("\n[red]Wrong index")
|
||||
sys.exit(0)
|
||||
|
||||
parsed_url = urlparse(url_search)
|
||||
path_parts = parsed_url.path.split('/')
|
||||
mp4_name = path_parts[-2] if path_parts[-1] == '' else path_parts[-1] + ".mp4"
|
||||
|
||||
# Create destination folder
|
||||
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER)
|
||||
|
||||
# Check if can create file output
|
||||
create_folder(mp4_path)
|
||||
if not can_create_file(mp4_name):
|
||||
logging.error("Invalid mp4 name.")
|
||||
sys.exit(0)
|
||||
|
||||
# Start download
|
||||
start_message()
|
||||
MP4_downloader(
|
||||
url = mp4_link,
|
||||
path = os.path.join(mp4_path, mp4_name),
|
||||
referer = f"{parsed_url.scheme}://{parsed_url.netloc}/",
|
||||
add_desc=f"{Colors.MAGENTA}video"
|
||||
)
|
||||
|
114
Src/Api/Guardaserie/Core/Class/ScrapeSerie.py
Normal file
114
Src/Api/Guardaserie/Core/Class/ScrapeSerie.py
Normal file
@ -0,0 +1,114 @@
|
||||
# 13.06.24
|
||||
|
||||
import sys
|
||||
import logging
|
||||
|
||||
from typing import List, Dict
|
||||
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.headers import get_headers
|
||||
|
||||
|
||||
# Logic class
|
||||
from .SearchType import MediaItem
|
||||
|
||||
|
||||
|
||||
class GetSerieInfo:
|
||||
|
||||
def __init__(self, dict_serie: MediaItem) -> None:
|
||||
"""
|
||||
Initializes the GetSerieInfo object with default values.
|
||||
|
||||
Args:
|
||||
dict_serie (MediaItem): Dictionary containing series information (optional).
|
||||
"""
|
||||
self.headers = {'user-agent': get_headers()}
|
||||
self.url = dict_serie.url
|
||||
self.tv_name = None
|
||||
self.list_episodes = None
|
||||
|
||||
def get_seasons_number(self) -> int:
|
||||
"""
|
||||
Retrieves the number of seasons of a TV series.
|
||||
|
||||
Returns:
|
||||
int: Number of seasons of the TV series.
|
||||
"""
|
||||
try:
|
||||
|
||||
# Make an HTTP request to the series URL
|
||||
print(self.url)
|
||||
response = httpx.get(self.url, headers=self.headers, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
# Parse HTML content of the page
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
# Find the container of seasons
|
||||
table_content = soup.find('div', class_="tt_season")
|
||||
|
||||
# Count the number of seasons
|
||||
seasons_number = len(table_content.find_all("li"))
|
||||
|
||||
# Extract the name of the series
|
||||
self.tv_name = soup.find("h1", class_="front_title").get_text(strip=True)
|
||||
|
||||
return seasons_number
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing HTML page: {e}")
|
||||
|
||||
return -999
|
||||
|
||||
def get_episode_number(self, n_season: int) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Retrieves the number of episodes for a specific season.
|
||||
|
||||
Args:
|
||||
n_season (int): The season number.
|
||||
|
||||
Returns:
|
||||
List[Dict[str, str]]: List of dictionaries containing episode information.
|
||||
"""
|
||||
try:
|
||||
|
||||
# Make an HTTP request to the series URL
|
||||
response = httpx.get(self.url, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
|
||||
# Parse HTML content of the page
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
# Find the container of episodes for the specified season
|
||||
table_content = soup.find('div', class_="tab-pane", id=f"season-{n_season}")
|
||||
|
||||
# Extract episode information
|
||||
episode_content = table_content.find_all("li")
|
||||
list_dict_episode = []
|
||||
|
||||
for episode_div in episode_content:
|
||||
index = episode_div.find("a").get("data-num")
|
||||
link = episode_div.find("a").get("data-link")
|
||||
name = episode_div.find("a").get("data-title")
|
||||
|
||||
obj_episode = {
|
||||
'number': index,
|
||||
'name': name,
|
||||
'url': link
|
||||
}
|
||||
list_dict_episode.append(obj_episode)
|
||||
|
||||
self.list_episodes = list_dict_episode
|
||||
return list_dict_episode
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing HTML page: {e}")
|
||||
|
||||
return []
|
61
Src/Api/Guardaserie/Core/Class/SearchType.py
Normal file
61
Src/Api/Guardaserie/Core/Class/SearchType.py
Normal file
@ -0,0 +1,61 @@
|
||||
# 26.05.24
|
||||
|
||||
from typing import List
|
||||
|
||||
|
||||
class MediaItem:
|
||||
def __init__(self, data: dict):
|
||||
self.name: str = data.get('name')
|
||||
self.type: str = "serie"
|
||||
self.score: str = data.get('score')
|
||||
self.url: int = data.get('url')
|
||||
|
||||
def __str__(self):
|
||||
return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})"
|
||||
|
||||
|
||||
class MediaManager:
|
||||
def __init__(self):
|
||||
self.media_list: List[MediaItem] = []
|
||||
|
||||
def add_media(self, data: dict) -> None:
|
||||
"""
|
||||
Add media to the list.
|
||||
|
||||
Args:
|
||||
data (dict): Media data to add.
|
||||
"""
|
||||
self.media_list.append(MediaItem(data))
|
||||
|
||||
def get(self, index: int) -> MediaItem:
|
||||
"""
|
||||
Get a media item from the list by index.
|
||||
|
||||
Args:
|
||||
index (int): The index of the media item to retrieve.
|
||||
|
||||
Returns:
|
||||
MediaItem: The media item at the specified index.
|
||||
"""
|
||||
return self.media_list[index]
|
||||
|
||||
def get_length(self) -> int:
|
||||
"""
|
||||
Get the number of media find with research
|
||||
|
||||
Returns:
|
||||
int: Number of episodes.
|
||||
"""
|
||||
return len(self.media_list)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""
|
||||
This method clears the medias list.
|
||||
|
||||
Args:
|
||||
self: The object instance.
|
||||
"""
|
||||
self.media_list.clear()
|
||||
|
||||
def __str__(self):
|
||||
return f"MediaManager(num_media={len(self.media_list)})"
|
@ -1,12 +1,11 @@
|
||||
# 26.05.24
|
||||
|
||||
import re
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
@ -47,12 +46,8 @@ class VideoSource:
|
||||
"""
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=self.headers)
|
||||
response = httpx.get(url, headers=self.headers, follow_redirects=True)
|
||||
response.raise_for_status()
|
||||
|
||||
with open('index.html', 'w', encoding='utf-8') as file:
|
||||
file.write(response.text)
|
||||
|
||||
return response.text
|
||||
|
||||
except Exception as e:
|
||||
@ -78,38 +73,6 @@ class VideoSource:
|
||||
logging.error(f"Failed to parse HTML content: {e}")
|
||||
return None
|
||||
|
||||
def get_iframe(self, soup):
|
||||
"""
|
||||
Extracts the source URL of the second iframe in the provided BeautifulSoup object.
|
||||
|
||||
Args:
|
||||
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML.
|
||||
|
||||
Returns:
|
||||
str: The source URL of the second iframe, or None if not found.
|
||||
"""
|
||||
tag_a = soup.find_all('a', href='#')
|
||||
if tag_a and len(tag_a) > 1:
|
||||
return tag_a[1].get("data-link")
|
||||
|
||||
return None
|
||||
|
||||
def find_content(self, url):
|
||||
"""
|
||||
Makes a request to the specified URL and parses the HTML content.
|
||||
|
||||
Args:
|
||||
url (str): The URL to fetch content from.
|
||||
|
||||
Returns:
|
||||
BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails.
|
||||
"""
|
||||
content = self.make_request(url)
|
||||
if content:
|
||||
return self.parse_html(content)
|
||||
|
||||
return None
|
||||
|
||||
def get_result_node_js(self, soup):
|
||||
"""
|
||||
Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL.
|
||||
@ -146,17 +109,7 @@ class VideoSource:
|
||||
logging.error("Failed to parse HTML content.")
|
||||
return None
|
||||
|
||||
iframe_src = self.get_iframe(soup)
|
||||
if not iframe_src:
|
||||
logging.error("No iframe found.")
|
||||
return None
|
||||
|
||||
down_page_soup = self.find_content(iframe_src)
|
||||
if not down_page_soup:
|
||||
logging.error("Failed to fetch down page content.")
|
||||
return None
|
||||
|
||||
result = self.get_result_node_js(down_page_soup)
|
||||
result = self.get_result_node_js(soup)
|
||||
if not result:
|
||||
logging.error("No video URL found in script.")
|
||||
return None
|
||||
@ -167,4 +120,4 @@ class VideoSource:
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred: {e}")
|
||||
return None
|
||||
|
||||
|
71
Src/Api/Guardaserie/Core/Util/manage_ep.py
Normal file
71
Src/Api/Guardaserie/Core/Util/manage_ep.py
Normal file
@ -0,0 +1,71 @@
|
||||
# 02.05.24
|
||||
|
||||
import logging
|
||||
|
||||
from typing import List
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.os import remove_special_characters
|
||||
|
||||
|
||||
# Config
|
||||
MAP_EPISODE = config_manager.get('DEFAULT', 'map_episode_name')
|
||||
|
||||
|
||||
def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
|
||||
"""
|
||||
Manage user selection for seasons to download.
|
||||
|
||||
Args:
|
||||
- cmd_insert (str): User input for season selection.
|
||||
- max_count (int): Maximum count of seasons available.
|
||||
|
||||
Returns:
|
||||
list_season_select (List[int]): List of selected seasons.
|
||||
"""
|
||||
list_season_select = []
|
||||
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
|
||||
|
||||
# For a single number (e.g., '5')
|
||||
if cmd_insert.isnumeric():
|
||||
list_season_select.append(int(cmd_insert))
|
||||
|
||||
# For a range (e.g., '[5-12]')
|
||||
elif "[" in cmd_insert:
|
||||
start, end = map(int, cmd_insert[1:-1].split('-'))
|
||||
list_season_select = list(range(start, end + 1))
|
||||
|
||||
# For all seasons
|
||||
elif cmd_insert == "*":
|
||||
list_season_select = list(range(1, max_count+1))
|
||||
|
||||
# Return list of selected seasons)
|
||||
logging.info(f"List return: {list_season_select}")
|
||||
return list_season_select
|
||||
|
||||
def map_episode_title(tv_name: str, number_season: int, episode_number: int, episode_name: str) -> str:
|
||||
"""
|
||||
Maps the episode title to a specific format.
|
||||
|
||||
Args:
|
||||
tv_name (str): The name of the TV show.
|
||||
number_season (int): The season number.
|
||||
episode_number (int): The episode number.
|
||||
episode_name (str): The original name of the episode.
|
||||
|
||||
Returns:
|
||||
str: The mapped episode title.
|
||||
"""
|
||||
map_episode_temp = MAP_EPISODE
|
||||
map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name))
|
||||
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season))
|
||||
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode_number))
|
||||
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name))
|
||||
|
||||
# Additional fix
|
||||
map_episode_temp = map_episode_temp.replace(".", "_")
|
||||
|
||||
logging.info(f"Map episode string return: {map_episode_temp}")
|
||||
return map_episode_temp
|
@ -1,3 +1,34 @@
|
||||
# 09.06.24
|
||||
|
||||
from .site import title_search
|
||||
# Internal utilities
|
||||
from Src.Util.console import console, msg
|
||||
|
||||
|
||||
# Logic class
|
||||
from .site import title_search, get_select_title
|
||||
from .series import download_series
|
||||
|
||||
|
||||
# Variable
|
||||
indice = 4
|
||||
|
||||
|
||||
def search():
|
||||
"""
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
# Make request to site to get content that corrsisponde to that string
|
||||
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(string_to_search)
|
||||
|
||||
if len_database > 0:
|
||||
|
||||
# Select title from list
|
||||
select_title = get_select_title()
|
||||
|
||||
# Download only film
|
||||
download_series(select_title)
|
||||
|
||||
else:
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
@ -1,4 +1,4 @@
|
||||
# 09.06.24
|
||||
|
||||
MAIN_FOLDER = "guardaserie"
|
||||
MOVIE_FOLDER = "Serie"
|
||||
SERIES_FOLDER = "Serie"
|
||||
|
169
Src/Api/Guardaserie/series.py
Normal file
169
Src/Api/Guardaserie/series.py
Normal file
@ -0,0 +1,169 @@
|
||||
# 13.06.24
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console, msg
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.table import TVShowManager
|
||||
from Src.Util.message import start_message
|
||||
from Src.Lib.Hls.downloader import Downloader
|
||||
|
||||
|
||||
# Logic class
|
||||
from .Core.Class.SearchType import MediaItem
|
||||
from .Core.Class.ScrapeSerie import GetSerieInfo
|
||||
from .Core.Util.manage_ep import manage_selection, map_episode_title
|
||||
from .Core.Player.supervideo import VideoSource
|
||||
|
||||
|
||||
# Config
|
||||
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
|
||||
from .costant import MAIN_FOLDER, SERIES_FOLDER
|
||||
|
||||
|
||||
# Variable
|
||||
table_show_manager = TVShowManager()
|
||||
video_source = VideoSource()
|
||||
|
||||
|
||||
def donwload_video(scape_info_serie: GetSerieInfo, index_season_selected: int, index_episode_selected: int) -> None:
|
||||
"""
|
||||
Download a single episode video.
|
||||
|
||||
Args:
|
||||
- tv_name (str): Name of the TV series.
|
||||
- index_season_selected (int): Index of the selected season.
|
||||
- index_episode_selected (int): Index of the selected episode.
|
||||
"""
|
||||
|
||||
start_message()
|
||||
|
||||
# Get info about episode
|
||||
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
|
||||
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}")
|
||||
print()
|
||||
|
||||
# Define filename and path for the downloaded video
|
||||
mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4"
|
||||
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}")
|
||||
|
||||
# Setup video source
|
||||
video_source.setup(obj_episode.get('url'))
|
||||
|
||||
# Get m3u8 master playlist
|
||||
master_playlist = video_source.get_playlist()
|
||||
|
||||
Downloader(
|
||||
m3u8_playlist = master_playlist,
|
||||
output_filename = os.path.join(mp4_path, mp4_name)
|
||||
).start()
|
||||
|
||||
|
||||
def donwload_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, donwload_all: bool = False) -> None:
|
||||
"""
|
||||
Download all episodes of a season.
|
||||
|
||||
Args:
|
||||
- tv_name (str): Name of the TV series.
|
||||
- index_season_selected (int): Index of the selected season.
|
||||
- donwload_all (bool): Donwload all seasons episodes
|
||||
"""
|
||||
|
||||
# Start message and collect information about episodes
|
||||
start_message()
|
||||
list_dict_episode = scape_info_serie.get_episode_number(index_season_selected)
|
||||
episodes_count = len(list_dict_episode)
|
||||
|
||||
# Download all episodes wihtout ask
|
||||
if donwload_all:
|
||||
for i_episode in range(1, episodes_count+1):
|
||||
donwload_video(scape_info_serie, index_season_selected, i_episode)
|
||||
|
||||
console.print(f"\n[red]Download [yellow]season: [red]{index_season_selected}.")
|
||||
|
||||
# If not download all episode but a single season
|
||||
if not donwload_all:
|
||||
|
||||
# Display episodes list and manage user selection
|
||||
last_command = display_episodes_list(scape_info_serie.list_episodes)
|
||||
list_episode_select = manage_selection(last_command, episodes_count)
|
||||
|
||||
# Download selected episodes
|
||||
if len(list_episode_select) == 1 and last_command != "*":
|
||||
donwload_video(scape_info_serie, index_season_selected, list_episode_select[0])
|
||||
|
||||
# Download all other episodes selecter
|
||||
else:
|
||||
for i_episode in list_episode_select:
|
||||
donwload_video(scape_info_serie, index_season_selected, i_episode)
|
||||
|
||||
|
||||
def download_series(dict_serie: MediaItem) -> None:
|
||||
|
||||
# Start message and set up video source
|
||||
start_message()
|
||||
|
||||
# Init class
|
||||
scape_info_serie = GetSerieInfo(dict_serie)
|
||||
|
||||
# Collect information about seasons
|
||||
seasons_count = scape_info_serie.get_seasons_number()
|
||||
|
||||
# Prompt user for season selection and download episodes
|
||||
console.print(f"\n[green]Season find: [red]{seasons_count}")
|
||||
index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media"))
|
||||
list_season_select = manage_selection(index_season_selected, seasons_count)
|
||||
|
||||
# Download selected episodes
|
||||
if len(list_season_select) == 1 and index_season_selected != "*":
|
||||
if 1 <= int(index_season_selected) <= seasons_count:
|
||||
donwload_episode(scape_info_serie, list_season_select[0])
|
||||
|
||||
# Dowload all seasons and episodes
|
||||
elif index_season_selected == "*":
|
||||
for i_season in list_season_select:
|
||||
donwload_episode(scape_info_serie, i_season, True)
|
||||
|
||||
# Download all other season selecter
|
||||
else:
|
||||
for i_season in list_season_select:
|
||||
donwload_episode(scape_info_serie, i_season)
|
||||
|
||||
|
||||
def display_episodes_list(obj_episode_manager) -> str:
|
||||
"""
|
||||
Display episodes list and handle user input.
|
||||
|
||||
Returns:
|
||||
last_command (str): Last command entered by the user.
|
||||
"""
|
||||
|
||||
# Set up table for displaying episodes
|
||||
table_show_manager.set_slice_end(10)
|
||||
|
||||
# Add columns to the table
|
||||
column_info = {
|
||||
"Index": {'color': 'red'},
|
||||
"Name": {'color': 'magenta'},
|
||||
}
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with episodes information
|
||||
for media in obj_episode_manager:
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(media.get('number')),
|
||||
'Name': media.get('name'),
|
||||
})
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run()
|
||||
|
||||
if last_command == "q":
|
||||
console.print("\n[red]Quit [white]...")
|
||||
sys.exit(0)
|
||||
|
||||
return last_command
|
@ -7,62 +7,118 @@ from urllib.parse import urlparse
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.table import TVShowManager
|
||||
from Src.Util.console import console, msg
|
||||
from Src.Util.os import create_folder, can_create_file
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Lib.Hls.downloader import Downloader
|
||||
|
||||
|
||||
# Logic class
|
||||
from .Core.Player.supervideo import VideoSource
|
||||
from .Core.Class.SearchType import MediaManager, MediaItem
|
||||
|
||||
|
||||
# Variable
|
||||
media_search_manager = MediaManager()
|
||||
table_show_manager = TVShowManager()
|
||||
|
||||
|
||||
# Config
|
||||
SITE_NAME = "guardaserie"
|
||||
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
|
||||
from .costant import MAIN_FOLDER, MOVIE_FOLDER
|
||||
DOMAIN_NOW = config_manager.get('SITE', SITE_NAME)
|
||||
|
||||
|
||||
def title_search() -> int:
|
||||
|
||||
def title_search(word_to_search) -> int:
|
||||
"""
|
||||
Search for titles based on a search query.
|
||||
"""
|
||||
|
||||
print()
|
||||
url_search = msg.ask(f"[cyan]Insert url title")
|
||||
|
||||
# Send request to search for titles
|
||||
response = requests.get(url_search, headers={'user-agent': get_headers()})
|
||||
response = httpx.get(f"https://guardaserie.{DOMAIN_NOW}/?story={word_to_search}&do=search&subaction=search", headers={'user-agent': get_headers()})
|
||||
response.raise_for_status()
|
||||
|
||||
# Get playlist
|
||||
video_source = VideoSource()
|
||||
video_source.setup(url_search)
|
||||
# Create soup and find table
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
table_content = soup.find('div', class_="mlnew-list")
|
||||
|
||||
|
||||
parsed_url = urlparse(url_search)
|
||||
path_parts = parsed_url.path.split('/')
|
||||
mp4_name = path_parts[-2] if path_parts[-1] == '' else path_parts[-1] + ".mp4"
|
||||
for serie_div in table_content.find_all('div', class_='mlnew'):
|
||||
|
||||
# Create destination folder
|
||||
mp4_path = os.path.join(ROOT_PATH, MAIN_FOLDER, MOVIE_FOLDER)
|
||||
try:
|
||||
title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True)
|
||||
link = serie_div.find('div', class_='mlnh-2').find('a')['href']
|
||||
imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True)
|
||||
|
||||
# Check if can create file output
|
||||
create_folder(mp4_path)
|
||||
if not can_create_file(mp4_name):
|
||||
logging.error("Invalid mp4 name.")
|
||||
serie_info = {
|
||||
'name': title,
|
||||
'url': link,
|
||||
'score': imdb_rating
|
||||
}
|
||||
|
||||
media_search_manager.add_media(serie_info)
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
# Return the number of titles found
|
||||
return media_search_manager.get_length()
|
||||
|
||||
|
||||
def get_select_title(type_filter: list = None) -> MediaItem:
|
||||
"""
|
||||
Display a selection of titles and prompt the user to choose one.
|
||||
|
||||
Args:
|
||||
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
|
||||
|
||||
Returns:
|
||||
MediaItem: The selected media item.
|
||||
"""
|
||||
|
||||
# Set up table for displaying titles
|
||||
table_show_manager.set_slice_end(10)
|
||||
|
||||
# Add columns to the table
|
||||
column_info = {
|
||||
"Index": {'color': 'red'},
|
||||
"Name": {'color': 'magenta'},
|
||||
"Type": {'color': 'yellow'},
|
||||
"Score": {'color': 'cyan'},
|
||||
}
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with title information
|
||||
for i, media in enumerate(media_search_manager.media_list):
|
||||
|
||||
# Filter for only a list of category
|
||||
if type_filter is not None:
|
||||
if str(media.type) not in type_filter:
|
||||
continue
|
||||
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(i),
|
||||
'Name': media.name,
|
||||
'Type': media.type,
|
||||
'Score': media.score,
|
||||
})
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
|
||||
table_show_manager.clear()
|
||||
|
||||
# Handle user's quit command
|
||||
if last_command == "q":
|
||||
console.print("\n[red]Quit [white]...")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
# Get m3u8 master playlist
|
||||
master_playlist = video_source.get_playlist()
|
||||
|
||||
# Download the film using the m3u8 playlist, and output filename
|
||||
Downloader(
|
||||
m3u8_playlist = master_playlist,
|
||||
output_filename = os.path.join(mp4_path, mp4_name)
|
||||
).start()
|
||||
# Check if the selected index is within range
|
||||
if 0 <= int(last_command) <= len(media_search_manager.media_list):
|
||||
return media_search_manager.get(int(last_command))
|
||||
else:
|
||||
console.print("\n[red]Wrong index")
|
||||
sys.exit(0)
|
||||
|
@ -6,7 +6,7 @@ from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
@ -66,7 +66,7 @@ class VideoSource:
|
||||
|
||||
try:
|
||||
|
||||
response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers)
|
||||
response = httpx.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers)
|
||||
response.raise_for_status()
|
||||
|
||||
# Extract JSON response if available
|
||||
@ -90,7 +90,7 @@ class VideoSource:
|
||||
try:
|
||||
|
||||
# Make a request to collect information about a specific season
|
||||
response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers)
|
||||
response = httpx.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers)
|
||||
response.raise_for_status()
|
||||
|
||||
# Extract JSON response if available
|
||||
@ -122,7 +122,7 @@ class VideoSource:
|
||||
try:
|
||||
|
||||
# Make a request to get iframe source
|
||||
response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params)
|
||||
response = httpx.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params)
|
||||
response.raise_for_status()
|
||||
|
||||
# Parse response with BeautifulSoup to get iframe source
|
||||
@ -164,15 +164,15 @@ class VideoSource:
|
||||
|
||||
# Make a request to get content
|
||||
try:
|
||||
response = requests.get(self.iframe_src, headers=self.headers)
|
||||
response = httpx.get(self.iframe_src, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
|
||||
except:
|
||||
except Exception as e:
|
||||
print("\n")
|
||||
console.print(Panel("[red bold]Coming soon", title="Notification", title_align="left", border_style="yellow"))
|
||||
sys.exit(0)
|
||||
|
||||
if response.ok:
|
||||
if response.status_code == 200:
|
||||
|
||||
# Parse response with BeautifulSoup to get content
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
@ -6,7 +6,7 @@ import logging
|
||||
|
||||
|
||||
# External library
|
||||
import requests
|
||||
import httpx
|
||||
|
||||
|
||||
# Internal utilities
|
||||
@ -28,7 +28,7 @@ def check_url_for_content(url: str, content: str) -> bool:
|
||||
try:
|
||||
|
||||
logging.info(f"Test site to extract domain: {url}")
|
||||
response = requests.get(url, timeout = 1)
|
||||
response = httpx.get(url, timeout = 1)
|
||||
response.raise_for_status()
|
||||
|
||||
if content in response.text:
|
||||
|
@ -7,6 +7,7 @@ from typing import List
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.os import remove_special_characters
|
||||
|
||||
|
||||
# Logic class
|
||||
@ -62,10 +63,10 @@ def map_episode_title(tv_name: str, episode: Episode, number_season: int):
|
||||
str: The mapped episode title.
|
||||
"""
|
||||
map_episode_temp = MAP_EPISODE
|
||||
map_episode_temp = map_episode_temp.replace("%(tv_name)", tv_name)
|
||||
map_episode_temp = map_episode_temp.replace("%(tv_name)", remove_special_characters(tv_name))
|
||||
map_episode_temp = map_episode_temp.replace("%(season)", str(number_season).zfill(2))
|
||||
map_episode_temp = map_episode_temp.replace("%(episode)", str(episode.number).zfill(2))
|
||||
map_episode_temp = map_episode_temp.replace("%(episode_name)", episode.name)
|
||||
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode.name))
|
||||
|
||||
# Additional fix
|
||||
map_episode_temp = map_episode_temp.replace(".", "_")
|
||||
|
@ -8,27 +8,29 @@ from Src.Util.console import console, msg
|
||||
from .site import (
|
||||
get_version_and_domain,
|
||||
title_search,
|
||||
get_select_title,
|
||||
manager_clear
|
||||
get_select_title
|
||||
)
|
||||
|
||||
from .film import download_film
|
||||
from .series import download_series
|
||||
|
||||
|
||||
def main_film_series():
|
||||
# Variable
|
||||
indice = 0
|
||||
|
||||
def search():
|
||||
"""
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
# Get site domain and version
|
||||
site_version, domain = get_version_and_domain()
|
||||
|
||||
# Make request to site to get content that corrsisponde to that string
|
||||
film_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
len_database = title_search(film_search, domain)
|
||||
string_to_search = msg.ask("\n[purple]Insert word to search in all site").strip()
|
||||
|
||||
if len_database != 0:
|
||||
# Get site domain and version and get result of the search
|
||||
site_version, domain = get_version_and_domain()
|
||||
len_database = title_search(string_to_search, domain)
|
||||
|
||||
if len_database > 0:
|
||||
|
||||
# Select title from list
|
||||
select_title = get_select_title()
|
||||
@ -50,6 +52,5 @@ def main_film_series():
|
||||
domain=domain
|
||||
)
|
||||
|
||||
# If no media find
|
||||
else:
|
||||
console.print("[red]Cant find a single element")
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
@ -28,44 +28,6 @@ video_source = VideoSource()
|
||||
table_show_manager = TVShowManager()
|
||||
|
||||
|
||||
|
||||
def display_episodes_list() -> str:
|
||||
"""
|
||||
Display episodes list and handle user input.
|
||||
|
||||
Returns:
|
||||
last_command (str): Last command entered by the user.
|
||||
"""
|
||||
|
||||
# Set up table for displaying episodes
|
||||
table_show_manager.set_slice_end(10)
|
||||
|
||||
# Add columns to the table
|
||||
column_info = {
|
||||
"Index": {'color': 'red'},
|
||||
"Name": {'color': 'magenta'},
|
||||
"Duration": {'color': 'green'}
|
||||
}
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with episodes information
|
||||
for i, media in enumerate(video_source.obj_episode_manager.episodes):
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(media.number),
|
||||
'Name': media.name,
|
||||
'Duration': str(media.duration)
|
||||
})
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run()
|
||||
|
||||
if last_command == "q":
|
||||
console.print("\n[red]Quit [white]...")
|
||||
sys.exit(0)
|
||||
|
||||
return last_command
|
||||
|
||||
|
||||
def donwload_video(tv_name: str, index_season_selected: int, index_episode_selected: int) -> None:
|
||||
"""
|
||||
Download a single episode video.
|
||||
@ -187,3 +149,40 @@ def download_series(tv_id: str, tv_name: str, version: str, domain: str) -> None
|
||||
else:
|
||||
for i_season in list_season_select:
|
||||
donwload_episode(tv_name, i_season)
|
||||
|
||||
|
||||
def display_episodes_list() -> str:
|
||||
"""
|
||||
Display episodes list and handle user input.
|
||||
|
||||
Returns:
|
||||
last_command (str): Last command entered by the user.
|
||||
"""
|
||||
|
||||
# Set up table for displaying episodes
|
||||
table_show_manager.set_slice_end(10)
|
||||
|
||||
# Add columns to the table
|
||||
column_info = {
|
||||
"Index": {'color': 'red'},
|
||||
"Name": {'color': 'magenta'},
|
||||
"Duration": {'color': 'green'}
|
||||
}
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with episodes information
|
||||
for i, media in enumerate(video_source.obj_episode_manager.episodes):
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(media.number),
|
||||
'Name': media.name,
|
||||
'Duration': str(media.duration)
|
||||
})
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run()
|
||||
|
||||
if last_command == "q":
|
||||
console.print("\n[red]Quit [white]...")
|
||||
sys.exit(0)
|
||||
|
||||
return last_command
|
||||
|
@ -8,7 +8,7 @@ from typing import Tuple
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
@ -102,7 +102,7 @@ def get_version_and_domain(new_domain = None) -> Tuple[str, str]:
|
||||
|
||||
# Make requests to site to get text
|
||||
console.print(f"[cyan]Test site[white]: [red]https://{SITE_NAME}.{config_domain}")
|
||||
response = requests.get(f"https://{SITE_NAME}.{config_domain}")
|
||||
response = httpx.get(f"https://{SITE_NAME}.{config_domain}")
|
||||
console.print(f"[cyan]Test respost site[white]: [red]{response.status_code} \n")
|
||||
|
||||
# Extract version from the response
|
||||
@ -137,7 +137,7 @@ def title_search(title_search: str, domain: str) -> int:
|
||||
"""
|
||||
|
||||
# Send request to search for titles ( replace à to a and space to "+" )
|
||||
response = requests.get(f"https://{SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()})
|
||||
response = httpx.get(f"https://{SITE_NAME}.{domain}/api/search?q={unidecode(title_search.replace(' ', '+'))}", headers={'user-agent': get_headers()})
|
||||
response.raise_for_status()
|
||||
|
||||
# Add found titles to media search manager
|
||||
@ -203,18 +203,3 @@ def get_select_title(type_filter: list = None) -> MediaItem:
|
||||
else:
|
||||
console.print("\n[red]Wrong index")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def manager_clear():
|
||||
"""
|
||||
Clears the data lists managed by media_search_manager and table_show_manager.
|
||||
|
||||
This function clears the data lists managed by global variables media_search_manager
|
||||
and table_show_manager. It removes all the items from these lists, effectively
|
||||
resetting them to empty lists.
|
||||
"""
|
||||
global media_search_manager, table_show_manager
|
||||
|
||||
# Clear list of data
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
@ -8,7 +8,7 @@ from typing import Generator, Optional
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
@ -80,7 +80,7 @@ def search(query: str, num: int = 10, stop: Optional[int] = None, pause: float =
|
||||
time.sleep(pause)
|
||||
|
||||
# Fetch the HTML content of the search page
|
||||
html = requests.get(url).text
|
||||
html = httpx.get(url).text
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
try:
|
||||
|
@ -6,7 +6,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
@ -31,44 +31,47 @@ REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
|
||||
|
||||
def MP4_downloader(url: str, path: str, referer: str, add_desc: str):
|
||||
|
||||
if not os.path.exists(path):
|
||||
console.log("[cyan]Video [red]already exists.")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
# Make request to get content of video
|
||||
logging.info(f"Make request to fetch mp4 from: {url}")
|
||||
response = requests.get(url, stream=True, headers={'Referer': referer, 'user-agent': get_headers()}, verify=REQUEST_VERIFY, timeout=REQUEST_TIMEOUT)
|
||||
total = int(response.headers.get('content-length', 0))
|
||||
headers = {'Referer': referer, 'user-agent': get_headers()}
|
||||
|
||||
with httpx.Client(verify=REQUEST_VERIFY, timeout=REQUEST_TIMEOUT) as client:
|
||||
with client.stream("GET", url, headers=headers) as response:
|
||||
total = int(response.headers.get('content-length', 0))
|
||||
|
||||
# Create bar format
|
||||
if TQDM_USE_LARGE_BAR:
|
||||
bar_format = (f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): "
|
||||
f"{Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ "
|
||||
f"{Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] "
|
||||
f"{Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}} {Colors.WHITE}| "
|
||||
f"{Colors.YELLOW}{{rate_fmt}}{{postfix}} {Colors.WHITE}]")
|
||||
else:
|
||||
bar_format = (f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% "
|
||||
f"{Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]")
|
||||
|
||||
# Create bar format
|
||||
if TQDM_USE_LARGE_BAR:
|
||||
bar_format=f"{Colors.YELLOW}Downloading {Colors.WHITE}({add_desc}{Colors.WHITE}): {Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] {Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}} {Colors.WHITE}| {Colors.YELLOW}{{rate_fmt}}{{postfix}} {Colors.WHITE}]"
|
||||
else:
|
||||
bar_format=f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
|
||||
|
||||
# Create progress bar
|
||||
progress_bar = tqdm(
|
||||
total=total,
|
||||
unit='iB',
|
||||
ascii='░▒█',
|
||||
bar_format=bar_format,
|
||||
unit_scale=True,
|
||||
unit_divisor=1024
|
||||
)
|
||||
|
||||
|
||||
# Download file
|
||||
with open(path, 'wb') as file, progress_bar as bar:
|
||||
for data in response.iter_content(chunk_size=1024):
|
||||
size = file.write(data)
|
||||
bar.update(size)
|
||||
# Create progress bar
|
||||
progress_bar = tqdm(
|
||||
total=total,
|
||||
unit='iB',
|
||||
ascii='░▒█',
|
||||
bar_format=bar_format,
|
||||
unit_scale=True,
|
||||
unit_divisor=1024
|
||||
)
|
||||
|
||||
# Download file
|
||||
with open(path, 'wb') as file, progress_bar as bar:
|
||||
for chunk in response.iter_bytes(chunk_size=1024):
|
||||
if chunk:
|
||||
size = file.write(chunk)
|
||||
bar.update(size)
|
||||
|
||||
# Get summary
|
||||
console.print(Panel(
|
||||
f"[bold green]Download completed![/bold green]\n"
|
||||
f"File size: [bold red]{format_size(os.path.getsize(path))}[/bold red]\n"
|
||||
f"Duration: [bold]{print_duration_table(path, show=False)}[/bold]",
|
||||
title=f"{os.path.basename(path.replace('.mp4', ''))}", border_style="green"))
|
||||
f"[bold green]Download completed![/bold green]\n"
|
||||
f"File size: [bold red]{format_size(os.path.getsize(path))}[/bold red]\n"
|
||||
f"Duration: [bold]{print_duration_table(path, show=False)}[/bold]",
|
||||
title=f"{os.path.basename(path.replace('.mp4', ''))}",
|
||||
border_style="green"
|
||||
))
|
@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
from unidecode import unidecode
|
||||
|
||||
|
||||
@ -55,7 +55,6 @@ DOWNLOAD_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'download_sub')
|
||||
MERGE_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'merge_subs')
|
||||
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_DOWNLOAD', 'cleanup_tmp_folder')
|
||||
FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution')
|
||||
CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report')
|
||||
|
||||
|
||||
# Variable
|
||||
@ -71,7 +70,7 @@ class Downloader():
|
||||
|
||||
Args:
|
||||
- output_filename (str): Output filename for the downloaded content.
|
||||
- m3u8_playlist (str, optional): URL to the main M3U8 playlist or text.
|
||||
- m3u8_playlist (str, optional): URL to the main M3U8 playlist.
|
||||
- m3u8_playlist (str, optional): URL to the main M3U8 index. ( NOT TEXT )
|
||||
"""
|
||||
|
||||
@ -139,9 +138,10 @@ class Downloader():
|
||||
# Send a GET request to the provided URL
|
||||
logging.info(f"Test url: {url}")
|
||||
headers_index['user-agent'] = get_headers()
|
||||
response = requests.get(url, headers=headers_index)
|
||||
response = httpx.get(url, headers=headers_index)
|
||||
response.raise_for_status()
|
||||
|
||||
if response.ok:
|
||||
if response.status_code == 200:
|
||||
return response.text
|
||||
|
||||
else:
|
||||
@ -321,9 +321,10 @@ class Downloader():
|
||||
"""
|
||||
|
||||
# Send a GET request to download the subtitle content
|
||||
response = requests.get(uri)
|
||||
response = httpx.get(uri)
|
||||
response.raise_for_status()
|
||||
|
||||
if response.ok:
|
||||
if response.status_code == 200:
|
||||
|
||||
# Write the content to the specified file
|
||||
with open(path, "wb") as f:
|
||||
@ -368,7 +369,7 @@ class Downloader():
|
||||
m3u8_sub_parser = M3U8_Parser()
|
||||
m3u8_sub_parser.parse_data(
|
||||
uri = obj_subtitle.get('uri'),
|
||||
raw_content = requests.get(obj_subtitle.get('uri')).text
|
||||
raw_content = httpx.get(obj_subtitle.get('uri')).text
|
||||
)
|
||||
|
||||
# Initiate the download of the subtitle content
|
||||
@ -500,16 +501,15 @@ class Downloader():
|
||||
if self.m3u8_playlist:
|
||||
logging.info("Download from PLAYLIST")
|
||||
|
||||
# Fetch the M3U8 playlist content
|
||||
if not len(str(self.m3u8_playlist).split("\n")) > 2: # Is a single link
|
||||
m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist)
|
||||
|
||||
# Add full URL of the M3U8 playlist to fix next .ts without https if necessary
|
||||
self.m3u8_url_fixer.set_playlist(self.m3u8_playlist) # !!!!!!!!!!!!!!!!!! to fix for playlist with text
|
||||
m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist)
|
||||
|
||||
else:
|
||||
logging.warning("M3U8 master url not set.") # TO DO
|
||||
m3u8_playlist_text = self.m3u8_playlist
|
||||
# Add full URL of the M3U8 playlist to fix next .ts without https if necessary
|
||||
self.m3u8_url_fixer.set_playlist(self.m3u8_playlist)
|
||||
|
||||
if m3u8_playlist_text is None:
|
||||
console.log("[red]Playlist m3u8 to download is empty.")
|
||||
sys.exit(0)
|
||||
|
||||
# Save text playlist
|
||||
open(os.path.join(self.base_path, "tmp", "playlist.m3u8"), "w+").write(m3u8_playlist_text)
|
||||
@ -620,16 +620,3 @@ class Downloader():
|
||||
|
||||
# Clean all tmp file
|
||||
self.__clean__(converted_out_path)
|
||||
|
||||
|
||||
# Create download report
|
||||
if CREATE_REPORT:
|
||||
|
||||
# Get variable to add
|
||||
current_date = datetime.today().date()
|
||||
base_filename = os.path.split(self.output_filename)[-1].replace('.mp4', '')
|
||||
filename_out_size = format_size(os.path.getsize(self.output_filename))
|
||||
|
||||
# Add new row to table and save
|
||||
report_table.add_row_to_database(str(current_date), str(base_filename), str(filename_out_size))
|
||||
report_table.save_database()
|
||||
|
@ -1,16 +1,20 @@
|
||||
# 09.06.24
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Util.os import check_file_existence
|
||||
|
||||
|
||||
class ProxyManager:
|
||||
@ -24,7 +28,6 @@ class ProxyManager:
|
||||
"""
|
||||
self.proxy_list = proxy_list or []
|
||||
self.verified_proxies = []
|
||||
self.failed_proxies = {}
|
||||
self.timeout = config_manager.get_float('REQUESTS', 'timeout')
|
||||
self.url = url
|
||||
|
||||
@ -39,17 +42,19 @@ class ProxyManager:
|
||||
- Proxy string if working, None otherwise
|
||||
"""
|
||||
protocol = proxy.split(":")[0].lower()
|
||||
protocol = f'{protocol}://'
|
||||
proxy = {protocol: proxy, "https://": proxy}
|
||||
|
||||
try:
|
||||
response = requests.get(self.url, proxies={protocol: proxy}, timeout=self.timeout)
|
||||
with httpx.Client(proxies=proxy, verify=False) as client:
|
||||
response = client.get(self.url, timeout=self.timeout, headers={'user-agent': get_headers()})
|
||||
|
||||
if response.status_code == 200:
|
||||
logging.info(f"Proxy {proxy} is working.")
|
||||
return proxy
|
||||
if response.status_code == 200:
|
||||
logging.info(f"Proxy {proxy} is working.")
|
||||
return proxy
|
||||
|
||||
except requests.RequestException as e:
|
||||
logging.error(f"Proxy {proxy} failed: {e}")
|
||||
self.failed_proxies[proxy] = time.time()
|
||||
except Exception as e:
|
||||
logging.error(f"Test proxy {proxy} failed: {e}")
|
||||
return None
|
||||
|
||||
def verify_proxies(self):
|
||||
@ -57,8 +62,9 @@ class ProxyManager:
|
||||
Verify all proxies in the list and store the working ones.
|
||||
"""
|
||||
logging.info("Starting proxy verification...")
|
||||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||||
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
|
||||
self.verified_proxies = list(executor.map(self._check_proxy, self.proxy_list))
|
||||
|
||||
self.verified_proxies = [proxy for proxy in self.verified_proxies if proxy]
|
||||
logging.info(f"Verification complete. {len(self.verified_proxies)} proxies are working.")
|
||||
|
||||
@ -66,23 +72,40 @@ class ProxyManager:
|
||||
"""
|
||||
Get validate proxies.
|
||||
"""
|
||||
validate_proxy = []
|
||||
|
||||
for proxy in self.verified_proxies:
|
||||
protocol = proxy.split(":")[0].lower()
|
||||
validate_proxy.append({protocol: proxy})
|
||||
|
||||
return validate_proxy
|
||||
|
||||
if len(self.verified_proxies) > 0:
|
||||
return self.verified_proxies
|
||||
|
||||
else:
|
||||
logging.error("Cant find valid proxy.")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main_test_proxy(url_test):
|
||||
|
||||
path_file_proxt_list = "list_proxy.txt"
|
||||
|
||||
if check_file_existence(path_file_proxt_list):
|
||||
|
||||
# Read file
|
||||
with open(path_file_proxt_list, 'r') as file:
|
||||
ip_addresses = file.readlines()
|
||||
|
||||
# Formatt ip
|
||||
ip_addresses = [ip.strip() for ip in ip_addresses]
|
||||
formatted_ips = [f"http://{ip}" for ip in ip_addresses]
|
||||
|
||||
# Get list of proxy from config.json
|
||||
proxy_list = config_manager.get_list('REQUESTS', 'proxy')
|
||||
proxy_list = formatted_ips
|
||||
|
||||
# Verify proxy
|
||||
manager = ProxyManager(proxy_list, url_test)
|
||||
manager.verify_proxies()
|
||||
|
||||
# Write valid ip in txt file
|
||||
with open(path_file_proxt_list, 'w') as file:
|
||||
for ip in ip_addresses:
|
||||
file.write(f"{ip}\n")
|
||||
|
||||
# Return valid proxy
|
||||
return manager.get_verified_proxies()
|
||||
|
@ -8,21 +8,21 @@ import threading
|
||||
import logging
|
||||
import binascii
|
||||
from queue import PriorityQueue
|
||||
from urllib.parse import urljoin
|
||||
from urllib.parse import urljoin, urlparse
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
from requests.exceptions import HTTPError, ConnectionError, Timeout, RequestException
|
||||
import httpx
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Util.headers import get_headers, random_headers
|
||||
from Src.Util.color import Colors
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.os import check_file_existence
|
||||
|
||||
|
||||
# Logic class
|
||||
@ -45,14 +45,17 @@ TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers')
|
||||
TQDM_DELAY_WORKER = config_manager.get_float('M3U8_DOWNLOAD', 'tqdm_delay')
|
||||
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
|
||||
REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
|
||||
THERE_IS_PROXY_LIST = len(config_manager.get_list('REQUESTS', 'proxy')) > 0
|
||||
REQUEST_MAX_RETRY = config_manager.get_int('REQUESTS', 'max_retry')
|
||||
THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt")
|
||||
PROXY_START_MIN = config_manager.get_float('REQUESTS', 'proxy_start_min')
|
||||
PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max')
|
||||
|
||||
|
||||
# Variable
|
||||
headers_index = config_manager.get_dict('REQUESTS', 'index')
|
||||
headers_segments = config_manager.get_dict('REQUESTS', 'segments')
|
||||
session = requests.Session()
|
||||
session.verify = config_manager.get_bool('REQUESTS', 'verify_ssl')
|
||||
transport = httpx.HTTPTransport(retries=REQUEST_MAX_RETRY)
|
||||
|
||||
|
||||
|
||||
|
||||
class M3U8_Segments:
|
||||
@ -90,13 +93,15 @@ class M3U8_Segments:
|
||||
"""
|
||||
headers_index['user-agent'] = get_headers()
|
||||
|
||||
|
||||
# Construct the full URL of the key
|
||||
key_uri = urljoin(self.url, m3u8_parser.keys.get('uri'))
|
||||
key_uri = urljoin(self.url, m3u8_parser.keys.get('uri'))
|
||||
parsed_url = urlparse(key_uri)
|
||||
self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
|
||||
logging.info(f"Uri key: {key_uri}")
|
||||
|
||||
# Make request to get porxy
|
||||
try:
|
||||
response = requests.get(key_uri, headers=headers_index)
|
||||
response = httpx.get(key_uri, headers=headers_index)
|
||||
response.raise_for_status()
|
||||
|
||||
except Exception as e:
|
||||
@ -169,11 +174,11 @@ class M3U8_Segments:
|
||||
headers_index['user-agent'] = get_headers()
|
||||
|
||||
# Send a GET request to retrieve the index M3U8 file
|
||||
response = requests.get(self.url, headers=headers_index)
|
||||
response = httpx.get(self.url, headers=headers_index)
|
||||
response.raise_for_status()
|
||||
|
||||
# Save the M3U8 file to the temporary folder
|
||||
if response.ok:
|
||||
if response.status_code == 200:
|
||||
path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8")
|
||||
open(path_m3u8_file, "w+").write(response.text)
|
||||
|
||||
@ -189,19 +194,30 @@ class M3U8_Segments:
|
||||
- index (int): The index of the segment.
|
||||
- progress_bar (tqdm): Progress counter for tracking download progress.
|
||||
"""
|
||||
|
||||
try:
|
||||
|
||||
# Generate headers
|
||||
start_time = time.time()
|
||||
headers_segments['user-agent'] = get_headers()
|
||||
|
||||
# Make request to get content
|
||||
if THERE_IS_PROXY_LIST:
|
||||
proxy = self.valid_proxy[index % len(self.valid_proxy)]
|
||||
logging.info(f"Use proxy: {proxy}")
|
||||
response = session.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, proxies=proxy)
|
||||
#print(client.get("https://api.ipify.org/?format=json").json())
|
||||
|
||||
with httpx.Client(proxies=proxy, verify=False, transport=transport) as client:
|
||||
if 'key_base_url' in self.__dict__:
|
||||
response = client.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT)
|
||||
else:
|
||||
response = client.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT)
|
||||
else:
|
||||
response = session.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT)
|
||||
|
||||
with httpx.Client(verify=False, transport=transport) as client_2:
|
||||
if 'key_base_url' in self.__dict__:
|
||||
response = client_2.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT)
|
||||
else:
|
||||
response = client_2.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT)
|
||||
|
||||
# Get response content
|
||||
response.raise_for_status()
|
||||
@ -211,7 +227,7 @@ class M3U8_Segments:
|
||||
duration = time.time() - start_time
|
||||
response_size = int(response.headers.get('Content-Length', 0))
|
||||
self.class_ts_estimator.update_progress_bar(response_size, duration, progress_bar)
|
||||
|
||||
|
||||
# Decrypt the segment content if decryption is needed
|
||||
if self.decryption is not None:
|
||||
segment_content = self.decryption.decrypt(segment_content)
|
||||
@ -220,13 +236,8 @@ class M3U8_Segments:
|
||||
self.queue.put((index, segment_content))
|
||||
progress_bar.update(1)
|
||||
|
||||
except (HTTPError, ConnectionError, Timeout, RequestException) as e:
|
||||
progress_bar.update(1)
|
||||
logging.error(f"Request-related exception while downloading segment: {e}")
|
||||
|
||||
except Exception as e:
|
||||
progress_bar.update(1)
|
||||
logging.error(f"An unexpected exception occurred while download segment: {e}")
|
||||
console.print(f"Failed to download '{ts_url}', status error: {e}.")
|
||||
|
||||
def write_segments_to_file(self):
|
||||
"""
|
||||
@ -280,10 +291,29 @@ class M3U8_Segments:
|
||||
writer_thread = threading.Thread(target=self.write_segments_to_file)
|
||||
writer_thread.start()
|
||||
|
||||
# Ff proxy avaiable set max_workers to number of proxy
|
||||
# else set max_workers to TQDM_MAX_WORKER
|
||||
max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER
|
||||
|
||||
# if proxy avaiable set timeout to variable time
|
||||
# else set timeout to TDQM_DELAY_WORKER
|
||||
if THERE_IS_PROXY_LIST:
|
||||
num_proxies = len(self.valid_proxy)
|
||||
self.working_proxy_list = self.valid_proxy
|
||||
|
||||
if num_proxies > 0:
|
||||
# calculate delay based on number of proxies
|
||||
# dalay should be between 0.5 and 1
|
||||
delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (num_proxies + 1)))
|
||||
else:
|
||||
delay = TQDM_DELAY_WORKER
|
||||
else:
|
||||
delay = TQDM_DELAY_WORKER
|
||||
|
||||
# Start all workers
|
||||
with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor:
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
for index, segment_url in enumerate(self.segments):
|
||||
time.sleep(TQDM_DELAY_WORKER)
|
||||
time.sleep(delay)
|
||||
executor.submit(self.make_requests_stream, segment_url, index, progress_bar)
|
||||
|
||||
# Wait for all tasks to complete
|
||||
|
@ -1,9 +1,6 @@
|
||||
# 09.06.24
|
||||
# 02.04.24
|
||||
|
||||
from .helper import (
|
||||
M3U8_Decryption,
|
||||
M3U8_Ts_Estimator,
|
||||
M3U8_Parser,
|
||||
M3U8_Codec,
|
||||
M3U8_UrlFix
|
||||
)
|
||||
from .decryptor import M3U8_Decryption
|
||||
from .estimator import M3U8_Ts_Estimator
|
||||
from .parser import M3U8_Parser, M3U8_Codec
|
||||
from .url_fixer import M3U8_UrlFix
|
@ -1,6 +0,0 @@
|
||||
# 02.04.24
|
||||
|
||||
from .decryptor import M3U8_Decryption
|
||||
from .estimator import M3U8_Ts_Estimator
|
||||
from .parser import M3U8_Parser, M3U8_Codec
|
||||
from .url_fixer import M3U8_UrlFix
|
@ -1,14 +1,15 @@
|
||||
# 20.04.25
|
||||
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from ..parser import load
|
||||
from m3u8 import loads
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
import httpx
|
||||
|
||||
|
||||
# Costant
|
||||
@ -372,7 +373,7 @@ class M3U8_Subtitle:
|
||||
|
||||
# Send a request to retrieve the subtitle content
|
||||
logging.info(f"Download subtitle: {obj_subtitle.get('name')}")
|
||||
response_subitle = requests.get(obj_subtitle.get('uri'))
|
||||
response_subitle = httpx.get(obj_subtitle.get('uri'))
|
||||
|
||||
try:
|
||||
# Try to extract the VTT URL from the subtitle content
|
||||
@ -418,10 +419,9 @@ class M3U8_Parser:
|
||||
|
||||
|
||||
# Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles
|
||||
m3u8_obj = load(raw_content, uri)
|
||||
m3u8_obj = loads(raw_content, uri)
|
||||
|
||||
self.__parse_video_info__(m3u8_obj)
|
||||
self.__parse_encryption_keys__(m3u8_obj)
|
||||
self.__parse_subtitles_and_audio__(m3u8_obj)
|
||||
self.__parse_segments__(m3u8_obj)
|
||||
|
||||
@ -516,6 +516,7 @@ class M3U8_Parser:
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing encryption keys: {e}")
|
||||
sys.exit(0)
|
||||
pass
|
||||
|
||||
def __parse_subtitles_and_audio__(self, m3u8_obj) -> None:
|
||||
@ -557,7 +558,11 @@ class M3U8_Parser:
|
||||
"""
|
||||
|
||||
try:
|
||||
|
||||
for segment in m3u8_obj.segments:
|
||||
|
||||
# Parse key
|
||||
self.__parse_encryption_keys__(segment)
|
||||
|
||||
# Collect all index duration
|
||||
self.duration += segment.duration
|
@ -1,38 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
import os
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from .model import M3U8
|
||||
|
||||
|
||||
def load(raw_content, uri):
|
||||
"""
|
||||
Parses the content of an M3U8 playlist and returns an M3U8 object.
|
||||
|
||||
Args:
|
||||
raw_content (str): The content of the M3U8 playlist as a string.
|
||||
uri (str): The URI of the M3U8 playlist file or stream.
|
||||
|
||||
Returns:
|
||||
M3U8: An object representing the parsed M3U8 playlist.
|
||||
|
||||
Raises:
|
||||
IOError: If the raw_content is empty or if the URI cannot be accessed.
|
||||
ValueError: If the raw_content is not a valid M3U8 playlist format.
|
||||
|
||||
Example:
|
||||
>>> m3u8_content = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:10\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:10.0,\nhttp://example.com/segment0.ts\n#EXTINF:10.0,\nhttp://example.com/segment1.ts\n"
|
||||
>>> uri = "http://example.com/playlist.m3u8"
|
||||
>>> playlist = load(m3u8_content, uri)
|
||||
"""
|
||||
|
||||
if not raw_content:
|
||||
raise IOError("Empty content provided.")
|
||||
|
||||
if not uri:
|
||||
raise IOError("Empty URI provided.")
|
||||
|
||||
base_uri = os.path.dirname(uri)
|
||||
return M3U8(raw_content, base_uri=base_uri)
|
@ -1,28 +0,0 @@
|
||||
# 19.04.24
|
||||
|
||||
import itertools
|
||||
|
||||
|
||||
def remove_quotes_parser(*attrs):
|
||||
"""
|
||||
Returns a dictionary mapping attribute names to a function that removes quotes from their values.
|
||||
"""
|
||||
return dict(zip(attrs, itertools.repeat(remove_quotes)))
|
||||
|
||||
|
||||
def remove_quotes(string):
|
||||
"""
|
||||
Removes quotes from a string.
|
||||
"""
|
||||
quotes = ('"', "'")
|
||||
if string and string[0] in quotes and string[-1] in quotes:
|
||||
return string[1:-1]
|
||||
return string
|
||||
|
||||
|
||||
def normalize_attribute(attribute):
|
||||
"""
|
||||
Normalizes an attribute name by converting hyphens to underscores and converting to lowercase.
|
||||
"""
|
||||
return attribute.replace('-', '_').lower().strip()
|
||||
|
@ -1,358 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
import os
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from ..parser import parser
|
||||
|
||||
|
||||
# Variable
|
||||
StreamInfo = namedtuple('StreamInfo', ['bandwidth', 'program_id', 'resolution', 'codecs'])
|
||||
Media = namedtuple('Media', ['uri', 'type', 'group_id', 'language', 'name','default', 'autoselect', 'forced', 'characteristics'])
|
||||
|
||||
|
||||
class M3U8:
|
||||
"""
|
||||
Represents a single M3U8 playlist. Should be instantiated with the content as string.
|
||||
|
||||
Args:
|
||||
- content: the m3u8 content as string
|
||||
- base_path: all urls (key and segments url) will be updated with this base_path,
|
||||
ex: base_path = "http://videoserver.com/hls"
|
||||
- base_uri: uri the playlist comes from. it is propagated to SegmentList and Key
|
||||
ex: http://example.com/path/to
|
||||
|
||||
Attribute:
|
||||
- key: it's a `Key` object, the EXT-X-KEY from m3u8. Or None
|
||||
- segments: a `SegmentList` object, represents the list of `Segment`s from this playlist
|
||||
- is_variant: Returns true if this M3U8 is a variant playlist, with links to other M3U8s with different bitrates.
|
||||
If true, `playlists` is a list of the playlists available, and `iframe_playlists` is a list of the i-frame playlists available.
|
||||
- is_endlist: Returns true if EXT-X-ENDLIST tag present in M3U8.
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8
|
||||
- playlists: If this is a variant playlist (`is_variant` is True), returns a list of Playlist objects
|
||||
- iframe_playlists: If this is a variant playlist (`is_variant` is True), returns a list of IFramePlaylist objects
|
||||
- playlist_type: A lower-case string representing the type of the playlist, which can be one of VOD (video on demand) or EVENT.
|
||||
- media: If this is a variant playlist (`is_variant` is True), returns a list of Media objects
|
||||
- target_duration: Returns the EXT-X-TARGETDURATION as an integer
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2
|
||||
- media_sequence: Returns the EXT-X-MEDIA-SEQUENCE as an integer
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3
|
||||
- program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a string
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
|
||||
- version: Return the EXT-X-VERSION as is
|
||||
- allow_cache: Return the EXT-X-ALLOW-CACHE as is
|
||||
- files: Returns an iterable with all files from playlist, in order. This includes segments and key uri, if present.
|
||||
- base_uri: It is a property (getter and setter) used by SegmentList and Key to have absolute URIs.
|
||||
- is_i_frames_only: Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8.
|
||||
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12
|
||||
|
||||
"""
|
||||
|
||||
# Mapping of simple attributes (obj attribute, parser attribute)
|
||||
SIMPLE_ATTRIBUTES = (
|
||||
('is_variant', 'is_variant'),
|
||||
('is_endlist', 'is_endlist'),
|
||||
('is_i_frames_only', 'is_i_frames_only'),
|
||||
('target_duration', 'targetduration'),
|
||||
('media_sequence', 'media_sequence'),
|
||||
('program_date_time', 'program_date_time'),
|
||||
('version', 'version'),
|
||||
('allow_cache', 'allow_cache'),
|
||||
('playlist_type', 'playlist_type')
|
||||
)
|
||||
|
||||
def __init__(self, content=None, base_path=None, base_uri=None):
|
||||
"""
|
||||
Initialize the M3U8 object.
|
||||
|
||||
Parameters:
|
||||
- content: M3U8 content (string).
|
||||
- base_path: Base path for relative URIs (string).
|
||||
- base_uri: Base URI for absolute URIs (string).
|
||||
"""
|
||||
if content is not None:
|
||||
self.data = parser.parse(content)
|
||||
else:
|
||||
self.data = {}
|
||||
self._base_uri = base_uri
|
||||
self.base_path = base_path
|
||||
self._initialize_attributes()
|
||||
|
||||
def _initialize_attributes(self):
|
||||
"""
|
||||
Initialize attributes based on parsed data.
|
||||
"""
|
||||
# Initialize key and segments
|
||||
self.key = Key(base_uri=self.base_uri, **self.data.get('key', {})) if 'key' in self.data else None
|
||||
self.segments = SegmentList([Segment(base_uri=self.base_uri, **params) for params in self.data.get('segments', [])])
|
||||
|
||||
# Initialize simple attributes
|
||||
for attr, param in self.SIMPLE_ATTRIBUTES:
|
||||
setattr(self, attr, self.data.get(param))
|
||||
|
||||
# Initialize files, media, playlists, and iframe_playlists
|
||||
self.files = []
|
||||
if self.key:
|
||||
self.files.append(self.key.uri)
|
||||
self.files.extend(self.segments.uri)
|
||||
|
||||
self.media = [Media(
|
||||
uri = media.get('uri'),
|
||||
type = media.get('type'),
|
||||
group_id = media.get('group_id'),
|
||||
language = media.get('language'),
|
||||
name = media.get('name'),
|
||||
default = media.get('default'),
|
||||
autoselect = media.get('autoselect'),
|
||||
forced = media.get('forced'),
|
||||
characteristics = media.get('characteristics'))
|
||||
for media in self.data.get('media', [])
|
||||
]
|
||||
self.playlists = PlaylistList([Playlist(
|
||||
base_uri = self.base_uri,
|
||||
media = self.media,
|
||||
**playlist
|
||||
)for playlist in self.data.get('playlists', [])
|
||||
])
|
||||
self.iframe_playlists = PlaylistList()
|
||||
for ifr_pl in self.data.get('iframe_playlists', []):
|
||||
self.iframe_playlists.append(
|
||||
IFramePlaylist(
|
||||
base_uri = self.base_uri,
|
||||
uri = ifr_pl['uri'],
|
||||
iframe_stream_info=ifr_pl['iframe_stream_info'])
|
||||
)
|
||||
|
||||
@property
|
||||
def base_uri(self):
|
||||
"""
|
||||
Get the base URI.
|
||||
"""
|
||||
return self._base_uri
|
||||
|
||||
@base_uri.setter
|
||||
def base_uri(self, new_base_uri):
|
||||
"""
|
||||
Set the base URI.
|
||||
"""
|
||||
self._base_uri = new_base_uri
|
||||
self.segments.base_uri = new_base_uri
|
||||
|
||||
|
||||
class BasePathMixin:
|
||||
"""
|
||||
Mixin class for managing base paths.
|
||||
"""
|
||||
@property
|
||||
def base_path(self):
|
||||
"""
|
||||
Get the base path.
|
||||
"""
|
||||
return os.path.dirname(self.uri)
|
||||
|
||||
@base_path.setter
|
||||
def base_path(self, newbase_path):
|
||||
"""
|
||||
Set the base path.
|
||||
"""
|
||||
if not self.base_path:
|
||||
self.uri = "%s/%s" % (newbase_path, self.uri)
|
||||
self.uri = self.uri.replace(self.base_path, newbase_path)
|
||||
|
||||
|
||||
class GroupedBasePathMixin:
|
||||
"""
|
||||
Mixin class for managing base paths across a group of items.
|
||||
"""
|
||||
|
||||
def _set_base_uri(self, new_base_uri):
|
||||
"""
|
||||
Set the base URI for each item in the group.
|
||||
"""
|
||||
for item in self:
|
||||
item.base_uri = new_base_uri
|
||||
|
||||
base_uri = property(None, _set_base_uri)
|
||||
|
||||
def _set_base_path(self, new_base_path):
|
||||
"""
|
||||
Set the base path for each item in the group.
|
||||
"""
|
||||
for item in self:
|
||||
item.base_path = new_base_path
|
||||
|
||||
base_path = property(None, _set_base_path)
|
||||
|
||||
|
||||
class Segment(BasePathMixin):
|
||||
"""
|
||||
Class representing a segment in an M3U8 playlist.
|
||||
Inherits from BasePathMixin for managing base paths.
|
||||
"""
|
||||
|
||||
def __init__(self, uri, base_uri, program_date_time=None, duration=None,
|
||||
title=None, byterange=None, discontinuity=False, key=None):
|
||||
"""
|
||||
Initialize a Segment object.
|
||||
|
||||
Args:
|
||||
- uri: URI of the segment.
|
||||
- base_uri: Base URI for the segment.
|
||||
- program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a datetime
|
||||
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
|
||||
- duration: Duration of the segment (optional).
|
||||
- title: Title attribute from EXTINF parameter
|
||||
- byterange: Byterange information of the segment (optional).
|
||||
- discontinuity: Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists
|
||||
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11
|
||||
- key: Key for encryption (optional).
|
||||
"""
|
||||
self.uri = uri
|
||||
self.duration = duration
|
||||
self.title = title
|
||||
self.base_uri = base_uri
|
||||
self.byterange = byterange
|
||||
self.program_date_time = program_date_time
|
||||
self.discontinuity = discontinuity
|
||||
#self.key = key
|
||||
|
||||
|
||||
class SegmentList(list, GroupedBasePathMixin):
|
||||
"""
|
||||
Class representing a list of segments in an M3U8 playlist.
|
||||
Inherits from list and GroupedBasePathMixin for managing base paths across a group of items.
|
||||
"""
|
||||
|
||||
@property
|
||||
def uri(self):
|
||||
"""
|
||||
Get the URI of each segment in the SegmentList.
|
||||
|
||||
Returns:
|
||||
- List of URIs of segments in the SegmentList.
|
||||
"""
|
||||
return [seg.uri for seg in self]
|
||||
|
||||
|
||||
class Key(BasePathMixin):
|
||||
"""
|
||||
Class representing a key used for encryption in an M3U8 playlist.
|
||||
Inherits from BasePathMixin for managing base paths.
|
||||
"""
|
||||
|
||||
def __init__(self, method, uri, base_uri, iv=None):
|
||||
"""
|
||||
Initialize a Key object.
|
||||
|
||||
Args:
|
||||
- method: Encryption method.
|
||||
ex: "AES-128"
|
||||
- uri: URI of the key.
|
||||
ex: "https://priv.example.com/key.php?r=52"
|
||||
- base_uri: Base URI for the key.
|
||||
ex: http://example.com/path/to
|
||||
- iv: Initialization vector (optional).
|
||||
ex: 0X12A
|
||||
"""
|
||||
self.method = method
|
||||
self.uri = uri
|
||||
self.iv = iv
|
||||
self.base_uri = base_uri
|
||||
|
||||
|
||||
class Playlist(BasePathMixin):
|
||||
"""
|
||||
Playlist object representing a link to a variant M3U8 with a specific bitrate.
|
||||
|
||||
More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10
|
||||
"""
|
||||
|
||||
def __init__(self, uri, stream_info, media, base_uri):
|
||||
"""
|
||||
Initialize a Playlist object.
|
||||
|
||||
Args:
|
||||
- uri: URI of the playlist.
|
||||
- stream_info: is a named tuple containing the attributes: `program_id`,
|
||||
- media: List of Media objects associated with the playlist.
|
||||
- base_uri: Base URI for the playlist.
|
||||
"""
|
||||
self.uri = uri
|
||||
self.base_uri = base_uri
|
||||
|
||||
# Extract resolution information from stream_info
|
||||
resolution = stream_info.get('resolution')
|
||||
if resolution is not None:
|
||||
values = resolution.split('x')
|
||||
resolution_pair = (int(values[0]), int(values[1]))
|
||||
else:
|
||||
resolution_pair = None
|
||||
|
||||
# Create StreamInfo object
|
||||
self.stream_info = StreamInfo(
|
||||
bandwidth = stream_info['bandwidth'],
|
||||
program_id = stream_info.get('program_id'),
|
||||
resolution = resolution_pair,
|
||||
codecs = stream_info.get('codecs')
|
||||
)
|
||||
|
||||
# Filter media based on group ID and media type
|
||||
self.media = []
|
||||
for media_type in ('audio', 'video', 'subtitles'):
|
||||
group_id = stream_info.get(media_type)
|
||||
if group_id:
|
||||
self.media += filter(lambda m: m.group_id == group_id, media)
|
||||
|
||||
|
||||
class IFramePlaylist(BasePathMixin):
|
||||
"""
|
||||
Class representing an I-Frame playlist in an M3U8 playlist.
|
||||
Inherits from BasePathMixin for managing base paths.
|
||||
"""
|
||||
|
||||
def __init__(self, base_uri, uri, iframe_stream_info):
|
||||
"""
|
||||
Initialize an IFramePlaylist object.
|
||||
|
||||
Args:
|
||||
- base_uri: Base URI for the I-Frame playlist.
|
||||
- uri: URI of the I-Frame playlist.
|
||||
- iframe_stream_info, is a named tuple containing the attributes:
|
||||
`program_id`, `bandwidth`, `codecs` and `resolution` which is a tuple (w, h) of integers
|
||||
"""
|
||||
self.uri = uri
|
||||
self.base_uri = base_uri
|
||||
|
||||
# Extract resolution information from iframe_stream_info
|
||||
resolution = iframe_stream_info.get('resolution')
|
||||
if resolution is not None:
|
||||
values = resolution.split('x')
|
||||
resolution_pair = (int(values[0]), int(values[1]))
|
||||
else:
|
||||
resolution_pair = None
|
||||
|
||||
# Create StreamInfo object for I-Frame playlist
|
||||
self.iframe_stream_info = StreamInfo(
|
||||
bandwidth = iframe_stream_info.get('bandwidth'),
|
||||
program_id = iframe_stream_info.get('program_id'),
|
||||
resolution = resolution_pair,
|
||||
codecs = iframe_stream_info.get('codecs')
|
||||
)
|
||||
|
||||
class PlaylistList(list, GroupedBasePathMixin):
|
||||
"""
|
||||
Class representing a list of playlists in an M3U8 playlist.
|
||||
Inherits from list and GroupedBasePathMixin for managing base paths across a group of items.
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Return a string representation of the PlaylistList.
|
||||
|
||||
Returns:
|
||||
- String representation of the PlaylistList.
|
||||
"""
|
||||
output = [str(playlist) for playlist in self]
|
||||
return '\n'.join(output)
|
@ -1,338 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
import re
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from ..parser import protocol
|
||||
from ._util import (
|
||||
remove_quotes,
|
||||
remove_quotes_parser,
|
||||
normalize_attribute
|
||||
)
|
||||
|
||||
|
||||
# External utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
# Variable
|
||||
REMOVE_EMPTY_ROW = config_manager.get_bool('M3U8_PARSER', 'skip_empty_row_playlist')
|
||||
ATTRIBUTELISTPATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''')
|
||||
|
||||
|
||||
def parse(content):
|
||||
"""
|
||||
Given an M3U8 playlist content, parses the content and extracts metadata.
|
||||
|
||||
Args:
|
||||
content (str): The M3U8 playlist content.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the parsed metadata.
|
||||
"""
|
||||
|
||||
# Initialize data dictionary with default values
|
||||
data = {
|
||||
'is_variant': False,
|
||||
'is_endlist': False,
|
||||
'is_i_frames_only': False,
|
||||
'playlist_type': None,
|
||||
'playlists': [],
|
||||
'iframe_playlists': [],
|
||||
'segments': [],
|
||||
'media': [],
|
||||
}
|
||||
|
||||
# Initialize state dictionary for tracking parsing state
|
||||
state = {
|
||||
'expect_segment': False,
|
||||
'expect_playlist': False,
|
||||
}
|
||||
|
||||
# Iterate over lines in the content
|
||||
content = content.split("\n")
|
||||
content_length = len(content)
|
||||
i = 0
|
||||
|
||||
while i < content_length:
|
||||
line = content[i]
|
||||
line_stripped = line.strip()
|
||||
is_end = i + 1 == content_length - 2
|
||||
|
||||
if REMOVE_EMPTY_ROW:
|
||||
if i < content_length - 2:
|
||||
actual_row = extract_params(line_stripped)
|
||||
next_row = extract_params(content[i + 2].strip())
|
||||
|
||||
if actual_row is not None and next_row is None and not is_end:
|
||||
logging.info(f"Skip row: {line_stripped}")
|
||||
i += 1
|
||||
continue
|
||||
|
||||
i += 1
|
||||
|
||||
if line.startswith(protocol.ext_x_byterange):
|
||||
_parse_byterange(line, state)
|
||||
state['expect_segment'] = True
|
||||
|
||||
elif state['expect_segment']:
|
||||
_parse_ts_chunk(line, data, state)
|
||||
state['expect_segment'] = False
|
||||
|
||||
elif state['expect_playlist']:
|
||||
_parse_variant_playlist(line, data, state)
|
||||
state['expect_playlist'] = False
|
||||
|
||||
elif line.startswith(protocol.ext_x_targetduration):
|
||||
_parse_simple_parameter(line, data, float)
|
||||
elif line.startswith(protocol.ext_x_media_sequence):
|
||||
_parse_simple_parameter(line, data, int)
|
||||
elif line.startswith(protocol.ext_x_discontinuity):
|
||||
state['discontinuity'] = True
|
||||
elif line.startswith(protocol.ext_x_version):
|
||||
_parse_simple_parameter(line, data)
|
||||
elif line.startswith(protocol.ext_x_allow_cache):
|
||||
_parse_simple_parameter(line, data)
|
||||
|
||||
elif line.startswith(protocol.ext_x_key):
|
||||
state['current_key'] = _parse_key(line)
|
||||
data['key'] = data.get('key', state['current_key'])
|
||||
|
||||
elif line.startswith(protocol.extinf):
|
||||
_parse_extinf(line, data, state)
|
||||
state['expect_segment'] = True
|
||||
|
||||
elif line.startswith(protocol.ext_x_stream_inf):
|
||||
state['expect_playlist'] = True
|
||||
_parse_stream_inf(line, data, state)
|
||||
|
||||
elif line.startswith(protocol.ext_x_i_frame_stream_inf):
|
||||
_parse_i_frame_stream_inf(line, data)
|
||||
|
||||
elif line.startswith(protocol.ext_x_media):
|
||||
_parse_media(line, data, state)
|
||||
|
||||
elif line.startswith(protocol.ext_x_playlist_type):
|
||||
_parse_simple_parameter(line, data)
|
||||
|
||||
elif line.startswith(protocol.ext_i_frames_only):
|
||||
data['is_i_frames_only'] = True
|
||||
|
||||
elif line.startswith(protocol.ext_x_endlist):
|
||||
data['is_endlist'] = True
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def extract_params(line):
|
||||
"""
|
||||
Extracts parameters from a formatted input string.
|
||||
|
||||
Args:
|
||||
- line (str): The string containing the parameters to extract.
|
||||
|
||||
Returns:
|
||||
dict or None: A dictionary containing the extracted parameters with their respective values.
|
||||
"""
|
||||
params = {}
|
||||
matches = re.findall(r'([A-Z\-]+)=("[^"]*"|[^",\s]*)', line)
|
||||
if not matches:
|
||||
return None
|
||||
for match in matches:
|
||||
param, value = match
|
||||
params[param] = value.strip('"')
|
||||
return params
|
||||
|
||||
def _parse_key(line):
|
||||
"""
|
||||
Parses the #EXT-X-KEY line and extracts key attributes.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-KEY line from the playlist.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the key attributes.
|
||||
"""
|
||||
params = ATTRIBUTELISTPATTERN.split(line.replace(protocol.ext_x_key + ':', ''))[1::2]
|
||||
key = {}
|
||||
for param in params:
|
||||
name, value = param.split('=', 1)
|
||||
key[normalize_attribute(name)] = remove_quotes(value)
|
||||
return key
|
||||
|
||||
def _parse_extinf(line, data, state):
|
||||
"""
|
||||
Parses the #EXTINF line and extracts segment duration and title.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXTINF line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
duration, title = line.replace(protocol.extinf + ':', '').split(',')
|
||||
state['segment'] = {'duration': float(duration), 'title': remove_quotes(title)}
|
||||
|
||||
def _parse_ts_chunk(line, data, state):
|
||||
"""
|
||||
Parses a segment URI line and adds it to the segment list.
|
||||
|
||||
Args:
|
||||
line (str): The segment URI line from the playlist.
|
||||
data (dict): The dictionary to store the parsed data.
|
||||
state (dict): The parsing state.
|
||||
"""
|
||||
segment = state.pop('segment')
|
||||
if state.get('current_program_date_time'):
|
||||
segment['program_date_time'] = state['current_program_date_time']
|
||||
state['current_program_date_time'] += datetime.timedelta(seconds=segment['duration'])
|
||||
segment['uri'] = line
|
||||
segment['discontinuity'] = state.pop('discontinuity', False)
|
||||
if state.get('current_key'):
|
||||
segment['key'] = state['current_key']
|
||||
data['segments'].append(segment)
|
||||
|
||||
def _parse_attribute_list(prefix, line, atribute_parser):
|
||||
"""
|
||||
Parses a line containing a list of attributes and their values.
|
||||
|
||||
Args:
|
||||
- prefix (str): The prefix to identify the line.
|
||||
- line (str): The line containing the attributes.
|
||||
- atribute_parser (dict): A dictionary mapping attribute names to parsing functions.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the parsed attributes.
|
||||
"""
|
||||
params = ATTRIBUTELISTPATTERN.split(line.replace(prefix + ':', ''))[1::2]
|
||||
|
||||
attributes = {}
|
||||
for param in params:
|
||||
name, value = param.split('=', 1)
|
||||
name = normalize_attribute(name)
|
||||
|
||||
if name in atribute_parser:
|
||||
value = atribute_parser[name](value)
|
||||
|
||||
attributes[name] = value
|
||||
|
||||
return attributes
|
||||
|
||||
def _parse_stream_inf(line, data, state):
|
||||
"""
|
||||
Parses the #EXT-X-STREAM-INF line and extracts stream information.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-STREAM-INF line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
data['is_variant'] = True
|
||||
atribute_parser = remove_quotes_parser('codecs', 'audio', 'video', 'subtitles')
|
||||
atribute_parser["program_id"] = int
|
||||
atribute_parser["bandwidth"] = int
|
||||
state['stream_info'] = _parse_attribute_list(protocol.ext_x_stream_inf, line, atribute_parser)
|
||||
|
||||
def _parse_i_frame_stream_inf(line, data):
|
||||
"""
|
||||
Parses the #EXT-X-I-FRAME-STREAM-INF line and extracts I-frame stream information.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-I-FRAME-STREAM-INF line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
"""
|
||||
atribute_parser = remove_quotes_parser('codecs', 'uri')
|
||||
atribute_parser["program_id"] = int
|
||||
atribute_parser["bandwidth"] = int
|
||||
iframe_stream_info = _parse_attribute_list(protocol.ext_x_i_frame_stream_inf, line, atribute_parser)
|
||||
iframe_playlist = {'uri': iframe_stream_info.pop('uri'),
|
||||
'iframe_stream_info': iframe_stream_info}
|
||||
|
||||
data['iframe_playlists'].append(iframe_playlist)
|
||||
|
||||
def _parse_media(line, data, state):
|
||||
"""
|
||||
Parses the #EXT-X-MEDIA line and extracts media attributes.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-MEDIA line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
quoted = remove_quotes_parser('uri', 'group_id', 'language', 'name', 'characteristics')
|
||||
media = _parse_attribute_list(protocol.ext_x_media, line, quoted)
|
||||
data['media'].append(media)
|
||||
|
||||
def _parse_variant_playlist(line, data, state):
|
||||
"""
|
||||
Parses a variant playlist line and extracts playlist information.
|
||||
|
||||
Args:
|
||||
- line (str): The variant playlist line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
playlist = {'uri': line, 'stream_info': state.pop('stream_info')}
|
||||
|
||||
data['playlists'].append(playlist)
|
||||
|
||||
def _parse_byterange(line, state):
|
||||
"""
|
||||
Parses the #EXT-X-BYTERANGE line and extracts byte range information.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-BYTERANGE line from the playlist.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
state['segment']['byterange'] = line.replace(protocol.ext_x_byterange + ':', '')
|
||||
|
||||
def _parse_simple_parameter_raw_value(line, cast_to=str, normalize=False):
|
||||
"""
|
||||
Parses a line containing a simple parameter and its value.
|
||||
|
||||
Args:
|
||||
- line (str): The line containing the parameter and its value.
|
||||
- cast_to (type): The type to which the value should be cast.
|
||||
- normalize (bool): Whether to normalize the parameter name.
|
||||
|
||||
Returns:
|
||||
tuple: A tuple containing the parameter name and its value.
|
||||
"""
|
||||
param, value = line.split(':', 1)
|
||||
param = normalize_attribute(param.replace('#EXT-X-', ''))
|
||||
if normalize:
|
||||
value = normalize_attribute(value)
|
||||
return param, cast_to(value)
|
||||
|
||||
def _parse_and_set_simple_parameter_raw_value(line, data, cast_to=str, normalize=False):
|
||||
"""
|
||||
Parses a line containing a simple parameter and its value, and sets it in the data dictionary.
|
||||
|
||||
Args:
|
||||
- line (str): The line containing the parameter and its value.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- cast_to (type): The type to which the value should be cast.
|
||||
- normalize (bool): Whether to normalize the parameter name.
|
||||
|
||||
Returns:
|
||||
The parsed value.
|
||||
"""
|
||||
param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize)
|
||||
data[param] = value
|
||||
return data[param]
|
||||
|
||||
def _parse_simple_parameter(line, data, cast_to=str):
|
||||
"""
|
||||
Parses a line containing a simple parameter and its value, and sets it in the data dictionary.
|
||||
|
||||
Args:
|
||||
line (str): The line containing the parameter and its value.
|
||||
data (dict): The dictionary to store the parsed data.
|
||||
cast_to (type): The type to which the value should be cast.
|
||||
|
||||
Returns:
|
||||
The parsed value.
|
||||
"""
|
||||
return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True)
|
@ -1,17 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
ext_x_targetduration = '#EXT-X-TARGETDURATION'
|
||||
ext_x_media_sequence = '#EXT-X-MEDIA-SEQUENCE'
|
||||
ext_x_program_date_time = '#EXT-X-PROGRAM-DATE-TIME'
|
||||
ext_x_media = '#EXT-X-MEDIA'
|
||||
ext_x_playlist_type = '#EXT-X-PLAYLIST-TYPE'
|
||||
ext_x_key = '#EXT-X-KEY'
|
||||
ext_x_stream_inf = '#EXT-X-STREAM-INF'
|
||||
ext_x_version = '#EXT-X-VERSION'
|
||||
ext_x_allow_cache = '#EXT-X-ALLOW-CACHE'
|
||||
ext_x_endlist = '#EXT-X-ENDLIST'
|
||||
extinf = '#EXTINF'
|
||||
ext_i_frames_only = '#EXT-X-I-FRAMES-ONLY'
|
||||
ext_x_byterange = '#EXT-X-BYTERANGE'
|
||||
ext_x_i_frame_stream_inf = '#EXT-X-I-FRAME-STREAM-INF'
|
||||
ext_x_discontinuity = '#EXT-X-DISCONTINUITY'
|
@ -10,7 +10,7 @@ from Src.Util.console import console
|
||||
|
||||
|
||||
# External library
|
||||
import requests
|
||||
import httpx
|
||||
|
||||
|
||||
# Variable
|
||||
@ -28,9 +28,10 @@ def update():
|
||||
|
||||
# Make the GitHub API requests and handle potential errors
|
||||
try:
|
||||
response_reposity = requests.get(f"https://api.github.com/repos/{repo_user}/{repo_name}").json()
|
||||
response_releases = requests.get(f"https://api.github.com/repos/{repo_user}/{repo_name}/releases").json()
|
||||
except requests.RequestException as e:
|
||||
response_reposity = httpx.get(f"https://api.github.com/repos/{repo_user}/{repo_name}").json()
|
||||
response_releases = httpx.get(f"https://api.github.com/repos/{repo_user}/{repo_name}/releases").json()
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[red]Error accessing GitHub API: {e}")
|
||||
return
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
# 4.04.24
|
||||
|
||||
import re
|
||||
import random
|
||||
import logging
|
||||
|
||||
|
||||
@ -11,6 +13,133 @@ import fake_useragent
|
||||
useragent = fake_useragent.UserAgent()
|
||||
|
||||
|
||||
def extract_versions(user_agent):
|
||||
"""
|
||||
Extract browser versions from the user agent.
|
||||
|
||||
Args:
|
||||
user_agent (str): User agent of the browser.
|
||||
|
||||
Returns:
|
||||
list: List of browser versions.
|
||||
"""
|
||||
|
||||
# Patterns to extract versions from various user agents
|
||||
patterns = {
|
||||
'chrome': re.compile(r'Chrome/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
|
||||
'firefox': re.compile(r'Firefox/(\d+)\.?(\d+)?\.?(\d+)?'),
|
||||
'safari': re.compile(r'Version/(\d+)\.(\d+)\.(\d+) Safari/(\d+)\.(\d+)\.(\d+)'),
|
||||
'edge': re.compile(r'Edg/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
|
||||
'edgios': re.compile(r'EdgiOS/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
|
||||
'crios': re.compile(r'CriOS/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
|
||||
}
|
||||
|
||||
for key, pattern in patterns.items():
|
||||
match = pattern.search(user_agent)
|
||||
if match:
|
||||
return [match.group(i+1) for i in range(match.lastindex)]
|
||||
|
||||
# Fallback values if specific versions are not found
|
||||
return ['99', '0', '0', '0']
|
||||
|
||||
def get_platform(user_agent):
|
||||
"""
|
||||
Determine the device platform from the user agent.
|
||||
|
||||
Args:
|
||||
user_agent (str): User agent of the browser.
|
||||
|
||||
Returns:
|
||||
str: Device platform.
|
||||
"""
|
||||
if 'Windows' in user_agent:
|
||||
return '"Windows"'
|
||||
elif 'Mac OS X' in user_agent:
|
||||
return '"macOS"'
|
||||
elif 'Android' in user_agent:
|
||||
return '"Android"'
|
||||
elif 'iPhone' in user_agent or 'iPad' in user_agent:
|
||||
return '"iOS"'
|
||||
elif 'Linux' in user_agent:
|
||||
return '"Linux"'
|
||||
return '"Unknown"'
|
||||
|
||||
def get_model(user_agent):
|
||||
"""
|
||||
Determine the device model from the user agent.
|
||||
|
||||
Args:
|
||||
user_agent (str): User agent of the browser.
|
||||
|
||||
Returns:
|
||||
str: Device model.
|
||||
"""
|
||||
if 'iPhone' in user_agent:
|
||||
return '"iPhone"'
|
||||
elif 'iPad' in user_agent:
|
||||
return '"iPad"'
|
||||
elif 'Android' in user_agent:
|
||||
return '"Android"'
|
||||
elif 'Windows' in user_agent:
|
||||
return '"PC"'
|
||||
elif 'Mac OS X' in user_agent:
|
||||
return '"Mac"'
|
||||
elif 'Linux' in user_agent:
|
||||
return '"Linux"'
|
||||
return '"Unknown"'
|
||||
|
||||
def random_headers(referer: str = None):
|
||||
"""
|
||||
Generate random HTTP headers to simulate human-like behavior.
|
||||
|
||||
Returns:
|
||||
dict: Generated HTTP headers.
|
||||
"""
|
||||
user_agent = useragent.random
|
||||
versions = extract_versions(user_agent)
|
||||
platform = get_platform(user_agent)
|
||||
model = get_model(user_agent)
|
||||
is_mobile = 'Mobi' in user_agent or 'Android' in user_agent
|
||||
|
||||
# Generate sec-ch-ua string based on the browser
|
||||
if 'Chrome' in user_agent or 'CriOS' in user_agent:
|
||||
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Chromium";v="{versions[0]}", "Google Chrome";v="{versions[0]}"'
|
||||
elif 'Edg' in user_agent or 'EdgiOS' in user_agent:
|
||||
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Chromium";v="{versions[0]}", "Microsoft Edge";v="{versions[0]}"'
|
||||
elif 'Firefox' in user_agent:
|
||||
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Firefox";v="{versions[0]}"'
|
||||
elif 'Safari' in user_agent:
|
||||
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Safari";v="{versions[0]}"'
|
||||
else:
|
||||
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}"'
|
||||
|
||||
headers = {
|
||||
'User-Agent': user_agent,
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': random.choice(['en-US', 'en-GB', 'fr-FR', 'es-ES', 'de-DE']),
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'Connection': 'keep-alive',
|
||||
'Upgrade-Insecure-Requests': '1',
|
||||
'Sec-Fetch-Dest': 'document',
|
||||
'Sec-Fetch-Mode': 'navigate',
|
||||
'Sec-Fetch-Site': 'none',
|
||||
'Sec-Fetch-User': '?1',
|
||||
'Cache-Control': 'max-age=0',
|
||||
'TE': 'Trailers',
|
||||
'Pragma': 'no-cache',
|
||||
'DNT': '1',
|
||||
'sec-ch-ua-mobile': '?1' if is_mobile else '?0',
|
||||
'sec-ch-ua-platform': platform,
|
||||
'sec-ch-ua': sec_ch_ua,
|
||||
'sec-ch-ua-model': model
|
||||
}
|
||||
|
||||
if referer:
|
||||
headers['Origin'] = referer
|
||||
headers['Referer'] = referer
|
||||
|
||||
return headers
|
||||
|
||||
def get_headers() -> str:
|
||||
"""
|
||||
Generate a random user agent to use in HTTP requests.
|
||||
@ -20,4 +149,4 @@ def get_headers() -> str:
|
||||
"""
|
||||
|
||||
# Get a random user agent string from the user agent rotator
|
||||
return useragent.firefox
|
||||
return useragent.random
|
||||
|
@ -1,6 +1,6 @@
|
||||
# 29.04.24
|
||||
|
||||
import requests
|
||||
import httpx
|
||||
import json
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
@ -22,10 +22,10 @@ preference_registry = ['Verisign', 'KSregistry', 'KNET']
|
||||
def scrape_new_gtld_applications(url):
|
||||
|
||||
# Send a GET request to the URL
|
||||
response = requests.get(url)
|
||||
response = httpx.get(url)
|
||||
|
||||
# Check if the response is successful
|
||||
if response.ok:
|
||||
if response.status_code == 200:
|
||||
|
||||
# Parse the HTML content of the page
|
||||
soup = BeautifulSoup(response.content, 'html.parser')
|
||||
|
32
config.json
32
config.json
@ -11,9 +11,13 @@
|
||||
},
|
||||
"REQUESTS": {
|
||||
"timeout": 5,
|
||||
"max_retry": 3,
|
||||
"verify_ssl": false,
|
||||
"index": {"user-agent": ""},
|
||||
"segments": { "user-agent": ""},
|
||||
"index": {
|
||||
"user-agent": ""
|
||||
},
|
||||
"proxy_start_min": 0.1,
|
||||
"proxy_start_max": 0.4,
|
||||
"proxy": []
|
||||
},
|
||||
"M3U8_DOWNLOAD": {
|
||||
@ -23,29 +27,35 @@
|
||||
"download_video": true,
|
||||
"download_audio": true,
|
||||
"merge_audio": true,
|
||||
"specific_list_audio": ["ita"],
|
||||
"specific_list_audio": [
|
||||
"ita"
|
||||
],
|
||||
"download_sub": true,
|
||||
"merge_subs": true,
|
||||
"specific_list_subtitles": ["eng", "spa"],
|
||||
"specific_list_subtitles": [
|
||||
"eng",
|
||||
"spa"
|
||||
],
|
||||
"cleanup_tmp_folder": true,
|
||||
"create_report": false
|
||||
},
|
||||
"M3U8_CONVERSION": {
|
||||
"use_codec": false,
|
||||
"use_vcodec": true,
|
||||
"use_acodec": true,
|
||||
"use_bitrate": true,
|
||||
"use_gpu": true,
|
||||
"use_vcodec": true,
|
||||
"use_acodec": true,
|
||||
"use_bitrate": true,
|
||||
"use_gpu": false,
|
||||
"default_preset": "ultrafast",
|
||||
"check_output_after_ffmpeg": false
|
||||
},
|
||||
"M3U8_PARSER": {
|
||||
"skip_empty_row_playlist": false,
|
||||
"force_resolution": -1
|
||||
},
|
||||
"SITE": {
|
||||
"streamingcommunity": "foo",
|
||||
"animeunity": "to",
|
||||
"altadefinizione": "vodka"
|
||||
"altadefinizione": "vodka",
|
||||
"guardaserie": "ceo",
|
||||
"ddlstreamitaly": "co"
|
||||
}
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
requests
|
||||
httpx
|
||||
bs4
|
||||
rich
|
||||
tqdm
|
||||
m3u8
|
||||
unidecode
|
||||
fake-useragent
|
113
run.py
113
run.py
@ -1,9 +1,11 @@
|
||||
# 10.12.23
|
||||
|
||||
import sys
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
import platform
|
||||
import argparse
|
||||
import importlib
|
||||
|
||||
from typing import Callable
|
||||
|
||||
@ -17,18 +19,11 @@ from Src.Util.os import get_system_summary
|
||||
from Src.Util.logger import Logger
|
||||
|
||||
|
||||
# Internal api
|
||||
from Src.Api.Streamingcommunity import main_film_series as streamingcommunity_film_serie
|
||||
from Src.Api.Animeunity import main_anime as streamingcommunity_anime
|
||||
from Src.Api.Altadefinizione import main_film as altadefinizione_film
|
||||
from Src.Api.Ddlstreamitaly import title_search as ddlstreamitaly_film_serie
|
||||
from Src.Api.Guardaserie import title_search as guardaserie_serie
|
||||
|
||||
|
||||
# Config
|
||||
CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close')
|
||||
|
||||
|
||||
|
||||
def run_function(func: Callable[..., None], close_console: bool = False) -> None:
|
||||
"""
|
||||
Run a given function indefinitely or once, depending on the value of close_console.
|
||||
@ -44,6 +39,56 @@ def run_function(func: Callable[..., None], close_console: bool = False) -> None
|
||||
func()
|
||||
|
||||
|
||||
def load_search_functions():
|
||||
|
||||
loaded_functions = {}
|
||||
|
||||
# Traverse the Api directory
|
||||
api_dir = os.path.join(os.path.dirname(__file__), 'Src', 'Api')
|
||||
init_files = glob.glob(os.path.join(api_dir, '*', '__init__.py'))
|
||||
|
||||
modules = []
|
||||
|
||||
# Retrieve modules and their indices
|
||||
for init_file in init_files:
|
||||
module_name = os.path.basename(os.path.dirname(init_file)) # Get folder name as module name
|
||||
|
||||
try:
|
||||
# Dynamically import the module
|
||||
mod = importlib.import_module(f'Src.Api.{module_name}')
|
||||
|
||||
# Get 'indice' from the module
|
||||
indice = getattr(mod, 'indice', 0) # If 'indice' is not defined, default to 0
|
||||
|
||||
# Add module and indice to the list
|
||||
modules.append((module_name, indice))
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[red]Failed to import module {module_name}: {str(e)}")
|
||||
|
||||
# Sort modules by 'indice'
|
||||
modules.sort(key=lambda x: x[1])
|
||||
|
||||
# Load search functions in the sorted order
|
||||
for module_name, _ in modules:
|
||||
module_alias = f'{module_name}_search' # Construct a unique alias for the module
|
||||
|
||||
try:
|
||||
# Dynamically import the module
|
||||
mod = importlib.import_module(f'Src.Api.{module_name}')
|
||||
|
||||
# Get the search function from the module (assuming the function is named 'search' and defined in __init__.py)
|
||||
search_function = getattr(mod, 'search')
|
||||
|
||||
# Add the function to the loaded functions dictionary
|
||||
loaded_functions[module_alias] = search_function
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[red]Failed to load search function from module {module_name}: {str(e)}")
|
||||
|
||||
return loaded_functions
|
||||
|
||||
|
||||
def initialize():
|
||||
"""
|
||||
Initialize the application.
|
||||
@ -75,59 +120,49 @@ def initialize():
|
||||
|
||||
|
||||
def main():
|
||||
# Load search functions
|
||||
search_functions = load_search_functions()
|
||||
|
||||
initialize()
|
||||
|
||||
# Parse command line arguments
|
||||
# Create dynamic argument parser
|
||||
parser = argparse.ArgumentParser(description='Script to download film and series from the internet.')
|
||||
parser.add_argument('-sa', '--streaming_anime', action='store_true', help='')
|
||||
parser.add_argument('-sf', '--streaming_film', action='store_true', help='')
|
||||
|
||||
# Add dynamic arguments based on loaded search modules
|
||||
for alias in search_functions.keys():
|
||||
short_option = alias[:3].upper() # Take the first three letters of the alias in uppercase
|
||||
long_option = alias # Use the full alias as the full option name
|
||||
parser.add_argument(f'-{short_option}', f'--{long_option}', action='store_true', help=f'Search for {alias.split("_")[0]} on streaming platforms.')
|
||||
|
||||
# Parse command line arguments
|
||||
args = parser.parse_args()
|
||||
|
||||
# Mapping command-line arguments to functions
|
||||
arg_to_function = {
|
||||
'streaming_anime': streamingcommunity_anime,
|
||||
'streaming_film': streamingcommunity_film_serie,
|
||||
}
|
||||
arg_to_function = {alias: search_functions[alias] for alias in search_functions.keys()}
|
||||
|
||||
# Check which argument is provided and run the corresponding function
|
||||
for arg, func in arg_to_function.items():
|
||||
if getattr(args, arg):
|
||||
run_function(func, CLOSE_CONSOLE)
|
||||
run_function(func)
|
||||
return
|
||||
|
||||
# Mapping user input to functions
|
||||
input_to_function = {
|
||||
'0': streamingcommunity_film_serie,
|
||||
'1': streamingcommunity_anime,
|
||||
'2': altadefinizione_film,
|
||||
'3': ddlstreamitaly_film_serie,
|
||||
'4': guardaserie_serie,
|
||||
}
|
||||
input_to_function = {str(i): search_functions[alias] for i, alias in enumerate(search_functions.keys())}
|
||||
|
||||
# Create dynamic prompt message and choices
|
||||
choices = list(input_to_function.keys())
|
||||
choice_labels = {
|
||||
'0': "Streamingcommunity",
|
||||
'1': "Animeunity",
|
||||
'2': "Altadefinizione",
|
||||
'3': "Ddlstreamitaly",
|
||||
'4': "Guardaserie",
|
||||
}
|
||||
prompt_message = "[cyan]Insert category [white](" + ", ".join(
|
||||
f"[red]{key}[white]: [bold magenta]{label}[white]" for key, label in choice_labels.items()
|
||||
) + ")[white]:[/cyan]"
|
||||
choice_labels = {str(i): alias.split("_")[0].capitalize() for i, alias in enumerate(search_functions.keys())}
|
||||
prompt_message = f"Insert category [white]({', '.join([f'[red]{key}: [magenta]{label}' for key, label in choice_labels.items()])}[white]): "
|
||||
|
||||
# Ask the user for input
|
||||
category = msg.ask(prompt_message, choices=choices, default="0")
|
||||
category = msg.ask(prompt_message, choices=list(choice_labels.keys()), default="0")
|
||||
|
||||
# Run the corresponding function based on user input
|
||||
if category in input_to_function:
|
||||
run_function(input_to_function[category], CLOSE_CONSOLE)
|
||||
run_function(input_to_function[category])
|
||||
else:
|
||||
console.print("[red]Invalid category.")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
initialize()
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user