eliminato file nella root inutili e fixato bug

This commit is contained in:
GiuDev 2025-02-04 16:21:58 +01:00
parent 80aefb4fba
commit 9f616b11bf
27 changed files with 146 additions and 971 deletions

2
.gitignore vendored
View File

@ -49,5 +49,5 @@ note.txt
list_proxy.txt
cmd.txt
bot_config.json
script.json
scripts.json
active_requests.json

View File

@ -12,7 +12,7 @@ from .site import title_search, run_get_select_title, media_search_manager
from .title import download_title
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
import sys

View File

@ -18,7 +18,7 @@ from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -24,8 +24,8 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
from .costant import DOMAIN_NOW, SITE_NAME, MOVIE_FOLDER
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -12,7 +12,7 @@ from .site import title_search, run_get_select_title, media_search_manager
from .film import download_film
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -25,8 +25,8 @@ from StreamingCommunity.Api.Player.supervideo import VideoSource
from .costant import MOVIE_FOLDER
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -26,7 +26,7 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
disable_searchDomain = config_manager.get_bool("DEFAULT", "disable_searchDomain")
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -12,7 +12,7 @@ from .site import title_search, run_get_select_title, media_search_manager
from .film_serie import download_film, download_series
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
import sys

View File

@ -27,8 +27,8 @@ from .costant import SITE_NAME, ANIME_FOLDER, MOVIE_FOLDER
KILL_HANDLER = bool(False)
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -20,7 +20,7 @@ from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -15,7 +15,7 @@ from .series import download_series
# Telegram bot instance
from StreamingCommunity.Util._jsonConfig import config_manager
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
# Variable

View File

@ -25,8 +25,8 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource
from .costant import SITE_NAME, MOVIE_FOLDER
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -27,8 +27,8 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource
from .costant import SITE_NAME, SERIES_FOLDER
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -28,7 +28,7 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
from .costant import SITE_NAME, DOMAIN_NOW
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
# Variable

View File

View File

@ -0,0 +1,91 @@
{
"DEFAULT": {
"debug": false,
"log_file": "app.log",
"log_to_file": true,
"show_message": true,
"clean_console": true,
"root_path": "Video",
"movie_folder_name": "Movie",
"serie_folder_name": "TV",
"anime_folder_name": "Anime",
"map_episode_name": "E%(episode)_%(episode_name)",
"config_qbit_tor": {
"host": "192.168.1.99",
"port": "7060",
"user": "admin",
"pass": "adminadmin"
},
"add_siteName": false,
"disable_searchDomain": false,
"not_close": false,
"telegram_bot": true
},
"REQUESTS": {
"timeout": 30,
"max_retry": 8,
"proxy_start_min": 0.1,
"proxy_start_max": 0.5
},
"M3U8_DOWNLOAD": {
"tqdm_delay": 0.12,
"default_video_workser": 12,
"default_audio_workser": 12,
"merge_audio": true,
"specific_list_audio": [
"ita"
],
"merge_subs": true,
"specific_list_subtitles": [
"eng",
"spa"
],
"cleanup_tmp_folder": true
},
"M3U8_CONVERSION": {
"use_codec": false,
"use_vcodec": true,
"use_acodec": true,
"use_bitrate": true,
"use_gpu": false,
"default_preset": "ultrafast"
},
"M3U8_PARSER": {
"force_resolution": -1,
"get_only_link": false
},
"SITE": {
"streamingcommunity": {
"domain": "paris"
},
"altadefinizionegratis": {
"domain": "site"
},
"guardaserie": {
"domain": "meme"
},
"mostraguarda": {
"domain": "stream"
},
"ddlstreamitaly": {
"domain": "co",
"extra": {
"ips4_device_key": "",
"ips4_member_id": "",
"ips4_login_key": ""
}
},
"animeunity": {
"domain": "so"
},
"cb01new": {
"domain": "mobi"
},
"1337xx": {
"domain": "to"
},
"ilcorsaronero": {
"domain": "link"
}
}
}

View File

@ -0,0 +1,14 @@
httpx
bs4
rich
tqdm
m3u8
psutil
unidecode
jsbeautifier
pathvalidate
pycryptodomex
googlesearch-python
fake-useragent
qbittorrent-api
python-qbittorrent

View File

