Versione 3.0.0 (#301)

* Update ScrapeSerie.py

* Update site.py

* Update util.py

* Update ffmpeg_installer.py

* Update os.py

* Update ffmpeg_installer.py

* Update setup.py

* Update version.py

* Update util.py
This commit is contained in:
None 2025-04-22 15:57:43 +02:00 committed by GitHub
parent 0a03be0fae
commit 353a23d169
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 177 additions and 114 deletions

View File

@ -51,10 +51,8 @@ def get_token() -> dict:
for html_meta in soup.find_all("meta"):
if html_meta.get('name') == "csrf-token":
find_csrf_token = html_meta.get('content')
logging.info(f"Extract: ('animeunity_session': {response.cookies['animeunity_session']}, 'csrf_token': {find_csrf_token})")
return {
'animeunity_session': response.cookies['animeunity_session'],
'csrf_token': find_csrf_token
@ -64,9 +62,6 @@ def get_token() -> dict:
def get_real_title(record):
"""
Get the real title from a record.
This function takes a record, which is assumed to be a dictionary representing a row of JSON data.
It looks for a title in the record, prioritizing English over Italian titles if available.
Parameters:
- record (dict): A dictionary representing a row of JSON data.
@ -84,7 +79,7 @@ def get_real_title(record):
def title_search(query: str) -> int:
"""
Function to perform an anime search using a provided query.
Function to perform an anime search using both APIs and combine results.
Parameters:
- query (str): The query to search for.
@ -97,43 +92,85 @@ def title_search(query: str) -> int:
media_search_manager.clear()
table_show_manager.clear()
seen_titles = set()
choices = [] if site_constant.TELEGRAM_BOT else None
# Create parameter for request
data = get_token()
cookies = {'animeunity_session': data.get('animeunity_session')}
cookies = {
'animeunity_session': data.get('animeunity_session')
}
headers = {
'user-agent': get_userAgent(),
'x-csrf-token': data.get('csrf_token')
}
json_data = {'title': query}
# Send a POST request to the API endpoint for live search
# First API call - livesearch
try:
response = httpx.post(
f'{site_constant.FULL_URL}/livesearch',
cookies=cookies,
headers=headers,
response1 = httpx.post(
f'{site_constant.FULL_URL}/livesearch',
cookies=cookies,
headers=headers,
json={'title': query},
timeout=max_timeout
)
response1.raise_for_status()
process_results(response1.json()['records'], seen_titles, media_search_manager, choices)
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, livesearch error: {e}")
# Second API call - archivio
try:
json_data = {
'title': query,
'type': False,
'year': False,
'order': 'Lista A-Z',
'status': False,
'genres': False,
'offset': 0,
'dubbed': False,
'season': False
}
response2 = httpx.post(
f'{site_constant.FULL_URL}/archivio/get-animes',
cookies=cookies,
headers=headers,
json=json_data,
timeout=max_timeout
)
response.raise_for_status()
response2.raise_for_status()
process_results(response2.json()['records'], seen_titles, media_search_manager, choices)
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
console.print(f"Site: {site_constant.SITE_NAME}, archivio search error: {e}")
# Inizializza la lista delle scelte
if site_constant.TELEGRAM_BOT:
choices = []
if site_constant.TELEGRAM_BOT and choices and len(choices) > 0:
bot.send_message(f"Lista dei risultati:", choices)
result_count = media_search_manager.get_length()
if result_count == 0:
console.print(f"Nothing matching was found for: {query}")
return result_count
for dict_title in response.json()['records']:
def process_results(records: list, seen_titles: set, media_manager: MediaManager, choices: list = None) -> None:
"""Helper function to process search results and add unique entries."""
for dict_title in records:
try:
# Rename keys for consistency
title_id = dict_title.get('id')
if title_id in seen_titles:
continue
seen_titles.add(title_id)
dict_title['name'] = get_real_title(dict_title)
media_search_manager.add_media({
'id': dict_title.get('id'),
media_manager.add_media({
'id': title_id,
'slug': dict_title.get('slug'),
'name': dict_title.get('name'),
'type': dict_title.get('type'),
@ -142,18 +179,9 @@ def title_search(query: str) -> int:
'image': dict_title.get('imageurl')
})
if site_constant.TELEGRAM_BOT:
# Crea una stringa formattata per ogni scelta con numero
if choices is not None:
choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}"
choices.append(choice_text)
except Exception as e:
print(f"Error parsing a film entry: {e}")
if site_constant.TELEGRAM_BOT:
if choices:
bot.send_message(f"Lista dei risultati:", choices)
# Return the length of media search manager
return media_search_manager.get_length()
print(f"Error parsing a title entry: {e}")

View File

@ -29,6 +29,7 @@ class ScrapeSerieAnime:
self.is_series = False
self.headers = {'user-agent': get_userAgent()}
self.url = url
self.episodes_cache = None
def setup(self, version: str = None, media_id: int = None, series_name: str = None):
self.version = version
@ -62,38 +63,41 @@ class ScrapeSerieAnime:
logging.error(f"Error fetching episode count: {e}")
return None
def get_info_episode(self, index_ep: int) -> Episode:
def _fetch_all_episodes(self):
"""
Fetch detailed information for a specific episode.
Args:
index_ep (int): Zero-based index of the target episode
Returns:
Episode: Detailed episode information
Fetch all episodes data at once and cache it
"""
try:
params = {
"start_range": index_ep,
"end_range": index_ep + 1
}
count = self.get_count_episodes()
if not count:
return
response = httpx.get(
url=f"{self.url}/info_api/{self.media_id}/{index_ep}",
headers=self.headers,
params=params,
url=f"{self.url}/info_api/{self.media_id}/1",
params={
"start_range": 1,
"end_range": count
},
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Return information about the episode
json_data = response.json()["episodes"][-1]
return Episode(json_data)
self.episodes_cache = response.json()["episodes"]
except Exception as e:
logging.error(f"Error fetching episode information: {e}")
return None
logging.error(f"Error fetching all episodes: {e}")
self.episodes_cache = None
def get_info_episode(self, index_ep: int) -> Episode:
"""
Get episode info from cache
"""
if self.episodes_cache is None:
self._fetch_all_episodes()
if self.episodes_cache and 0 <= index_ep < len(self.episodes_cache):
return Episode(self.episodes_cache[index_ep])
return None
# ------------- FOR GUI -------------
@ -108,4 +112,4 @@ class ScrapeSerieAnime:
"""
Get information for a specific episode.
"""
return self.get_info_episode(episode_index)
return self.get_info_episode(episode_index)

View File

@ -132,10 +132,8 @@ def print_duration_table(file_path: str, description: str = "Duration", return_s
def get_ffprobe_info(file_path):
"""
Get format and codec information for a media file using ffprobe.
Parameters:
- file_path (str): Path to the media file.
Returns:
dict: A dictionary containing the format name and a list of codec names.
Returns None if file does not exist or ffprobe crashes.
@ -143,48 +141,58 @@ def get_ffprobe_info(file_path):
if not os.path.exists(file_path):
logging.error(f"File not found: {file_path}")
return None
# Get ffprobe path and verify it exists
ffprobe_path = get_ffprobe_path()
if not ffprobe_path or not os.path.exists(ffprobe_path):
logging.error(f"FFprobe not found at path: {ffprobe_path}")
return None
# Verify file permissions
try:
# Use subprocess.Popen instead of run to better handle crashes
cmd = [get_ffprobe_path(), '-v', 'error', '-show_format', '-show_streams', '-print_format', 'json', file_path]
logging.info(f"FFmpeg command: {cmd}")
file_stat = os.stat(file_path)
logging.info(f"File permissions: {oct(file_stat.st_mode)}")
if not os.access(file_path, os.R_OK):
logging.error(f"No read permission for file: {file_path}")
return None
except OSError as e:
logging.error(f"Cannot access file {file_path}: {e}")
return None
try:
cmd = [ffprobe_path, '-v', 'error', '-show_format', '-show_streams', '-print_format', 'json', file_path]
logging.info(f"Running FFprobe command: {' '.join(cmd)}")
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
stdout, stderr = proc.communicate()
if proc.returncode != 0:
logging.error(f"FFprobe failed with return code {proc.returncode} for file {file_path}")
if stderr:
logging.error(f"FFprobe stderr: {stderr}")
return {
'format_name': None,
'codec_names': []
}
# Make sure we have valid JSON before parsing
if not stdout or not stdout.strip():
logging.warning(f"FFprobe returned empty output for file {file_path}")
return {
'format_name': None,
'codec_names': []
}
info = json.loads(stdout)
format_name = info['format']['format_name'] if 'format' in info else None
codec_names = [stream['codec_name'] for stream in info['streams']] if 'streams' in info else []
# Use subprocess.run instead of Popen for better error handling
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False # Don't raise exception on non-zero exit
)
if result.returncode != 0:
logging.error(f"FFprobe failed with return code {result.returncode}")
logging.error(f"FFprobe stderr: {result.stderr}")
logging.error(f"FFprobe stdout: {result.stdout}")
logging.error(f"Command: {' '.join(cmd)}")
logging.error(f"FFprobe path permissions: {oct(os.stat(ffprobe_path).st_mode)}")
return None
# Parse JSON output
try:
info = json.loads(result.stdout)
return {
'format_name': format_name,
'codec_names': codec_names
'format_name': info.get('format', {}).get('format_name'),
'codec_names': [stream.get('codec_name') for stream in info.get('streams', [])]
}
except json.JSONDecodeError as e:
logging.error(f"Failed to parse FFprobe output: {e}")
return None
except Exception as e:
logging.error(f"Failed to get ffprobe info for file {file_path}: {e}")
return {
'format_name': None,
'codec_names': []
}
logging.error(f"FFprobe execution failed: {e}")
return None
def is_png_format_or_codec(file_info):
@ -255,4 +263,4 @@ def check_duration_v_a(video_path, audio_path, tolerance=1.0):
if duration_difference <= tolerance:
return True, duration_difference
else:
return False, duration_difference
return False, duration_difference

View File

@ -1,5 +1,5 @@
__title__ = 'StreamingCommunity'
__version__ = '2.9.9'
__version__ = '3.0.0'
__author__ = 'Arrowar'
__description__ = 'A command-line program to download film'
__copyright__ = 'Copyright 2024'

View File

@ -238,6 +238,31 @@ class FFMPEGDownloader:
Returns:
Tuple[Optional[str], Optional[str], Optional[str]]: Paths to ffmpeg, ffprobe, and ffplay executables.
"""
if self.os_name == 'linux':
try:
# Attempt to install FFmpeg using apt
console.print("[bold blue]Trying to install FFmpeg using 'sudo apt install ffmpeg'[/]")
result = subprocess.run(
['sudo', 'apt', 'install', '-y', 'ffmpeg'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
ffmpeg_path = shutil.which('ffmpeg')
ffprobe_path = shutil.which('ffprobe')
if ffmpeg_path and ffprobe_path:
console.print("[bold green]FFmpeg successfully installed via apt[/]")
return ffmpeg_path, ffprobe_path, None
else:
console.print("[bold yellow]Failed to install FFmpeg via apt. Proceeding with static download.[/]")
except Exception as e:
logging.error(f"Error during 'sudo apt install ffmpeg': {e}")
console.print("[bold red]Error during 'sudo apt install ffmpeg'. Proceeding with static download.[/]")
# Proceed with static download if apt installation fails or is not applicable
config = FFMPEG_CONFIGURATION[self.os_name]
executables = [exe.format(arch=self.arch) for exe in config['executables']]
successful_extractions = []
@ -346,4 +371,4 @@ def check_ffmpeg() -> Tuple[Optional[str], Optional[str], Optional[str]]:
except Exception as e:
logging.error(f"Error checking or downloading FFmpeg executables: {e}")
return None, None, None
return None, None, None

View File

@ -104,16 +104,14 @@ class OsManager:
if not path:
return path
# Decode unicode characters
# Decode unicode characters and perform basic sanitization
decoded = unidecode(path)
# Basic path sanitization
sanitized = sanitize_filepath(decoded)
if self.system == 'windows':
# Handle network paths (UNC or IP-based)
if path.startswith('\\\\') or path.startswith('//'):
parts = path.replace('/', '\\').split('\\')
if sanitized.startswith('\\\\') or sanitized.startswith('//'):
parts = sanitized.replace('/', '\\').split('\\')
# Keep server/IP and share name as is
sanitized_parts = parts[:4]
# Sanitize remaining parts
@ -126,9 +124,9 @@ class OsManager:
return '\\'.join(sanitized_parts)
# Handle drive letters
elif len(path) >= 2 and path[1] == ':':
drive = path[:2]
rest = path[2:].lstrip('\\').lstrip('/')
elif len(sanitized) >= 2 and sanitized[1] == ':':
drive = sanitized[:2]
rest = sanitized[2:].lstrip('\\').lstrip('/')
path_parts = [drive] + [
self.get_sanitize_file(part)
for part in rest.replace('/', '\\').split('\\')
@ -138,12 +136,12 @@ class OsManager:
# Regular path
else:
parts = path.replace('/', '\\').split('\\')
parts = sanitized.replace('/', '\\').split('\\')
return '\\'.join(p for p in parts if p)
else:
# Handle Unix-like paths (Linux and macOS)
is_absolute = path.startswith('/')
parts = path.replace('\\', '/').split('/')
is_absolute = sanitized.startswith('/')
parts = sanitized.replace('\\', '/').split('/')
sanitized_parts = [
self.get_sanitize_file(part)
for part in parts
@ -454,4 +452,4 @@ def get_ffmpeg_path():
def get_ffprobe_path():
"""Returns the path of FFprobe."""
return os_summary.ffprobe_path
return os_summary.ffprobe_path

View File

@ -10,7 +10,7 @@ with open(os.path.join(os.path.dirname(__file__), "requirements.txt"), "r", enco
setup(
name="StreamingCommunity",
version="2.9.9",
version="3.0.0",
long_description=read_readme(),
long_description_content_type="text/markdown",
author="Lovi-0",