Remove constant /2

This commit is contained in:
Dark1291 2025-02-12 10:14:56 +01:00
parent 38cf30f977
commit 938ed00064
9 changed files with 47 additions and 54 deletions

View File

@ -14,17 +14,16 @@ from StreamingCommunity.Util.headers import get_headers
# Variable
from StreamingCommunity.Api.Site.ddlstreamitaly.costant import COOKIE
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
def __init__(self) -> None:
def __init__(self, cookie) -> None:
"""
Initializes the VideoSource object with default values.
"""
self.headers = {'user-agent': get_headers()}
self.cookie = COOKIE
self.cookie = cookie
def setup(self, url: str) -> None:
"""

View File

@ -10,7 +10,9 @@ from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import HLS_Downloader
from StreamingCommunity.TelegramHelp.telegram_bot import TelegramSession, get_bot_instance
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
@ -18,10 +20,6 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
from StreamingCommunity.Api.Player.supervideo import VideoSource
# Config
from .costant import MOVIE_FOLDER, TELEGRAM_BOT
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided film ID, title name, and domain.
@ -33,7 +31,7 @@ def download_film(select_title: MediaItem) -> str:
Return:
- str: output path
"""
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
bot.send_message(f"Download in corso:\n{select_title.name}", None)
@ -51,7 +49,7 @@ def download_film(select_title: MediaItem) -> str:
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4"
mp4_path = os.path.join(MOVIE_FOLDER, title_name.replace(".mp4", ""))
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
@ -62,7 +60,7 @@ def download_film(select_title: MediaItem) -> str:
output_path=os.path.join(mp4_path, title_name)
).start()
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
# Delete script_id
script_id = TelegramSession.get_session()

View File

@ -13,12 +13,12 @@ from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
@ -47,24 +47,24 @@ def title_search(title_search: str) -> int:
table_show_manager.clear()
# Find new domain if prev dont work
domain_to_use = DOMAIN_NOW
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
# Send request to search for title
client = httpx.Client()
try:
response = client.get(
url=f"https://{SITE_NAME}.{domain_to_use}/?story={title_search.replace(' ', '+')}&do=search&subaction=search&titleonly=3",
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/?story={title_search.replace(' ', '+')}&do=search&subaction=search&titleonly=3",
headers={'User-Agent': get_headers()},
timeout=max_timeout
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
raise
# Create soup and find table

View File

@ -15,6 +15,7 @@ from StreamingCommunity.TelegramHelp.telegram_bot import TelegramSession, get_bo
# Logic class
from .util.ScrapeSerie import ScrapeSerieAnime
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Util import manage_selection, dynamic_format_number
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
@ -24,7 +25,6 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSourceAnime
# Variable
from .costant import SITE_NAME, ANIME_FOLDER, MOVIE_FOLDER, TELEGRAM_BOT
KILL_HANDLER = bool(False)
@ -40,7 +40,7 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
- str: output path
- bool: kill handler status
"""
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
# Get information about the selected episode
@ -52,7 +52,7 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
console.print(f"[yellow]Download: [red]EP_{obj_episode.number} \n")
console.print("[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
bot.send_message(f"Download in corso:\nTitolo:{scrape_serie.series_name}\nEpisodio: {obj_episode.number}", None)
# Get script_id
@ -67,10 +67,10 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
title_name = f"{scrape_serie.series_name}_EP_{dynamic_format_number(int(obj_episode.number))}.mp4"
if scrape_serie.is_series:
mp4_path = os_manager.get_sanitize_path(os.path.join(ANIME_FOLDER, scrape_serie.series_name))
mp4_path = os_manager.get_sanitize_path(os.path.join(site_constant.ANIME_FOLDER, scrape_serie.series_name))
else:
mp4_path = os_manager.get_sanitize_path(os.path.join(MOVIE_FOLDER, scrape_serie.series_name))
mp4_path = os_manager.get_sanitize_path(os.path.join(site_constant.MOVIE_FOLDER, scrape_serie.series_name))
# Create output folder
os_manager.create_path(mp4_path)
@ -97,11 +97,11 @@ def download_series(select_title: MediaItem):
"""
start_message()
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
scrape_serie = ScrapeSerieAnime(SITE_NAME)
video_source = VideoSourceAnime(SITE_NAME)
scrape_serie = ScrapeSerieAnime(site_constant.SITE_NAME)
video_source = VideoSourceAnime(site_constant.SITE_NAME)
# Set up video source
scrape_serie.setup(None, select_title.id, select_title.slug)
@ -110,7 +110,7 @@ def download_series(select_title: MediaItem):
episoded_count = scrape_serie.get_count_episodes()
console.print(f"[cyan]Episodes find: [red]{episoded_count}")
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
console.print(f"\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]or [red][3-*] [cyan]for a range of media")
bot.send_message(f"Episodi trovati: {episoded_count}", None)
@ -141,7 +141,7 @@ def download_series(select_title: MediaItem):
break
_, kill_handler = download_episode(i_episode-1, scrape_serie, video_source)
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
bot.send_message(f"Finito di scaricare tutte le serie e episodi", None)
# Get script_id
@ -160,8 +160,8 @@ def download_film(select_title: MediaItem):
"""
# Init class
scrape_serie = ScrapeSerieAnime(SITE_NAME)
video_source = VideoSourceAnime(SITE_NAME)
scrape_serie = ScrapeSerieAnime(site_constant.SITE_NAME)
video_source = VideoSourceAnime(site_constant.SITE_NAME)
# Set up video source
scrape_serie.setup(None, select_title.id, select_title.slug)

View File

@ -16,12 +16,12 @@ from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
from .costant import SITE_NAME, DOMAIN_NOW, TELEGRAM_BOT
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
@ -103,19 +103,19 @@ def title_search(title: str) -> int:
Returns:
- int: A number containing the length of media search manager.
"""
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
media_search_manager.clear()
table_show_manager.clear()
# Get token and session value from configuration
domain_to_use = DOMAIN_NOW
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://www.{SITE_NAME}.{DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://www.{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
data = get_token(SITE_NAME, domain_to_use)
data = get_token(site_constant.SITE_NAME, domain_to_use)
# Prepare cookies to be used in the request
cookies = {
@ -138,7 +138,7 @@ def title_search(title: str) -> int:
# Send a POST request to the API endpoint for live search
try:
response = httpx.post(
url=f'https://www.{SITE_NAME}.{domain_to_use}/livesearch',
url=f'https://www.{site_constant.SITE_NAME}.{domain_to_use}/livesearch',
cookies=cookies,
headers=headers,
json=json_data,
@ -147,10 +147,10 @@ def title_search(title: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
# Inizializza la lista delle scelte
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
choices = []
for dict_title in response.json()['records']:
@ -168,15 +168,15 @@ def title_search(title: str) -> int:
'episodes_count': dict_title.get('episodes_count')
})
if TELEGRAM_BOT:
# Crea una stringa formattata per ogni scelta con numero
choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}"
choices.append(choice_text)
if site_constant.TELEGRAM_BOT:
# Crea una stringa formattata per ogni scelta con numero
choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}"
choices.append(choice_text)
except Exception as e:
print(f"Error parsing a film entry: {e}")
if TELEGRAM_BOT:
if site_constant.TELEGRAM_BOT:
if choices:
bot.send_message(f"Lista dei risultati:", choices)

