Fix some bug

This commit is contained in:
Lovi 2025-02-15 15:26:54 +01:00
parent f047f0f7a3
commit 93f50b7d89
15 changed files with 95 additions and 386 deletions

View File

@ -564,14 +564,13 @@ python3 telegram_bot.py
| Website | Status | Command |
|:-------------------|:------:|:--------:|
| [1337xx](https://1337xx.to/) | ✅ | -133 |
| [AltadefinizioneGratis](https://altadefinizionegratis.pro/) | ✅ | -ALT |
| [AnimeUnity](https://animeunity.so/) | ✅ | -ANI |
| [Ilcorsaronero](https://ilcorsaronero.link/) | ✅ | `-ILC` |
| [CB01New](https://cb01new.gold/) | ✅ | -CB0 |
| [DDLStreamItaly](https://ddlstreamitaly.co/) | ✅ | -DDL |
| [GuardaSerie](https://guardaserie.now/) | ✅ | -GUA |
| [MostraGuarda](https://mostraguarda.stream/) | ✅ | -MOS |
| [StreamingCommunity](https://streamingcommunity.paris/) | ✅ | -STR |
| [StreamingCommunity](https://streamingcommunity.lu/) | ✅ | -STR |
# Tutorials

View File

@ -1,6 +1,6 @@
# 23.11.24
from typing import Dict, Any, List, Union
from typing import Dict, Any, List, Union, List, Optional
class Episode:
@ -65,6 +65,25 @@ class EpisodeManager:
return f"EpisodeManager(num_episodes={len(self.episodes)})"
class SeasonData:
def __init__(self, data: Dict[str, Any]):
self.id: int = data.get('id', 0)
self.number: int = data.get('number', 0)
def __str__(self):
return f"SeasonData(id={self.id}, number={self.number}, name='{self.name}'"
class SeasonManager:
def __init__(self):
self.seasons: List[SeasonData] = []
def add_season(self, season_data):
season = SeasonData(season_data)
self.seasons.append(season)
def get_season_by_number(self, number: int) -> Optional[Dict]:
return self.seasons[number]
class Season:
def __init__(self, season_data: Dict[str, Union[int, str, None]]):
self.season_data = season_data
@ -78,7 +97,12 @@ class Season:
self.plot: str = season_data.get('plot', '')
self.type: str = season_data.get('type', '')
self.seasons_count: int = season_data.get('seasons_count', 0)
self.episodes: EpisodeManager = EpisodeManager()
self.seasonsData: SeasonManager = SeasonManager()
for element in season_data['seasons']:
self.seasonsData.add_season(element)
class Stream:
@ -133,4 +157,4 @@ class WindowParameter:
self.url = data.get('url')
def __str__(self):
return (f"WindowParameter(token='{self.token}', expires='{self.expires}', url='{self.url}', data={self.data})")
return (f"WindowParameter(token='{self.token}', expires='{self.expires}', url='{self.url}', data={self.data})")

View File

@ -1,76 +0,0 @@
# 26.05.24
import sys
import subprocess
from urllib.parse import quote_plus
# Internal utilities
from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from .site import title_search, media_search_manager, table_show_manager
from .film import download_film
# Variable
indice = 2
_useFor = "film"
_deprecate = False
_priority = 2
_engineDownload = "hls"
def search(string_to_search: str = None, get_onylDatabase: bool = False):
"""
Main function of the application for film and series.
"""
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
if string_to_search is None:
# Chiedi la scelta all'utente con il bot Telegram
string_to_search = bot.ask(
"key_search",
f"Inserisci la parola da cercare\noppure 🔙 back per tornare alla scelta: ",
None
)
if string_to_search == 'back':
# Riavvia lo script
# Chiude il processo attuale e avvia una nuova istanza dello script
subprocess.Popen([sys.executable] + sys.argv)
sys.exit()
else:
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{site_constant.SITE_NAME}").strip()
# Search on database
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list
select_title = get_select_title(table_show_manager, media_search_manager)
# Download only film
download_film(select_title)
else:
if site_constant.TELEGRAM_BOT:
bot.send_message(f"Nessun risultato trovato riprova", None)
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()

View File

@ -1,76 +0,0 @@
# 26.05.24
import os
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import HLS_Downloader
from StreamingCommunity.TelegramHelp.telegram_bot import TelegramSession, get_bot_instance
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Player
from StreamingCommunity.Api.Player.supervideo import VideoSource
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided film ID, title name, and domain.
Parameters:
- title_name (str): The name of the film title.
- url (str): The url of the video
Return:
- str: output path
"""
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
bot.send_message(f"Download in corso:\n{select_title.name}", None)
# Get script_id
script_id = TelegramSession.get_session()
if script_id != "unknown":
TelegramSession.updateScriptId(script_id, select_title.name)
# Start message and display film information
start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n")
# Set domain and media ID for the video source
video_source = VideoSource(select_title.url)
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4"
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
r_proc = HLS_Downloader(
m3u8_url=master_playlist,
output_path=os.path.join(mp4_path, title_name)
).start()
if site_constant.TELEGRAM_BOT:
# Delete script_id
script_id = TelegramSession.get_session()
if script_id != "unknown":
TelegramSession.deleteScriptId(script_id)
if "error" in r_proc.keys():
try:
os.remove(r_proc['path'])
except:
pass
return r_proc['path']

View File

@ -1,109 +0,0 @@
# 26.05.24
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
disable_searchDomain = config_manager.get_bool("DEFAULT", "disable_searchDomain")
# Telegram bot instance
from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
def title_search(title_search: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
Returns:
int: The number of titles found.
"""
if TELEGRAM_BOT:
bot = get_bot_instance()
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
# Send request to search for title
client = httpx.Client()
try:
response = client.get(
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/?story={title_search.replace(' ', '+')}&do=search&subaction=search&titleonly=3",
headers={'User-Agent': get_headers()},
timeout=max_timeout
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
raise
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
# Inizializza la lista delle scelte
if TELEGRAM_BOT:
choices = []
for row in soup.find_all('div', class_='col-lg-3 col-md-3 col-xs-4'):
try:
title_element = row.find('h2', class_='titleFilm').find('a')
title = title_element.get_text(strip=True)
link = title_element['href']
imdb_element = row.find('div', class_='imdb-rate')
imdb_rating = imdb_element.get_text(strip=True).split(":")[-1]
film_info = {
'name': title,
'url': link,
'score': imdb_rating
}
media_search_manager.add_media(film_info)
if TELEGRAM_BOT:
# Crea una stringa formattata per ogni scelta con numero
choice_text = f"{len(choices)} - {film_info.get('name')} ({film_info.get('url')}) {film_info.get('score')}"
choices.append(choice_text)
except AttributeError as e:
print(f"Error parsing a film entry: {e}")
if TELEGRAM_BOT:
if choices:
bot.send_message(f"Lista dei risultati:", choices)
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -1,7 +1,6 @@
# 17.09.24
import os
import sys
import logging
@ -15,6 +14,7 @@ from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.table import TVShowManager, get_call_stack
from StreamingCommunity.Lib.Downloader import HLS_Downloader
@ -53,8 +53,13 @@ def download_film(movie_details: Json_film) -> str:
raise
if "not found" in str(response.text):
logging.error(f"Cant find in the server, Element: {movie_details}")
raise
logging.error(f"Cant find in the server: {movie_details.title}.")
research_func = next((
f for f in get_call_stack()
if f['function'] == 'search' and f['script'] == '__init__.py'
), None)
TVShowManager.run_back_command(research_func)
# Extract supervideo url
soup = BeautifulSoup(response.text, "html.parser")

View File

@ -13,7 +13,7 @@ from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from .site import get_version_and_domain, title_search, table_show_manager, media_search_manager
from .site import title_search, table_show_manager, media_search_manager
from .film import download_film
from .series import download_series
@ -52,9 +52,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{site_constant.SITE_NAME}").strip()
# Get site domain and version and get result of the search
site_version, domain = get_version_and_domain()
len_database = title_search(quote_plus(string_to_search), domain)
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -66,7 +64,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
select_title = get_select_title(table_show_manager, media_search_manager)
if select_title.type == 'tv':
download_series(select_title, site_version)
download_series(select_title)
else:
download_film(select_title)

View File

@ -132,14 +132,13 @@ def download_episode(index_season_selected: int, scrape_serie: ScrapeSerie, vide
if stopped:
break
def download_series(select_season: MediaItem, version: str) -> None:
def download_series(select_season: MediaItem) -> None:
"""
Download episodes of a TV series based on user selection.
Parameters:
- select_season (MediaItem): Selected media item (TV series).
- domain (str): Domain from which to download.
- version (str): Version of the site.
"""
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
@ -152,7 +151,7 @@ def download_series(select_season: MediaItem, version: str) -> None:
video_source = VideoSource(site_constant.SITE_NAME, True)
# Setup video source
scrape_serie.setup(version, select_season.id, select_season.slug)
scrape_serie.setup(select_season.id, select_season.slug)
video_source.setup(select_season.id)
# Collect information about seasons
@ -194,11 +193,11 @@ def download_series(select_season: MediaItem, version: str) -> None:
if len(list_season_select) > 1 or index_season_selected == "*":
# Download all episodes if multiple seasons are selected or if '*' is used
download_episode(i_season, scrape_serie, video_source, download_all=True)
download_episode(scrape_serie.season_manager.seasonsData.get_season_by_number(i_season-1).number, scrape_serie, video_source, download_all=True)
else:
# Otherwise, let the user select specific episodes for the single season
download_episode(i_season, scrape_serie, video_source, download_all=False)
download_episode(scrape_serie.season_manager.seasonsData.get_season_by_number(i_season-1).number, scrape_serie, video_source, download_all=False)
if site_constant.TELEGRAM_BOT:
bot.send_message(f"Finito di scaricare tutte le serie e episodi", None)

View File

@ -1,13 +1,8 @@
# 10.12.23
import json
import logging
import secrets
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
@ -32,72 +27,21 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
disable_searchDomain = config_manager.get_bool("DEFAULT", "disable_searchDomain")
def get_version(domain: str):
def title_search(title_search: str) -> int:
"""
Extracts the version from the HTML text of a webpage.
Search for titles based on a search query.
Parameters:
- domain (str): The domain of the site.
- title_search (str): The title to search for.
Returns:
str: The version extracted from the webpage.
"""
try:
response = httpx.get(
url=f"https://{site_constant.SITE_NAME}.{domain}/",
headers={'User-Agent': get_headers()},
timeout=max_timeout
)
response.raise_for_status()
# Parse request to site
soup = BeautifulSoup(response.text, "html.parser")
# Extract version
version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version']
#console.print(f"[cyan]Get version [white]=> [red]{version} \n")
return version
except Exception as e:
logging.error(f"Error extracting version: {e}")
raise
def get_version_and_domain():
"""
Retrieve the current version and domain of the site.
This function performs the following steps:
- Determines the correct domain to use for the site by searching for a specific meta tag.
- Fetches the content of the site to extract the version information.
int: The number of titles found.
"""
domain_to_use = site_constant
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
try:
version = get_version(domain_to_use)
except:
#console.print("[green]Auto generate version ...")
#version = secrets.token_hex(32 // 2)
version = None
return version, domain_to_use
def title_search(title_search: str, domain: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
- domain (str): The domain to search on.
Returns:
int: The number of titles found.
"""
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
@ -106,7 +50,7 @@ def title_search(title_search: str, domain: str) -> int:
try:
response = httpx.get(
url=f"https://{site_constant.SITE_NAME}.{domain}/api/search?q={title_search.replace(' ', '+')}",
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/api/search?q={title_search.replace(' ', '+')}",
headers={'user-agent': get_headers()},
timeout=max_timeout
)

View File

@ -32,16 +32,14 @@ class ScrapeSerie:
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
def setup(self, version: str = None, media_id: int = None, series_name: str = None):
def setup(self, media_id: int = None, series_name: str = None):
"""
Set up the scraper with specific media details.
Args:
version (str, optional): Site version for request headers
media_id (int, optional): Unique identifier for the media
series_name (str, optional): Name of the TV series
"""
self.version = version
self.media_id = media_id
# If series name is provided, initialize series-specific managers
@ -70,18 +68,6 @@ class ScrapeSerie:
soup = BeautifulSoup(response.text, "html.parser")
json_response = json.loads(soup.find("div", {"id": "app"}).get("data-page"))
self.version = json_response['version']
"""
response = httpx.post(
url=f'https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}',
headers={'User-Agent': get_headers()}
)
response.raise_for_status()
# Extract seasons from JSON response
json_response = response.json()
"""
# Collect info about season
self.season_manager = Season(json_response.get("props").get("title"))

View File

@ -6,7 +6,7 @@ from typing import List
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.table import TVShowManager
@ -47,28 +47,33 @@ def manage_selection(cmd_insert: str, max_count: int) -> List[int]:
Returns:
list_selection (List[int]): List of selected items.
"""
list_selection = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
while True:
list_selection = []
logging.info(f"Command insert: {cmd_insert}, end index: {max_count + 1}")
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_selection.append(int(cmd_insert))
# For a single number (e.g., '5')
if cmd_insert.isnumeric():
list_selection.append(int(cmd_insert))
break
# For a range (e.g., '5-12')
elif "-" in cmd_insert:
start, end = map(str.strip, cmd_insert.split('-'))
start = int(start)
end = int(end) if end.isnumeric() else max_count
# For a range (e.g., '5-12')
elif "-" in cmd_insert:
try:
start, end = map(str.strip, cmd_insert.split('-'))
start = int(start)
end = int(end) if end.isnumeric() else max_count
list_selection = list(range(start, end + 1))
break
except ValueError:
pass
list_selection = list(range(start, end + 1))
# For all items ('*')
elif cmd_insert == "*":
list_selection = list(range(1, max_count + 1))
else:
raise ValueError("Invalid input format")
# For all items ('*')
elif cmd_insert == "*":
list_selection = list(range(1, max_count + 1))
break
cmd_insert = msg.ask("[red]Invalid input. Please enter a valid command: ")
logging.info(f"List return: {list_selection}")
return list_selection

View File

@ -8,6 +8,14 @@ from pathlib import Path
from typing import Any, List
# External library
from rich.console import Console
# Variable
console = Console()
class ConfigManager:
def __init__(self, file_name: str = 'config.json') -> None:
"""Initialize the ConfigManager.
@ -15,9 +23,14 @@ class ConfigManager:
Parameters:
- file_path (str, optional): The path to the configuration file. Default is 'config.json'.
"""
self.file_path = Path(__file__).parent.parent.parent / file_name
if getattr(sys, 'frozen', False):
base_path = Path(sys._MEIPASS)
else:
base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
self.file_path = os.path.join(base_path, file_name)
self.config = {}
self.cache = {}
console.print(f"[green]Configuration file path: {self.file_path}[/green]")
def read_config(self) -> None:
"""Read the configuration file."""

View File

@ -85,8 +85,9 @@ class TVShowManager:
table.add_row(*row_data)
self.console.print(table)
def run_back_command(self, research_func: dict) -> None:
@staticmethod
def run_back_command(research_func: dict) -> None:
"""
Executes a back-end search command by dynamically importing a module and invoking its search function.
@ -116,8 +117,7 @@ class TVShowManager:
search_func(None)
except Exception as e:
self.console.print(f"[red]Error during search: {e}")
logging.exception("Error during search execution")
logging.error("Error during search execution")
finally:
if project_root in sys.path:
@ -181,7 +181,7 @@ class TVShowManager:
if self.slice_end > total_items:
self.slice_end = total_items
elif (key.lower() in ["b", "back"]) and research_func:
self.run_back_command(research_func)
TVShowManager.run_back_command(research_func)
else:
break
@ -215,7 +215,7 @@ class TVShowManager:
self.slice_start = 0
self.slice_end = self.step
elif (key.lower() in ["b", "back"]) and research_func:
self.run_back_command(research_func)
TVShowManager.run_back_command(research_func)
else:
break

View File

@ -59,10 +59,7 @@
},
"SITE": {
"streamingcommunity": {
"domain": "paris"
},
"altadefinizionegratis": {
"domain": "pro"
"domain": "lu"
},
"guardaserie": {
"domain": "now"

View File

@ -10,7 +10,7 @@ with open("requirements.txt", "r", encoding="utf-8-sig") as f:
setup(
name="StreamingCommunity",
version="2.6.0",
version="2.6.1",
long_description=read_readme(),
long_description_content_type="text/markdown",
author="Lovi-0",