diff --git a/.github/.domain/domain_update.py b/.github/.domain/domain_update.py index 2c69a48..7886ed3 100644 --- a/.github/.domain/domain_update.py +++ b/.github/.domain/domain_update.py @@ -25,7 +25,7 @@ def get_headers(): def get_tld(url_str): try: parsed = urlparse(unquote(url_str)) - domain = parsed.netloc.lower().lstrip('www.') + domain = parsed.netloc.lower() parts = domain.split('.') return parts[-1] if len(parts) >= 2 else None @@ -35,7 +35,7 @@ def get_tld(url_str): def get_base_domain(url_str): try: parsed = urlparse(url_str) - domain = parsed.netloc.lower().lstrip('www.') + domain = parsed.netloc.lower() parts = domain.split('.') return '.'.join(parts[:-1]) if len(parts) > 2 else parts[0] @@ -89,8 +89,10 @@ def parse_url(url): try: extracted = tldextract.extract(url) parsed = urlparse(url) + + # Mantieni il netloc originale (con www se presente) clean_url = f"{parsed.scheme}://{parsed.netloc}/" - full_domain = f"{extracted.domain}.{extracted.suffix}" if extracted.domain else extracted.suffix + full_domain = parsed.netloc domain_tld = extracted.suffix result = { 'url': clean_url, diff --git a/StreamingCommunity/Api/Site/animeunity/site.py b/StreamingCommunity/Api/Site/animeunity/site.py index 0994f80..d2967ec 100644 --- a/StreamingCommunity/Api/Site/animeunity/site.py +++ b/StreamingCommunity/Api/Site/animeunity/site.py @@ -1,11 +1,9 @@ # 10.12.23 -import logging - - # External libraries +import urllib.parse import httpx -from bs4 import BeautifulSoup +from curl_cffi import requests from rich.console import Console @@ -20,92 +18,67 @@ from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance from StreamingCommunity.Api.Template.config_loader import site_constant from StreamingCommunity.Api.Template.Class.SearchType import MediaManager - -# Variable console = Console() media_search_manager = MediaManager() table_show_manager = TVShowManager() max_timeout = config_manager.get_int("REQUESTS", "timeout") -def get_token() -> dict: +def get_token(user_agent: str) -> dict: """ - Function to retrieve session tokens from a specified website. - - Parameters: - - site_name (str): The name of the site. - - domain (str): The domain of the site. - - Returns: - - dict: A dictionary containing session tokens. The keys are 'XSRF_TOKEN', 'animeunity_session', and 'csrf_token'. + Retrieve session cookies from the site. """ - response = httpx.get( - url=site_constant.FULL_URL, - timeout=max_timeout + response = requests.get( + site_constant.FULL_URL, + headers={'user-agent': user_agent}, + impersonate="chrome120" ) response.raise_for_status() + all_cookies = {name: value for name, value in response.cookies.items()} - # Initialize variables to store CSRF token - find_csrf_token = None - soup = BeautifulSoup(response.text, "html.parser") - - for html_meta in soup.find_all("meta"): - if html_meta.get('name') == "csrf-token": - find_csrf_token = html_meta.get('content') - - return { - 'animeunity_session': response.cookies['animeunity_session'], - 'csrf_token': find_csrf_token - } + return {k: urllib.parse.unquote(v) for k, v in all_cookies.items()} -def get_real_title(record): +def get_real_title(record: dict) -> str: """ - Get the real title from a record. - - Parameters: - - record (dict): A dictionary representing a row of JSON data. - - Returns: - - str: The title found in the record. If no title is found, returns None. + Return the most appropriate title from the record. """ - if record['title_eng'] is not None: + if record.get('title_eng'): return record['title_eng'] - elif record['title'] is not None: + elif record.get('title'): return record['title'] else: - return record['title_it'] + return record.get('title_it', '') def title_search(query: str) -> int: """ - Function to perform an anime search using both APIs and combine results. - - Parameters: - - query (str): The query to search for. - - Returns: - - int: A number containing the length of media search manager. + Perform anime search on animeunity.so. """ - if site_constant.TELEGRAM_BOT: + if site_constant.TELEGRAM_BOT: bot = get_bot_instance() - + media_search_manager.clear() table_show_manager.clear() seen_titles = set() choices = [] if site_constant.TELEGRAM_BOT else None - # Create parameter for request - data = get_token() + user_agent = get_userAgent() + data = get_token(user_agent) + cookies = { - 'animeunity_session': data.get('animeunity_session') - } - headers = { - 'user-agent': get_userAgent(), - 'x-csrf-token': data.get('csrf_token') + 'XSRF-TOKEN': data.get('XSRF-TOKEN', ''), + 'animeunity_session': data.get('animeunity_session', ''), } - # First API call - livesearch + headers = { + 'origin': site_constant.FULL_URL, + 'referer': f"{site_constant.FULL_URL}/", + 'user-agent': user_agent, + 'x-xsrf-token': data.get('XSRF-TOKEN', ''), + } + + # First call: /livesearch try: response1 = httpx.post( f'{site_constant.FULL_URL}/livesearch', @@ -114,15 +87,14 @@ def title_search(query: str) -> int: json={'title': query}, timeout=max_timeout ) - response1.raise_for_status() - process_results(response1.json()['records'], seen_titles, media_search_manager, choices) + process_results(response1.json().get('records', []), seen_titles, media_search_manager, choices) except Exception as e: console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}") return 0 - # Second API call - archivio + # Second call: /archivio/get-animes try: json_data = { 'title': query, @@ -135,7 +107,6 @@ def title_search(query: str) -> int: 'dubbed': False, 'season': False } - response2 = httpx.post( f'{site_constant.FULL_URL}/archivio/get-animes', cookies=cookies, @@ -143,30 +114,32 @@ def title_search(query: str) -> int: json=json_data, timeout=max_timeout ) - response2.raise_for_status() - process_results(response2.json()['records'], seen_titles, media_search_manager, choices) + process_results(response2.json().get('records', []), seen_titles, media_search_manager, choices) except Exception as e: console.print(f"Site: {site_constant.SITE_NAME}, archivio search error: {e}") if site_constant.TELEGRAM_BOT and choices and len(choices) > 0: - bot.send_message(f"Lista dei risultati:", choices) - + bot.send_message("List of results:", choices) + result_count = media_search_manager.get_length() if result_count == 0: console.print(f"Nothing matching was found for: {query}") - + return result_count + def process_results(records: list, seen_titles: set, media_manager: MediaManager, choices: list = None) -> None: - """Helper function to process search results and add unique entries.""" + """ + Add unique results to the media manager and to choices. + """ for dict_title in records: try: title_id = dict_title.get('id') if title_id in seen_titles: continue - + seen_titles.add(title_id) dict_title['name'] = get_real_title(dict_title) @@ -179,10 +152,9 @@ def process_results(records: list, seen_titles: set, media_manager: MediaManager 'episodes_count': dict_title.get('episodes_count'), 'image': dict_title.get('imageurl') }) - + if choices is not None: - choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}" + choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodes: {dict_title.get('episodes_count')}" choices.append(choice_text) - except Exception as e: - print(f"Error parsing a title entry: {e}") + print(f"Error parsing a title entry: {e}") \ No newline at end of file