View File

@ -86,8 +86,8 @@ def download_thread(dict_serie: MediaItem):
start_message()
# Init class
scape_info_serie = GetSerieInfo(dict_serie)
video_source = VideoSource()
scape_info_serie = GetSerieInfo(dict_serie, site_constant.COOKIE)
video_source = VideoSource(site_constant.COOKIE)
# Collect information about thread
list_dict_episode = scape_info_serie.get_episode_number()

View File

@ -16,7 +16,6 @@ from StreamingCommunity.Util.headers import get_headers
# Logic class
from ..site import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
@ -25,7 +24,7 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
class GetSerieInfo:
def __init__(self, dict_serie: MediaItem) -> None:
def __init__(self, dict_serie: MediaItem, cookies) -> None:
"""
Initializes the GetSerieInfo object with default values.
@ -33,7 +32,7 @@ class GetSerieInfo:
- dict_serie (MediaItem): Dictionary containing series information (optional).
"""
self.headers = {'user-agent': get_headers()}
self.cookies = site_constant.COOKIE
self.cookies = cookies
self.url = dict_serie.url
self.tv_name = None
self.list_episodes = None

View File

@ -19,6 +19,7 @@ from StreamingCommunity.Api.Template.Util import (
validate_episode_selection,
display_episodes_list
)
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
@ -27,10 +28,6 @@ from .util.ScrapeSerie import GetSerieInfo
from StreamingCommunity.Api.Player.supervideo import VideoSource
# Variable
from .costant import SERIES_FOLDER
def download_video(index_season_selected: int, index_episode_selected: int, scape_info_serie: GetSerieInfo) -> Tuple[str,bool]:
"""
@ -54,7 +51,7 @@ def download_video(index_season_selected: int, index_episode_selected: int, scap
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4"
mp4_path = os.path.join(SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}")
mp4_path = os.path.join(site_constant.SERIES_FOLDER, scape_info_serie.tv_name, f"S{index_season_selected}")
# Setup video source
video_source = VideoSource(obj_episode.get('url'))

View File

@ -13,12 +13,12 @@ from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
@ -39,10 +39,10 @@ def title_search(word_to_search: str) -> int:
table_show_manager.clear()
# Find new domain if prev dont work
domain_to_use = DOMAIN_NOW
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
# Send request to search for titles
try:
@ -54,7 +54,7 @@ def title_search(word_to_search: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")