Better interrupt_handler

This commit is contained in:
Lovi 2025-02-09 17:38:14 +01:00
parent a907726710
commit 959adbab22
9 changed files with 90 additions and 131 deletions

View File

@ -45,7 +45,6 @@ def download_film(select_title: MediaItem) -> str:
# Start message and display film information # Start message and display film information
start_message() start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n") console.print(f"[yellow]Download: [red]{select_title.name} \n")
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
# Set domain and media ID for the video source # Set domain and media ID for the video source
video_source = VideoSource(select_title.url) video_source = VideoSource(select_title.url)

View File

@ -34,7 +34,6 @@ def download_film(select_title: MediaItem) -> str:
""" """
start_message() start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n") console.print(f"[yellow]Download: [red]{select_title.name} \n")
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
# Setup api manger # Setup api manger
video_source = VideoSource(select_title.url) video_source = VideoSource(select_title.url)

View File

@ -46,7 +46,6 @@ def download_video(index_episode_selected: int, scape_info_serie: GetSerieInfo,
# Get info about episode # Get info about episode
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1] obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{obj_episode.get('name')}\n") console.print(f"[yellow]Download: [red]{obj_episode.get('name')}\n")
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
# Define filename and path for the downloaded video # Define filename and path for the downloaded video
title_name = os_manager.get_sanitize_file( title_name = os_manager.get_sanitize_file(

View File

@ -46,7 +46,6 @@ def download_video(index_season_selected: int, index_episode_selected: int, scap
# Get info about episode # Get info about episode
obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1] obj_episode = scape_info_serie.list_episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}\n") console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.get('name')}\n")
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
# Define filename and path for the downloaded video # Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4" mp4_name = f"{map_episode_title(scape_info_serie.tv_name, index_season_selected, index_episode_selected, obj_episode.get('name'))}.mp4"

View File

@ -44,7 +44,6 @@ def download_film(movie_details: Json_film) -> str:
# Start message and display film information # Start message and display film information
start_message() start_message()
console.print(f"[yellow]Download: [red]{movie_details.title} \n") console.print(f"[yellow]Download: [red]{movie_details.title} \n")
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
# Make request to main site # Make request to main site
try: try:

View File

@ -260,12 +260,13 @@ class DownloadManager:
if result.get('stopped', False): if result.get('stopped', False):
self.stopped = True self.stopped = True
return self.stopped return self.stopped
def download_audio(self, audio: Dict): def download_audio(self, audio: Dict):
"""Downloads audio segments for a specific language track.""" """Downloads audio segments for a specific language track."""
if self.stopped: #if self.stopped:
return True # return True
audio_full_url = self.url_fixer.generate_full_url(audio['uri']) audio_full_url = self.url_fixer.generate_full_url(audio['uri'])
audio_tmp_dir = os.path.join(self.temp_dir, 'audio', audio['language']) audio_tmp_dir = os.path.join(self.temp_dir, 'audio', audio['language'])
@ -280,8 +281,8 @@ class DownloadManager:
def download_subtitle(self, sub: Dict): def download_subtitle(self, sub: Dict):
"""Downloads and saves subtitle file for a specific language.""" """Downloads and saves subtitle file for a specific language."""
if self.stopped: #if self.stopped:
return True # return True
raw_content = self.client.request(sub['uri']) raw_content = self.client.request(sub['uri'])
if raw_content: if raw_content:
@ -301,30 +302,35 @@ class DownloadManager:
""" """
Downloads all selected streams (video, audio, subtitles). Downloads all selected streams (video, audio, subtitles).
""" """
return_stopped = False
video_file = os.path.join(self.temp_dir, 'video', '0.ts') video_file = os.path.join(self.temp_dir, 'video', '0.ts')
if not os.path.exists(video_file): if not os.path.exists(video_file):
if self.download_video(video_url): if self.download_video(video_url):
return True if not return_stopped:
return_stopped = True
for audio in audio_streams: for audio in audio_streams:
if self.stopped: #if self.stopped:
break # break
audio_file = os.path.join(self.temp_dir, 'audio', audio['language'], '0.ts') audio_file = os.path.join(self.temp_dir, 'audio', audio['language'], '0.ts')
if not os.path.exists(audio_file): if not os.path.exists(audio_file):
if self.download_audio(audio): if self.download_audio(audio):
return True if not return_stopped:
return_stopped = True
for sub in sub_streams: for sub in sub_streams:
if self.stopped: #if self.stopped:
break # break
sub_file = os.path.join(self.temp_dir, 'subs', f"{sub['language']}.vtt") sub_file = os.path.join(self.temp_dir, 'subs', f"{sub['language']}.vtt")
if not os.path.exists(sub_file): if not os.path.exists(sub_file):
if self.download_subtitle(sub): if self.download_subtitle(sub):
return True if not return_stopped:
return_stopped = True
return self.stopped return return_stopped
class MergeManager: class MergeManager:
@ -414,6 +420,8 @@ class HLS_Downloader:
- is_master: Whether the M3U8 was a master playlist - is_master: Whether the M3U8 was a master playlist
Or raises an exception if there's an error Or raises an exception if there's an error
""" """
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
if TELEGRAM_BOT: if TELEGRAM_BOT:
bot = get_bot_instance() bot = get_bot_instance()

