Fix telegram_bot (#312)

* Update config.json

* Fix telegram_bot

* fix bug

* Fix StreamingCommunity site

* Delete console.log

* fix doppio string_to_search

* Update __init__.py

* Update site.py

* Update config.json

* Update site.py

* Update config.json

* Update __init__.py

* Update __init__.py

* Fix proxy (#319)

* Add ENABLE_VIDEO

* Fix proxy

* Add error proxy

* Update config.json

* Refactor user input handling and improve messaging in __init__.py

---------

Co-authored-by: None <62809003+Arrowar@users.noreply.github.com>
Co-authored-by: l1n00 <>
This commit is contained in:
l1n00 2025-05-18 11:22:08 +02:00 committed by GitHub
parent c3abe52d1f
commit 8de72fabfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 260 additions and 118 deletions

View File

@ -748,26 +748,24 @@ The `run-container` command mounts also the `config.json` file, so any change to
The bot was created to replace terminal commands and allow interaction via Telegram. Each download runs within a screen session, enabling multiple downloads to run simultaneously.
To run the bot in the background, simply start it inside a screen session and then press Ctrl + A, followed by D, to detach from the session without stopping the bot.
</details>
<details>
<summary>🤖 Bot Commands</summary>
Command Functions:
🔹 /start Starts a new search for a download. This command performs the same operations as manually running the script in the terminal with test_run.py.
🔹 /list Displays the status of active downloads, with options to:
- Stop an incorrect download using /stop <ID>
- View the real-time output of a download using /screen <ID>
Stop an incorrect download using /stop <ID>.
View the real-time output of a download using /screen <ID>.
⚠ Warning: If a download is interrupted, incomplete files may remain in the folder specified in config.json. These files must be deleted manually to avoid storage or management issues.
</details>
<details>
<summary>🔧 Environment Setup</summary>
🛠 Configuration: Currently, the bot's settings are stored in the config.json file, which is located in the same directory as the telegram_bot.py script.
Create an `.env` file with:
## .env Example:
You need to create an .env file and enter your Telegram token and user ID to authorize only one user to use it
```
TOKEN_TELEGRAM=IlTuo2131TOKEN$12D3Telegram

View File

@ -62,7 +62,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager, len_database)
download_title(select_title)
else:

View File

@ -110,7 +110,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
bot = get_bot_instance()
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager, len_database)
process_search_result(select_title, selections)
else:

View File

@ -109,7 +109,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
bot = get_bot_instance()
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager,len_database)
process_search_result(select_title, selections)
else:

View File

@ -75,7 +75,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager,len_database)
process_search_result(select_title, selections)
else:

View File

@ -62,7 +62,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager,len_database)
process_search_result(select_title)
else:

View File

@ -75,7 +75,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager,len_database)
process_search_result(select_title, selections)
else:

View File

@ -84,7 +84,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager,len_database)
process_search_result(select_title, selections)
else:

View File

@ -26,7 +26,7 @@ from .series import download_series
# Variable
indice = 0
_useFor = "Film_&_Serie"
_useFor = "Film_&_Serie" # "Movies_&_Series"
_priority = 0
_engineDownload = "hls"
_deprecate = False
@ -39,35 +39,58 @@ def get_user_input(string_to_search: str = None):
"""
Asks the user to input a search term.
Handles both Telegram bot input and direct input.
If string_to_search is provided, it's returned directly (after stripping).
"""
if string_to_search is None:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
string_to_search = bot.ask(
"key_search",
f"Enter the search term\nor type 'back' to return to the menu: ",
None
)
if string_to_search is not None:
return string_to_search.strip()
if string_to_search == 'back':
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
user_response = bot.ask(
"key_search", # Request type
"Enter the search term\nor type 'back' to return to the menu: ",
None
)
if user_response is None:
bot.send_message("Timeout: No search term entered.", None)
return None
if user_response.lower() == 'back':
bot.send_message("Returning to the main menu...", None)
try:
# Restart the script
subprocess.Popen([sys.executable] + sys.argv)
sys.exit()
else:
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
return string_to_search
except Exception as e:
bot.send_message(f"Error during restart attempt: {e}", None)
return None # Return None if restart fails
return user_response.strip()
else:
return msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
def process_search_result(select_title, selections=None, proxy=None):
"""
Handles the search result and initiates the download for either a film or series.
Parameters:
select_title (MediaItem): The selected media item
select_title (MediaItem): The selected media item. Can be None if selection fails.
selections (dict, optional): Dictionary containing selection inputs that bypass manual input
{'season': season_selection, 'episode': episode_selection}
e.g., {'season': season_selection, 'episode': episode_selection}
proxy (str, optional): The proxy to use for downloads.
"""
if not select_title:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
bot.send_message("No title selected or selection cancelled.", None)
else:
console.print("[yellow]No title selected or selection cancelled.")
return
if select_title.type == 'tv':
season_selection = None
episode_selection = None
@ -77,57 +100,67 @@ def process_search_result(select_title, selections=None, proxy=None):
episode_selection = selections.get('episode')
download_series(select_title, season_selection, episode_selection, proxy)
else:
download_film(select_title)
else: # 'movie' or other types assumed to be film-like
download_film(select_title, proxy) # Assuming download_film might also need proxy
def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None, selections: dict = None):
"""
Main function of the application for search.
Parameters:
string_to_search (str, optional): String to search for
get_onlyDatabase (bool, optional): If True, return only the database object
direct_item (dict, optional): Direct item to process (bypass search)
string_to_search (str, optional): String to search for. Can be passed from run.py.
If 'back', special handling might occur in get_user_input.
get_onlyDatabase (bool, optional): If True, return only the database search manager object.
direct_item (dict, optional): Direct item to process (bypasses search).
selections (dict, optional): Dictionary containing selection inputs that bypass manual input
{'season': season_selection, 'episode': episode_selection}
for series (season/episode).
"""
bot = None
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
if direct_item:
select_title = MediaItem(**direct_item)
process_search_result(select_title, selections)
select_title_obj = MediaItem(**direct_item)
# Note: If direct_item processing requires a proxy, it should be fetched here.
# For now, assuming process_search_result handles proxy=None if not provided.
finder = ProxyFinder(site_constant.FULL_URL) # Get proxy for direct item too
proxy = finder.find_fast_proxy()
process_search_result(select_title_obj, selections, proxy)
return
if string_to_search is None:
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
string_to_search = bot.ask(
"key_search",
f"Enter the search term\nor type 'back' to return to the menu: ",
None
)
actual_search_query = get_user_input(string_to_search)
if string_to_search == 'back':
# Handle cases where user input is empty, or 'back' was handled (sys.exit or None return)
if not actual_search_query:
if bot:
if actual_search_query is None: # Specifically for timeout from bot.ask or failed restart
bot.send_message("Search term not provided or operation cancelled. Returning.", None)
# If not bot, or empty string, just return; will likely lead to no results or previous menu.
return
# Restart the script
subprocess.Popen([sys.executable] + sys.argv)
sys.exit()
else:
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
# Search on database
# Perform search on the database using the obtained query
finder = ProxyFinder(site_constant.FULL_URL)
proxy = finder.find_fast_proxy()
len_database = title_search(string_to_search, proxy)
len_database = title_search(actual_search_query, proxy)
# If only the database is needed, return the manager
# If only the database object (media_search_manager populated by title_search) is needed
if get_onlyDatabase:
return media_search_manager
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
process_search_result(select_title, selections, proxy)
# *** THE FIX IS HERE: Added len_database as the third argument ***
select_title = get_select_title(table_show_manager, media_search_manager, len_database)
process_search_result(select_title, selections, proxy) # Pass proxy
else:
# If no results are found, ask again
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
search()
# No results found
no_results_message = f"No results found for: '{actual_search_query}'"
if bot:
bot.send_message(no_results_message, None)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{actual_search_query}")
# Do not call search() recursively here to avoid infinite loops on no results.
# The flow should return to the caller (e.g., main menu in run.py).
return

View File

@ -120,4 +120,4 @@ def title_search(query: str, proxy: str) -> int:
bot.send_message(f"Lista dei risultati:", choices)
# Return the number of titles found
return media_search_manager.get_length()
return media_search_manager.get_length()

View File

@ -87,7 +87,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
select_title = get_select_title(table_show_manager, media_search_manager,len_database)
process_search_result(select_title, selections, proxy)
else:

View File

@ -7,78 +7,123 @@ import sys
from rich.console import Console
# Internal utilities
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
# Variable
console = Console()
available_colors = ['red', 'magenta', 'yellow', 'cyan', 'green', 'blue', 'white']
column_to_hide = ['Slug', 'Sub_ita', 'Last_air_date', 'Seasons_count', 'Url', 'Image', 'Path_id']
def get_select_title(table_show_manager, media_search_manager):
def get_select_title(table_show_manager, media_search_manager, num_results_available):
"""
Display a selection of titles and prompt the user to choose one.
Handles both console and Telegram bot input.
Parameters:
table_show_manager: Manager for console table display.
media_search_manager: Manager holding the list of media items.
num_results_available (int): The number of media items available for selection.
Returns:
MediaItem: The selected media item.
MediaItem: The selected media item, or None if no selection is made or an error occurs.
"""
# Determine column_info dynamically for (search site)
if not media_search_manager.media_list:
console.print("\n[red]No media items available.")
# console.print("\n[red]No media items available.")
return None
# Example of available colors for columns
available_colors = ['red', 'magenta', 'yellow', 'cyan', 'green', 'blue', 'white']
# Retrieve the keys of the first media item as column headers
first_media_item = media_search_manager.media_list[0]
column_info = {"Index": {'color': available_colors[0]}} # Always include Index with a fixed color
# Assign colors to the remaining keys dynamically
color_index = 1
for key in first_media_item.__dict__.keys():
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
prompt_message = f"Inserisci il numero del titolo che vuoi selezionare (da 0 a {num_results_available - 1}):"
user_input_str = bot.ask(
"select_title_from_list_number",
prompt_message,
None
)
if key.capitalize() in column_to_hide:
continue
if user_input_str is None:
bot.send_message("Timeout: nessuna selezione ricevuta.", None)
return None
if key in ('id', 'type', 'name', 'score'): # Custom prioritization of colors
if key == 'type':
column_info["Type"] = {'color': 'yellow'}
elif key == 'name':
column_info["Name"] = {'color': 'magenta'}
elif key == 'score':
column_info["Score"] = {'color': 'cyan'}
try:
chosen_index = int(user_input_str)
if 0 <= chosen_index < num_results_available:
selected_item = media_search_manager.get(chosen_index)
if selected_item:
return selected_item
else:
bot.send_message(f"Errore interno: Impossibile recuperare il titolo con indice {chosen_index}.", None)
return None
else:
bot.send_message(f"Selezione '{chosen_index}' non valida. Inserisci un numero compreso tra 0 e {num_results_available - 1}.", None)
return None
except ValueError:
bot.send_message(f"Input '{user_input_str}' non valido. Devi inserire un numero.", None)
return None
except Exception as e:
bot.send_message(f"Si è verificato un errore durante la selezione: {e}", None)
return None
else:
column_info[key.capitalize()] = {'color': available_colors[color_index % len(available_colors)]}
color_index += 1
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
media_dict = {'Index': str(i)}
else:
# Logica originale per la console
if not media_search_manager.media_list:
console.print("\n[red]No media items available.")
return None
first_media_item = media_search_manager.media_list[0]
column_info = {"Index": {'color': available_colors[0]}}
color_index = 1
for key in first_media_item.__dict__.keys():
if key.capitalize() in column_to_hide:
continue
if key in ('id', 'type', 'name', 'score'):
if key == 'type': column_info["Type"] = {'color': 'yellow'}
elif key == 'name': column_info["Name"] = {'color': 'magenta'}
elif key == 'score': column_info["Score"] = {'color': 'cyan'}
else:
column_info[key.capitalize()] = {'color': available_colors[color_index % len(available_colors)]}
color_index += 1
# Ensure all values are strings for rich add table
media_dict[key.capitalize()] = str(getattr(media, key))
table_show_manager.clear()
table_show_manager.add_column(column_info)
table_show_manager.add_tv_show(media_dict)
for i, media in enumerate(media_search_manager.media_list):
media_dict = {'Index': str(i)}
for key in first_media_item.__dict__.keys():
if key.capitalize() in column_to_hide:
continue
media_dict[key.capitalize()] = str(getattr(media, key))
table_show_manager.add_tv_show(media_dict)
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
last_command_str = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q" or last_command == "quit":
console.print("\n[red]Quit ...")
sys.exit(0)
if last_command_str is None or last_command_str.lower() in ["q", "quit"]:
console.print("\n[red]Selezione annullata o uscita.")
return None
# Check if the selected index is within range
if 0 <= int(last_command) < len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)
try:
selected_index = int(last_command_str)
if 0 <= selected_index < len(media_search_manager.media_list):
return media_search_manager.get(selected_index)
else:
console.print("\n[red]Indice errato o non valido.")
# sys.exit(0)
return None
except ValueError:
console.print("\n[red]Input non numerico ricevuto dalla tabella.")
# sys.exit(0)
return None

View File

@ -0,0 +1,62 @@
{
"DEFAULT": {
"debug": false,
"show_message": true,
"clean_console": true,
"show_trending": true,
"use_api": true,
"not_close": false,
"telegram_bot": true,
"download_site_data": true,
"validate_github_config": true
},
"OUT_FOLDER": {
"root_path": "/mnt/data/media/",
"movie_folder_name": "films",
"serie_folder_name": "serie_tv",
"anime_folder_name": "Anime",
"map_episode_name": "E%(episode)_%(episode_name)",
"add_siteName": false
},
"QBIT_CONFIG": {
"host": "192.168.1.51",
"port": "6666",
"user": "admin",
"pass": "adminadmin"
},
"M3U8_DOWNLOAD": {
"tqdm_delay": 0.01,
"default_video_workser": 12,
"default_audio_workser": 12,
"segment_timeout": 8,
"download_audio": true,
"merge_audio": true,
"specific_list_audio": [
"ita"
],
"download_subtitle": true,
"merge_subs": true,
"specific_list_subtitles": [
"ita",
"eng"
],
"cleanup_tmp_folder": true
},
"M3U8_CONVERSION": {
"use_codec": false,
"use_vcodec": true,
"use_acodec": true,
"use_bitrate": true,
"use_gpu": false,
"default_preset": "ultrafast"
},
"M3U8_PARSER": {
"force_resolution": "Best",
"get_only_link": false
},
"REQUESTS": {
"verify": false,
"timeout": 20,
"max_retry": 8
}
}

View File

@ -575,6 +575,10 @@ class TelegramBot:
cleaned_output = cleaned_output.replace(
"\n\n", "\n"
) # Rimuovi newline multipli
# Inizializza le variabili
cleaned_output_0 = None # o ""
cleaned_output_1 = None # o ""
# Dentro cleaned_output c'è una stringa recupero quello che si trova tra ## ##
download_section = re.search(r"##(.*?)##", cleaned_output, re.DOTALL)

View File

@ -315,7 +315,7 @@ class InternManager():
for provider, ips in dns_providers.items():
if server in ips:
return True
return False
return True
except Exception:
return False