Remove pycachce

This commit is contained in:
Lovi 2024-12-10 17:00:32 +01:00
parent 5969fb2f00
commit 1101d8eae6
51 changed files with 516 additions and 76 deletions

4
.gitignore vendored
View File

@ -39,9 +39,13 @@ key.t
# Cache
__pycache__/
**/__pycache__/
# Ignore node_modules directory in the client dashboard to avoid committing dependencies
/client/dashboard/node_modules
# Ignore build directory in the client dashboard to avoid committing build artifacts
/client/dashboard/build
# PER PYCACHE -> pyclean .

View File

@ -22,7 +22,6 @@ from StreamingCommunity.Util.os import (
# Logic class
from ...FFmpeg import (
print_duration_table,
get_video_duration_s,
join_video,
join_audios,
join_subtitle
@ -52,6 +51,7 @@ GET_ONLY_LINK = config_manager.get_bool('M3U8_PARSER', 'get_only_link')
max_timeout = config_manager.get_int("REQUESTS", "timeout")
headers_index = config_manager.get_dict('REQUESTS', 'user-agent')
m3u8_url_fixer = M3U8_UrlFix()
list_MissingTs = []
@ -397,7 +397,8 @@ class ContentDownloader:
self.expected_real_time = video_m3u8.expected_real_time
# Download the video streams and print status
video_m3u8.download_streams(f"{Colors.MAGENTA}video")
info_dw = video_m3u8.download_streams(f"{Colors.MAGENTA}video", "video")
list_MissingTs.append(info_dw)
# Print duration information of the downloaded video
#print_duration_table(downloaded_video[0].get('path'))
@ -427,7 +428,8 @@ class ContentDownloader:
audio_m3u8.get_info()
# Download the audio segments and print status
audio_m3u8.download_streams(f"{Colors.MAGENTA}audio {Colors.RED}{obj_audio.get('language')}")
info_dw = audio_m3u8.download_streams(f"{Colors.MAGENTA}audio {Colors.RED}{obj_audio.get('language')}", f"audio_{obj_audio.get('language')}")
list_MissingTs.append(info_dw)
# Print duration information of the downloaded audio
#print_duration_table(obj_audio.get('path'))
@ -838,40 +840,33 @@ class HLS_Downloader:
# Rename the output file to the desired output filename if it does not already exist
if not os.path.exists(self.output_filename):
missing_ts = False
missing_info = ""
# Rename the converted file to the specified output filename
os.rename(out_path, self.output_filename)
# Get duration information for the output file
end_output_time = print_duration_table(self.output_filename, description=False, return_string=False)
# Calculate file size and duration for reporting
formatted_size = internet_manager.format_file_size(os.path.getsize(self.output_filename))
formatted_duration = print_duration_table(self.output_filename, description=False, return_string=True)
expected_real_seconds = dict_to_seconds(self.content_downloader.expected_real_time)
end_output_seconds = dict_to_seconds(end_output_time)
# Check if the downloaded content is complete based on expected duration
if expected_real_seconds is not None:
missing_ts = not (expected_real_seconds - 3 <= end_output_seconds <= expected_real_seconds + 3)
else:
missing_ts = "Undefined"
# Second check for missing segments
if not missing_ts:
if get_video_duration_s(self.output_filename) < int(expected_real_seconds) - 5:
# Collect info about type missing
for item in list_MissingTs:
if int(item['nFailed']) >= 1:
missing_ts = True
missing_info += f"[red]TS Failed: {item['nFailed']} {item['type']} tracks[/red]\n"
# Prepare the report panel content
print("")
panel_content = (
f"[bold green]Download completed![/bold green]\n"
f"[cyan]File size: [bold red]{formatted_size}[/bold red]\n"
f"[cyan]Duration: [bold]{formatted_duration}[/bold]\n"
f"[cyan]Missing TS: [bold red]{missing_ts}[/bold red]"
f"[cyan]Duration: [bold]{formatted_duration}[/bold]"
)
if missing_ts:
panel_content += f"\n{missing_info}"
# Display the download completion message
console.print(Panel(
panel_content,

View File

@ -4,10 +4,11 @@ import os
import sys
import time
import queue
import signal
import logging
import binascii
import threading
import signal
from queue import PriorityQueue
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
@ -34,6 +35,7 @@ from ...M3U8 import (
M3U8_Parser,
M3U8_UrlFix
)
from ...FFmpeg.util import print_duration_table, format_duration
from .proxyes import main_test_proxy
# Config
@ -93,6 +95,11 @@ class M3U8_Segments:
self.interrupt_flag = threading.Event()
self.download_interrupted = False
# OTHER INFO
self.info_maxRetry = 0
self.info_nRetry = 0
self.info_nFailed = 0
def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes:
"""
Retrieves the encryption key from the M3U8 playlist.
@ -127,6 +134,7 @@ class M3U8_Segments:
hex_content = binascii.hexlify(response.content).decode('utf-8')
byte_content = bytes.fromhex(hex_content)
#console.print(f"[cyan]Find key: [red]{hex_content}")
return byte_content
def parse_data(self, m3u8_content: str) -> None:
@ -221,11 +229,10 @@ class M3U8_Segments:
self.download_interrupted = True
self.stop_event.set()
"""if threading.current_thread() is threading.main_thread():
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, interrupt_handler)
else:
console.log("[red]Signal handler must be set in the main thread !!")"""
signal.signal(signal.SIGINT, interrupt_handler)
print("Signal handler must be set in the main thread")
def make_requests_stream(self, ts_url: str, index: int, progress_bar: tqdm, backoff_factor: float = 1.5) -> None:
"""
@ -318,11 +325,18 @@ class M3U8_Segments:
except Exception as e:
logging.info(f"Attempt {attempt + 1} failed for segment {index} - '{ts_url}': {e}")
# Update stat variable class
if attempt > self.info_maxRetry:
self.info_maxRetry = ( attempt + 1 )
self.info_nRetry += 1
if attempt + 1 == REQUEST_MAX_RETRY:
console.log(f"[red]Final retry failed for segment: {index}")
self.queue.put((index, None)) # Marker for failed segment
progress_bar.update(1)
break
self.info_nFailed += 1
#break
sleep_time = backoff_factor * (2 ** attempt)
logging.info(f"Retrying segment {index} in {sleep_time} seconds...")
@ -383,12 +397,13 @@ class M3U8_Segments:
except Exception as e:
logging.error(f"Error writing segment {index}: {str(e)}")
def download_streams(self, add_desc):
def download_streams(self, description: str, type: str):
"""
Downloads all TS segments in parallel and writes them to a file.
Parameters:
- add_desc (str): Additional description for the progress bar.
- description: Description to insert on tqdm bar
- type (str): Type of download: 'video' or 'audio'
"""
self.setup_interrupt_handler()
@ -415,15 +430,18 @@ class M3U8_Segments:
AUDIO_WORKERS = DEFAULT_AUDIO_WORKERS
# Differnt workers for audio and video
if "video" in str(add_desc):
if "video" in str(type):
TQDM_MAX_WORKER = VIDEO_WORKERS
if "audio" in str(add_desc):
if "audio" in str(type):
TQDM_MAX_WORKER = AUDIO_WORKERS
console.print(f"[cyan]Video workers[white]: [green]{VIDEO_WORKERS} [white]| [cyan]Audio workers[white]: [green]{AUDIO_WORKERS}")
# Custom bar for mobile and pc
if TQDM_USE_LARGE_BAR:
bar_format = (
f"{Colors.YELLOW}[HLS] {Colors.WHITE}({Colors.CYAN}{add_desc}{Colors.WHITE}): "
f"{Colors.YELLOW}[HLS] {Colors.WHITE}({Colors.CYAN}{description}{Colors.WHITE}): "
f"{Colors.RED}{{percentage:.2f}}% "
f"{Colors.MAGENTA}{{bar}} "
f"{Colors.WHITE}[ {Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] "
@ -512,11 +530,6 @@ class M3U8_Segments:
if self.download_interrupted:
console.log("[red] Download was manually stopped.")
# Optional: Delete partial download
if os.path.exists(self.tmp_file_path):
os.remove(self.tmp_file_path)
sys.exit(0)
# Clean up
self.stop_event.set()
writer_thread.join(timeout=30)
@ -535,5 +548,18 @@ class M3U8_Segments:
file_size = os.path.getsize(self.tmp_file_path)
if file_size == 0:
raise Exception("Output file is empty")
logging.info(f"Download completed. File size: {file_size} bytes")
# Get expected time
ex_hours, ex_minutes, ex_seconds = format_duration(self.expected_real_time_s)
ex_formatted_duration = f"[yellow]{int(ex_hours)}[red]h [yellow]{int(ex_minutes)}[red]m [yellow]{int(ex_seconds)}[red]s"
console.print(f"[cyan]Max retry per URL[white]: [green]{self.info_maxRetry}[green] [white]| [cyan]Total retry done[white]: [green]{self.info_nRetry}[green] [white]| [cyan]Missing TS: [red]{self.info_nFailed} [white]| [cyan]Duration: {print_duration_table(self.tmp_file_path, None, True)} [white]| [cyan]Expected duation: {ex_formatted_duration} \n")
if self.info_nRetry >= len(self.segments) * (1/3.33):
console.print(
"[yellow]⚠ Warning:[/yellow] Too many retries detected! "
"Consider reducing the number of [cyan]workers[/cyan] in the [magenta]config.json[/magenta] file. "
"This will impact [bold]performance[/bold]."
)
# Info to return
return {'type': type, 'nFailed': self.info_nFailed}

View File

@ -1,4 +1,4 @@
# 18.04.24
from .command import join_video, join_audios, join_subtitle
from .util import print_duration_table, get_video_duration_s
from .util import print_duration_table, get_video_duration

View File

@ -9,8 +9,11 @@ from typing import List, Dict
# Internal utilities
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.os import os_manager, suppress_output
from StreamingCommunity.Util.os import os_manager, os_summary, suppress_output
from StreamingCommunity.Util.console import console
# Logic class
from .util import need_to_force_to_ts, check_duration_v_a
from .capture import capture_ffmpeg_real_time
from ..M3U8 import M3U8_Codec
@ -29,6 +32,7 @@ FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_CONVERSION", "default_preset")
# Variable
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
FFMPEG_PATH = os_summary.ffmpeg_path
def join_video(video_path: str, out_path: str, codec: M3U8_Codec = None):
@ -49,8 +53,8 @@ def join_video(video_path: str, out_path: str, codec: M3U8_Codec = None):
logging.error("Missing input video for ffmpeg conversion.")
sys.exit(0)
# Start command
ffmpeg_cmd = ['ffmpeg']
# Start command with locate ffmpeg
ffmpeg_cmd = [FFMPEG_PATH]
# Enabled the use of gpu
if USE_GPU:
@ -140,8 +144,8 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s
video_audio_same_duration = check_duration_v_a(video_path, audio_tracks[0].get('path'))
# Start command
ffmpeg_cmd = ['ffmpeg']
# Start command with locate ffmpeg
ffmpeg_cmd = [FFMPEG_PATH]
# Enabled the use of gpu
if USE_GPU:
@ -242,8 +246,8 @@ def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], out_pat
logging.error("Missing input video for ffmpeg conversion.")
sys.exit(0)
# Start command
ffmpeg_cmd = ["ffmpeg", "-i", video_path]
# Start command with locate ffmpeg
ffmpeg_cmd = [FFMPEG_PATH, "-i", video_path]
# Add subtitle input files first
for subtitle in subtitles_list:

View File

@ -10,6 +10,12 @@ from typing import Tuple
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.os import os_summary
# Variable
FFPROB_PATH = os_summary.ffprobe_path
def has_audio_stream(video_path: str) -> bool:
@ -23,7 +29,7 @@ def has_audio_stream(video_path: str) -> bool:
has_audio (bool): True if the input video has an audio stream, False otherwise.
"""
try:
ffprobe_cmd = ['ffprobe', '-v', 'error', '-print_format', 'json', '-select_streams', 'a', '-show_streams', video_path]
ffprobe_cmd = [FFPROB_PATH, '-v', 'error', '-print_format', 'json', '-select_streams', 'a', '-show_streams', video_path]
logging.info(f"FFmpeg command: {ffprobe_cmd}")
with subprocess.Popen(ffprobe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
@ -47,12 +53,11 @@ def get_video_duration(file_path: str) -> float:
- file_path (str): The path to the video file.
Returns:
(float): The duration of the video in seconds if successful,
None if there's an error.
(float): The duration of the video in seconds if successful, None if there's an error.
"""
try:
ffprobe_cmd = ['ffprobe', '-v', 'error', '-show_format', '-print_format', 'json', file_path]
ffprobe_cmd = [FFPROB_PATH, '-v', 'error', '-show_format', '-print_format', 'json', file_path]
logging.info(f"FFmpeg command: {ffprobe_cmd}")
# Use a with statement to ensure the subprocess is cleaned up properly
@ -74,18 +79,19 @@ def get_video_duration(file_path: str) -> float:
return 1
except Exception as e:
logging.error(f"Error get video duration: {e}")
logging.error(f"Get video duration error: {e}")
sys.exit(0)
def get_video_duration_s(filename):
"""
Get the duration of a video file using ffprobe.
Parameters:
- filename (str): Path to the video file (e.g., 'sim.mp4')
- filename (str): Path to the video file (e.g., 'sim.mp4')
Returns:
- duration (float): Duration of the video in seconds, or None if an error occurs.
- duration (float): Duration of the video in seconds, or None if an error occurs.
"""
ffprobe_cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', filename]
@ -138,7 +144,6 @@ def print_duration_table(file_path: str, description: str = "Duration", return_s
- str: The formatted duration string if return_string is True.
- dict: A dictionary with keys 'h', 'm', 's' representing hours, minutes, and seconds if return_string is False.
"""
video_duration = get_video_duration(file_path)
if video_duration is not None:
@ -160,14 +165,14 @@ def get_ffprobe_info(file_path):
Get format and codec information for a media file using ffprobe.
Parameters:
file_path (str): Path to the media file.
- file_path (str): Path to the media file.
Returns:
dict: A dictionary containing the format name and a list of codec names.
"""
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_format', '-show_streams', '-print_format', 'json', file_path],
[FFPROB_PATH, '-v', 'error', '-show_format', '-show_streams', '-print_format', 'json', file_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
output = result.stdout
@ -195,7 +200,7 @@ def is_png_format_or_codec(file_info):
Check if the format is 'png_pipe' or if any codec is 'png'.
Parameters:
file_info (dict): The dictionary containing file information.
- file_info (dict): The dictionary containing file information.
Returns:
bool: True if the format is 'png_pipe' or any codec is 'png', otherwise False.
@ -210,7 +215,7 @@ def need_to_force_to_ts(file_path):
Get if a file to TS format if it is in PNG format or contains a PNG codec.
Parameters:
file_path (str): Path to the input media file.
- file_path (str): Path to the input media file.
"""
logging.info(f"Processing file: {file_path}")
file_info = get_ffprobe_info(file_path)
@ -225,11 +230,11 @@ def check_duration_v_a(video_path, audio_path):
Check if the duration of the video and audio matches.
Parameters:
- video_path (str): Path to the video file.
- audio_path (str): Path to the audio file.
- video_path (str): Path to the video file.
- audio_path (str): Path to the audio file.
Returns:
- bool: True if the duration of the video and audio matches, False otherwise.
- bool: True if the duration of the video and audio matches, False otherwise.
"""
# Ottieni la durata del video

View File

@ -0,0 +1,2 @@
from .tmdb import tmdb
from .obj_tmbd import Json_film

View File

@ -0,0 +1,39 @@
# 17.09.24
from typing import Dict
class Json_film:
def __init__(self, data: Dict):
self.adult = data.get('adult', False)
self.backdrop_path = data.get('backdrop_path')
self.budget = data.get('budget', 0)
self.homepage = data.get('homepage')
self.id = data.get('id', 0)
self.imdb_id = data.get('imdb_id')
self.origin_country = data.get('origin_country', [])
self.original_language = data.get('original_language')
self.original_title = data.get('original_title')
self.overview = data.get('overview')
self.popularity = data.get('popularity', 0.0)
self.poster_path = data.get('poster_path')
self.release_date = data.get('release_date')
self.revenue = data.get('revenue', 0)
self.runtime = data.get('runtime', 0)
self.status = data.get('status')
self.tagline = data.get('tagline')
self.title = data.get('title')
self.video = data.get('video', False)
self.vote_average = data.get('vote_average', 0.0)
self.vote_count = data.get('vote_count', 0)
def __repr__(self):
return (f"Film(adult={self.adult}, backdrop_path='{self.backdrop_path}', "
f"budget={self.budget}, "
f"homepage='{self.homepage}', id={self.id}, "
f"imdb_id='{self.imdb_id}', origin_country={self.origin_country}, "
f"original_language='{self.original_language}', original_title='{self.original_title}', "
f"overview='{self.overview}', popularity={self.popularity}, poster_path='{self.poster_path}', "
f"release_date='{self.release_date}', revenue={self.revenue}, runtime={self.runtime}, "
f"status='{self.status}', tagline='{self.tagline}', "
f"title='{self.title}', video={self.video}, vote_average={self.vote_average}, vote_count={self.vote_count})")

View File

@ -0,0 +1,346 @@
# 24.08.24
import sys
from typing import Dict
# External libraries
import httpx
from rich.console import Console
# Internal utilities
from .obj_tmbd import Json_film
from StreamingCommunity.Util.table import TVShowManager
# Variable
table_show_manager = TVShowManager()
api_key = "a800ed6c93274fb857ea61bd9e7256c5"
def get_select_title(table_show_manager, generic_obj):
"""
Display a selection of titles and prompt the user to choose one.
Returns:
dict: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Check if the generic_obj list is empty
if not generic_obj:
Console.print("\n[red]No media items available.")
return None
# Example of available colors for columns
available_colors = ['red', 'magenta', 'yellow', 'cyan', 'green', 'blue', 'white']
# Retrieve the keys of the first item as column headers
first_item = generic_obj[0]
column_info = {"Index": {'color': available_colors[0]}} # Always include Index with a fixed color
# Assign colors to the remaining keys dynamically
color_index = 1
for key in first_item.keys():
if key in ('name', 'date', 'number'): # Custom prioritization of colors
if key == 'name':
column_info["Name"] = {'color': 'magenta'}
elif key == 'date':
column_info["Date"] = {'color': 'cyan'}
elif key == 'number':
column_info["Number"] = {'color': 'yellow'}
else:
column_info[key.capitalize()] = {'color': available_colors[color_index % len(available_colors)]}
color_index += 1
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, item in enumerate(generic_obj):
item_dict = {'Index': str(i)}
for key in item.keys():
# Ensure all values are strings for rich add table
item_dict[key.capitalize()] = str(item[key])
table_show_manager.add_tv_show(item_dict)
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(generic_obj))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
Console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) < len(generic_obj):
return generic_obj[int(last_command)]
else:
Console.print("\n[red]Wrong index")
sys.exit(0)
class TheMovieDB:
def __init__(self, api_key):
"""
Initialize the class with the API key.
Parameters:
- api_key (str): The API key for authenticating requests to TheMovieDB.
"""
self.api_key = api_key
self.base_url = "https://api.themoviedb.org/3"
self.console = Console()
#self.genres = self._fetch_genres()
def _make_request(self, endpoint, params=None):
"""
Make a request to the given API endpoint with optional parameters.
Parameters:
- endpoint (str): The API endpoint to hit.
- params (dict): Additional parameters for the request.
Returns:
dict: JSON response as a dictionary.
"""
if params is None:
params = {}
params['api_key'] = self.api_key
url = f"{self.base_url}/{endpoint}"
response = httpx.get(url, params=params)
response.raise_for_status()
return response.json()
def _fetch_genres(self) -> Dict[int, str]:
"""
Fetch and return the genre names from TheMovieDB.
Returns:
Dict[int, str]: A dictionary mapping genre IDs to genre names.
"""
genres = self._make_request("genre/movie/list")
return {genre['id']: genre['name'] for genre in genres.get('genres', [])}
def _process_and_add_tv_shows(self, data, columns):
"""
Process TV show data and add it to the TV show manager.
Parameters:
- data (list): List of dictionaries containing the data to process.
- columns (list): A list of tuples, where each tuple contains the column name and the key to fetch the data from the dictionary.
"""
# Define column styles with colors
tv_show_manager = TVShowManager()
column_info = {
col[0]: {'color': col[2] if len(col) > 2 else 'white'}
for col in columns
}
tv_show_manager.add_column(column_info)
# Add each item to the TV show manager, including rank
for index, item in enumerate(data):
# Convert genre IDs to genre names
genre_names = [self.genres.get(genre_id, 'Unknown') for genre_id in item.get('genre_ids', [])]
tv_show = {
col[0]: str(item.get(col[1], 'N/A')) if col[1] != 'genre_ids' else ', '.join(genre_names)
for col in columns
}
tv_show_manager.add_tv_show(tv_show)
# Display the processed TV show data
tv_show_manager.display_data(tv_show_manager.tv_shows[tv_show_manager.slice_start:tv_show_manager.slice_end])
def _display_with_title(self, title: str, data, columns):
"""
Display data with a title.
Parameters:
- title (str): The title to display.
- data (list): List of dictionaries containing the data to process.
- columns (list): A list of tuples, where each tuple contains the column name and the key to fetch the data from the dictionary.
"""
self.console.print(f"\n{title}", style="bold underline")
self._process_and_add_tv_shows(data, columns)
def display_trending_tv_shows(self):
"""
Fetch and display the trending TV shows of the week.
"""
data = self._make_request("trending/tv/week").get("results", [])
columns = [
("Title", "name", 'cyan'),
("First Air Date", "first_air_date", 'green'),
("Popularity", "popularity", 'magenta'),
("Genres", "genre_ids", 'blue'),
("Origin Country", "origin_country", 'red'),
("Vote Average", "vote_average", 'yellow')
]
self._display_with_title("Trending TV Shows of the Week", data, columns)
def display_trending_films(self):
"""
Fetch and display the trending films of the week.
"""
data = self._make_request("trending/movie/week").get("results", [])
columns = [
("Title", "title", 'cyan'),
("Release Date", "release_date", 'green'),
("Popularity", "popularity", 'magenta'),
("Genres", "genre_ids", 'blue'),
("Vote Average", "vote_average", 'yellow')
]
self._display_with_title("Trending Films of the Week", data, columns)
def search_movie(self, movie_name: str):
"""
Search for a movie by name and return its TMDB ID.
Parameters:
- movie_name (str): The name of the movie to search for.
Returns:
int: The TMDB ID of the selected movie.
"""
generic_obj = []
data = self._make_request("search/movie", {"query": movie_name}).get("results", [])
if not data:
self.console.print("No movies found with that name.", style="red")
return None
self.console.print("\nSelect a Movie:")
for i, movie in enumerate(data, start=1):
generic_obj.append({
'name': movie['title'],
'date': movie.get('release_date', 'N/A'),
'id': movie['id']
})
choice = get_select_title(table_show_manager, generic_obj)
return choice["id"]
def get_movie_details(self, tmdb_id: int) -> Json_film:
"""
Fetch and display details for a specific movie using its TMDB ID.
Parameters:
- tmdb_id (int): The TMDB ID of the movie.
Returns:
- Json_film: The movie details as a class.
"""
movie = self._make_request(f"movie/{tmdb_id}")
if not movie:
self.console.print("Movie not found.", style="red")
return None
return Json_film(movie)
def search_tv_show(self, tv_name: str):
"""
Search for a TV show by name and return its TMDB ID.
Parameters:
- tv_name (str): The name of the TV show to search for.
Returns:
int: The TMDB ID of the selected TV show.
"""
data = self._make_request("search/tv", {"query": tv_name}).get("results", [])
if not data:
self.console.print("No TV shows found with that name.", style="red")
return None
self.console.print("\nSelect a TV Show:")
for i, show in enumerate(data, start=1):
self.console.print(f"{i}. {show['name']} (First Air Date: {show.get('first_air_date', 'N/A')})")
choice = int(input("Enter the number of the show you want: ")) - 1
selected_show = data[choice]
return selected_show["id"] # Return the TMDB ID of the selected TV show
def get_seasons(self, tv_show_id: int):
"""
Get seasons for a given TV show.
Parameters:
- tv_show_id (int): The TMDB ID of the TV show.
Returns:
int: The season number selected by the user.
"""
data = self._make_request(f"tv/{tv_show_id}").get("seasons", [])
if not data:
self.console.print("No seasons found for this TV show.", style="red")
return None
self.console.print("\nSelect a Season:")
for i, season in enumerate(data, start=1):
self.console.print(f"{i}. {season['name']} (Episodes: {season['episode_count']})")
choice = int(input("Enter the number of the season you want: ")) - 1
return data[choice]["season_number"]
def get_episodes(self, tv_show_id: int, season_number: int):
"""
Get episodes for a given season of a TV show.
Parameters:
- tv_show_id (int): The TMDB ID of the TV show.
- season_number (int): The season number.
Returns:
dict: The details of the selected episode.
"""
data = self._make_request(f"tv/{tv_show_id}/season/{season_number}").get("episodes", [])
if not data:
self.console.print("No episodes found for this season.", style="red")
return None
self.console.print("\nSelect an Episode:")
for i, episode in enumerate(data, start=1):
self.console.print(f"{i}. {episode['name']} (Air Date: {episode.get('air_date', 'N/A')})")
choice = int(input("Enter the number of the episode you want: ")) - 1
return data[choice]
# Output
tmdb = TheMovieDB(api_key)
"""
Example:
@ movie
movie_name = "Interstellar"
movie_id = tmdb.search_movie(movie_name)
if movie_id:
movie_details = tmdb.get_movie_details(tmdb_id=movie_id)
print(movie_details)
@ series
tv_name = "Game of Thrones"
tv_show_id = tmdb.search_tv_show(tv_name)
if tv_show_id:
season_number = tmdb.get_seasons(tv_show_id=tv_show_id)
if season_number:
episode = tmdb.get_episodes(tv_show_id=tv_show_id, season_number=season_number)
print(episode)
"""

View File

@ -306,7 +306,11 @@ class InternManager():
print()
class OsSummary():
class OsSummary:
def __init__(self):
self.ffmpeg_path = None
self.ffprobe_path = None
def get_executable_version(self, command: list):
"""
@ -413,7 +417,7 @@ class OsSummary():
console.print("Please install the official Python from [bold blue]https://www.python.org[/bold blue] and try again.", style="bold yellow")
sys.exit(0)
async def get_system_summary(self):
def get_system_summary(self):
"""
Generate a summary of the system environment.
@ -441,19 +445,33 @@ class OsSummary():
console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({glibc_version})[/bold red]")
logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({glibc_version})")
# ffmpeg and ffprobe versions
ffmpeg_path, ffprobe_path = check_ffmpeg()
# Locate ffmpeg and ffprobe
if "binary" not in ffmpeg_path:
ffmpeg_path = self.check_ffmpeg_location(['where', 'ffmpeg'])
if "binary" not in ffprobe_path:
ffprobe_path = self.check_ffmpeg_location(['where', 'ffprobe'])
# Usa il comando 'where' su Windows
if platform.system() == "Windows":
command = 'where'
ffmpeg_version = self.get_executable_version([ffprobe_path, '-version'])
ffprobe_version = self.get_executable_version([ffprobe_path, '-version'])
# Usa il comando 'which' su Unix/Linux
else:
command = 'which'
console.print(f"[cyan]Path[white]: [red]ffmpeg [bold yellow]'{ffmpeg_path}'[/bold yellow][white], [red]ffprobe '[bold yellow]{ffprobe_path}'[/bold yellow]")
# Locate ffmpeg and ffprobe from path enviroment
if self.ffmpeg_path != None and "binary" not in self.ffmpeg_path:
self.ffmpeg_path = self.check_ffmpeg_location([command, 'ffmpeg'])
if self.ffprobe_path != None and "binary" not in self.ffprobe_path:
self.ffprobe_path = self.check_ffmpeg_location([command, 'ffprobe'])
# Locate ffmpeg from bin installation
if self.ffmpeg_path is None or self.ffprobe_path is None:
self.ffmpeg_path, self.ffprobe_path = check_ffmpeg()
if self.ffmpeg_path is None or self.ffprobe_path is None:
console.log("[red]Cant locate ffmpeg or ffprobe")
sys.exit(0)
ffmpeg_version = self.get_executable_version([self.ffprobe_path, '-version'])
ffprobe_version = self.get_executable_version([self.ffprobe_path, '-version'])
console.print(f"[cyan]Path[white]: [red]ffmpeg [bold yellow]'{self.ffmpeg_path}'[/bold yellow][white], [red]ffprobe '[bold yellow]{self.ffprobe_path}'[/bold yellow]")
console.print(f"[cyan]Exe versions[white]: [bold red]ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}[/bold red]")
# Check if requirements.txt exists, if not on pyinstaller
@ -486,12 +504,12 @@ class OsSummary():
logging.info(f"Libraries: {', '.join([self.get_library_version(lib) for lib in optional_libraries])}")
# OTHER
os_manager = OsManager()
internet_manager = InternManager()
os_summary = OsSummary()
@contextlib.contextmanager
def suppress_output():
with contextlib.redirect_stdout(io.StringIO()):

View File

@ -53,7 +53,7 @@
}
},
"EXTRA": {
"mongodb": "mongodb+srv://admin:admin@cluster0.hwk1q.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0",
"mongodb": "mongodb+srv://...",
"database": "StreamingCommunity"
}
}

View File

@ -13,4 +13,5 @@ qbittorrent-api
python-qbittorrent
googlesearch-python
pymongo
fastapi
fastapi
pyclean