View File

@ -158,10 +158,10 @@ class M3U8_Segments:
""" """
def interrupt_handler(signum, frame): def interrupt_handler(signum, frame):
if not self.interrupt_flag.is_set(): if not self.interrupt_flag.is_set():
console.log("\n[red] Stopping download gracefully...") console.print("\n[red]- Stopping download gracefully...")
self.interrupt_flag.set() #self.interrupt_flag.set() IN MODO DA NON TERMINARE SUBITO
self.download_interrupted = True self.download_interrupted = True
self.stop_event.set() #self.stop_event.set() IN MODO DA NON TERMINARE SUBITO
if threading.current_thread() is threading.main_thread(): if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, interrupt_handler) signal.signal(signal.SIGINT, interrupt_handler)
@ -315,7 +315,8 @@ class M3U8_Segments:
unit='s', unit='s',
ascii='░▒█', ascii='░▒█',
bar_format=self._get_bar_format(description), bar_format=self._get_bar_format(description),
mininterval=0.05 mininterval=0.05,
file=sys.stdout, # Using file=sys.stdout to force in-place updates because sys.stderr may not support carriage returns in this environment.
) )
try: try:

View File

@ -124,7 +124,8 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = No
unit='iB', unit='iB',
unit_scale=True, unit_scale=True,
desc='Downloading', desc='Downloading',
mininterval=0.05 mininterval=0.05,
file=sys.stdout, # Using file=sys.stdout to force in-place updates because sys.stderr may not support carriage returns in this environment.
) )
downloaded = 0 downloaded = 0

View File