@ -5,11 +5,12 @@ import uuid
import subprocess
import os, re, sys
import json
from request_manager import RequestManager
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from StreamingCommunity.HelpTg.request_manager import RequestManager
import threading
# Funzione per caricare variabili da un file .env
def load_env(file_path=".env"):
def load_env(file_path="../../.env"):
if os.path.exists(file_path):
with open(file_path) as f:
for line in f:
@ -22,7 +23,7 @@ load_env()
class TelegramBot:
_instance = None
_config_file = "bot_config.json"
_config_file = "../../bot_config.json"
@classmethod
def get_instance(cls):
@ -54,7 +55,7 @@ class TelegramBot:
def monitor_scripts():
while True:
try:
with open("scripts.json", "r") as f:
with open("../../scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
@ -79,7 +80,7 @@ class TelegramBot:
scripts_data_to_save.append(script)
# Salva la lista aggiornata, senza gli script scaduti
with open("scripts.json", "w") as f:
with open("../../scripts.json", "w") as f:
json.dump(scripts_data_to_save, f, indent=4)
time.sleep(60) # Controlla ogni minuto
@ -195,7 +196,7 @@ class TelegramBot:
verbose = debug_mode
if debug_mode == "True":
subprocess.Popen(["python3", "test_run.py", screen_id])
subprocess.Popen(["python3", "../../test_run.py", screen_id])
else:
# Verifica se lo screen con il nome esiste già
try:
@ -209,7 +210,7 @@ class TelegramBot:
pass # Se il comando fallisce, significa che non ci sono screen attivi.
# Crea la sessione screen e avvia lo script al suo interno
command = ["screen", "-dmS", screen_id, "python3", "test_run.py", screen_id]
command = ["screen", "-dmS", screen_id, "python3", "../../test_run.py", screen_id]
# Avvia il comando tramite subprocess
subprocess.Popen(command)
@ -223,7 +224,7 @@ class TelegramBot:
}
# Salvataggio nel file JSON
json_file = "scripts.json"
json_file = "../../scripts.json"
# Carica i dati esistenti o crea una nuova lista
try:
@ -246,7 +247,7 @@ class TelegramBot:
return
try:
with open("scripts.json", "r") as f:
with open("../../scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
@ -305,7 +306,7 @@ class TelegramBot:
parts = message.text.split()
if len(parts) < 2:
try:
with open("scripts.json", "r") as f:
with open("../../scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
@ -328,7 +329,7 @@ class TelegramBot:
screen_id = parts[1]
try:
with open("scripts.json", "r") as f:
with open("../../scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
@ -352,7 +353,7 @@ class TelegramBot:
return
# Salva la lista aggiornata senza lo script eliminato
with open("scripts.json", "w") as f:
with open("../../scripts.json", "w") as f:
json.dump(new_scripts_data, f, indent=4)
print(f"✅ Script `{screen_id}` terminato con successo!")
@ -508,8 +509,8 @@ class TelegramBot:
def run(self):
print("🚀 Avvio del bot...")
# svuoto il file scripts.json
with open("scripts.json", "w") as f:
# svuoto il file ../../scripts.json
with open("../../scripts.json", "w") as f:
json.dump([], f)
self.bot.infinity_polling()

View File

@ -38,7 +38,7 @@ from ...M3U8 import (
from .segments import M3U8_Segments
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
# Config

View File

@ -30,7 +30,7 @@ import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -19,7 +19,7 @@ from .message import start_message
from .call_stack import get_call_stack
# Telegram bot instance
StreamingCommunity.HelpTg. import get_bot_instance
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
from StreamingCommunity.Util._jsonConfig import config_manager
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')

View File

@ -1,245 +0,0 @@
# 10.12.23
import os
import sys
import time
import glob
import logging
import platform
import argparse
import importlib
from typing import Callable
# Internal utilities
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Upload.update import update as git_update
from StreamingCommunity.Util.os import os_summary
from StreamingCommunity.Util.logger import Logger
# Config
CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close')
def run_function(func: Callable[..., None], close_console: bool = False) -> None:
"""
Run a given function indefinitely or once, depending on the value of close_console.
Parameters:
func (Callable[..., None]): The function to run.
close_console (bool, optional): Whether to close the console after running the function once. Defaults to False.
"""
if close_console:
while 1:
func()
else:
func()
def load_search_functions():
modules = []
loaded_functions = {}
# Find api home directory
if getattr(sys, 'frozen', False): # Modalità PyInstaller
base_path = os.path.join(sys._MEIPASS, "StreamingCommunity")
else:
base_path = os.path.dirname(__file__)
api_dir = os.path.join(base_path, 'Api', 'Site')
init_files = glob.glob(os.path.join(api_dir, '*', '__init__.py'))
# Retrieve modules and their indices
for init_file in init_files:
# Get folder name as module name
module_name = os.path.basename(os.path.dirname(init_file))
logging.info(f"Load module name: {module_name}")
try:
# Dynamically import the module
mod = importlib.import_module(f'StreamingCommunity.Api.Site.{module_name}')
# Get 'indice' from the module
indice = getattr(mod, 'indice', 0)
is_deprecate = bool(getattr(mod, '_deprecate', True))
use_for = getattr(mod, '_useFor', 'other')
if not is_deprecate:
modules.append((module_name, indice, use_for))
except Exception as e:
console.print(f"[red]Failed to import module {module_name}: {str(e)}")
# Sort modules by 'indice'
modules.sort(key=lambda x: x[1])
# Load search functions in the sorted order
for module_name, _, use_for in modules:
# Construct a unique alias for the module
module_alias = f'{module_name}_search'
try:
# Dynamically import the module
mod = importlib.import_module(f'StreamingCommunity.Api.Site.{module_name}')
# Get the search function from the module (assuming the function is named 'search' and defined in __init__.py)
search_function = getattr(mod, 'search')
# Add the function to the loaded functions dictionary
loaded_functions[module_alias] = (search_function, use_for)
except Exception as e:
console.print(f"[red]Failed to load search function from module {module_name}: {str(e)}")
return loaded_functions
def initialize():
# Get start message
start_message()
# Get system info
os_summary.get_system_summary()
# Set terminal size for win 7
if platform.system() == "Windows" and "7" in platform.version():
os.system('mode 120, 40')
# Check python version
if sys.version_info < (3, 7):
console.log("[red]Install python version > 3.7.16")
sys.exit(0)
# Attempting GitHub update
try:
git_update()
except:
console.log("[red]Error with loading github.")
def main():
start = time.time()
# Create logger
log_not = Logger()
initialize()
# Load search functions
search_functions = load_search_functions()
logging.info(f"Load module in: {time.time() - start} s")
# Create argument parser
parser = argparse.ArgumentParser(
description='Script to download movies and series from the internet. Use these commands to configure the script and control its behavior.'
)
# Add arguments for the main configuration parameters
parser.add_argument(
'--add_siteName', type=bool, help='Enable or disable adding the site name to the file name (e.g., true/false).'
)
parser.add_argument(
'--disable_searchDomain', type=bool, help='Enable or disable searching in configured domains (e.g., true/false).'
)
parser.add_argument(
'--not_close', type=bool, help='If set to true, the script will not close the console after execution (e.g., true/false).'
)
# Add arguments for M3U8 configuration
parser.add_argument(
'--default_video_worker', type=int, help='Number of workers for video during M3U8 download (default: 12).'
)
parser.add_argument(
'--default_audio_worker', type=int, help='Number of workers for audio during M3U8 download (default: 12).'
)
# Add options for audio and subtitles
parser.add_argument(
'--specific_list_audio', type=str, help='Comma-separated list of specific audio languages to download (e.g., ita,eng).'
)
parser.add_argument(
'--specific_list_subtitles', type=str, help='Comma-separated list of specific subtitle languages to download (e.g., eng,spa).'
)
# Add arguments for search functions
color_map = {
"anime": "red",
"film_serie": "yellow",
"film": "blue",
"serie": "green",
"other": "white"
}
# Add dynamic arguments based on loaded search modules
for alias, (_, use_for) in search_functions.items():
short_option = alias[:3].upper()
long_option = alias
parser.add_argument(f'-{short_option}', f'--{long_option}', action='store_true', help=f'Search for {alias.split("_")[0]} on streaming platforms.')
# Parse command-line arguments
args = parser.parse_args()
# Map command-line arguments to the config values
config_updates = {}
if args.add_siteName is not None:
config_updates['DEFAULT.add_siteName'] = args.add_siteName
if args.disable_searchDomain is not None:
config_updates['DEFAULT.disable_searchDomain'] = args.disable_searchDomain
if args.not_close is not None:
config_updates['DEFAULT.not_close'] = args.not_close
if args.default_video_worker is not None:
config_updates['M3U8_DOWNLOAD.default_video_worker'] = args.default_video_worker
if args.default_audio_worker is not None:
config_updates['M3U8_DOWNLOAD.default_audio_worker'] = args.default_audio_worker
if args.specific_list_audio is not None:
config_updates['M3U8_DOWNLOAD.specific_list_audio'] = args.specific_list_audio.split(',')
if args.specific_list_subtitles is not None:
config_updates['M3U8_DOWNLOAD.specific_list_subtitles'] = args.specific_list_subtitles.split(',')
# Apply the updates to the config file
for key, value in config_updates.items():
section, option = key.split('.')
config_manager.set_key(section, option, value)
config_manager.write_config()
# Map command-line arguments to functions
arg_to_function = {alias: func for alias, (func, _) in search_functions.items()}
# Check which argument is provided and run the corresponding function
for arg, func in arg_to_function.items():
if getattr(args, arg):
run_function(func)
return
# Mapping user input to functions
input_to_function = {str(i): func for i, (alias, (func, _)) in enumerate(search_functions.items())}
# Create dynamic prompt message and choices
choice_labels = {str(i): (alias.split("_")[0].capitalize(), use_for) for i, (alias, (_, use_for)) in enumerate(search_functions.items())}
# Display the category legend in a single line
legend_text = " | ".join([f"[{color}]{category.capitalize()}[/{color}]" for category, color in color_map.items()])
console.print(f"\n[bold green]Category Legend:[/bold green] {legend_text}")
# Construct the prompt message with color-coded site names
prompt_message = "[green]Insert category [white](" + ", ".join(
[f"{key}: [{color_map[label[1]]}]{label[0]}[/{color_map[label[1]]}]" for key, label in choice_labels.items()]
) + "[white])"
# Ask the user for input
category = msg.ask(prompt_message, choices=list(choice_labels.keys()), default="0", show_choices=False, show_default=False)
# Run the corresponding function based on user input
if category in input_to_function:
run_function(input_to_function[category])
else:
console.print("[red]Invalid category.")
sys.exit(0)

View File

@ -26,7 +26,7 @@ from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
# Config
CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close')
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
from session import get_session, deleteScriptId
from StreamingCommunity.HelpTg.session import get_session, deleteScriptId
def run_function(func: Callable[..., None], close_console: bool = False) -> None:

View File

@ -1,79 +0,0 @@
import json
import time
from typing import Optional
class RequestManager:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, json_file: str = "active_requests.json"):
if not hasattr(self, 'initialized'):
self.json_file = json_file
self.initialized = True
self.on_response_callback = None # Aggiungi un campo per il callback
def create_request(self, type: str) -> str:
request_data = {
"type": type,
"response": None,
"timestamp": time.time()
}
# Aggiungi il tipo al salvataggio della richiesta
with open(self.json_file, "w") as f:
json.dump(request_data, f)
return "Ok"
def save_response(self, message_text: str) -> bool:
try:
# Carica il file JSON
with open(self.json_file, "r") as f:
data = json.load(f)
# Controlla se esiste la chiave 'type' e se la risposta è presente
if "type" in data and "response" in data:
data["response"] = message_text # Aggiorna la risposta
# Scrivi il file JSON aggiornato
with open(self.json_file, "w") as f:
json.dump(data, f, indent=4) # Formatta il file JSON
return True
else:
return False
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"⚠️ save_response - errore: {e}")
return False
def get_response(self) -> Optional[str]:
try:
with open(self.json_file, "r") as f:
data = json.load(f)
# Verifica se esiste la chiave "response"
if "response" in data:
response = data["response"] # Ottieni la risposta direttamente
if response is not None and self.on_response_callback:
# Se la risposta è disponibile, chiama il callback
self.on_response_callback(response)
return response
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"get_response - errore: {e}")
return None
def clear_file(self) -> bool:
try:
# Svuota il file JSON scrivendo un oggetto vuoto
with open(self.json_file, "w") as f:
json.dump({}, f)
print(f"File {self.json_file} è stato svuotato con successo.")
return True
except Exception as e:
print(f"⚠️ clear_file - errore: {e}")
return False

View File

@ -1,16 +1,8 @@
[
{
"screen_id": "0e830205",
"start_time": 1738674260.611711,
"screen_id": "cfd83841",
"start_time": 1738682482.202467,
"status": "running",
"user_id": 676749122,
"titolo": "Doctor Strange"
},
{
"screen_id": "29057a57",
"start_time": 1738674321.6443458,
"status": "running",
"user_id": 676749122,
"titolo": "Guardiani della Galassia Vol. 2"
"user_id": 676749122
}
]

View File

@ -1,62 +0,0 @@
import json
session_data = {}
def set_session(value):
# salvo script_id in session_data
session_data['script_id'] = value
def get_session():
# controllo se script_id è presente in session_data
return session_data.get('script_id', 'unknown')
def updateScriptId(screen_id, titolo):
# definisco il nome del file json
json_file = "scripts.json"
try:
# apro il file json
with open(json_file, 'r') as f:
scripts_data = json.load(f)
except FileNotFoundError:
# Se il file non esiste, inizializzo la lista vuota
scripts_data = []
# cerco lo script con lo screen_id
for script in scripts_data:
if script["screen_id"] == screen_id:
# se trovo il match, aggiorno il titolo
script["titolo"] = titolo
# aggiorno il file json
with open(json_file, 'w') as f:
json.dump(scripts_data, f, indent=4)
#print(f"Titolo aggiornato per screen_id {screen_id}")
return
# se non trovo nessuno script con lo screen_id
print(f"Screen_id {screen_id} non trovato.")
# creo la funzione che elimina lo script con lo screen_id specificato
def deleteScriptId(screen_id):
# definisco il nome del file json
json_file = "scripts.json"
try:
# apro il file json
with open(json_file, 'r') as f:
scripts_data = json.load(f)
except FileNotFoundError:
# Se il file non esiste, inizializzo la lista vuota
scripts_data = []
# cerco lo script con lo screen_id
for script in scripts_data:
if script["screen_id"] == screen_id:
# se trovo il match, elimino lo script
scripts_data.remove(script)
# aggiorno il file json
with open(json_file, 'w') as f:
json.dump(scripts_data, f, indent=4)
print(f"Script eliminato per screen_id {screen_id}")
return
# se non trovo nessuno script con lo screen_id
print(f"Screen_id {screen_id} non trovato.")

View File

@ -1,537 +0,0 @@
import telebot
from telebot import types
import time
import uuid
import subprocess
import os, re, sys
import json
from request_manager import RequestManager
import threading
# Funzione per caricare variabili da un file .env
def load_env(file_path=".env"):
if os.path.exists(file_path):
with open(file_path) as f:
for line in f:
if line.strip() and not line.startswith("#"):
key, value = line.strip().split("=", 1)
os.environ[key] = value
# Carica le variabili
load_env()
class TelegramBot:
_instance = None
_config_file = "bot_config.json"
@classmethod
def get_instance(cls):
if cls._instance is None:
# Prova a caricare la configurazione e inizializzare il bot
if os.path.exists(cls._config_file):
with open(cls._config_file, 'r') as f:
config = json.load(f)
cls._instance = cls.init_bot(config['token'], config['authorized_user_id'])
else:
raise Exception("Bot non ancora inizializzato. Chiamare prima init_bot() con token e authorized_user_id")
return cls._instance
@classmethod
def init_bot(cls, token, authorized_user_id):
if cls._instance is None:
cls._instance = cls(token, authorized_user_id)
# Salva la configurazione
config = {
'token': token,
'authorized_user_id': authorized_user_id
}
with open(cls._config_file, 'w') as f:
json.dump(config, f)
return cls._instance
def __init__(self, token, authorized_user_id):
def monitor_scripts():
while True:
try:
with open("scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
current_time = time.time()
# Crea una nuova lista senza gli script che sono scaduti
scripts_data_to_save = []
for script in scripts_data:
if "titolo" not in script and script["status"] == "running" and (current_time - script["start_time"]) > 600:
# Prova a terminare la sessione screen
try:
subprocess.check_output(["screen", "-S", script["screen_id"], "-X", "quit"])
print(f"✅ La sessione screen con ID {script['screen_id']} è stata fermata automaticamente.")
except subprocess.CalledProcessError:
print(f"⚠️ Impossibile fermare la sessione screen con ID {script['screen_id']}.")
# Aggiungi solo gli script che non sono scaduti
print(f"⚠️ Lo script con ID {script['screen_id']} ha superato i 10 minuti e verrà rimosso.")
else:
scripts_data_to_save.append(script)
# Salva la lista aggiornata, senza gli script scaduti
with open("scripts.json", "w") as f:
json.dump(scripts_data_to_save, f, indent=4)
time.sleep(60) # Controlla ogni minuto
# Avvia il thread di monitoraggio
monitor_thread = threading.Thread(target=monitor_scripts, daemon=True)
monitor_thread.start()
if TelegramBot._instance is not None:
raise Exception("Questa classe è un singleton! Usa get_instance() per ottenere l'istanza.")
self.token = token
self.authorized_user_id = authorized_user_id
self.chat_id = authorized_user_id
self.bot = telebot.TeleBot(token)
self.request_manager = RequestManager()
# Registra gli handler
self.register_handlers()
def register_handlers(self):
""" @self.bot.message_handler(commands=['start'])
def start(message):
self.handle_start(message) """
@self.bot.message_handler(commands=['get_id'])
def get_id(message):
self.handle_get_id(message)
@self.bot.message_handler(commands=['start'])
def start_script(message):
self.handle_start_script(message)
@self.bot.message_handler(commands=['list'])
def list_scripts(message):
self.handle_list_scripts(message)
@self.bot.message_handler(commands=['stop'])
def stop_script(message):
self.handle_stop_script(message)
@self.bot.message_handler(commands=['screen'])
def screen_status(message):
self.handle_screen_status(message)
""" @self.bot.message_handler(commands=['replay'])
def send_welcome(message):
# Crea una tastiera personalizzata
markup = types.ReplyKeyboardMarkup(row_width=2)
itembtn1 = types.KeyboardButton('Start')
itembtn2 = types.KeyboardButton('Lista')
markup.add(itembtn1, itembtn2)
# Invia un messaggio con la tastiera
self.bot.send_message(message.chat.id, "Scegli un'opzione:", reply_markup=markup) """
"""@self.bot.message_handler(commands=['inline'])
def send_welcome(message):
# Crea una tastiera inline
markup = types.InlineKeyboardMarkup()
itembtn1 = types.InlineKeyboardButton('Azione 1', callback_data='action1')
itembtn2 = types.InlineKeyboardButton('Azione 2', callback_data='action2')
itembtn3 = types.InlineKeyboardButton('Azione 3', callback_data='action3')
markup.add(itembtn1, itembtn2, itembtn3)
# Invia un messaggio con la tastiera inline
self.bot.send_message(message.chat.id, "Scegli un'opzione:", reply_markup=markup)
# Gestisce le callback delle tastiere inline
@self.bot.callback_query_handler(func=lambda call: True)
def handle_callback(call):
if call.data == 'action1':
self.bot.answer_callback_query(call.id, "Hai scelto Azione 1!")
self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 1!")
elif call.data == 'action2':
self.bot.answer_callback_query(call.id, "Hai scelto Azione 2!")
self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 2!")
elif call.data == 'action3':
self.bot.answer_callback_query(call.id, "Hai scelto Azione 3!")
self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 3!")
"""
@self.bot.message_handler(func=lambda message: True)
def handle_all_messages(message):
self.handle_response(message)
def is_authorized(self, user_id):
return user_id == self.authorized_user_id
def handle_get_id(self, message):
if not self.is_authorized(message.from_user.id):
print(f"❌ Non sei autorizzato.")
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
return
print(f"Il tuo ID utente è: `{message.from_user.id}`")
self.bot.send_message(message.chat.id, f"Il tuo ID utente è: `{message.from_user.id}`", parse_mode="Markdown")
def handle_start_script(self, message):
if not self.is_authorized(message.from_user.id):
print(f"❌ Non sei autorizzato. {message.from_user.id}")
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
return
screen_id = str(uuid.uuid4())[:8]
#screen_id = '0000'
# Impostare a True per avviare il test_run.py in modalità verbose per il debug
debug_mode = os.getenv("DEBUG")
verbose = debug_mode
if debug_mode == "True":
subprocess.Popen(["python3", "test_run.py", screen_id])
else:
# Verifica se lo screen con il nome esiste già
try:
subprocess.check_output(["screen", "-list"])
existing_screens = subprocess.check_output(["screen", "-list"]).decode('utf-8')
if screen_id in existing_screens:
print(f"⚠️ Lo script con ID {screen_id} è già in esecuzione.")
self.bot.send_message(message.chat.id, f"⚠️ Lo script con ID {screen_id} è già in esecuzione.")
return
except subprocess.CalledProcessError:
pass # Se il comando fallisce, significa che non ci sono screen attivi.
# Crea la sessione screen e avvia lo script al suo interno
command = ["screen", "-dmS", screen_id, "python3", "test_run.py", screen_id]
# Avvia il comando tramite subprocess
subprocess.Popen(command)
# Creazione oggetto script info
script_info = {
"screen_id": screen_id,
"start_time": time.time(),
"status": "running",
"user_id": message.from_user.id
}
# Salvataggio nel file JSON
json_file = "scripts.json"
# Carica i dati esistenti o crea una nuova lista
try:
with open(json_file, 'r') as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
# Aggiungi il nuovo script
scripts_data.append(script_info)
# Scrivi il file aggiornato
with open(json_file, 'w') as f:
json.dump(scripts_data, f, indent=4)
def handle_list_scripts(self, message):
if not self.is_authorized(message.from_user.id):
print(f"❌ Non sei autorizzato.")
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
return
try:
with open("scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
if not scripts_data:
print(f"⚠️ Nessuno script registrato.")
self.bot.send_message(message.chat.id, "⚠️ Nessuno script registrato.")
return
current_time = time.time()
msg = ["🖥️ **Script Registrati:**\n"]
for script in scripts_data:
# Calcola la durata
duration = current_time - script["start_time"]
if "end_time" in script:
duration = script["end_time"] - script["start_time"]
# Formatta la durata
hours, rem = divmod(duration, 3600)
minutes, seconds = divmod(rem, 60)
duration_str = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
# Icona stato
status_icons = {
"running": "🟢",
"stopped": "🔴",
"completed": ""
}
# Costruisci riga
line = (
f"• ID: `{script['screen_id']}`\n"
f"• Stato: {status_icons.get(script['status'], '')}\n"
f"• Stop: `/stop {script['screen_id']}`\n"
f"• Screen: `/screen {script['screen_id']}`\n"
f"• Durata: {duration_str}\n"
f"• Download:\n{script.get('titolo', 'N/A')}\n"
)
msg.append(line)
# Formatta la risposta finale
final_msg = "\n".join(msg)
if len(final_msg) > 4000:
final_msg = final_msg[:4000] + "\n[...] (messaggio troncato)"
print(f"{final_msg}")
self.bot.send_message(message.chat.id, final_msg, parse_mode="Markdown")
def handle_stop_script(self, message):
if not self.is_authorized(message.from_user.id):
print(f"❌ Non sei autorizzato.")
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
return
parts = message.text.split()
if len(parts) < 2:
try:
with open("scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
running_scripts = [s for s in scripts_data if s["status"] == "running"]
if not running_scripts:
print(f"⚠️ Nessuno script attivo da fermare.")
self.bot.send_message(message.chat.id, "⚠️ Nessuno script attivo da fermare.")
return
msg = "🖥️ **Script Attivi:**\n"
for script in running_scripts:
msg += f"🔹 `/stop {script['screen_id']}` per fermarlo\n"
print(f"{msg}")
self.bot.send_message(message.chat.id, msg, parse_mode="Markdown")
elif len(parts) == 2:
screen_id = parts[1]
try:
with open("scripts.json", "r") as f:
scripts_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
scripts_data = []
# Filtra la lista eliminando lo script con l'ID specificato
new_scripts_data = [script for script in scripts_data if script["screen_id"] != screen_id]
if len(new_scripts_data) == len(scripts_data):
# Nessun elemento rimosso, quindi ID non trovato
print(f"⚠️ Nessuno script attivo con ID `{screen_id}`.")
self.bot.send_message(message.chat.id, f"⚠️ Nessuno script attivo con ID `{screen_id}`.", parse_mode="Markdown")
return
# Terminare la sessione screen
try:
subprocess.check_output(["screen", "-S", screen_id, "-X", "quit"])
print(f"✅ La sessione screen con ID {screen_id} è stata fermata.")
except subprocess.CalledProcessError:
print(f"⚠️ Impossibile fermare la sessione screen con ID `{screen_id}`.")
self.bot.send_message(message.chat.id, f"⚠️ Impossibile fermare la sessione screen con ID `{screen_id}`.", parse_mode="Markdown")
return
# Salva la lista aggiornata senza lo script eliminato
with open("scripts.json", "w") as f:
json.dump(new_scripts_data, f, indent=4)
print(f"✅ Script `{screen_id}` terminato con successo!")
self.bot.send_message(message.chat.id, f"✅ Script `{screen_id}` terminato con successo!", parse_mode="Markdown")
def handle_response(self, message):
""" if message.text == 'Start':
self.handle_start_script(self, message)
elif message.text == 'Lista':
self.handle_list_scripts(self, message)
elif message.text == 'Azione 3':
self.bot.send_message(message.chat.id, "Hai scelto Azione 3!")
else:
self.bot.send_message(message.chat.id, "Comando non riconosciuto.")
if not self.is_authorized(message.from_user.id):
self.bot.reply_to(message, "❌ Non sei autorizzato.")
return """
text = message.text
if self.request_manager.save_response(text):
print(f"📥 Risposta salvata correttamente per il tipo {text}")
else:
print("⚠️ Nessuna richiesta attiva.")
self.bot.reply_to(message, "❌ Nessuna richiesta attiva.")
def handle_screen_status(self, message):
command_parts = message.text.split()
if len(command_parts) < 2:
print(f"⚠️ ID mancante nel comando. Usa: /screen <ID>")
self.bot.send_message(message.chat.id, "⚠️ ID mancante nel comando. Usa: /screen <ID>")
return
screen_id = command_parts[1]
temp_file = f"/tmp/screen_output_{screen_id}.txt"
try:
# Cattura l'output della screen
subprocess.run(["screen", "-X", "-S", screen_id, "hardcopy", "-h", temp_file], check=True)
except subprocess.CalledProcessError as e:
print(f"❌ Errore durante la cattura dell'output della screen: {e}")
self.bot.send_message(message.chat.id, f"❌ Errore durante la cattura dell'output della screen: {e}")
return
if not os.path.exists(temp_file):
print(f"❌ Impossibile catturare l'output della screen.")
self.bot.send_message(message.chat.id, f"❌ Impossibile catturare l'output della screen.")
return
try:
# Leggi il file con la codifica corretta
with open(temp_file, 'r', encoding='latin-1') as file:
screen_output = file.read()
# Pulisci l'output
cleaned_output = re.sub(r'[\x00-\x1F\x7F]', '', screen_output) # Rimuovi caratteri di controllo
cleaned_output = cleaned_output.replace('\n\n', '\n') # Rimuovi newline multipli
# Estrarre tutte le parti da "Download:" fino a "Video" o "Subtitle", senza includerli
download_matches = re.findall(r"Download: (.*?)(?:Video|Subtitle)", cleaned_output)
if download_matches:
# Serie TV e Film StreamingCommunity
proc_matches = re.findall(r"Proc: ([\d\.]+%)", cleaned_output)
# Creare una stringa unica con tutti i risultati
result_string = "\n".join([f"Download: {download_matches[i].strip()}\nDownload al {proc_matches[i]}" for i in range(len(download_matches)) if i < len(proc_matches)])
if result_string != "":
cleaned_output = result_string
else:
print(f"❌ La parola 'Download:' non è stata trovata nella stringa.")
else:
download_list = []
# Estrai tutte le righe che iniziano con "Download:" fino al prossimo "Download" o alla fine della riga
matches = re.findall(r"Download:\s*(.*?)(?=Download|$)", cleaned_output)
# Se sono stati trovati download, stampali
if matches:
for i, match in enumerate(matches, 1):
# rimuovo solo la parte "downloader.py:57Result:400" se esiste
match = re.sub(r"downloader.py:\d+Result:400", "", match)
match = match.strip() # Rimuovo gli spazi bianchi in eccesso
if match: # Assicurati che la stringa non sia vuota
print(f"Download {i}: {match}")
# Aggiungi il risultato modificato alla lista
download_list.append(f"Download {i}: {match}")
# Creare una stringa unica con tutti i risultati
cleaned_output = "\n".join(download_list)
else:
print("❌ Nessun download trovato")
# Invia l'output pulito
print(f"📄 Output della screen {screen_id}:\n{cleaned_output}")
self._send_long_message(message.chat.id, f"📄 Output della screen {screen_id}:\n{cleaned_output}")
except Exception as e:
print(f"❌ Errore durante la lettura o l'invio dell'output della screen: {e}")
self.bot.send_message(message.chat.id, f"❌ Errore durante la lettura o l'invio dell'output della screen: {e}")
# Cancella il file temporaneo
os.remove(temp_file)
def send_message(self, message, choices):
if choices is None:
if self.chat_id:
print(f"{message}")
self.bot.send_message(self.chat_id, message)
else:
formatted_choices = "\n".join(choices)
message = f"{message}\n\n{formatted_choices}"
if self.chat_id:
print(f"{message}")
self.bot.send_message(self.chat_id, message)
def _send_long_message(self, chat_id, text, chunk_size=4096):
"""Suddivide e invia un messaggio troppo lungo in più parti."""
for i in range(0, len(text), chunk_size):
print(f"{text[i:i+chunk_size]}")
self.bot.send_message(chat_id, text[i:i+chunk_size])
def ask(self, type, prompt_message, choices, timeout=60):
self.request_manager.create_request(type)
if choices is None:
print(f"{prompt_message}")
self.bot.send_message(
self.chat_id,
f"{prompt_message}",
)
else:
print(f"{prompt_message}\n\nOpzioni: {', '.join(choices)}")
self.bot.send_message(
self.chat_id,
f"{prompt_message}\n\nOpzioni: {', '.join(choices)}",
)
start_time = time.time()
while time.time() - start_time < timeout:
response = self.request_manager.get_response()
if response is not None:
return response
time.sleep(1)
print(f"⚠️ Timeout: nessuna risposta ricevuta.")
self.bot.send_message(self.chat_id, "⚠️ Timeout: nessuna risposta ricevuta.")
self.request_manager.clear_file()
return None
def run(self):
print("🚀 Avvio del bot...")
# svuoto il file scripts.json
with open("scripts.json", "w") as f:
json.dump([], f)
self.bot.infinity_polling()
def get_bot_instance():
return TelegramBot.get_instance()
# Esempio di utilizzo
if __name__ == "__main__":
# Usa le variabili
token = os.getenv("TOKEN_TELEGRAM")
authorized_user_id = os.getenv("AUTHORIZED_USER_ID")
TOKEN = token # Inserisci il token del tuo bot Telegram sul file .env
AUTHORIZED_USER_ID = int(authorized_user_id) # Inserisci il tuo ID utente Telegram sul file .env
# Inizializza il bot
bot = TelegramBot.init_bot(TOKEN, AUTHORIZED_USER_ID)
bot.run()
"""
start - Avvia lo script
list - Lista script attivi
get - Mostra ID utente Telegram
"""