mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-05 02:55:25 +00:00
Integration Telegram New (#243)
* Integration Telegram * eliminato i file inutili * fix * rest file config * stostato il file dentro la cartella HelpTg * eliminato file nella root inutili e fixato bug * disabilitato il merge dei sottotitoli, migliorata la gestione degli script scaduti e rimosso il file scripts.json
This commit is contained in:
parent
8680f39d80
commit
d7d137ed4e
5
.gitignore
vendored
5
.gitignore
vendored
@ -47,4 +47,7 @@ venv.bak/
|
||||
Video
|
||||
note.txt
|
||||
list_proxy.txt
|
||||
cmd.txt
|
||||
cmd.txt
|
||||
bot_config.json
|
||||
scripts.json
|
||||
active_requests.json
|
@ -11,6 +11,12 @@ from StreamingCommunity.Util.console import console, msg
|
||||
from .site import title_search, run_get_select_title, media_search_manager
|
||||
from .title import download_title
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
# Variable
|
||||
indice = 8
|
||||
@ -25,8 +31,23 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
"""
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
if string_to_search is None:
|
||||
# Chiedi la scelta all'utente con il bot Telegram
|
||||
string_to_search = bot.ask(
|
||||
"key_search",
|
||||
f"Inserisci la parola da cercare\noppure 🔙 back per tornare alla scelta: ",
|
||||
None
|
||||
)
|
||||
|
||||
if string_to_search == 'back':
|
||||
# Riavvia lo script
|
||||
# Chiude il processo attuale e avvia una nuova istanza dello script
|
||||
subprocess.Popen([sys.executable] + sys.argv)
|
||||
sys.exit()
|
||||
else:
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
|
||||
# Search on database
|
||||
@ -48,4 +69,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
||||
# Retry
|
||||
search()
|
||||
search()
|
||||
|
@ -17,6 +17,10 @@ from StreamingCommunity.Api.Template import get_select_title
|
||||
from StreamingCommunity.Api.Template.Util import search_domain
|
||||
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
# Variable
|
||||
from .costant import SITE_NAME, DOMAIN_NOW
|
||||
@ -36,6 +40,10 @@ def title_search(word_to_search: str) -> int:
|
||||
Returns:
|
||||
- int: The number of titles found.
|
||||
"""
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
||||
@ -61,6 +69,10 @@ def title_search(word_to_search: str) -> int:
|
||||
# Create soup and find table
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Inizializza la lista delle scelte
|
||||
choices = []
|
||||
|
||||
for tr in soup.find_all('tr'):
|
||||
try:
|
||||
|
||||
@ -72,12 +84,23 @@ def title_search(word_to_search: str) -> int:
|
||||
'date': tr.find_all("td")[-3].get_text(strip=True).replace("'", ""),
|
||||
'size': tr.find_all("td")[-2].get_text(strip=True)
|
||||
}
|
||||
|
||||
media_search_manager.add_media(title_info)
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Crea una stringa formattata per ogni scelta con numero
|
||||
choice_text = f"{len(choices)} - {title_info.get('name')} ({title_info.get('type')}) - {title_info.get('date')}"
|
||||
choices.append(choice_text)
|
||||
|
||||
media_search_manager.add_media(title_info)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error parsing a film entry: {e}")
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Se ci sono scelte, inviale a Telegram
|
||||
if choices:
|
||||
# Invio a telegram la lista
|
||||
bot.send_message(f"Lista dei risultati:", choices)
|
||||
|
||||
# Return the number of titles found
|
||||
return media_search_manager.get_length()
|
||||
|
||||
@ -86,4 +109,4 @@ def run_get_select_title():
|
||||
"""
|
||||
Display a selection of titles and prompt the user to choose one.
|
||||
"""
|
||||
return get_select_title(table_show_manager, media_search_manager)
|
||||
return get_select_title(table_show_manager, media_search_manager)
|
||||
|
@ -23,6 +23,11 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
|
||||
# Config
|
||||
from .costant import DOMAIN_NOW, SITE_NAME, MOVIE_FOLDER
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
def download_title(select_title: MediaItem):
|
||||
"""
|
||||
@ -31,6 +36,12 @@ def download_title(select_title: MediaItem):
|
||||
Parameters:
|
||||
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
bot.send_message(f"Download in corso:\n{select_title.name}", None)
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
updateScriptId(script_id, select_title.name)
|
||||
|
||||
start_message()
|
||||
console.print(f"[yellow]Download: [red]{select_title.name} \n")
|
||||
@ -62,3 +73,9 @@ def download_title(select_title: MediaItem):
|
||||
manager.add_magnet_link(final_url)
|
||||
manager.start_download()
|
||||
manager.move_downloaded_files(mp4_path)
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Delete script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
deleteScriptId(script_id)
|
||||
|
@ -11,6 +11,10 @@ from StreamingCommunity.Util.console import console, msg
|
||||
from .site import title_search, run_get_select_title, media_search_manager
|
||||
from .film import download_film
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
# Variable
|
||||
indice = 2
|
||||
@ -26,8 +30,25 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
if string_to_search is None:
|
||||
# Chiedi la scelta all'utente con il bot Telegram
|
||||
string_to_search = bot.ask(
|
||||
"key_search",
|
||||
f"Inserisci la parola da cercare\noppure 🔙 back per tornare alla scelta: ",
|
||||
None
|
||||
)
|
||||
|
||||
if string_to_search == 'back':
|
||||
# Riavvia lo script
|
||||
# Chiude il processo attuale e avvia una nuova istanza dello script
|
||||
subprocess.Popen([sys.executable] + sys.argv)
|
||||
sys.exit()
|
||||
else:
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
|
||||
# Search on database
|
||||
len_database = title_search(quote_plus(string_to_search))
|
||||
@ -45,6 +66,10 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
download_film(select_title)
|
||||
|
||||
else:
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Nessun risultato trovato riprova", None)
|
||||
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
||||
# Retry
|
||||
|
@ -24,6 +24,12 @@ from StreamingCommunity.Api.Player.supervideo import VideoSource
|
||||
# Config
|
||||
from .costant import MOVIE_FOLDER
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
def download_film(select_title: MediaItem) -> str:
|
||||
"""
|
||||
@ -37,6 +43,17 @@ def download_film(select_title: MediaItem) -> str:
|
||||
- str: output path
|
||||
"""
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
# Invio a telegram
|
||||
bot.send_message(f"Download in corso:\n{select_title.name}", None)
|
||||
|
||||
# Get script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
updateScriptId(script_id, select_title.name)
|
||||
|
||||
# Start message and display film information
|
||||
start_message()
|
||||
console.print(f"[yellow]Download: [red]{select_title.name} \n")
|
||||
@ -65,7 +82,13 @@ def download_film(select_title: MediaItem) -> str:
|
||||
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
|
||||
frames = get_call_stack()
|
||||
execute_search(frames[-4])"""
|
||||
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Delete script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
deleteScriptId(script_id)
|
||||
|
||||
if r_proc != None:
|
||||
console.print("[green]Result: ")
|
||||
console.print(r_proc)
|
||||
|
@ -25,6 +25,11 @@ table_show_manager = TVShowManager()
|
||||
max_timeout = config_manager.get_int("REQUESTS", "timeout")
|
||||
disable_searchDomain = config_manager.get_bool("DEFAULT", "disable_searchDomain")
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
def title_search(title_search: str) -> int:
|
||||
"""
|
||||
@ -36,6 +41,9 @@ def title_search(title_search: str) -> int:
|
||||
Returns:
|
||||
int: The number of titles found.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
||||
@ -63,6 +71,10 @@ def title_search(title_search: str) -> int:
|
||||
# Create soup and find table
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Inizializza la lista delle scelte
|
||||
choices = []
|
||||
|
||||
for row in soup.find_all('div', class_='col-lg-3 col-md-3 col-xs-4'):
|
||||
try:
|
||||
|
||||
@ -81,8 +93,20 @@ def title_search(title_search: str) -> int:
|
||||
|
||||
media_search_manager.add_media(film_info)
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Crea una stringa formattata per ogni scelta con numero
|
||||
choice_text = f"{len(choices)} - {film_info.get('name')} ({film_info.get('url')}) {film_info.get('score')}"
|
||||
choices.append(choice_text)
|
||||
|
||||
except AttributeError as e:
|
||||
print(f"Error parsing a film entry: {e}")
|
||||
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Se ci sono scelte, inviale a Telegram
|
||||
if choices:
|
||||
# Invio a telegram la lista
|
||||
bot.send_message(f"Lista dei risultati:", choices)
|
||||
|
||||
# Return the number of titles found
|
||||
return media_search_manager.get_length()
|
||||
|
@ -11,6 +11,13 @@ from StreamingCommunity.Util.console import console, msg
|
||||
from .site import title_search, run_get_select_title, media_search_manager
|
||||
from .film_serie import download_film, download_series
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
|
||||
# Variable
|
||||
indice = 1
|
||||
@ -23,8 +30,25 @@ from .costant import SITE_NAME
|
||||
|
||||
def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
if string_to_search is None:
|
||||
# Chiedi la scelta all'utente con il bot Telegram
|
||||
string_to_search = bot.ask(
|
||||
"key_search",
|
||||
f"Inserisci la parola da cercare\noppure 🔙 back per tornare alla scelta: ",
|
||||
None
|
||||
)
|
||||
|
||||
if string_to_search == 'back':
|
||||
# Riavvia lo script
|
||||
# Chiude il processo attuale e avvia una nuova istanza dello script
|
||||
subprocess.Popen([sys.executable] + sys.argv)
|
||||
sys.exit()
|
||||
else:
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
|
||||
# Search on database
|
||||
len_database = title_search(string_to_search)
|
||||
@ -45,6 +69,10 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
download_series(select_title)
|
||||
|
||||
else:
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Nessun risultato trovato riprova", None)
|
||||
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
||||
# Retry
|
||||
|
@ -26,6 +26,11 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSourceAnime
|
||||
from .costant import SITE_NAME, ANIME_FOLDER, MOVIE_FOLDER
|
||||
KILL_HANDLER = bool(False)
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_source: VideoSourceAnime) -> tuple[str,bool]:
|
||||
"""
|
||||
@ -38,6 +43,9 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
|
||||
- str: output path
|
||||
- bool: kill handler status
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
|
||||
# Get information about the selected episode
|
||||
obj_episode = scrape_serie.get_info_episode(index_select)
|
||||
@ -48,11 +56,20 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
|
||||
console.print(f"[yellow]Download: [red]EP_{obj_episode.number} \n")
|
||||
console.print("[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Invio a telegram
|
||||
bot.send_message(f"Download in corso:\nTitolo:{scrape_serie.series_name}\nEpisodio: {obj_episode.number}", None)
|
||||
|
||||
# Get script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
updateScriptId(script_id, f"{scrape_serie.series_name} - E{obj_episode.number}")
|
||||
# Collect mp4 url
|
||||
video_source.get_embed(obj_episode.id)
|
||||
|
||||
# Create output path
|
||||
title_name = f"{obj_episode.number}.mp4"
|
||||
title_name = f"{scrape_serie.series_name}_EP_{obj_episode.number}.mp4"
|
||||
|
||||
if scrape_serie.is_series:
|
||||
mp4_path = os_manager.get_sanitize_path(
|
||||
@ -91,6 +108,8 @@ def download_series(select_title: MediaItem):
|
||||
- tv_id (int): The ID of the TV series.
|
||||
- tv_name (str): The name of the TV series.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
scrape_serie = ScrapeSerieAnime(SITE_NAME)
|
||||
video_source = VideoSourceAnime(SITE_NAME)
|
||||
|
||||
@ -101,8 +120,20 @@ def download_series(select_title: MediaItem):
|
||||
episoded_count = scrape_serie.get_count_episodes()
|
||||
console.print(f"[cyan]Episodes find: [red]{episoded_count}")
|
||||
|
||||
# Prompt user to select an episode index
|
||||
last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]or [red][3-*] [cyan]for a range of media")
|
||||
if TELEGRAM_BOT:
|
||||
console.print(f"\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]or [red][3-*] [cyan]for a range of media")
|
||||
|
||||
# Invio a telegram
|
||||
bot.send_message(f"Episodi trovati: {episoded_count}", None)
|
||||
|
||||
last_command = bot.ask(
|
||||
"select_title",
|
||||
f"Inserisci l'indice del media o (*) per scaricare tutti i media, oppure [1-2] o [3-*] per un intervallo di media.",
|
||||
None
|
||||
)
|
||||
else:
|
||||
# Prompt user to select an episode index
|
||||
last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]or [red][3-*] [cyan]for a range of media")
|
||||
|
||||
# Manage user selection
|
||||
list_episode_select = manage_selection(last_command, episoded_count)
|
||||
@ -118,6 +149,14 @@ def download_series(select_title: MediaItem):
|
||||
if kill_handler:
|
||||
break
|
||||
kill_handler= download_episode(i_episode-1, scrape_serie, video_source)[1]
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Finito di scaricare tutte le serie e episodi", None)
|
||||
|
||||
# Get script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
deleteScriptId(script_id)
|
||||
|
||||
|
||||
def download_film(select_title: MediaItem):
|
||||
|
@ -19,6 +19,11 @@ from StreamingCommunity.Api.Template import get_select_title
|
||||
from StreamingCommunity.Api.Template.Util import search_domain
|
||||
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
# Variable
|
||||
from .costant import SITE_NAME, DOMAIN_NOW
|
||||
@ -103,6 +108,9 @@ def title_search(title: str) -> int:
|
||||
Returns:
|
||||
- int: A number containing the length of media search manager.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
||||
@ -146,6 +154,10 @@ def title_search(title: str) -> int:
|
||||
except Exception as e:
|
||||
console.print(f"Site: {SITE_NAME}, request search error: {e}")
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Inizializza la lista delle scelte
|
||||
choices = []
|
||||
|
||||
for dict_title in response.json()['records']:
|
||||
try:
|
||||
|
||||
@ -161,8 +173,18 @@ def title_search(title: str) -> int:
|
||||
'episodes_count': dict_title.get('episodes_count')
|
||||
})
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Crea una stringa formattata per ogni scelta con numero
|
||||
choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}"
|
||||
choices.append(choice_text)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error parsing a film entry: {e}")
|
||||
if TELEGRAM_BOT:
|
||||
# Se ci sono scelte, inviale a Telegram
|
||||
if choices:
|
||||
# Invio a telegram la lista
|
||||
bot.send_message(f"Lista dei risultati:", choices)
|
||||
|
||||
# Return the length of media search manager
|
||||
return media_search_manager.get_length()
|
||||
|
@ -1,7 +1,8 @@
|
||||
# 21.05.24
|
||||
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
# Internal utilities
|
||||
from StreamingCommunity.Util.console import console, msg
|
||||
@ -12,6 +13,10 @@ from .site import get_version_and_domain, title_search, run_get_select_title, me
|
||||
from .film import download_film
|
||||
from .series import download_series
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
# Variable
|
||||
indice = 0
|
||||
@ -27,8 +32,25 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
Main function of the application for film and series.
|
||||
"""
|
||||
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
if string_to_search is None:
|
||||
# Chiedi la scelta all'utente con il bot Telegram
|
||||
string_to_search = bot.ask(
|
||||
"key_search",
|
||||
f"Inserisci la parola da cercare\noppure 🔙 back per tornare alla scelta: ",
|
||||
None
|
||||
)
|
||||
|
||||
if string_to_search == 'back':
|
||||
# Riavvia lo script
|
||||
# Chiude il processo attuale e avvia una nuova istanza dello script
|
||||
subprocess.Popen([sys.executable] + sys.argv)
|
||||
sys.exit()
|
||||
else:
|
||||
if string_to_search is None:
|
||||
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{SITE_NAME}").strip()
|
||||
|
||||
# Get site domain and version and get result of the search
|
||||
site_version, domain = get_version_and_domain()
|
||||
@ -52,5 +74,8 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
|
||||
else:
|
||||
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Nessun risultato trovato riprova", None)
|
||||
|
||||
# Retry
|
||||
search()
|
@ -23,7 +23,13 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource
|
||||
|
||||
# Variable
|
||||
from .costant import SITE_NAME, MOVIE_FOLDER
|
||||
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
def download_film(select_title: MediaItem) -> str:
|
||||
"""
|
||||
@ -36,11 +42,23 @@ def download_film(select_title: MediaItem) -> str:
|
||||
Return:
|
||||
- str: output path
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
# Invio a telegram
|
||||
bot.send_message(f"Download in corso:\n{select_title.name}", None)
|
||||
|
||||
# Viene usato per lo screen
|
||||
console.print(f"## Download: [red]{select_title.name} ##")
|
||||
|
||||
# Get script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
updateScriptId(script_id, select_title.name)
|
||||
|
||||
# Start message and display film information
|
||||
start_message()
|
||||
console.print(f"[yellow]Download: [red]{select_title.name} \n")
|
||||
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
|
||||
|
||||
# Init class
|
||||
video_source = VideoSource(SITE_NAME, False)
|
||||
@ -61,6 +79,12 @@ def download_film(select_title: MediaItem) -> str:
|
||||
output_filename=os.path.join(mp4_path, title_name)
|
||||
).start()
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Delete script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
deleteScriptId(script_id)
|
||||
|
||||
"""if r_proc == 404:
|
||||
time.sleep(2)
|
||||
|
||||
|
@ -26,6 +26,11 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource
|
||||
# Variable
|
||||
from .costant import SITE_NAME, SERIES_FOLDER
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.HelpTg.session import get_session, updateScriptId, deleteScriptId
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
def download_video(index_season_selected: int, index_episode_selected: int, scrape_serie: ScrapeSerie, video_source: VideoSource) -> tuple[str,bool]:
|
||||
@ -45,9 +50,24 @@ def download_video(index_season_selected: int, index_episode_selected: int, scra
|
||||
|
||||
# Get info about episode
|
||||
obj_episode = scrape_serie.episode_manager.get(index_episode_selected - 1)
|
||||
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.name}\n")
|
||||
console.print(f"[cyan]You can safely stop the download with [bold]Ctrl+c[bold] [cyan] \n")
|
||||
|
||||
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.name}")
|
||||
print()
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
# Invio a telegram
|
||||
bot.send_message(
|
||||
f"Download in corso\nSerie: {scrape_serie.series_name}\nStagione: {index_season_selected}\nEpisodio: {index_episode_selected}\nTitolo: {obj_episode.name}",
|
||||
None
|
||||
)
|
||||
|
||||
# Get script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
# Update script_id
|
||||
updateScriptId(script_id, f"{scrape_serie.series_name} - S{index_season_selected} - E{index_episode_selected} - {obj_episode.name}")
|
||||
|
||||
# Define filename and path for the downloaded video
|
||||
mp4_name = f"{map_episode_title(scrape_serie.series_name, index_season_selected, index_episode_selected, obj_episode.name)}.mp4"
|
||||
mp4_path = os.path.join(SERIES_FOLDER, scrape_serie.series_name, f"S{index_season_selected}")
|
||||
@ -62,7 +82,9 @@ def download_video(index_season_selected: int, index_episode_selected: int, scra
|
||||
m3u8_playlist=master_playlist,
|
||||
output_filename=os.path.join(mp4_path, mp4_name)
|
||||
).start()
|
||||
|
||||
|
||||
#bot.send_message(f"Serie scaricata tutta", None)
|
||||
|
||||
"""if r_proc == 404:
|
||||
time.sleep(2)
|
||||
|
||||
@ -74,6 +96,7 @@ def download_video(index_season_selected: int, index_episode_selected: int, scra
|
||||
if r_proc != None:
|
||||
console.print("[green]Result: ")
|
||||
console.print(r_proc)
|
||||
#bot.send_message(f"Episodio scaricato", None)
|
||||
|
||||
return os.path.join(mp4_path, mp4_name)
|
||||
|
||||
@ -86,6 +109,8 @@ def download_episode(index_season_selected: int, scrape_serie: ScrapeSerie, vide
|
||||
- download_all (bool): Download all episodes in the season.
|
||||
"""
|
||||
|
||||
#bot = get_bot_instance()
|
||||
|
||||
# Clean memory of all episodes and get the number of the season
|
||||
scrape_serie.episode_manager.clear()
|
||||
|
||||
@ -101,6 +126,8 @@ def download_episode(index_season_selected: int, scrape_serie: ScrapeSerie, vide
|
||||
download_video(index_season_selected, i_episode, scrape_serie, video_source)
|
||||
console.print(f"\n[red]End downloaded [yellow]season: [red]{index_season_selected}.")
|
||||
|
||||
#bot.send_message(f"Finito di scaricare la stagione: {index_season_selected}", None)
|
||||
|
||||
else:
|
||||
|
||||
# Display episodes list and manage user selection
|
||||
@ -111,6 +138,7 @@ def download_episode(index_season_selected: int, scrape_serie: ScrapeSerie, vide
|
||||
list_episode_select = validate_episode_selection(list_episode_select, episodes_count)
|
||||
except ValueError as e:
|
||||
console.print(f"[red]{str(e)}")
|
||||
#bot.send_message(f"{str(e)}", None)
|
||||
return
|
||||
|
||||
# Download selected episodes if not stopped
|
||||
@ -129,6 +157,8 @@ def download_series(select_season: MediaItem, version: str) -> None:
|
||||
- domain (str): Domain from which to download.
|
||||
- version (str): Version of the site.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
# Start message and set up video source
|
||||
start_message()
|
||||
@ -147,11 +177,24 @@ def download_series(select_season: MediaItem, version: str) -> None:
|
||||
|
||||
# Prompt user for season selection and download episodes
|
||||
console.print(f"\n[green]Seasons found: [red]{seasons_count}")
|
||||
index_season_selected = msg.ask(
|
||||
"\n[cyan]Insert season number [yellow](e.g., 1), [red]* [cyan]to download all seasons, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of seasons, or [yellow](e.g., 3-*) [cyan]to download from a specific season to the end"
|
||||
)
|
||||
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
console.print("\n[cyan]Insert season number [yellow](e.g., 1), [red]* [cyan]to download all seasons, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of seasons, or [yellow](e.g., 3-*) [cyan]to download from a specific season to the end")
|
||||
|
||||
bot.send_message(f"Stagioni trovate: {seasons_count}", None)
|
||||
|
||||
index_season_selected = bot.ask(
|
||||
"select_title_episode",
|
||||
"Inserisci il numero della stagione (es. 1), * per scaricare tutte le stagioni, (es. 1-2) per un intervallo di stagioni, o (es. 3-*) per scaricare dalla stagione specificata fino alla fine",
|
||||
None
|
||||
)
|
||||
else:
|
||||
index_season_selected = msg.ask(
|
||||
"\n[cyan]Insert season number [yellow](e.g., 1), [red]* [cyan]to download all seasons, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of seasons, or [yellow](e.g., 3-*) [cyan]to download from a specific season to the end"
|
||||
)
|
||||
|
||||
# Manage and validate the selection
|
||||
list_season_select = manage_selection(index_season_selected, seasons_count)
|
||||
|
||||
@ -172,6 +215,13 @@ def download_series(select_season: MediaItem, version: str) -> None:
|
||||
# Otherwise, let the user select specific episodes for the single season
|
||||
download_episode(i_season, scrape_serie, video_source, download_all=False)
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Finito di scaricare tutte le serie e episodi", None)
|
||||
# Get script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
deleteScriptId(script_id)
|
||||
|
||||
|
||||
def display_episodes_list(scrape_serie) -> str:
|
||||
"""
|
||||
@ -180,6 +230,8 @@ def display_episodes_list(scrape_serie) -> str:
|
||||
Returns:
|
||||
last_command (str): Last command entered by the user.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
# Set up table for displaying episodes
|
||||
table_show_manager = TVShowManager()
|
||||
@ -194,6 +246,8 @@ def display_episodes_list(scrape_serie) -> str:
|
||||
table_show_manager.add_column(column_info)
|
||||
|
||||
# Populate the table with episodes information
|
||||
if TELEGRAM_BOT:
|
||||
choices = []
|
||||
for i, media in enumerate(scrape_serie.episode_manager.episodes):
|
||||
table_show_manager.add_tv_show({
|
||||
'Index': str(media.number),
|
||||
@ -201,6 +255,22 @@ def display_episodes_list(scrape_serie) -> str:
|
||||
'Duration': str(media.duration)
|
||||
})
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Creazione della stringa per il messaggio Telegram
|
||||
choice_text = f"{media.number} - {media.name} ({media.duration} min)"
|
||||
choices.append(choice_text)
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# creo episoded_count
|
||||
#episoded_count = len(scrape_serie.episode_manager.episodes)
|
||||
|
||||
# Invio a telegram
|
||||
#bot.send_message(f"Episodi trovati: {episoded_count}", None)
|
||||
|
||||
# Invia la lista degli episodi al bot Telegram
|
||||
if choices:
|
||||
bot.send_message(f"Lista episodi:", choices)
|
||||
|
||||
# Run the table and handle user input
|
||||
last_command = table_show_manager.run()
|
||||
|
||||
|
@ -27,6 +27,9 @@ from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
|
||||
# Config
|
||||
from .costant import SITE_NAME, DOMAIN_NOW
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
# Variable
|
||||
media_search_manager = MediaManager()
|
||||
@ -95,7 +98,7 @@ def get_version_and_domain():
|
||||
def title_search(title_search: str, domain: str) -> int:
|
||||
"""
|
||||
Search for titles based on a search query.
|
||||
|
||||
|
||||
Parameters:
|
||||
- title_search (str): The title to search for.
|
||||
- domain (str): The domain to search on.
|
||||
@ -103,6 +106,9 @@ def title_search(title_search: str, domain: str) -> int:
|
||||
Returns:
|
||||
int: The number of titles found.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
media_search_manager.clear()
|
||||
table_show_manager.clear()
|
||||
|
||||
@ -117,7 +123,11 @@ def title_search(title_search: str, domain: str) -> int:
|
||||
except Exception as e:
|
||||
console.print(f"Site: {SITE_NAME}, request search error: {e}")
|
||||
|
||||
for dict_title in response.json()['data']:
|
||||
if TELEGRAM_BOT:
|
||||
# Prepara le scelte per l'utente
|
||||
choices = []
|
||||
|
||||
for i, dict_title in enumerate(response.json()['data']):
|
||||
try:
|
||||
|
||||
media_search_manager.add_media({
|
||||
@ -128,10 +138,19 @@ def title_search(title_search: str, domain: str) -> int:
|
||||
'date': dict_title.get('last_air_date'),
|
||||
'score': dict_title.get('score')
|
||||
})
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Crea una stringa formattata per ogni scelta con numero
|
||||
choice_text = f"{i} - {dict_title.get('name')} ({dict_title.get('type')}) - {dict_title.get('last_air_date')}"
|
||||
choices.append(choice_text)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error parsing a film entry: {e}")
|
||||
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
if choices:
|
||||
# Invio a telegram la lista
|
||||
bot.send_message(f"Lista dei risultati:", choices)
|
||||
|
||||
# Return the number of titles found
|
||||
return media_search_manager.get_length()
|
||||
|
||||
|
0
StreamingCommunity/HelpTg/__init__.py
Normal file
0
StreamingCommunity/HelpTg/__init__.py
Normal file
91
StreamingCommunity/HelpTg/config.json
Normal file
91
StreamingCommunity/HelpTg/config.json
Normal file
@ -0,0 +1,91 @@
|
||||
{
|
||||
"DEFAULT": {
|
||||
"debug": false,
|
||||
"log_file": "app.log",
|
||||
"log_to_file": true,
|
||||
"show_message": true,
|
||||
"clean_console": true,
|
||||
"root_path": "Video",
|
||||
"movie_folder_name": "Movie",
|
||||
"serie_folder_name": "TV",
|
||||
"anime_folder_name": "Anime",
|
||||
"map_episode_name": "E%(episode)_%(episode_name)",
|
||||
"config_qbit_tor": {
|
||||
"host": "192.168.1.99",
|
||||
"port": "7060",
|
||||
"user": "admin",
|
||||
"pass": "adminadmin"
|
||||
},
|
||||
"add_siteName": false,
|
||||
"disable_searchDomain": false,
|
||||
"not_close": false,
|
||||
"telegram_bot": true
|
||||
},
|
||||
"REQUESTS": {
|
||||
"timeout": 30,
|
||||
"max_retry": 8,
|
||||
"proxy_start_min": 0.1,
|
||||
"proxy_start_max": 0.5
|
||||
},
|
||||
"M3U8_DOWNLOAD": {
|
||||
"tqdm_delay": 0.12,
|
||||
"default_video_workser": 12,
|
||||
"default_audio_workser": 12,
|
||||
"merge_audio": true,
|
||||
"specific_list_audio": [
|
||||
"ita"
|
||||
],
|
||||
"merge_subs": false,
|
||||
"specific_list_subtitles": [
|
||||
"eng",
|
||||
"spa"
|
||||
],
|
||||
"cleanup_tmp_folder": true
|
||||
},
|
||||
"M3U8_CONVERSION": {
|
||||
"use_codec": false,
|
||||
"use_vcodec": true,
|
||||
"use_acodec": true,
|
||||
"use_bitrate": true,
|
||||
"use_gpu": false,
|
||||
"default_preset": "ultrafast"
|
||||
},
|
||||
"M3U8_PARSER": {
|
||||
"force_resolution": -1,
|
||||
"get_only_link": false
|
||||
},
|
||||
"SITE": {
|
||||
"streamingcommunity": {
|
||||
"domain": "paris"
|
||||
},
|
||||
"altadefinizionegratis": {
|
||||
"domain": "site"
|
||||
},
|
||||
"guardaserie": {
|
||||
"domain": "meme"
|
||||
},
|
||||
"mostraguarda": {
|
||||
"domain": "stream"
|
||||
},
|
||||
"ddlstreamitaly": {
|
||||
"domain": "co",
|
||||
"extra": {
|
||||
"ips4_device_key": "",
|
||||
"ips4_member_id": "",
|
||||
"ips4_login_key": ""
|
||||
}
|
||||
},
|
||||
"animeunity": {
|
||||
"domain": "so"
|
||||
},
|
||||
"cb01new": {
|
||||
"domain": "mobi"
|
||||
},
|
||||
"1337xx": {
|
||||
"domain": "to"
|
||||
},
|
||||
"ilcorsaronero": {
|
||||
"domain": "link"
|
||||
}
|
||||
}
|
||||
}
|
79
StreamingCommunity/HelpTg/request_manager.py
Normal file
79
StreamingCommunity/HelpTg/request_manager.py
Normal file
@ -0,0 +1,79 @@
|
||||
import json
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
class RequestManager:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if not cls._instance:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, json_file: str = "active_requests.json"):
|
||||
if not hasattr(self, 'initialized'):
|
||||
self.json_file = json_file
|
||||
self.initialized = True
|
||||
self.on_response_callback = None # Aggiungi un campo per il callback
|
||||
|
||||
def create_request(self, type: str) -> str:
|
||||
request_data = {
|
||||
"type": type,
|
||||
"response": None,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
|
||||
# Aggiungi il tipo al salvataggio della richiesta
|
||||
with open(self.json_file, "w") as f:
|
||||
json.dump(request_data, f)
|
||||
|
||||
return "Ok"
|
||||
|
||||
def save_response(self, message_text: str) -> bool:
|
||||
try:
|
||||
# Carica il file JSON
|
||||
with open(self.json_file, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Controlla se esiste la chiave 'type' e se la risposta è presente
|
||||
if "type" in data and "response" in data:
|
||||
data["response"] = message_text # Aggiorna la risposta
|
||||
|
||||
# Scrivi il file JSON aggiornato
|
||||
with open(self.json_file, "w") as f:
|
||||
json.dump(data, f, indent=4) # Formatta il file JSON
|
||||
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||
print(f"⚠️ save_response - errore: {e}")
|
||||
return False
|
||||
|
||||
def get_response(self) -> Optional[str]:
|
||||
try:
|
||||
with open(self.json_file, "r") as f:
|
||||
data = json.load(f)
|
||||
# Verifica se esiste la chiave "response"
|
||||
if "response" in data:
|
||||
response = data["response"] # Ottieni la risposta direttamente
|
||||
if response is not None and self.on_response_callback:
|
||||
# Se la risposta è disponibile, chiama il callback
|
||||
self.on_response_callback(response)
|
||||
return response
|
||||
|
||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||
print(f"get_response - errore: {e}")
|
||||
return None
|
||||
|
||||
def clear_file(self) -> bool:
|
||||
try:
|
||||
# Svuota il file JSON scrivendo un oggetto vuoto
|
||||
with open(self.json_file, "w") as f:
|
||||
json.dump({}, f)
|
||||
print(f"File {self.json_file} è stato svuotato con successo.")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"⚠️ clear_file - errore: {e}")
|
||||
return False
|
14
StreamingCommunity/HelpTg/requirements.txt
Normal file
14
StreamingCommunity/HelpTg/requirements.txt
Normal file
@ -0,0 +1,14 @@
|
||||
httpx
|
||||
bs4
|
||||
rich
|
||||
tqdm
|
||||
m3u8
|
||||
psutil
|
||||
unidecode
|
||||
jsbeautifier
|
||||
pathvalidate
|
||||
pycryptodomex
|
||||
googlesearch-python
|
||||
fake-useragent
|
||||
qbittorrent-api
|
||||
python-qbittorrent
|
62
StreamingCommunity/HelpTg/session.py
Executable file
62
StreamingCommunity/HelpTg/session.py
Executable file
@ -0,0 +1,62 @@
|
||||
import json
|
||||
|
||||
session_data = {}
|
||||
|
||||
def set_session(value):
|
||||
# salvo script_id in session_data
|
||||
session_data['script_id'] = value
|
||||
|
||||
def get_session():
|
||||
# controllo se script_id è presente in session_data
|
||||
return session_data.get('script_id', 'unknown')
|
||||
|
||||
def updateScriptId(screen_id, titolo):
|
||||
# definisco il nome del file json
|
||||
json_file = "scripts.json"
|
||||
try:
|
||||
# apro il file json
|
||||
with open(json_file, 'r') as f:
|
||||
scripts_data = json.load(f)
|
||||
except FileNotFoundError:
|
||||
# Se il file non esiste, inizializzo la lista vuota
|
||||
scripts_data = []
|
||||
|
||||
# cerco lo script con lo screen_id
|
||||
for script in scripts_data:
|
||||
if script["screen_id"] == screen_id:
|
||||
# se trovo il match, aggiorno il titolo
|
||||
script["titolo"] = titolo
|
||||
# aggiorno il file json
|
||||
with open(json_file, 'w') as f:
|
||||
json.dump(scripts_data, f, indent=4)
|
||||
#print(f"Titolo aggiornato per screen_id {screen_id}")
|
||||
return
|
||||
|
||||
# se non trovo nessuno script con lo screen_id
|
||||
print(f"Screen_id {screen_id} non trovato.")
|
||||
|
||||
# creo la funzione che elimina lo script con lo screen_id specificato
|
||||
def deleteScriptId(screen_id):
|
||||
# definisco il nome del file json
|
||||
json_file = "scripts.json"
|
||||
try:
|
||||
# apro il file json
|
||||
with open(json_file, 'r') as f:
|
||||
scripts_data = json.load(f)
|
||||
except FileNotFoundError:
|
||||
# Se il file non esiste, inizializzo la lista vuota
|
||||
scripts_data = []
|
||||
|
||||
# cerco lo script con lo screen_id
|
||||
for script in scripts_data:
|
||||
if script["screen_id"] == screen_id:
|
||||
# se trovo il match, elimino lo script
|
||||
scripts_data.remove(script)
|
||||
# aggiorno il file json
|
||||
with open(json_file, 'w') as f:
|
||||
json.dump(scripts_data, f, indent=4)
|
||||
print(f"Script eliminato per screen_id {screen_id}")
|
||||
return
|
||||
|
||||
# se non trovo nessuno script con lo screen_id
|
||||
print(f"Screen_id {screen_id} non trovato.")
|
547
StreamingCommunity/HelpTg/telegram_bot.py
Normal file
547
StreamingCommunity/HelpTg/telegram_bot.py
Normal file
@ -0,0 +1,547 @@
|
||||
import telebot
|
||||
from telebot import types
|
||||
import time
|
||||
import uuid
|
||||
import subprocess
|
||||
import os, re, sys
|
||||
import json
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
|
||||
from StreamingCommunity.HelpTg.request_manager import RequestManager
|
||||
import threading
|
||||
|
||||
# Funzione per caricare variabili da un file .env
|
||||
def load_env(file_path="../../.env"):
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path) as f:
|
||||
for line in f:
|
||||
if line.strip() and not line.startswith("#"):
|
||||
key, value = line.strip().split("=", 1)
|
||||
os.environ[key] = value
|
||||
|
||||
# Carica le variabili
|
||||
load_env()
|
||||
|
||||
class TelegramBot:
|
||||
_instance = None
|
||||
_config_file = "../../bot_config.json"
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls):
|
||||
if cls._instance is None:
|
||||
# Prova a caricare la configurazione e inizializzare il bot
|
||||
if os.path.exists(cls._config_file):
|
||||
with open(cls._config_file, 'r') as f:
|
||||
config = json.load(f)
|
||||
cls._instance = cls.init_bot(config['token'], config['authorized_user_id'])
|
||||
else:
|
||||
raise Exception("Bot non ancora inizializzato. Chiamare prima init_bot() con token e authorized_user_id")
|
||||
return cls._instance
|
||||
|
||||
@classmethod
|
||||
def init_bot(cls, token, authorized_user_id):
|
||||
if cls._instance is None:
|
||||
cls._instance = cls(token, authorized_user_id)
|
||||
# Salva la configurazione
|
||||
config = {
|
||||
'token': token,
|
||||
'authorized_user_id': authorized_user_id
|
||||
}
|
||||
with open(cls._config_file, 'w') as f:
|
||||
json.dump(config, f)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, token, authorized_user_id):
|
||||
|
||||
def monitor_scripts():
|
||||
while True:
|
||||
try:
|
||||
with open("../../scripts.json", "r") as f:
|
||||
scripts_data = json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
scripts_data = []
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# Crea una nuova lista senza gli script che sono scaduti o le screen che non esistono
|
||||
scripts_data_to_save = []
|
||||
|
||||
for script in scripts_data:
|
||||
screen_exists = False
|
||||
try:
|
||||
existing_screens = subprocess.check_output(["screen", "-list"]).decode('utf-8')
|
||||
if script["screen_id"] in existing_screens:
|
||||
screen_exists = True
|
||||
except subprocess.CalledProcessError:
|
||||
pass # Se il comando fallisce, significa che non ci sono screen attivi.
|
||||
|
||||
if screen_exists:
|
||||
if "titolo" not in script and script["status"] == "running" and (current_time - script["start_time"]) > 600:
|
||||
# Prova a terminare la sessione screen
|
||||
try:
|
||||
subprocess.check_output(["screen", "-S", script["screen_id"], "-X", "quit"])
|
||||
print(f"✅ La sessione screen con ID {script['screen_id']} è stata fermata automaticamente.")
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"⚠️ Impossibile fermare la sessione screen con ID {script['screen_id']}.")
|
||||
print(f"⚠️ Lo script con ID {script['screen_id']} ha superato i 10 minuti e verrà rimosso.")
|
||||
else:
|
||||
scripts_data_to_save.append(script)
|
||||
else:
|
||||
print(f"⚠️ La sessione screen con ID {script['screen_id']} non esiste più e verrà rimossa.")
|
||||
|
||||
# Salva la lista aggiornata, senza gli script scaduti o le screen non esistenti
|
||||
with open("../../scripts.json", "w") as f:
|
||||
json.dump(scripts_data_to_save, f, indent=4)
|
||||
|
||||
time.sleep(60) # Controlla ogni minuto
|
||||
|
||||
# Avvia il thread di monitoraggio
|
||||
monitor_thread = threading.Thread(target=monitor_scripts, daemon=True)
|
||||
monitor_thread.start()
|
||||
|
||||
|
||||
if TelegramBot._instance is not None:
|
||||
raise Exception("Questa classe è un singleton! Usa get_instance() per ottenere l'istanza.")
|
||||
|
||||
self.token = token
|
||||
self.authorized_user_id = authorized_user_id
|
||||
self.chat_id = authorized_user_id
|
||||
self.bot = telebot.TeleBot(token)
|
||||
self.request_manager = RequestManager()
|
||||
|
||||
# Registra gli handler
|
||||
self.register_handlers()
|
||||
|
||||
def register_handlers(self):
|
||||
|
||||
""" @self.bot.message_handler(commands=['start'])
|
||||
def start(message):
|
||||
self.handle_start(message) """
|
||||
|
||||
@self.bot.message_handler(commands=['get_id'])
|
||||
def get_id(message):
|
||||
self.handle_get_id(message)
|
||||
|
||||
@self.bot.message_handler(commands=['start'])
|
||||
def start_script(message):
|
||||
self.handle_start_script(message)
|
||||
|
||||
@self.bot.message_handler(commands=['list'])
|
||||
def list_scripts(message):
|
||||
self.handle_list_scripts(message)
|
||||
|
||||
@self.bot.message_handler(commands=['stop'])
|
||||
def stop_script(message):
|
||||
self.handle_stop_script(message)
|
||||
|
||||
@self.bot.message_handler(commands=['screen'])
|
||||
def screen_status(message):
|
||||
self.handle_screen_status(message)
|
||||
|
||||
""" @self.bot.message_handler(commands=['replay'])
|
||||
def send_welcome(message):
|
||||
# Crea una tastiera personalizzata
|
||||
markup = types.ReplyKeyboardMarkup(row_width=2)
|
||||
itembtn1 = types.KeyboardButton('Start')
|
||||
itembtn2 = types.KeyboardButton('Lista')
|
||||
markup.add(itembtn1, itembtn2)
|
||||
|
||||
# Invia un messaggio con la tastiera
|
||||
self.bot.send_message(message.chat.id, "Scegli un'opzione:", reply_markup=markup) """
|
||||
|
||||
"""@self.bot.message_handler(commands=['inline'])
|
||||
def send_welcome(message):
|
||||
# Crea una tastiera inline
|
||||
markup = types.InlineKeyboardMarkup()
|
||||
itembtn1 = types.InlineKeyboardButton('Azione 1', callback_data='action1')
|
||||
itembtn2 = types.InlineKeyboardButton('Azione 2', callback_data='action2')
|
||||
itembtn3 = types.InlineKeyboardButton('Azione 3', callback_data='action3')
|
||||
markup.add(itembtn1, itembtn2, itembtn3)
|
||||
|
||||
# Invia un messaggio con la tastiera inline
|
||||
self.bot.send_message(message.chat.id, "Scegli un'opzione:", reply_markup=markup)
|
||||
|
||||
# Gestisce le callback delle tastiere inline
|
||||
@self.bot.callback_query_handler(func=lambda call: True)
|
||||
def handle_callback(call):
|
||||
if call.data == 'action1':
|
||||
self.bot.answer_callback_query(call.id, "Hai scelto Azione 1!")
|
||||
self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 1!")
|
||||
elif call.data == 'action2':
|
||||
self.bot.answer_callback_query(call.id, "Hai scelto Azione 2!")
|
||||
self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 2!")
|
||||
elif call.data == 'action3':
|
||||
self.bot.answer_callback_query(call.id, "Hai scelto Azione 3!")
|
||||
self.bot.send_message(call.message.chat.id, "Hai eseguito Azione 3!")
|
||||
"""
|
||||
|
||||
@self.bot.message_handler(func=lambda message: True)
|
||||
def handle_all_messages(message):
|
||||
self.handle_response(message)
|
||||
|
||||
def is_authorized(self, user_id):
|
||||
return user_id == self.authorized_user_id
|
||||
|
||||
def handle_get_id(self, message):
|
||||
if not self.is_authorized(message.from_user.id):
|
||||
print(f"❌ Non sei autorizzato.")
|
||||
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
|
||||
return
|
||||
|
||||
print(f"Il tuo ID utente è: `{message.from_user.id}`")
|
||||
self.bot.send_message(message.chat.id, f"Il tuo ID utente è: `{message.from_user.id}`", parse_mode="Markdown")
|
||||
|
||||
def handle_start_script(self, message):
|
||||
if not self.is_authorized(message.from_user.id):
|
||||
print(f"❌ Non sei autorizzato. {message.from_user.id}")
|
||||
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
|
||||
return
|
||||
|
||||
screen_id = str(uuid.uuid4())[:8]
|
||||
#screen_id = '0000'
|
||||
|
||||
# Impostare a True per avviare il test_run.py in modalità verbose per il debug
|
||||
|
||||
debug_mode = os.getenv("DEBUG")
|
||||
verbose = debug_mode
|
||||
|
||||
if debug_mode == "True":
|
||||
subprocess.Popen(["python3", "../../test_run.py", screen_id])
|
||||
else:
|
||||
# Verifica se lo screen con il nome esiste già
|
||||
try:
|
||||
subprocess.check_output(["screen", "-list"])
|
||||
existing_screens = subprocess.check_output(["screen", "-list"]).decode('utf-8')
|
||||
if screen_id in existing_screens:
|
||||
print(f"⚠️ Lo script con ID {screen_id} è già in esecuzione.")
|
||||
self.bot.send_message(message.chat.id, f"⚠️ Lo script con ID {screen_id} è già in esecuzione.")
|
||||
return
|
||||
except subprocess.CalledProcessError:
|
||||
pass # Se il comando fallisce, significa che non ci sono screen attivi.
|
||||
|
||||
# Crea la sessione screen e avvia lo script al suo interno
|
||||
command = ["screen", "-dmS", screen_id, "python3", "../../test_run.py", screen_id]
|
||||
|
||||
# Avvia il comando tramite subprocess
|
||||
subprocess.Popen(command)
|
||||
|
||||
# Creazione oggetto script info
|
||||
script_info = {
|
||||
"screen_id": screen_id,
|
||||
"start_time": time.time(),
|
||||
"status": "running",
|
||||
"user_id": message.from_user.id
|
||||
}
|
||||
|
||||
# Salvataggio nel file JSON
|
||||
json_file = "../../scripts.json"
|
||||
|
||||
# Carica i dati esistenti o crea una nuova lista
|
||||
try:
|
||||
with open(json_file, 'r') as f:
|
||||
scripts_data = json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
scripts_data = []
|
||||
|
||||
# Aggiungi il nuovo script
|
||||
scripts_data.append(script_info)
|
||||
|
||||
# Scrivi il file aggiornato
|
||||
with open(json_file, 'w') as f:
|
||||
json.dump(scripts_data, f, indent=4)
|
||||
|
||||
def handle_list_scripts(self, message):
|
||||
if not self.is_authorized(message.from_user.id):
|
||||
print(f"❌ Non sei autorizzato.")
|
||||
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
|
||||
return
|
||||
|
||||
try:
|
||||
with open("../../scripts.json", "r") as f:
|
||||
scripts_data = json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
scripts_data = []
|
||||
|
||||
if not scripts_data:
|
||||
print(f"⚠️ Nessuno script registrato.")
|
||||
self.bot.send_message(message.chat.id, "⚠️ Nessuno script registrato.")
|
||||
return
|
||||
|
||||
current_time = time.time()
|
||||
msg = ["🖥️ **Script Registrati:**\n"]
|
||||
|
||||
for script in scripts_data:
|
||||
# Calcola la durata
|
||||
duration = current_time - script["start_time"]
|
||||
if "end_time" in script:
|
||||
duration = script["end_time"] - script["start_time"]
|
||||
|
||||
# Formatta la durata
|
||||
hours, rem = divmod(duration, 3600)
|
||||
minutes, seconds = divmod(rem, 60)
|
||||
duration_str = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
|
||||
|
||||
# Icona stato
|
||||
status_icons = {
|
||||
"running": "🟢",
|
||||
"stopped": "🔴",
|
||||
"completed": "⚪"
|
||||
}
|
||||
|
||||
# Costruisci riga
|
||||
line = (
|
||||
f"• ID: `{script['screen_id']}`\n"
|
||||
f"• Stato: {status_icons.get(script['status'], '⚫')}\n"
|
||||
f"• Stop: `/stop {script['screen_id']}`\n"
|
||||
f"• Screen: `/screen {script['screen_id']}`\n"
|
||||
f"• Durata: {duration_str}\n"
|
||||
f"• Download:\n{script.get('titolo', 'N/A')}\n"
|
||||
)
|
||||
msg.append(line)
|
||||
|
||||
# Formatta la risposta finale
|
||||
final_msg = "\n".join(msg)
|
||||
if len(final_msg) > 4000:
|
||||
final_msg = final_msg[:4000] + "\n[...] (messaggio troncato)"
|
||||
|
||||
print(f"{final_msg}")
|
||||
self.bot.send_message(message.chat.id, final_msg, parse_mode="Markdown")
|
||||
|
||||
def handle_stop_script(self, message):
|
||||
if not self.is_authorized(message.from_user.id):
|
||||
print(f"❌ Non sei autorizzato.")
|
||||
self.bot.send_message(message.chat.id, "❌ Non sei autorizzato.")
|
||||
return
|
||||
|
||||
parts = message.text.split()
|
||||
if len(parts) < 2:
|
||||
try:
|
||||
with open("../../scripts.json", "r") as f:
|
||||
scripts_data = json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
scripts_data = []
|
||||
|
||||
running_scripts = [s for s in scripts_data if s["status"] == "running"]
|
||||
|
||||
if not running_scripts:
|
||||
print(f"⚠️ Nessuno script attivo da fermare.")
|
||||
self.bot.send_message(message.chat.id, "⚠️ Nessuno script attivo da fermare.")
|
||||
return
|
||||
|
||||
msg = "🖥️ **Script Attivi:**\n"
|
||||
for script in running_scripts:
|
||||
msg += f"🔹 `/stop {script['screen_id']}` per fermarlo\n"
|
||||
|
||||
print(f"{msg}")
|
||||
self.bot.send_message(message.chat.id, msg, parse_mode="Markdown")
|
||||
|
||||
elif len(parts) == 2:
|
||||
screen_id = parts[1]
|
||||
|
||||
try:
|
||||
with open("../../scripts.json", "r") as f:
|
||||
scripts_data = json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
scripts_data = []
|
||||
|
||||
# Filtra la lista eliminando lo script con l'ID specificato
|
||||
new_scripts_data = [script for script in scripts_data if script["screen_id"] != screen_id]
|
||||
|
||||
if len(new_scripts_data) == len(scripts_data):
|
||||
# Nessun elemento rimosso, quindi ID non trovato
|
||||
print(f"⚠️ Nessuno script attivo con ID `{screen_id}`.")
|
||||
self.bot.send_message(message.chat.id, f"⚠️ Nessuno script attivo con ID `{screen_id}`.", parse_mode="Markdown")
|
||||
return
|
||||
|
||||
# Terminare la sessione screen
|
||||
try:
|
||||
subprocess.check_output(["screen", "-S", screen_id, "-X", "quit"])
|
||||
print(f"✅ La sessione screen con ID {screen_id} è stata fermata.")
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"⚠️ Impossibile fermare la sessione screen con ID `{screen_id}`.")
|
||||
self.bot.send_message(message.chat.id, f"⚠️ Impossibile fermare la sessione screen con ID `{screen_id}`.", parse_mode="Markdown")
|
||||
return
|
||||
|
||||
# Salva la lista aggiornata senza lo script eliminato
|
||||
with open("../../scripts.json", "w") as f:
|
||||
json.dump(new_scripts_data, f, indent=4)
|
||||
|
||||
print(f"✅ Script `{screen_id}` terminato con successo!")
|
||||
self.bot.send_message(message.chat.id, f"✅ Script `{screen_id}` terminato con successo!", parse_mode="Markdown")
|
||||
|
||||
def handle_response(self, message):
|
||||
""" if message.text == 'Start':
|
||||
self.handle_start_script(self, message)
|
||||
elif message.text == 'Lista':
|
||||
self.handle_list_scripts(self, message)
|
||||
elif message.text == 'Azione 3':
|
||||
self.bot.send_message(message.chat.id, "Hai scelto Azione 3!")
|
||||
else:
|
||||
self.bot.send_message(message.chat.id, "Comando non riconosciuto.")
|
||||
|
||||
if not self.is_authorized(message.from_user.id):
|
||||
self.bot.reply_to(message, "❌ Non sei autorizzato.")
|
||||
return """
|
||||
|
||||
text = message.text
|
||||
if self.request_manager.save_response(text):
|
||||
print(f"📥 Risposta salvata correttamente per il tipo {text}")
|
||||
else:
|
||||
print("⚠️ Nessuna richiesta attiva.")
|
||||
self.bot.reply_to(message, "❌ Nessuna richiesta attiva.")
|
||||
|
||||
def handle_screen_status(self, message):
|
||||
command_parts = message.text.split()
|
||||
if len(command_parts) < 2:
|
||||
print(f"⚠️ ID mancante nel comando. Usa: /screen <ID>")
|
||||
self.bot.send_message(message.chat.id, "⚠️ ID mancante nel comando. Usa: /screen <ID>")
|
||||
return
|
||||
|
||||
screen_id = command_parts[1]
|
||||
temp_file = f"/tmp/screen_output_{screen_id}.txt"
|
||||
|
||||
try:
|
||||
# Cattura l'output della screen
|
||||
subprocess.run(["screen", "-X", "-S", screen_id, "hardcopy", "-h", temp_file], check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Errore durante la cattura dell'output della screen: {e}")
|
||||
self.bot.send_message(message.chat.id, f"❌ Errore durante la cattura dell'output della screen: {e}")
|
||||
return
|
||||
|
||||
if not os.path.exists(temp_file):
|
||||
print(f"❌ Impossibile catturare l'output della screen.")
|
||||
self.bot.send_message(message.chat.id, f"❌ Impossibile catturare l'output della screen.")
|
||||
return
|
||||
|
||||
try:
|
||||
# Leggi il file con la codifica corretta
|
||||
with open(temp_file, 'r', encoding='latin-1') as file:
|
||||
screen_output = file.read()
|
||||
|
||||
# Pulisci l'output
|
||||
cleaned_output = re.sub(r'[\x00-\x1F\x7F]', '', screen_output) # Rimuovi caratteri di controllo
|
||||
cleaned_output = cleaned_output.replace('\n\n', '\n') # Rimuovi newline multipli
|
||||
|
||||
# Estrarre tutte le parti da "Download:" fino a "Video" o "Subtitle", senza includerli
|
||||
download_matches = re.findall(r"Download: (.*?)(?:Video|Subtitle)", cleaned_output)
|
||||
if download_matches:
|
||||
# Serie TV e Film StreamingCommunity
|
||||
|
||||
proc_matches = re.findall(r"Proc: ([\d\.]+%)", cleaned_output)
|
||||
|
||||
# Creare una stringa unica con tutti i risultati
|
||||
result_string = "\n".join([f"Download: {download_matches[i].strip()}\nDownload al {proc_matches[i]}" for i in range(len(download_matches)) if i < len(proc_matches)])
|
||||
|
||||
if result_string != "":
|
||||
cleaned_output = result_string
|
||||
else:
|
||||
print(f"❌ La parola 'Download:' non è stata trovata nella stringa.")
|
||||
else:
|
||||
|
||||
download_list = []
|
||||
|
||||
# Estrai tutte le righe che iniziano con "Download:" fino al prossimo "Download" o alla fine della riga
|
||||
matches = re.findall(r"Download:\s*(.*?)(?=Download|$)", cleaned_output)
|
||||
|
||||
# Se sono stati trovati download, stampali
|
||||
if matches:
|
||||
for i, match in enumerate(matches, 1):
|
||||
# rimuovo solo la parte "downloader.py:57Result:400" se esiste
|
||||
match = re.sub(r"downloader.py:\d+Result:400", "", match)
|
||||
match = match.strip() # Rimuovo gli spazi bianchi in eccesso
|
||||
if match: # Assicurati che la stringa non sia vuota
|
||||
print(f"Download {i}: {match}")
|
||||
|
||||
# Aggiungi il risultato modificato alla lista
|
||||
download_list.append(f"Download {i}: {match}")
|
||||
|
||||
# Creare una stringa unica con tutti i risultati
|
||||
cleaned_output = "\n".join(download_list)
|
||||
else:
|
||||
print("❌ Nessun download trovato")
|
||||
|
||||
# Invia l'output pulito
|
||||
print(f"📄 Output della screen {screen_id}:\n{cleaned_output}")
|
||||
self._send_long_message(message.chat.id, f"📄 Output della screen {screen_id}:\n{cleaned_output}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Errore durante la lettura o l'invio dell'output della screen: {e}")
|
||||
self.bot.send_message(message.chat.id, f"❌ Errore durante la lettura o l'invio dell'output della screen: {e}")
|
||||
|
||||
# Cancella il file temporaneo
|
||||
os.remove(temp_file)
|
||||
|
||||
def send_message(self, message, choices):
|
||||
if choices is None:
|
||||
if self.chat_id:
|
||||
print(f"{message}")
|
||||
self.bot.send_message(self.chat_id, message)
|
||||
else:
|
||||
formatted_choices = "\n".join(choices)
|
||||
message = f"{message}\n\n{formatted_choices}"
|
||||
if self.chat_id:
|
||||
print(f"{message}")
|
||||
self.bot.send_message(self.chat_id, message)
|
||||
|
||||
def _send_long_message(self, chat_id, text, chunk_size=4096):
|
||||
"""Suddivide e invia un messaggio troppo lungo in più parti."""
|
||||
for i in range(0, len(text), chunk_size):
|
||||
print(f"{text[i:i+chunk_size]}")
|
||||
self.bot.send_message(chat_id, text[i:i+chunk_size])
|
||||
|
||||
def ask(self, type, prompt_message, choices, timeout=60):
|
||||
self.request_manager.create_request(type)
|
||||
|
||||
if choices is None:
|
||||
print(f"{prompt_message}")
|
||||
self.bot.send_message(
|
||||
self.chat_id,
|
||||
f"{prompt_message}",
|
||||
)
|
||||
else:
|
||||
print(f"{prompt_message}\n\nOpzioni: {', '.join(choices)}")
|
||||
self.bot.send_message(
|
||||
self.chat_id,
|
||||
f"{prompt_message}\n\nOpzioni: {', '.join(choices)}",
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
response = self.request_manager.get_response()
|
||||
if response is not None:
|
||||
return response
|
||||
time.sleep(1)
|
||||
|
||||
print(f"⚠️ Timeout: nessuna risposta ricevuta.")
|
||||
self.bot.send_message(self.chat_id, "⚠️ Timeout: nessuna risposta ricevuta.")
|
||||
self.request_manager.clear_file()
|
||||
return None
|
||||
|
||||
def run(self):
|
||||
print("🚀 Avvio del bot...")
|
||||
# svuoto il file ../../scripts.json
|
||||
with open("../../scripts.json", "w") as f:
|
||||
json.dump([], f)
|
||||
self.bot.infinity_polling()
|
||||
|
||||
def get_bot_instance():
|
||||
return TelegramBot.get_instance()
|
||||
|
||||
# Esempio di utilizzo
|
||||
if __name__ == "__main__":
|
||||
|
||||
# Usa le variabili
|
||||
token = os.getenv("TOKEN_TELEGRAM")
|
||||
authorized_user_id = os.getenv("AUTHORIZED_USER_ID")
|
||||
|
||||
TOKEN = token # Inserisci il token del tuo bot Telegram sul file .env
|
||||
AUTHORIZED_USER_ID = int(authorized_user_id) # Inserisci il tuo ID utente Telegram sul file .env
|
||||
|
||||
# Inizializza il bot
|
||||
bot = TelegramBot.init_bot(TOKEN, AUTHORIZED_USER_ID)
|
||||
bot.run()
|
||||
|
||||
"""
|
||||
start - Avvia lo script
|
||||
list - Lista script attivi
|
||||
get - Mostra ID utente Telegram
|
||||
"""
|
@ -1,9 +1,11 @@
|
||||
# 17.10.24
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
|
||||
# External libraries
|
||||
@ -35,6 +37,9 @@ from ...M3U8 import (
|
||||
)
|
||||
from .segments import M3U8_Segments
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
# Config
|
||||
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_DOWNLOAD', 'specific_list_audio')
|
||||
@ -749,9 +754,14 @@ class HLS_Downloader:
|
||||
def start(self):
|
||||
"""
|
||||
Initiates the downloading process. Checks if the output file already exists and proceeds with processing the playlist or index.
|
||||
"""
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
if os.path.exists(self.output_filename):
|
||||
console.log("[red]Output file already exists.")
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Contenuto già scaricato!", None)
|
||||
return 400
|
||||
|
||||
self.path_manager.create_directories()
|
||||
@ -825,6 +835,8 @@ class HLS_Downloader:
|
||||
Args:
|
||||
out_path (str): The path of the output file to be cleaned up.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
# Check if the final output file exists
|
||||
logging.info(f"Check if end file converted exists: {out_path}")
|
||||
@ -869,6 +881,13 @@ class HLS_Downloader:
|
||||
border_style="green"
|
||||
))
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
message = f"Download completato\nDimensione: {formatted_size}\nDurata: {formatted_duration}\nPercorso: {os.path.abspath(self.output_filename)}"
|
||||
# Rimuovere i tag di colore usando una regex
|
||||
clean_message = re.sub(r'\[[a-zA-Z]+\]', '', message)
|
||||
# Invio a telegram
|
||||
bot.send_message(clean_message, None)
|
||||
|
||||
# Handle missing segments
|
||||
if missing_ts:
|
||||
os.rename(self.output_filename, self.output_filename.replace(".mp4", "_failed.mp4"))
|
||||
|
@ -47,6 +47,7 @@ PROXY_START_MAX = config_manager.get_float('REQUESTS', 'proxy_start_max')
|
||||
DEFAULT_VIDEO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_video_workser')
|
||||
DEFAULT_AUDIO_WORKERS = config_manager.get_int('M3U8_DOWNLOAD', 'default_audio_workser')
|
||||
MAX_TIMEOOUT = config_manager.get_int("REQUESTS", "timeout")
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
|
||||
@ -295,6 +296,11 @@ class M3U8_Segments:
|
||||
- description: Description to insert on tqdm bar
|
||||
- type (str): Type of download: 'video' or 'audio'
|
||||
"""
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
# Viene usato per lo screen
|
||||
console.log("####")
|
||||
|
||||
self.setup_interrupt_handler()
|
||||
|
||||
progress_bar = tqdm(
|
||||
|
@ -1,7 +1,7 @@
|
||||
# 09.06.24
|
||||
|
||||
import os
|
||||
import signal
|
||||
import signal, re
|
||||
import sys
|
||||
import ssl
|
||||
import certifi
|
||||
@ -29,6 +29,10 @@ from ...FFmpeg import print_duration_table
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
# Config
|
||||
GET_ONLY_LINK = config_manager.get_bool('M3U8_PARSER', 'get_only_link')
|
||||
@ -49,6 +53,16 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = No
|
||||
- referer (str, optional): The referer header value.
|
||||
- headers_ (dict, optional): Custom headers for the request.
|
||||
"""
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
# Viene usato per lo screen
|
||||
console.log("####")
|
||||
|
||||
if os.path.exists(path):
|
||||
console.log("[red]Output file already exists.")
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Contenuto già scaricato!", None)
|
||||
return 400
|
||||
|
||||
# Early return for link-only mode
|
||||
if GET_ONLY_LINK:
|
||||
@ -100,7 +114,7 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = No
|
||||
# Create progress bar
|
||||
progress_bar = tqdm(
|
||||
total=total,
|
||||
ascii='░▒█',
|
||||
ascii='âââ',
|
||||
bar_format=f"{Colors.YELLOW}[MP4] {Colors.WHITE}({Colors.CYAN}video{Colors.WHITE}): "
|
||||
f"{Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ "
|
||||
f"{Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] "
|
||||
@ -156,8 +170,16 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = No
|
||||
title=f"{os.path.basename(path.replace('.mp4', ''))}",
|
||||
border_style="green"
|
||||
))
|
||||
return path,KILL_HANDLER
|
||||
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
message = f"Download completato\nDimensione: {internet_manager.format_file_size(os.path.getsize(path))}\nDurata: {print_duration_table(path, description=False, return_string=True)}\nTitolo: {os.path.basename(path.replace('.mp4', ''))}"
|
||||
# Rimuovere i tag di colore usando una regex
|
||||
clean_message = re.sub(r'\[[a-zA-Z]+\]', '', message)
|
||||
# Invio a telegram
|
||||
bot.send_message(clean_message, None)
|
||||
|
||||
return path
|
||||
|
||||
else:
|
||||
console.print("[bold red]Download failed or file is empty.[/bold red]")
|
||||
return None
|
||||
|
@ -18,6 +18,11 @@ from typing import Dict, List, Any
|
||||
from .message import start_message
|
||||
from .call_stack import get_call_stack
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
from StreamingCommunity.Util._jsonConfig import config_manager
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
|
||||
|
||||
class TVShowManager:
|
||||
def __init__(self):
|
||||
@ -149,6 +154,9 @@ class TVShowManager:
|
||||
total_items = len(self.tv_shows)
|
||||
last_command = "" # Variable to store the last command executed
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
|
||||
while True:
|
||||
start_message()
|
||||
|
||||
@ -167,7 +175,16 @@ class TVShowManager:
|
||||
self.console.print(f"\n[green]Press [red]Enter [green]for next page, [red]'q' [green]to quit, or [red]'back' [green]to search.")
|
||||
|
||||
if not force_int_input:
|
||||
key = Prompt.ask(
|
||||
if TELEGRAM_BOT:
|
||||
self.console.print(f"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end")
|
||||
key = bot.ask(
|
||||
"select_title_episode",
|
||||
f"Inserisci l'indice dei media (ad esempio, 1), * per scaricare tutti i media, (ad esempio, 1-2) per un intervallo di media, o (ad esempio, 3-*) per scaricare dal un indice specifico fino alla fine.",
|
||||
None
|
||||
)
|
||||
else:
|
||||
key = Prompt.ask(
|
||||
"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end"
|
||||
)
|
||||
@ -175,8 +192,16 @@ class TVShowManager:
|
||||
else:
|
||||
choices = [str(i) for i in range(0, max_int_input)]
|
||||
choices.extend(["q", "quit", "b", "back"])
|
||||
|
||||
key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False)
|
||||
if TELEGRAM_BOT:
|
||||
self.console.print(f"[cyan]Insert media [red]index")
|
||||
key = bot.ask(
|
||||
"select_title",
|
||||
f"Scegli il contenuto da scaricare:\n📺 Serie TV - 🎞️ Film - 🌀 Anime\noppure `back` per tornare indietro",
|
||||
None
|
||||
)
|
||||
else:
|
||||
key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False)
|
||||
|
||||
last_command = key
|
||||
|
||||
if key.lower() == "q" or key.lower() == "quit":
|
||||
@ -198,16 +223,34 @@ class TVShowManager:
|
||||
# Last slice, ensure all remaining items are shown
|
||||
self.console.print(f"\n [green]You've reached the end. [red]Enter [green]for first page, [red]'q' [green]to quit, or [red]'back' [green]to search.")
|
||||
if not force_int_input:
|
||||
key = Prompt.ask(
|
||||
"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end"
|
||||
)
|
||||
if TELEGRAM_BOT:
|
||||
self.console.print(f"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end")
|
||||
key = bot.ask(
|
||||
"select_title_episode",
|
||||
f"Inserisci l'indice dei media (ad esempio, 1), * per scaricare tutti i media, (ad esempio, 1-2) per un intervallo di media, o (ad esempio, 3-*) per scaricare dal un indice specifico fino alla fine.",
|
||||
None
|
||||
)
|
||||
else:
|
||||
key = Prompt.ask(
|
||||
"\n[cyan]Insert media index [yellow](e.g., 1), [red]* [cyan]to download all media, "
|
||||
"[yellow](e.g., 1-2) [cyan]for a range of media, or [yellow](e.g., 3-*) [cyan]to download from a specific index to the end"
|
||||
)
|
||||
|
||||
else:
|
||||
choices = [str(i) for i in range(0, max_int_input)]
|
||||
choices.extend(["q", "quit", "b", "back"])
|
||||
|
||||
key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False)
|
||||
if TELEGRAM_BOT:
|
||||
self.console.print(f"[cyan]Insert media [red]index")
|
||||
key = bot.ask(
|
||||
"select_title",
|
||||
f"Scegli il contenuto da scaricare:\n📺 Serie TV - 🎞️ Film - 🌀 Anime\n oppure `back` per tornare indietro",
|
||||
None
|
||||
)
|
||||
else:
|
||||
key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False)
|
||||
|
||||
last_command = key
|
||||
|
||||
if key.lower() == "q" or key.lower() == "quit":
|
||||
|
@ -8,6 +8,7 @@ import logging
|
||||
import platform
|
||||
import argparse
|
||||
import importlib
|
||||
import threading, asyncio
|
||||
from typing import Callable
|
||||
|
||||
|
||||
@ -19,9 +20,13 @@ from StreamingCommunity.Upload.update import update as git_update
|
||||
from StreamingCommunity.Util.os import os_summary
|
||||
from StreamingCommunity.Util.logger import Logger
|
||||
|
||||
# Telegram bot instance
|
||||
from StreamingCommunity.HelpTg.telegram_bot import get_bot_instance
|
||||
|
||||
# Config
|
||||
CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close')
|
||||
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
|
||||
from StreamingCommunity.HelpTg.session import get_session, deleteScriptId
|
||||
|
||||
|
||||
def run_function(func: Callable[..., None], close_console: bool = False) -> None:
|
||||
@ -43,6 +48,9 @@ def load_search_functions():
|
||||
modules = []
|
||||
loaded_functions = {}
|
||||
|
||||
# Lista dei siti da escludere se TELEGRAM_BOT è attivo
|
||||
excluded_sites = {"cb01new", "ddlstreamitaly", "guardaserie", "ilcorsaronero", "mostraguarda"} if TELEGRAM_BOT else set()
|
||||
|
||||
# Find api home directory
|
||||
if getattr(sys, 'frozen', False): # Modalità PyInstaller
|
||||
base_path = os.path.join(sys._MEIPASS, "StreamingCommunity")
|
||||
@ -57,6 +65,11 @@ def load_search_functions():
|
||||
|
||||
# Get folder name as module name
|
||||
module_name = os.path.basename(os.path.dirname(init_file))
|
||||
|
||||
# Se il modulo è nella lista da escludere, saltalo
|
||||
if module_name in excluded_sites:
|
||||
continue
|
||||
|
||||
logging.info(f"Load module name: {module_name}")
|
||||
|
||||
try:
|
||||
@ -123,8 +136,48 @@ def initialize():
|
||||
except:
|
||||
console.log("[red]Error with loading github.")
|
||||
|
||||
def restart_script():
|
||||
"""Riavvia lo script con gli stessi argomenti della riga di comando."""
|
||||
print("\n🔄 Riavvio dello script...\n")
|
||||
python = sys.executable
|
||||
os.execv(python, [python] + sys.argv) # Riavvia lo stesso script con gli stessi argomenti
|
||||
|
||||
def force_exit():
|
||||
"""Forza la chiusura dello script in qualsiasi contesto."""
|
||||
|
||||
print("\n🛑 Chiusura dello script in corso...")
|
||||
|
||||
# 1️⃣ Chiudi tutti i thread tranne il principale
|
||||
for t in threading.enumerate():
|
||||
if t is not threading.main_thread():
|
||||
print(f"🔄 Chiusura thread: {t.name}")
|
||||
t.join(timeout=1)
|
||||
|
||||
# 2️⃣ Ferma asyncio, se attivo
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
print("⚡ Arresto del loop asyncio...")
|
||||
loop.stop()
|
||||
except RuntimeError:
|
||||
pass # Se non c'è un loop asyncio attivo, ignora l'errore
|
||||
|
||||
# 3️⃣ Esce con sys.exit(), se fallisce usa os._exit()
|
||||
try:
|
||||
print("✅ Uscita con sys.exit(0)")
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
pass # Se viene intercettato da un try/except, ignora
|
||||
|
||||
print("🚨 Uscita forzata con os._exit(0)")
|
||||
os._exit(0) # Se sys.exit() non funziona, esce forzatamente
|
||||
|
||||
def main(script_id):
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot = get_bot_instance()
|
||||
bot.send_message(f"🏁 Avviato script {script_id}", None)
|
||||
|
||||
def main():
|
||||
start = time.time()
|
||||
|
||||
# Create logger
|
||||
@ -140,6 +193,8 @@ def main():
|
||||
description='Script to download movies and series from the internet. Use these commands to configure the script and control its behavior.'
|
||||
)
|
||||
|
||||
parser.add_argument("script_id", nargs="?", default="unknown", help="ID dello script")
|
||||
|
||||
# Add arguments for the main configuration parameters
|
||||
parser.add_argument(
|
||||
'--add_siteName', type=bool, help='Enable or disable adding the site name to the file name (e.g., true/false).'
|
||||
@ -234,12 +289,67 @@ def main():
|
||||
[f"{key}: [{color_map[label[1]]}]{label[0]}[/{color_map[label[1]]}]" for key, label in choice_labels.items()]
|
||||
) + "[white])"
|
||||
|
||||
# Ask the user for input
|
||||
category = msg.ask(prompt_message, choices=list(choice_labels.keys()), default="0", show_choices=False, show_default=False)
|
||||
if TELEGRAM_BOT:
|
||||
|
||||
# Mappa delle emoji per i colori
|
||||
emoji_map = {
|
||||
"yellow": "🟡", # Giallo
|
||||
"red": "🔴", # Rosso
|
||||
"blue": "🔵", # Blu
|
||||
"green": "🟢" # Verde
|
||||
}
|
||||
|
||||
# Display the category legend in a single line
|
||||
category_legend_str = "Categorie: \n" + " | ".join([
|
||||
f"{emoji_map.get(color, '⚪')} {category.capitalize()}"
|
||||
for category, color in color_map.items()
|
||||
])
|
||||
|
||||
# Costruisci il messaggio con le emoji al posto dei colori
|
||||
prompt_message = "Inserisci il sito:\n" + "\n".join(
|
||||
[f"{key}: {emoji_map[color_map[label[1]]]} {label[0]}" for key, label in choice_labels.items()]
|
||||
)
|
||||
|
||||
console.print(f"\n{prompt_message}")
|
||||
|
||||
# Chiedi la scelta all'utente con il bot Telegram
|
||||
category = bot.ask(
|
||||
"select_provider",
|
||||
f"{category_legend_str}\n\n{prompt_message}",
|
||||
None # Passiamo la lista delle chiavi come scelte
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
#console.print(f"\n{prompt_message}")
|
||||
|
||||
# Ask the user for input
|
||||
category = msg.ask(prompt_message, choices=list(choice_labels.keys()), default="0", show_choices=False, show_default=False)
|
||||
|
||||
|
||||
# Run the corresponding function based on user input
|
||||
if category in input_to_function:
|
||||
run_function(input_to_function[category])
|
||||
else:
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Categoria non valida", None)
|
||||
|
||||
console.print("[red]Invalid category.")
|
||||
sys.exit(0)
|
||||
|
||||
if CLOSE_CONSOLE:
|
||||
restart_script() # Riavvia lo script invece di uscire
|
||||
# Riavvia lo script
|
||||
# Chiude il processo attuale e avvia una nuova istanza dello script
|
||||
""" subprocess.Popen([sys.executable] + sys.argv)
|
||||
sys.exit() """
|
||||
else:
|
||||
force_exit() # Usa la funzione per chiudere sempre
|
||||
|
||||
if TELEGRAM_BOT:
|
||||
bot.send_message(f"Chiusura in corso", None)
|
||||
# Delete script_id
|
||||
script_id = get_session()
|
||||
if script_id != "unknown":
|
||||
deleteScriptId(script_id)
|
||||
|
21
config.json
21
config.json
@ -2,23 +2,24 @@
|
||||
"DEFAULT": {
|
||||
"debug": false,
|
||||
"log_file": "app.log",
|
||||
"log_to_file": true,
|
||||
"show_message": true,
|
||||
"log_to_file": false,
|
||||
"show_message": false,
|
||||
"clean_console": true,
|
||||
"root_path": "Video",
|
||||
"root_path": "/home/giuseppepiccolo/Develop/docker/jellyfin/media/",
|
||||
"movie_folder_name": "Movie",
|
||||
"serie_folder_name": "TV",
|
||||
"serie_folder_name": "Serie",
|
||||
"anime_folder_name": "Anime",
|
||||
"map_episode_name": "E%(episode)_%(episode_name)",
|
||||
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
|
||||
"config_qbit_tor": {
|
||||
"host": "192.168.1.99",
|
||||
"port": "7060",
|
||||
"host": "192.168.5.172",
|
||||
"port": "8080",
|
||||
"user": "admin",
|
||||
"pass": "adminadmin"
|
||||
},
|
||||
"add_siteName": false,
|
||||
"disable_searchDomain": false,
|
||||
"not_close": false
|
||||
"not_close": false,
|
||||
"telegram_bot": true
|
||||
},
|
||||
"REQUESTS": {
|
||||
"timeout": 30,
|
||||
@ -34,7 +35,7 @@
|
||||
"specific_list_audio": [
|
||||
"ita"
|
||||
],
|
||||
"merge_subs": true,
|
||||
"merge_subs": false,
|
||||
"specific_list_subtitles": [
|
||||
"eng",
|
||||
"spa"
|
||||
@ -58,7 +59,7 @@
|
||||
"domain": "paris"
|
||||
},
|
||||
"altadefinizionegratis": {
|
||||
"domain": "site"
|
||||
"domain": "pro"
|
||||
},
|
||||
"guardaserie": {
|
||||
"domain": "meme"
|
||||
|
@ -11,4 +11,6 @@ pycryptodomex
|
||||
googlesearch-python
|
||||
fake-useragent
|
||||
qbittorrent-api
|
||||
python-qbittorrent
|
||||
python-qbittorrent
|
||||
pyTelegramBotAPI
|
||||
Pillow
|
12
test_run.py
12
test_run.py
@ -1,5 +1,15 @@
|
||||
# 26.11.24
|
||||
|
||||
import sys
|
||||
from StreamingCommunity.run import main
|
||||
from StreamingCommunity.HelpTg.request_manager import RequestManager
|
||||
from StreamingCommunity.HelpTg.session import set_session
|
||||
|
||||
main()
|
||||
# Svuoto il file
|
||||
request_manager = RequestManager()
|
||||
request_manager.clear_file()
|
||||
script_id = sys.argv[1] if len(sys.argv) > 1 else "unknown"
|
||||
|
||||
set_session(script_id)
|
||||
|
||||
main(script_id)
|
Loading…
x
Reference in New Issue
Block a user