@ -4,6 +4,8 @@ import os
import sys import sys
import logging import logging
import importlib import importlib
from pathlib import Path
from typing import Dict, List, Any
# External library # External library
@ -11,7 +13,6 @@ from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.prompt import Prompt from rich.prompt import Prompt
from rich.style import Style from rich.style import Style
from typing import Dict, List, Any
# Internal utilities # Internal utilities
@ -24,13 +25,12 @@ from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
class TVShowManager: class TVShowManager:
def __init__(self): def __init__(self):
""" """Initialize TVShowManager with default values."""
Initialize TVShowManager with provided column information.
"""
self.console = Console() self.console = Console()
self.tv_shows: List[Dict[str, Any]] = [] # List to store TV show data as dictionaries self.tv_shows: List[Dict[str, Any]] = []
self.slice_start: int = 0 self.slice_start: int = 0
self.slice_end: int = 5 self.slice_end: int = 5
self.step: int = self.slice_end self.step: int = self.slice_end
@ -55,7 +55,7 @@ class TVShowManager:
""" """
self.column_info = column_info self.column_info = column_info
def add_tv_show(self, tv_show: Dict[str, Any]): def add_tv_show(self, tv_show: Dict[str, Any]) -> None:
""" """
Add a TV show to the list of TV shows. Add a TV show to the list of TV shows.
@ -64,7 +64,7 @@ class TVShowManager:
""" """
self.tv_shows.append(tv_show) self.tv_shows.append(tv_show)
def display_data(self, data_slice: List[Dict[str, Any]]): def display_data(self, data_slice: List[Dict[str, Any]]) -> None:
""" """
Display TV show data in a tabular format. Display TV show data in a tabular format.
@ -76,21 +76,17 @@ class TVShowManager:
# Add columns dynamically based on provided column information # Add columns dynamically based on provided column information
for col_name, col_style in self.column_info.items(): for col_name, col_style in self.column_info.items():
color = col_style.get("color", None) color = col_style.get("color", None)
if color: style = Style(color=color) if color else None
style = Style(color=color)
else:
style = None
table.add_column(col_name, style=style, justify='center') table.add_column(col_name, style=style, justify='center')
# Add rows dynamically based on available TV show data # Add rows dynamically based on available TV show data
for entry in data_slice: for entry in data_slice:
# Create row data while handling missing keys row_data = [str(entry.get(col_name, '')) for col_name in self.column_info.keys()]
row_data = [entry.get(col_name, '') for col_name in self.column_info.keys()]
table.add_row(*row_data) table.add_row(*row_data)
self.console.print(table) self.console.print(table)
def run_back_command(self, research_func: dict): def run_back_command(self, research_func: dict) -> None:
""" """
Executes a back-end search command by dynamically importing a module and invoking its search function. Executes a back-end search command by dynamically importing a module and invoking its search function.
@ -99,47 +95,34 @@ class TVShowManager:
- 'folder' (str): The absolute path to the directory containing the module to be executed. - 'folder' (str): The absolute path to the directory containing the module to be executed.
""" """
try: try:
# Get site name from folder # Get site name from folder
site_name = (os.path.basename(research_func['folder'])) site_name = Path(research_func['folder']).name
# Find the project root directory # Find the project root directory
current_path = research_func['folder'] current_path = research_func['folder']
while not os.path.exists(os.path.join(current_path, 'StreamingCommunity')): while not os.path.exists(os.path.join(current_path, 'StreamingCommunity')):
current_path = os.path.dirname(current_path) current_path = os.path.dirname(current_path)
# Add project root to Python path
project_root = current_path project_root = current_path
#print(f"[DEBUG] Project Root: {project_root}")
if project_root not in sys.path: if project_root not in sys.path:
sys.path.insert(0, project_root) sys.path.insert(0, project_root)
# Import using full absolute import # Import using full absolute import
module_path = f'StreamingCommunity.Api.Site.{site_name}' module_path = f'StreamingCommunity.Api.Site.{site_name}'
#print(f"[DEBUG] Importing module: {module_path}")
# Import the module
module = importlib.import_module(module_path) module = importlib.import_module(module_path)
# Get the search function # Get and call the search function
search_func = getattr(module, 'search') search_func = getattr(module, 'search')
# Call the search function with the search string
search_func(None) search_func(None)
except Exception as e: except Exception as e:
self.console.print(f"[red]Error during search: {e}") self.console.print(f"[red]Error during search: {e}")
logging.exception("Error during search execution")
# Print detailed traceback finally:
import traceback
traceback.print_exc()
# Optionally remove the path if you want to clean up
if project_root in sys.path: if project_root in sys.path:
sys.path.remove(project_root) sys.path.remove(project_root)
def run(self, force_int_input: bool = False, max_int_input: int = 0) -> str: def run(self, force_int_input: bool = False, max_int_input: int = 0) -> str:
""" """
Run the TV show manager application. Run the TV show manager application.
@ -152,121 +135,92 @@ class TVShowManager:
str: Last command executed before breaking out of the loop. str: Last command executed before breaking out of the loop.
""" """
total_items = len(self.tv_shows) total_items = len(self.tv_shows)
last_command = "" # Variable to store the last command executed last_command = ""
is_telegram = config_manager.get_bool('DEFAULT', 'telegram_bot')
if TELEGRAM_BOT: bot = get_bot_instance() if is_telegram else None
bot = get_bot_instance()
while True: while True:
start_message() start_message()
# Display table
self.display_data(self.tv_shows[self.slice_start:self.slice_end]) self.display_data(self.tv_shows[self.slice_start:self.slice_end])
# Find research function from call stack # Find research function from call stack
research_func = None research_func = next((
for reverse_fun in get_call_stack(): f for f in get_call_stack()
if reverse_fun['function'] == 'search' and reverse_fun['script'] == '__init__.py': if f['function'] == 'search' and f['script'] == '__init__.py'
research_func = reverse_fun ), None)
logging.info(f"Found research_func: {research_func}")
# Handling user input for loading more items or quitting # Handle pagination and user input
if self.slice_end < total_items: if self.slice_end < total_items:
self.console.print(f"\n[green]Press [red]Enter [green]for next page, [red]'q' [green]to quit, or [red]'back' [green]to search.") self.console.print(f"\n[green]Press [red]Enter [green]for next page, [red]'q' [green]to quit, or [red]'back' [green]to search.")
if not force_int_input: if not force_int_input:
if TELEGRAM_BOT: prompt_msg = ("\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
self.console.print(f"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end") "[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end")
key = bot.ask(
"select_title_episode",
f"Inserisci l'indice dei media (ad esempio, 1), * per scaricare tutti i media, (ad esempio, 1-2) per un intervallo di media, o (ad esempio, 3-*) per scaricare dal un indice specifico fino alla fine.",
None
)
else:
key = Prompt.ask(
"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end"
)
if is_telegram:
key = bot.ask("select_title_episode", prompt_msg, None)
else: else:
choices = [str(i) for i in range(0, max_int_input)] key = Prompt.ask(prompt_msg)
choices.extend(["q", "quit", "b", "back"])
if TELEGRAM_BOT:
self.console.print(f"[cyan]Insert media [red]index")
key = bot.ask(
"select_title",
f"Scegli il contenuto da scaricare:\n📺 Serie TV - 🎞️ Film - 🌀 Anime\noppure `back` per tornare indietro",
None
)
else: else:
key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False) choices = [str(i) for i in range(max_int_input + 1)] + ["q", "quit", "b", "back"]
prompt_msg = "[cyan]Insert media [red]index"
telegram_msg = "Scegli il contenuto da scaricare:\n📺 Serie TV - 🎞️ Film - 🌀 Anime\noppure `back` per tornare indietro"
if is_telegram:
key = bot.ask("select_title", telegram_msg, None)
else:
key = Prompt.ask(prompt_msg, choices=choices, show_choices=False)
last_command = key last_command = key
if key.lower() == "q" or key.lower() == "quit": if key.lower() in ["q", "quit"]:
break break
elif key == "": elif key == "":
self.slice_start += self.step self.slice_start += self.step
self.slice_end += self.step self.slice_end += self.step
if self.slice_end > total_items: if self.slice_end > total_items:
self.slice_end = total_items self.slice_end = total_items
elif (key.lower() in ["b", "back"]) and research_func:
elif (key.lower() == "b" or key.lower() == "back") and research_func:
self.run_back_command(research_func) self.run_back_command(research_func)
else: else:
break break
else: else:
# Last slice, ensure all remaining items are shown # Last page handling
self.console.print(f"\n [green]You've reached the end. [red]Enter [green]for first page, [red]'q' [green]to quit, or [red]'back' [green]to search.") self.console.print(f"\n[green]You've reached the end. [red]Enter [green]for first page, [red]'q' [green]to quit, or [red]'back' [green]to search.")
if not force_int_input: if not force_int_input:
if TELEGRAM_BOT: prompt_msg = ("\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
self.console.print(f"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end") "[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end")
key = bot.ask(
"select_title_episode",
f"Inserisci l'indice dei media (ad esempio, 1), * per scaricare tutti i media, (ad esempio, 1-2) per un intervallo di media, o (ad esempio, 3-*) per scaricare dal un indice specifico fino alla fine.",
None
)
else:
key = Prompt.ask(
"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end"
)
if is_telegram:
key = bot.ask("select_title_episode", prompt_msg, None)
else: else:
choices = [str(i) for i in range(0, max_int_input)] key = Prompt.ask(prompt_msg)
choices.extend(["q", "quit", "b", "back"]) else:
choices = [str(i) for i in range(max_int_input + 1)] + ["q", "quit", "b", "back"]
prompt_msg = "[cyan]Insert media [red]index"
telegram_msg = "Scegli il contenuto da scaricare:\n📺 Serie TV - 🎞️ Film - 🌀 Anime\noppure `back` per tornare indietro"
if TELEGRAM_BOT: if is_telegram:
self.console.print(f"[cyan]Insert media [red]index") key = bot.ask("select_title", telegram_msg, None)
key = bot.ask(
"select_title",
f"Scegli il contenuto da scaricare:\n📺 Serie TV - 🎞️ Film - 🌀 Anime\n oppure `back` per tornare indietro",
None
)
else: else:
key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False) key = Prompt.ask(prompt_msg, choices=choices, show_choices=False)
last_command = key last_command = key
if key.lower() == "q" or key.lower() == "quit": if key.lower() in ["q", "quit"]:
break break
elif key == "": elif key == "":
self.slice_start = 0 self.slice_start = 0
self.slice_end = self.step self.slice_end = self.step
elif (key.lower() in ["b", "back"]) and research_func:
elif (key.lower() == "b" or key.lower() == "back") and research_func:
self.run_back_command(research_func) self.run_back_command(research_func)
else: else:
break break
return last_command return last_command
def clear(self): def clear(self) -> None:
"""Clear all TV shows data."""
self.tv_shows = [] self.tv_shows = []