diff --git a/.gitignore b/.gitignore index 86f4bf1..231113a 100644 --- a/.gitignore +++ b/.gitignore @@ -49,5 +49,5 @@ note.txt list_proxy.txt cmd.txt bot_config.json -script.json +scripts.json active_requests.json \ No newline at end of file diff --git a/StreamingCommunity/Api/Site/1337xx/__init__.py b/StreamingCommunity/Api/Site/1337xx/__init__.py index 55886d6..bad0b7c 100644 --- a/StreamingCommunity/Api/Site/1337xx/__init__.py +++ b/StreamingCommunity/Api/Site/1337xx/__init__.py @@ -12,7 +12,7 @@ from .site import title_search, run_get_select_title, media_search_manager from .title import download_title # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') import sys diff --git a/StreamingCommunity/Api/Site/1337xx/site.py b/StreamingCommunity/Api/Site/1337xx/site.py index f8133e5..a472616 100644 --- a/StreamingCommunity/Api/Site/1337xx/site.py +++ b/StreamingCommunity/Api/Site/1337xx/site.py @@ -18,7 +18,7 @@ from StreamingCommunity.Api.Template.Util import search_domain from StreamingCommunity.Api.Template.Class.SearchType import MediaManager # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/1337xx/title.py b/StreamingCommunity/Api/Site/1337xx/title.py index d81bfcd..c299f40 100644 --- a/StreamingCommunity/Api/Site/1337xx/title.py +++ b/StreamingCommunity/Api/Site/1337xx/title.py @@ -24,8 +24,8 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaItem from .costant import DOMAIN_NOW, SITE_NAME, MOVIE_FOLDER # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance -from session import get_session, updateScriptId, deleteScriptId +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance +from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/altadefinizionegratis/__init__.py b/StreamingCommunity/Api/Site/altadefinizionegratis/__init__.py index f7d88d4..40c86df 100644 --- a/StreamingCommunity/Api/Site/altadefinizionegratis/__init__.py +++ b/StreamingCommunity/Api/Site/altadefinizionegratis/__init__.py @@ -12,7 +12,7 @@ from .site import title_search, run_get_select_title, media_search_manager from .film import download_film # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/altadefinizionegratis/film.py b/StreamingCommunity/Api/Site/altadefinizionegratis/film.py index fa6007c..7a91b53 100644 --- a/StreamingCommunity/Api/Site/altadefinizionegratis/film.py +++ b/StreamingCommunity/Api/Site/altadefinizionegratis/film.py @@ -25,8 +25,8 @@ from StreamingCommunity.Api.Player.supervideo import VideoSource from .costant import MOVIE_FOLDER # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance -from session import get_session, updateScriptId, deleteScriptId +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance +from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/altadefinizionegratis/site.py b/StreamingCommunity/Api/Site/altadefinizionegratis/site.py index bf94b67..8c03bbe 100644 --- a/StreamingCommunity/Api/Site/altadefinizionegratis/site.py +++ b/StreamingCommunity/Api/Site/altadefinizionegratis/site.py @@ -26,7 +26,7 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout") disable_searchDomain = config_manager.get_bool("DEFAULT", "disable_searchDomain") # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/animeunity/__init__.py b/StreamingCommunity/Api/Site/animeunity/__init__.py index d139bd4..6c83ca5 100644 --- a/StreamingCommunity/Api/Site/animeunity/__init__.py +++ b/StreamingCommunity/Api/Site/animeunity/__init__.py @@ -12,7 +12,7 @@ from .site import title_search, run_get_select_title, media_search_manager from .film_serie import download_film, download_series # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') import sys diff --git a/StreamingCommunity/Api/Site/animeunity/film_serie.py b/StreamingCommunity/Api/Site/animeunity/film_serie.py index 1a0c976..84bd5af 100644 --- a/StreamingCommunity/Api/Site/animeunity/film_serie.py +++ b/StreamingCommunity/Api/Site/animeunity/film_serie.py @@ -27,8 +27,8 @@ from .costant import SITE_NAME, ANIME_FOLDER, MOVIE_FOLDER KILL_HANDLER = bool(False) # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance -from session import get_session, updateScriptId, deleteScriptId +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance +from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/animeunity/site.py b/StreamingCommunity/Api/Site/animeunity/site.py index 9cf8b6f..1ffc46b 100644 --- a/StreamingCommunity/Api/Site/animeunity/site.py +++ b/StreamingCommunity/Api/Site/animeunity/site.py @@ -20,7 +20,7 @@ from StreamingCommunity.Api.Template.Util import search_domain from StreamingCommunity.Api.Template.Class.SearchType import MediaManager # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/streamingcommunity/__init__.py b/StreamingCommunity/Api/Site/streamingcommunity/__init__.py index e86b087..a0ad193 100644 --- a/StreamingCommunity/Api/Site/streamingcommunity/__init__.py +++ b/StreamingCommunity/Api/Site/streamingcommunity/__init__.py @@ -15,7 +15,7 @@ from .series import download_series # Telegram bot instance from StreamingCommunity.Util._jsonConfig import config_manager -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') # Variable diff --git a/StreamingCommunity/Api/Site/streamingcommunity/film.py b/StreamingCommunity/Api/Site/streamingcommunity/film.py index 209f761..80e1bc4 100644 --- a/StreamingCommunity/Api/Site/streamingcommunity/film.py +++ b/StreamingCommunity/Api/Site/streamingcommunity/film.py @@ -25,8 +25,8 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource from .costant import SITE_NAME, MOVIE_FOLDER # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance -from session import get_session, updateScriptId, deleteScriptId +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance +from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/streamingcommunity/series.py b/StreamingCommunity/Api/Site/streamingcommunity/series.py index 6d28123..50a3a73 100644 --- a/StreamingCommunity/Api/Site/streamingcommunity/series.py +++ b/StreamingCommunity/Api/Site/streamingcommunity/series.py @@ -27,8 +27,8 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource from .costant import SITE_NAME, SERIES_FOLDER # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance -from session import get_session, updateScriptId, deleteScriptId +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance +from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Api/Site/streamingcommunity/site.py b/StreamingCommunity/Api/Site/streamingcommunity/site.py index c8bca62..be84384 100644 --- a/StreamingCommunity/Api/Site/streamingcommunity/site.py +++ b/StreamingCommunity/Api/Site/streamingcommunity/site.py @@ -28,7 +28,7 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaManager from .costant import SITE_NAME, DOMAIN_NOW # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') # Variable diff --git a/StreamingCommunity/HelpTg/__init__.py b/StreamingCommunity/HelpTg/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/StreamingCommunity/HelpTg/config.json b/StreamingCommunity/HelpTg/config.json new file mode 100644 index 0000000..836d7c6 --- /dev/null +++ b/StreamingCommunity/HelpTg/config.json @@ -0,0 +1,91 @@ +{ + "DEFAULT": { + "debug": false, + "log_file": "app.log", + "log_to_file": true, + "show_message": true, + "clean_console": true, + "root_path": "Video", + "movie_folder_name": "Movie", + "serie_folder_name": "TV", + "anime_folder_name": "Anime", + "map_episode_name": "E%(episode)_%(episode_name)", + "config_qbit_tor": { + "host": "192.168.1.99", + "port": "7060", + "user": "admin", + "pass": "adminadmin" + }, + "add_siteName": false, + "disable_searchDomain": false, + "not_close": false, + "telegram_bot": true + }, + "REQUESTS": { + "timeout": 30, + "max_retry": 8, + "proxy_start_min": 0.1, + "proxy_start_max": 0.5 + }, + "M3U8_DOWNLOAD": { + "tqdm_delay": 0.12, + "default_video_workser": 12, + "default_audio_workser": 12, + "merge_audio": true, + "specific_list_audio": [ + "ita" + ], + "merge_subs": true, + "specific_list_subtitles": [ + "eng", + "spa" + ], + "cleanup_tmp_folder": true + }, + "M3U8_CONVERSION": { + "use_codec": false, + "use_vcodec": true, + "use_acodec": true, + "use_bitrate": true, + "use_gpu": false, + "default_preset": "ultrafast" + }, + "M3U8_PARSER": { + "force_resolution": -1, + "get_only_link": false + }, + "SITE": { + "streamingcommunity": { + "domain": "paris" + }, + "altadefinizionegratis": { + "domain": "site" + }, + "guardaserie": { + "domain": "meme" + }, + "mostraguarda": { + "domain": "stream" + }, + "ddlstreamitaly": { + "domain": "co", + "extra": { + "ips4_device_key": "", + "ips4_member_id": "", + "ips4_login_key": "" + } + }, + "animeunity": { + "domain": "so" + }, + "cb01new": { + "domain": "mobi" + }, + "1337xx": { + "domain": "to" + }, + "ilcorsaronero": { + "domain": "link" + } + } +} \ No newline at end of file diff --git a/StreamingCommunity/HelpTg/requirements.txt b/StreamingCommunity/HelpTg/requirements.txt new file mode 100644 index 0000000..01f23e8 --- /dev/null +++ b/StreamingCommunity/HelpTg/requirements.txt @@ -0,0 +1,14 @@ +httpx +bs4 +rich +tqdm +m3u8 +psutil +unidecode +jsbeautifier +pathvalidate +pycryptodomex +googlesearch-python +fake-useragent +qbittorrent-api +python-qbittorrent \ No newline at end of file diff --git a/StreamingCommunity/HelpTg/telegram_bot.py b/StreamingCommunity/HelpTg/telegram_bot.py index 7ea8dfb..c0908a8 100644 --- a/StreamingCommunity/HelpTg/telegram_bot.py +++ b/StreamingCommunity/HelpTg/telegram_bot.py @@ -5,11 +5,12 @@ import uuid import subprocess import os, re, sys import json -from request_manager import RequestManager +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) +from StreamingCommunity.HelpTg.request_manager import RequestManager import threading # Funzione per caricare variabili da un file .env -def load_env(file_path=".env"): +def load_env(file_path="../../.env"): if os.path.exists(file_path): with open(file_path) as f: for line in f: @@ -22,7 +23,7 @@ load_env() class TelegramBot: _instance = None - _config_file = "bot_config.json" + _config_file = "../../bot_config.json" @classmethod def get_instance(cls): @@ -54,7 +55,7 @@ class TelegramBot: def monitor_scripts(): while True: try: - with open("scripts.json", "r") as f: + with open("../../scripts.json", "r") as f: scripts_data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): scripts_data = [] @@ -79,7 +80,7 @@ class TelegramBot: scripts_data_to_save.append(script) # Salva la lista aggiornata, senza gli script scaduti - with open("scripts.json", "w") as f: + with open("../../scripts.json", "w") as f: json.dump(scripts_data_to_save, f, indent=4) time.sleep(60) # Controlla ogni minuto @@ -195,7 +196,7 @@ class TelegramBot: verbose = debug_mode if debug_mode == "True": - subprocess.Popen(["python3", "test_run.py", screen_id]) + subprocess.Popen(["python3", "../../test_run.py", screen_id]) else: # Verifica se lo screen con il nome esiste già try: @@ -209,7 +210,7 @@ class TelegramBot: pass # Se il comando fallisce, significa che non ci sono screen attivi. # Crea la sessione screen e avvia lo script al suo interno - command = ["screen", "-dmS", screen_id, "python3", "test_run.py", screen_id] + command = ["screen", "-dmS", screen_id, "python3", "../../test_run.py", screen_id] # Avvia il comando tramite subprocess subprocess.Popen(command) @@ -223,7 +224,7 @@ class TelegramBot: } # Salvataggio nel file JSON - json_file = "scripts.json" + json_file = "../../scripts.json" # Carica i dati esistenti o crea una nuova lista try: @@ -246,7 +247,7 @@ class TelegramBot: return try: - with open("scripts.json", "r") as f: + with open("../../scripts.json", "r") as f: scripts_data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): scripts_data = [] @@ -305,7 +306,7 @@ class TelegramBot: parts = message.text.split() if len(parts) < 2: try: - with open("scripts.json", "r") as f: + with open("../../scripts.json", "r") as f: scripts_data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): scripts_data = [] @@ -328,7 +329,7 @@ class TelegramBot: screen_id = parts[1] try: - with open("scripts.json", "r") as f: + with open("../../scripts.json", "r") as f: scripts_data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): scripts_data = [] @@ -352,7 +353,7 @@ class TelegramBot: return # Salva la lista aggiornata senza lo script eliminato - with open("scripts.json", "w") as f: + with open("../../scripts.json", "w") as f: json.dump(new_scripts_data, f, indent=4) print(f"✅ Script `{screen_id}` terminato con successo!") @@ -508,8 +509,8 @@ class TelegramBot: def run(self): print("🚀 Avvio del bot...") - # svuoto il file scripts.json - with open("scripts.json", "w") as f: + # svuoto il file ../../scripts.json + with open("../../scripts.json", "w") as f: json.dump([], f) self.bot.infinity_polling() diff --git a/StreamingCommunity/Lib/Downloader/HLS/downloader.py b/StreamingCommunity/Lib/Downloader/HLS/downloader.py index e4d3a6e..601f4c7 100644 --- a/StreamingCommunity/Lib/Downloader/HLS/downloader.py +++ b/StreamingCommunity/Lib/Downloader/HLS/downloader.py @@ -38,7 +38,7 @@ from ...M3U8 import ( from .segments import M3U8_Segments # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') # Config diff --git a/StreamingCommunity/Lib/Downloader/MP4/downloader.py b/StreamingCommunity/Lib/Downloader/MP4/downloader.py index 8076a78..9afd4b4 100644 --- a/StreamingCommunity/Lib/Downloader/MP4/downloader.py +++ b/StreamingCommunity/Lib/Downloader/MP4/downloader.py @@ -30,7 +30,7 @@ import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/Util/table.py b/StreamingCommunity/Util/table.py index 16673e5..67acb5c 100644 --- a/StreamingCommunity/Util/table.py +++ b/StreamingCommunity/Util/table.py @@ -19,7 +19,7 @@ from .message import start_message from .call_stack import get_call_stack # Telegram bot instance -StreamingCommunity.HelpTg. import get_bot_instance +from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance from StreamingCommunity.Util._jsonConfig import config_manager TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') diff --git a/StreamingCommunity/old_run.py b/StreamingCommunity/old_run.py deleted file mode 100644 index f1a616e..0000000 --- a/StreamingCommunity/old_run.py +++ /dev/null @@ -1,245 +0,0 @@ -# 10.12.23 - -import os -import sys -import time -import glob -import logging -import platform -import argparse -import importlib -from typing import Callable - - -# Internal utilities -from StreamingCommunity.Util.message import start_message -from StreamingCommunity.Util.console import console, msg -from StreamingCommunity.Util._jsonConfig import config_manager -from StreamingCommunity.Upload.update import update as git_update -from StreamingCommunity.Util.os import os_summary -from StreamingCommunity.Util.logger import Logger - - -# Config -CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close') - - -def run_function(func: Callable[..., None], close_console: bool = False) -> None: - """ - Run a given function indefinitely or once, depending on the value of close_console. - - Parameters: - func (Callable[..., None]): The function to run. - close_console (bool, optional): Whether to close the console after running the function once. Defaults to False. - """ - if close_console: - while 1: - func() - else: - func() - - -def load_search_functions(): - modules = [] - loaded_functions = {} - - # Find api home directory - if getattr(sys, 'frozen', False): # Modalità PyInstaller - base_path = os.path.join(sys._MEIPASS, "StreamingCommunity") - else: - base_path = os.path.dirname(__file__) - - api_dir = os.path.join(base_path, 'Api', 'Site') - init_files = glob.glob(os.path.join(api_dir, '*', '__init__.py')) - - # Retrieve modules and their indices - for init_file in init_files: - - # Get folder name as module name - module_name = os.path.basename(os.path.dirname(init_file)) - logging.info(f"Load module name: {module_name}") - - try: - # Dynamically import the module - mod = importlib.import_module(f'StreamingCommunity.Api.Site.{module_name}') - - # Get 'indice' from the module - indice = getattr(mod, 'indice', 0) - is_deprecate = bool(getattr(mod, '_deprecate', True)) - use_for = getattr(mod, '_useFor', 'other') - - if not is_deprecate: - modules.append((module_name, indice, use_for)) - - except Exception as e: - console.print(f"[red]Failed to import module {module_name}: {str(e)}") - - # Sort modules by 'indice' - modules.sort(key=lambda x: x[1]) - - # Load search functions in the sorted order - for module_name, _, use_for in modules: - - # Construct a unique alias for the module - module_alias = f'{module_name}_search' - - try: - - # Dynamically import the module - mod = importlib.import_module(f'StreamingCommunity.Api.Site.{module_name}') - - # Get the search function from the module (assuming the function is named 'search' and defined in __init__.py) - search_function = getattr(mod, 'search') - - # Add the function to the loaded functions dictionary - loaded_functions[module_alias] = (search_function, use_for) - - except Exception as e: - console.print(f"[red]Failed to load search function from module {module_name}: {str(e)}") - - return loaded_functions - - -def initialize(): - - # Get start message - start_message() - - # Get system info - os_summary.get_system_summary() - - # Set terminal size for win 7 - if platform.system() == "Windows" and "7" in platform.version(): - os.system('mode 120, 40') - - # Check python version - if sys.version_info < (3, 7): - console.log("[red]Install python version > 3.7.16") - sys.exit(0) - - # Attempting GitHub update - try: - git_update() - except: - console.log("[red]Error with loading github.") - - -def main(): - start = time.time() - - # Create logger - log_not = Logger() - initialize() - - # Load search functions - search_functions = load_search_functions() - logging.info(f"Load module in: {time.time() - start} s") - - # Create argument parser - parser = argparse.ArgumentParser( - description='Script to download movies and series from the internet. Use these commands to configure the script and control its behavior.' - ) - - # Add arguments for the main configuration parameters - parser.add_argument( - '--add_siteName', type=bool, help='Enable or disable adding the site name to the file name (e.g., true/false).' - ) - parser.add_argument( - '--disable_searchDomain', type=bool, help='Enable or disable searching in configured domains (e.g., true/false).' - ) - parser.add_argument( - '--not_close', type=bool, help='If set to true, the script will not close the console after execution (e.g., true/false).' - ) - - # Add arguments for M3U8 configuration - parser.add_argument( - '--default_video_worker', type=int, help='Number of workers for video during M3U8 download (default: 12).' - ) - parser.add_argument( - '--default_audio_worker', type=int, help='Number of workers for audio during M3U8 download (default: 12).' - ) - - # Add options for audio and subtitles - parser.add_argument( - '--specific_list_audio', type=str, help='Comma-separated list of specific audio languages to download (e.g., ita,eng).' - ) - parser.add_argument( - '--specific_list_subtitles', type=str, help='Comma-separated list of specific subtitle languages to download (e.g., eng,spa).' - ) - - # Add arguments for search functions - color_map = { - "anime": "red", - "film_serie": "yellow", - "film": "blue", - "serie": "green", - "other": "white" - } - - # Add dynamic arguments based on loaded search modules - for alias, (_, use_for) in search_functions.items(): - short_option = alias[:3].upper() - long_option = alias - parser.add_argument(f'-{short_option}', f'--{long_option}', action='store_true', help=f'Search for {alias.split("_")[0]} on streaming platforms.') - - # Parse command-line arguments - args = parser.parse_args() - - # Map command-line arguments to the config values - config_updates = {} - - if args.add_siteName is not None: - config_updates['DEFAULT.add_siteName'] = args.add_siteName - if args.disable_searchDomain is not None: - config_updates['DEFAULT.disable_searchDomain'] = args.disable_searchDomain - if args.not_close is not None: - config_updates['DEFAULT.not_close'] = args.not_close - if args.default_video_worker is not None: - config_updates['M3U8_DOWNLOAD.default_video_worker'] = args.default_video_worker - if args.default_audio_worker is not None: - config_updates['M3U8_DOWNLOAD.default_audio_worker'] = args.default_audio_worker - if args.specific_list_audio is not None: - config_updates['M3U8_DOWNLOAD.specific_list_audio'] = args.specific_list_audio.split(',') - if args.specific_list_subtitles is not None: - config_updates['M3U8_DOWNLOAD.specific_list_subtitles'] = args.specific_list_subtitles.split(',') - - # Apply the updates to the config file - for key, value in config_updates.items(): - section, option = key.split('.') - config_manager.set_key(section, option, value) - - config_manager.write_config() - - # Map command-line arguments to functions - arg_to_function = {alias: func for alias, (func, _) in search_functions.items()} - - # Check which argument is provided and run the corresponding function - for arg, func in arg_to_function.items(): - if getattr(args, arg): - run_function(func) - return - - # Mapping user input to functions - input_to_function = {str(i): func for i, (alias, (func, _)) in enumerate(search_functions.items())} - - # Create dynamic prompt message and choices - choice_labels = {str(i): (alias.split("_")[0].capitalize(), use_for) for i, (alias, (_, use_for)) in enumerate(search_functions.items())} - - # Display the category legend in a single line - legend_text = " | ".join([f"[{color}]{category.capitalize()}[/{color}]" for category, color in color_map.items()]) - console.print(f"\n[bold green]Category Legend:[/bold green] {legend_text}") - - # Construct the prompt message with color-coded site names - prompt_message = "[green]Insert category [white](" + ", ".join( - [f"{key}: [{color_map[label[1]]}]{label[0]}[/{color_map[label[1]]}]" for key, label in choice_labels.items()] - ) + "[white])" - - # Ask the user for input - category = msg.ask(prompt_message, choices=list(choice_labels.keys()), default="0", show_choices=False, show_default=False) - - # Run the corresponding function based on user input - if category in input_to_function: - run_function(input_to_function[category]) - else: - console.print("[red]Invalid category.") - sys.exit(0) \ No newline at end of file diff --git a/StreamingCommunity/run.py b/StreamingCommunity/run.py index 1b4de19..4339452 100644 --- a/StreamingCommunity/run.py +++ b/StreamingCommunity/run.py @@ -26,7 +26,7 @@ from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance # Config CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close') TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot') -from session import get_session, deleteScriptId +from StreamingCommunity.HelpTg.session import get_session, deleteScriptId def run_function(func: Callable[..., None], close_console: bool = False) -> None: diff --git a/request_manager.py b/request_manager.py deleted file mode 100644 index 963bbad..0000000 --- a/request_manager.py +++ /dev/null @@ -1,79 +0,0 @@ -import json -import time -from typing import Optional - -class RequestManager: - _instance = None - - def __new__(cls, *args, **kwargs): - if not cls._instance: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self, json_file: str = "active_requests.json"): - if not hasattr(self, 'initialized'): - self.json_file = json_file - self.initialized = True - self.on_response_callback = None # Aggiungi un campo per il callback - - def create_request(self, type: str) -> str: - request_data = { - "type": type, - "response": None, - "timestamp": time.time() - } - - # Aggiungi il tipo al salvataggio della richiesta - with open(self.json_file, "w") as f: - json.dump(request_data, f) - - return "Ok" - - def save_response(self, message_text: str) -> bool: - try: - # Carica il file JSON - with open(self.json_file, "r") as f: - data = json.load(f) - - # Controlla se esiste la chiave 'type' e se la risposta è presente - if "type" in data and "response" in data: - data["response"] = message_text # Aggiorna la risposta - - # Scrivi il file JSON aggiornato - with open(self.json_file, "w") as f: - json.dump(data, f, indent=4) # Formatta il file JSON - - return True - else: - return False - - except (FileNotFoundError, json.JSONDecodeError) as e: - print(f"⚠️ save_response - errore: {e}") - return False - - def get_response(self) -> Optional[str]: - try: - with open(self.json_file, "r") as f: - data = json.load(f) - # Verifica se esiste la chiave "response" - if "response" in data: - response = data["response"] # Ottieni la risposta direttamente - if response is not None and self.on_response_callback: - # Se la risposta è disponibile, chiama il callback - self.on_response_callback(response) - return response - - except (FileNotFoundError, json.JSONDecodeError) as e: - print(f"get_response - errore: {e}") - return None - - def clear_file(self) -> bool: - try: - # Svuota il file JSON scrivendo un oggetto vuoto - with open(self.json_file, "w") as f: - json.dump({}, f) - print(f"File {self.json_file} è stato svuotato con successo.") - return True - except Exception as e: - print(f"⚠️ clear_file - errore: {e}") - return False \ No newline at end of file diff --git a/scripts.json b/scripts.json index c03fbb1..21a1e24 100644 --- a/scripts.json +++ b/scripts.json @@ -1,16 +1,8 @@ [ { - "screen_id": "0e830205", - "start_time": 1738674260.611711, + "screen_id": "cfd83841", + "start_time": 1738682482.202467, "status": "running", - "user_id": 676749122, - "titolo": "Doctor Strange" - }, - { - "screen_id": "29057a57", - "start_time": 1738674321.6443458, - "status": "running", - "user_id": 676749122, - "titolo": "Guardiani della Galassia Vol. 2" + "user_id": 676749122 } ] \ No newline at end of file diff --git a/session.py b/session.py deleted file mode 100755 index b81add8..0000000 --- a/session.py +++ /dev/null @@ -1,62 +0,0 @@ -import json - -session_data = {} - -def set_session(value): - # salvo script_id in session_data - session_data['script_id'] = value - -def get_session(): - # controllo se script_id è presente in session_data - return session_data.get('script_id', 'unknown') - -def updateScriptId(screen_id, titolo): - # definisco il nome del file json - json_file = "scripts.json" - try: - # apro il file json - with open(json_file, 'r') as f: - scripts_data = json.load(f) - except FileNotFoundError: - # Se il file non esiste, inizializzo la lista vuota - scripts_data = [] - - # cerco lo script con lo screen_id - for script in scripts_data: - if script["screen_id"] == screen_id: - # se trovo il match, aggiorno il titolo - script["titolo"] = titolo - # aggiorno il file json - with open(json_file, 'w') as f: - json.dump(scripts_data, f, indent=4) - #print(f"Titolo aggiornato per screen_id {screen_id}") - return - - # se non trovo nessuno script con lo screen_id - print(f"Screen_id {screen_id} non trovato.") - -# creo la funzione che elimina lo script con lo screen_id specificato -def deleteScriptId(screen_id): - # definisco il nome del file json - json_file = "scripts.json" - try: - # apro il file json - with open(json_file, 'r') as f: - scripts_data = json.load(f) - except FileNotFoundError: - # Se il file non esiste, inizializzo la lista vuota - scripts_data = [] - - # cerco lo script con lo screen_id - for script in scripts_data: - if script["screen_id"] == screen_id: - # se trovo il match, elimino lo script - scripts_data.remove(script) - # aggiorno il file json - with open(json_file, 'w') as f: - json.dump(scripts_data, f, indent=4) - print(f"Script eliminato per screen_id {screen_id}") - return - - # se non trovo nessuno script con lo screen_id - print(f"Screen_id {screen_id} non trovato.") \ No newline at end of file diff --git a/telegram_bot.py b/telegram_bot.py deleted file mode 100644 index 7ea8dfb..0000000 --- a/telegram_bot.py +++ /dev/null @@ -1,537 +0,0 @@ -import telebot -from telebot import types -import time -import uuid -import subprocess -import os, re, sys -import json -from request_manager import RequestManager -import threading - -# Funzione per caricare variabili da un file .env -def load_env(file_path=".env"): - if os.path.exists(file_path): - with open(file_path) as f: - for line in f: - if line.strip() and not line.startswith("#"): - key, value = line.strip().split("=", 1) - os.environ[key] = value - -# Carica le variabili -load_env() - -class TelegramBot: - _instance = None - _config_file = "bot_config.json" - - @classmethod - def get_instance(cls): - if cls._instance is None: - # Prova a caricare la configurazione e inizializzare il bot - if os.path.exists(cls._config_file): - with open(cls._config_file, 'r') as f: - config = json.load(f) - cls._instance = cls.init_bot(config['token'], config['authorized_user_id']) - else: - raise Exception("Bot non ancora inizializzato. Chiamare prima init_bot() con token e authorized_user_id") - return cls._instance - - @classmethod - def init_bot(cls, token, authorized_user_id): - if cls._instance is None: - cls._instance = cls(token, authorized_user_id) - # Salva la configurazione - config = { - 'token': token, - 'authorized_user_id': authorized_user_id - } - with open(cls._config_file, 'w') as f: - json.dump(config, f) - return cls._instance - - def __init__(self, token, authorized_user_id): - - def monitor_scripts(): - while True: - try: - with open("scripts.json", "r") as f: - scripts_data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - scripts_data = [] - - current_time = time.time() - - # Crea una nuova lista senza gli script che sono scaduti - scripts_data_to_save = [] - - for script in scripts_data: - if "titolo" not in script and script["status"] == "running" and (current_time - script["start_time"]) > 600: - # Prova a terminare la sessione screen - try: - subprocess.check_output(["screen", "-S", script["screen_id"], "-X", "quit"]) - print(f"✅ La sessione screen con ID {script['screen_id']} è stata fermata automaticamente.") - except subprocess.CalledProcessError: - print(f"⚠️ Impossibile fermare la sessione screen con ID {script['screen_id']}.") - - # Aggiungi solo gli script che non sono scaduti - print(f"⚠️ Lo script con ID {script['screen_id']} ha superato i 10 minuti e verrà rimosso.") - else: - scripts_data_to_save.append(script) - - # Salva la lista aggiornata, senza gli script scaduti - with open("scripts.json", "w") as f: - json.dump(scripts_data_to_save, f, indent=4) - - time.sleep(60) # Controlla ogni minuto - - # Avvia il thread di monitoraggio - monitor_thread = threading.Thread(target=monitor_scripts, daemon=True) - monitor_thread.start() - - - if TelegramBot._instance is not None: - raise Exception("Questa classe è un singleton! Usa get_instance() per ottenere l'istanza.") - - self.token = token - self.authorized_user_id = authorized_user_id - self.chat_id = authorized_user_id - self.bot = telebot.TeleBot(token) - self.request_manager = RequestManager() - - # Registra gli handler - self.register_handlers() - - def register_handlers(self): - - """ @self.bot.message_handler(commands=['start']) - def start(message): - self.handle_start(message) """ - - @self.bot.message_handler(commands=['get_id']) - def get_id(message): - self.handle_get_id(message) - - @self.bot.message_handler(commands=['start']) - def start_script(message): - self.handle_start_script(message) - - @self.bot.message_handler(commands=['list']) - def list_scripts(message): - self.handle_list_scripts(message) - - @self.bot.message_handler(commands=['stop']) - def stop_script(message): - self.handle_stop_script(message) - - @self.bot.message_handler(commands=['screen']) - def screen_status(message): - self.handle_screen_status(message) - - """ @self.bot.message_handler(commands=['replay']) - def send_welcome(message): - # Crea una tastiera personalizzata - markup = types.ReplyKeyboardMarkup(row_width=2) - itembtn1 = types.KeyboardButton('Start') - itembtn2 = types.KeyboardButton('Lista') - markup.add(itembtn1, itembtn2) - - # Invia un messaggio con la tastiera - self.bot.send_message(message.chat.id, "Scegli un'opzione:", reply_markup=markup) """ - - """@self.bot.message_handler(commands=['inline']) - def send_welcome(message): - # Crea una tastiera inline - markup = types.InlineKeyboardMarkup() - itembtn1 = types.InlineKeyboardButton('Azione 1', callback_data='action1') - itembtn2 = types.InlineKeyboardButton('Azione 2', callback_data='action2') - itembtn3 = types.InlineKeyboardButton('Azione 3', callback_data='action3') - markup.add(itembtn1, itembtn2, itembtn3) - - # Invia un messaggio con la tastiera inline - self.bot.send_message(message.chat.id, "Scegli un'opzione:", reply_markup=markup) - - # Gestisce le callback delle tastiere inline - @self.bot.callback_query_handler(func=lambda call: True) - def handle_callback(call): - if call.data == 'action1': - self.bot.answer_callback_query(call.id, "Hai scelto Azione 1!") - self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 1!") - elif call.data == 'action2': - self.bot.answer_callback_query(call.id, "Hai scelto Azione 2!") - self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 2!") - elif call.data == 'action3': - self.bot.answer_callback_query(call.id, "Hai scelto Azione 3!") - self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 3!") - """ - - @self.bot.message_handler(func=lambda message: True) - def handle_all_messages(message): - self.handle_response(message) - - def is_authorized(self, user_id): - return user_id == self.authorized_user_id - - def handle_get_id(self, message): - if not self.is_authorized(message.from_user.id): - print(f"❌ Non sei autorizzato.") - self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.") - return - - print(f"Il tuo ID utente è: `{message.from_user.id}`") - self.bot.send_message(message.chat.id, f"Il tuo ID utente è: `{message.from_user.id}`", parse_mode="Markdown") - - def handle_start_script(self, message): - if not self.is_authorized(message.from_user.id): - print(f"❌ Non sei autorizzato. {message.from_user.id}") - self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.") - return - - screen_id = str(uuid.uuid4())[:8] - #screen_id = '0000' - - # Impostare a True per avviare il test_run.py in modalità verbose per il debug - - debug_mode = os.getenv("DEBUG") - verbose = debug_mode - - if debug_mode == "True": - subprocess.Popen(["python3", "test_run.py", screen_id]) - else: - # Verifica se lo screen con il nome esiste già - try: - subprocess.check_output(["screen", "-list"]) - existing_screens = subprocess.check_output(["screen", "-list"]).decode('utf-8') - if screen_id in existing_screens: - print(f"⚠️ Lo script con ID {screen_id} è già in esecuzione.") - self.bot.send_message(message.chat.id, f"⚠️ Lo script con ID {screen_id} è già in esecuzione.") - return - except subprocess.CalledProcessError: - pass # Se il comando fallisce, significa che non ci sono screen attivi. - - # Crea la sessione screen e avvia lo script al suo interno - command = ["screen", "-dmS", screen_id, "python3", "test_run.py", screen_id] - - # Avvia il comando tramite subprocess - subprocess.Popen(command) - - # Creazione oggetto script info - script_info = { - "screen_id": screen_id, - "start_time": time.time(), - "status": "running", - "user_id": message.from_user.id - } - - # Salvataggio nel file JSON - json_file = "scripts.json" - - # Carica i dati esistenti o crea una nuova lista - try: - with open(json_file, 'r') as f: - scripts_data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - scripts_data = [] - - # Aggiungi il nuovo script - scripts_data.append(script_info) - - # Scrivi il file aggiornato - with open(json_file, 'w') as f: - json.dump(scripts_data, f, indent=4) - - def handle_list_scripts(self, message): - if not self.is_authorized(message.from_user.id): - print(f"❌ Non sei autorizzato.") - self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.") - return - - try: - with open("scripts.json", "r") as f: - scripts_data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - scripts_data = [] - - if not scripts_data: - print(f"⚠️ Nessuno script registrato.") - self.bot.send_message(message.chat.id, "⚠️ Nessuno script registrato.") - return - - current_time = time.time() - msg = ["🖥️ **Script Registrati:**\n"] - - for script in scripts_data: - # Calcola la durata - duration = current_time - script["start_time"] - if "end_time" in script: - duration = script["end_time"] - script["start_time"] - - # Formatta la durata - hours, rem = divmod(duration, 3600) - minutes, seconds = divmod(rem, 60) - duration_str = f"{int(hours)}h {int(minutes)}m {int(seconds)}s" - - # Icona stato - status_icons = { - "running": "🟢", - "stopped": "🔴", - "completed": "⚪" - } - - # Costruisci riga - line = ( - f"• ID: `{script['screen_id']}`\n" - f"• Stato: {status_icons.get(script['status'], '⚫')}\n" - f"• Stop: `/stop {script['screen_id']}`\n" - f"• Screen: `/screen {script['screen_id']}`\n" - f"• Durata: {duration_str}\n" - f"• Download:\n{script.get('titolo', 'N/A')}\n" - ) - msg.append(line) - - # Formatta la risposta finale - final_msg = "\n".join(msg) - if len(final_msg) > 4000: - final_msg = final_msg[:4000] + "\n[...] (messaggio troncato)" - - print(f"{final_msg}") - self.bot.send_message(message.chat.id, final_msg, parse_mode="Markdown") - - def handle_stop_script(self, message): - if not self.is_authorized(message.from_user.id): - print(f"❌ Non sei autorizzato.") - self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.") - return - - parts = message.text.split() - if len(parts) < 2: - try: - with open("scripts.json", "r") as f: - scripts_data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - scripts_data = [] - - running_scripts = [s for s in scripts_data if s["status"] == "running"] - - if not running_scripts: - print(f"⚠️ Nessuno script attivo da fermare.") - self.bot.send_message(message.chat.id, "⚠️ Nessuno script attivo da fermare.") - return - - msg = "🖥️ **Script Attivi:**\n" - for script in running_scripts: - msg += f"🔹 `/stop {script['screen_id']}` per fermarlo\n" - - print(f"{msg}") - self.bot.send_message(message.chat.id, msg, parse_mode="Markdown") - - elif len(parts) == 2: - screen_id = parts[1] - - try: - with open("scripts.json", "r") as f: - scripts_data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - scripts_data = [] - - # Filtra la lista eliminando lo script con l'ID specificato - new_scripts_data = [script for script in scripts_data if script["screen_id"] != screen_id] - - if len(new_scripts_data) == len(scripts_data): - # Nessun elemento rimosso, quindi ID non trovato - print(f"⚠️ Nessuno script attivo con ID `{screen_id}`.") - self.bot.send_message(message.chat.id, f"⚠️ Nessuno script attivo con ID `{screen_id}`.", parse_mode="Markdown") - return - - # Terminare la sessione screen - try: - subprocess.check_output(["screen", "-S", screen_id, "-X", "quit"]) - print(f"✅ La sessione screen con ID {screen_id} è stata fermata.") - except subprocess.CalledProcessError: - print(f"⚠️ Impossibile fermare la sessione screen con ID `{screen_id}`.") - self.bot.send_message(message.chat.id, f"⚠️ Impossibile fermare la sessione screen con ID `{screen_id}`.", parse_mode="Markdown") - return - - # Salva la lista aggiornata senza lo script eliminato - with open("scripts.json", "w") as f: - json.dump(new_scripts_data, f, indent=4) - - print(f"✅ Script `{screen_id}` terminato con successo!") - self.bot.send_message(message.chat.id, f"✅ Script `{screen_id}` terminato con successo!", parse_mode="Markdown") - - def handle_response(self, message): - """ if message.text == 'Start': - self.handle_start_script(self, message) - elif message.text == 'Lista': - self.handle_list_scripts(self, message) - elif message.text == 'Azione 3': - self.bot.send_message(message.chat.id, "Hai scelto Azione 3!") - else: - self.bot.send_message(message.chat.id, "Comando non riconosciuto.") - - if not self.is_authorized(message.from_user.id): - self.bot.reply_to(message, "❌ Non sei autorizzato.") - return """ - - text = message.text - if self.request_manager.save_response(text): - print(f"📥 Risposta salvata correttamente per il tipo {text}") - else: - print("⚠️ Nessuna richiesta attiva.") - self.bot.reply_to(message, "❌ Nessuna richiesta attiva.") - - def handle_screen_status(self, message): - command_parts = message.text.split() - if len(command_parts) < 2: - print(f"⚠️ ID mancante nel comando. Usa: /screen ") - self.bot.send_message(message.chat.id, "⚠️ ID mancante nel comando. Usa: /screen ") - return - - screen_id = command_parts[1] - temp_file = f"/tmp/screen_output_{screen_id}.txt" - - try: - # Cattura l'output della screen - subprocess.run(["screen", "-X", "-S", screen_id, "hardcopy", "-h", temp_file], check=True) - except subprocess.CalledProcessError as e: - print(f"❌ Errore durante la cattura dell'output della screen: {e}") - self.bot.send_message(message.chat.id, f"❌ Errore durante la cattura dell'output della screen: {e}") - return - - if not os.path.exists(temp_file): - print(f"❌ Impossibile catturare l'output della screen.") - self.bot.send_message(message.chat.id, f"❌ Impossibile catturare l'output della screen.") - return - - try: - # Leggi il file con la codifica corretta - with open(temp_file, 'r', encoding='latin-1') as file: - screen_output = file.read() - - # Pulisci l'output - cleaned_output = re.sub(r'[\x00-\x1F\x7F]', '', screen_output) # Rimuovi caratteri di controllo - cleaned_output = cleaned_output.replace('\n\n', '\n') # Rimuovi newline multipli - - # Estrarre tutte le parti da "Download:" fino a "Video" o "Subtitle", senza includerli - download_matches = re.findall(r"Download: (.*?)(?:Video|Subtitle)", cleaned_output) - if download_matches: - # Serie TV e Film StreamingCommunity - - proc_matches = re.findall(r"Proc: ([\d\.]+%)", cleaned_output) - - # Creare una stringa unica con tutti i risultati - result_string = "\n".join([f"Download: {download_matches[i].strip()}\nDownload al {proc_matches[i]}" for i in range(len(download_matches)) if i < len(proc_matches)]) - - if result_string != "": - cleaned_output = result_string - else: - print(f"❌ La parola 'Download:' non è stata trovata nella stringa.") - else: - - download_list = [] - - # Estrai tutte le righe che iniziano con "Download:" fino al prossimo "Download" o alla fine della riga - matches = re.findall(r"Download:\s*(.*?)(?=Download|$)", cleaned_output) - - # Se sono stati trovati download, stampali - if matches: - for i, match in enumerate(matches, 1): - # rimuovo solo la parte "downloader.py:57Result:400" se esiste - match = re.sub(r"downloader.py:\d+Result:400", "", match) - match = match.strip() # Rimuovo gli spazi bianchi in eccesso - if match: # Assicurati che la stringa non sia vuota - print(f"Download {i}: {match}") - - # Aggiungi il risultato modificato alla lista - download_list.append(f"Download {i}: {match}") - - # Creare una stringa unica con tutti i risultati - cleaned_output = "\n".join(download_list) - else: - print("❌ Nessun download trovato") - - # Invia l'output pulito - print(f"📄 Output della screen {screen_id}:\n{cleaned_output}") - self._send_long_message(message.chat.id, f"📄 Output della screen {screen_id}:\n{cleaned_output}") - - except Exception as e: - print(f"❌ Errore durante la lettura o l'invio dell'output della screen: {e}") - self.bot.send_message(message.chat.id, f"❌ Errore durante la lettura o l'invio dell'output della screen: {e}") - - # Cancella il file temporaneo - os.remove(temp_file) - - def send_message(self, message, choices): - if choices is None: - if self.chat_id: - print(f"{message}") - self.bot.send_message(self.chat_id, message) - else: - formatted_choices = "\n".join(choices) - message = f"{message}\n\n{formatted_choices}" - if self.chat_id: - print(f"{message}") - self.bot.send_message(self.chat_id, message) - - def _send_long_message(self, chat_id, text, chunk_size=4096): - """Suddivide e invia un messaggio troppo lungo in più parti.""" - for i in range(0, len(text), chunk_size): - print(f"{text[i:i+chunk_size]}") - self.bot.send_message(chat_id, text[i:i+chunk_size]) - - def ask(self, type, prompt_message, choices, timeout=60): - self.request_manager.create_request(type) - - if choices is None: - print(f"{prompt_message}") - self.bot.send_message( - self.chat_id, - f"{prompt_message}", - ) - else: - print(f"{prompt_message}\n\nOpzioni: {', '.join(choices)}") - self.bot.send_message( - self.chat_id, - f"{prompt_message}\n\nOpzioni: {', '.join(choices)}", - ) - - start_time = time.time() - while time.time() - start_time < timeout: - response = self.request_manager.get_response() - if response is not None: - return response - time.sleep(1) - - print(f"⚠️ Timeout: nessuna risposta ricevuta.") - self.bot.send_message(self.chat_id, "⚠️ Timeout: nessuna risposta ricevuta.") - self.request_manager.clear_file() - return None - - def run(self): - print("🚀 Avvio del bot...") - # svuoto il file scripts.json - with open("scripts.json", "w") as f: - json.dump([], f) - self.bot.infinity_polling() - -def get_bot_instance(): - return TelegramBot.get_instance() - -# Esempio di utilizzo -if __name__ == "__main__": - - # Usa le variabili - token = os.getenv("TOKEN_TELEGRAM") - authorized_user_id = os.getenv("AUTHORIZED_USER_ID") - - TOKEN = token # Inserisci il token del tuo bot Telegram sul file .env - AUTHORIZED_USER_ID = int(authorized_user_id) # Inserisci il tuo ID utente Telegram sul file .env - - # Inizializza il bot - bot = TelegramBot.init_bot(TOKEN, AUTHORIZED_USER_ID) - bot.run() - -""" -start - Avvia lo script -list - Lista script attivi -get - Mostra ID utente Telegram -"""