core: Fix strip www

This commit is contained in:
None 2025-06-19 11:21:46 +02:00
parent a1dde783f0
commit 817b0027d0
2 changed files with 50 additions and 76 deletions

View File

@ -25,7 +25,7 @@ def get_headers():
def get_tld(url_str): def get_tld(url_str):
try: try:
parsed = urlparse(unquote(url_str)) parsed = urlparse(unquote(url_str))
domain = parsed.netloc.lower().lstrip('www.') domain = parsed.netloc.lower()
parts = domain.split('.') parts = domain.split('.')
return parts[-1] if len(parts) >= 2 else None return parts[-1] if len(parts) >= 2 else None
@ -35,7 +35,7 @@ def get_tld(url_str):
def get_base_domain(url_str): def get_base_domain(url_str):
try: try:
parsed = urlparse(url_str) parsed = urlparse(url_str)
domain = parsed.netloc.lower().lstrip('www.') domain = parsed.netloc.lower()
parts = domain.split('.') parts = domain.split('.')
return '.'.join(parts[:-1]) if len(parts) > 2 else parts[0] return '.'.join(parts[:-1]) if len(parts) > 2 else parts[0]
@ -89,8 +89,10 @@ def parse_url(url):
try: try:
extracted = tldextract.extract(url) extracted = tldextract.extract(url)
parsed = urlparse(url) parsed = urlparse(url)
# Mantieni il netloc originale (con www se presente)
clean_url = f"{parsed.scheme}://{parsed.netloc}/" clean_url = f"{parsed.scheme}://{parsed.netloc}/"
full_domain = f"{extracted.domain}.{extracted.suffix}" if extracted.domain else extracted.suffix full_domain = parsed.netloc
domain_tld = extracted.suffix domain_tld = extracted.suffix
result = { result = {
'url': clean_url, 'url': clean_url,

View File

@ -1,11 +1,9 @@
# 10.12.23 # 10.12.23
import logging
# External libraries # External libraries
import urllib.parse
import httpx import httpx
from bs4 import BeautifulSoup from curl_cffi import requests
from rich.console import Console from rich.console import Console
@ -20,72 +18,42 @@ from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
from StreamingCommunity.Api.Template.config_loader import site_constant from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
console = Console() console = Console()
media_search_manager = MediaManager() media_search_manager = MediaManager()
table_show_manager = TVShowManager() table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout") max_timeout = config_manager.get_int("REQUESTS", "timeout")
def get_token() -> dict: def get_token(user_agent: str) -> dict:
""" """
Function to retrieve session tokens from a specified website. Retrieve session cookies from the site.
Parameters:
- site_name (str): The name of the site.
- domain (str): The domain of the site.
Returns:
- dict: A dictionary containing session tokens. The keys are 'XSRF_TOKEN', 'animeunity_session', and 'csrf_token'.
""" """
response = httpx.get( response = requests.get(
url=site_constant.FULL_URL, site_constant.FULL_URL,
timeout=max_timeout headers={'user-agent': user_agent},
impersonate="chrome120"
) )
response.raise_for_status() response.raise_for_status()
all_cookies = {name: value for name, value in response.cookies.items()}
# Initialize variables to store CSRF token return {k: urllib.parse.unquote(v) for k, v in all_cookies.items()}
find_csrf_token = None
soup = BeautifulSoup(response.text, "html.parser")
for html_meta in soup.find_all("meta"):
if html_meta.get('name') == "csrf-token":
find_csrf_token = html_meta.get('content')
return {
'animeunity_session': response.cookies['animeunity_session'],
'csrf_token': find_csrf_token
}
def get_real_title(record): def get_real_title(record: dict) -> str:
""" """
Get the real title from a record. Return the most appropriate title from the record.
Parameters:
- record (dict): A dictionary representing a row of JSON data.
Returns:
- str: The title found in the record. If no title is found, returns None.
""" """
if record['title_eng'] is not None: if record.get('title_eng'):
return record['title_eng'] return record['title_eng']
elif record['title'] is not None: elif record.get('title'):
return record['title'] return record['title']
else: else:
return record['title_it'] return record.get('title_it', '')
def title_search(query: str) -> int: def title_search(query: str) -> int:
""" """
Function to perform an anime search using both APIs and combine results. Perform anime search on animeunity.so.
Parameters:
- query (str): The query to search for.
Returns:
- int: A number containing the length of media search manager.
""" """
if site_constant.TELEGRAM_BOT: if site_constant.TELEGRAM_BOT:
bot = get_bot_instance() bot = get_bot_instance()
@ -95,17 +63,22 @@ def title_search(query: str) -> int:
seen_titles = set() seen_titles = set()
choices = [] if site_constant.TELEGRAM_BOT else None choices = [] if site_constant.TELEGRAM_BOT else None
# Create parameter for request user_agent = get_userAgent()
data = get_token() data = get_token(user_agent)
cookies = { cookies = {
'animeunity_session': data.get('animeunity_session') 'XSRF-TOKEN': data.get('XSRF-TOKEN', ''),
} 'animeunity_session': data.get('animeunity_session', ''),
headers = {
'user-agent': get_userAgent(),
'x-csrf-token': data.get('csrf_token')
} }
# First API call - livesearch headers = {
'origin': site_constant.FULL_URL,
'referer': f"{site_constant.FULL_URL}/",
'user-agent': user_agent,
'x-xsrf-token': data.get('XSRF-TOKEN', ''),
}
# First call: /livesearch
try: try:
response1 = httpx.post( response1 = httpx.post(
f'{site_constant.FULL_URL}/livesearch', f'{site_constant.FULL_URL}/livesearch',
@ -114,15 +87,14 @@ def title_search(query: str) -> int:
json={'title': query}, json={'title': query},
timeout=max_timeout timeout=max_timeout
) )
response1.raise_for_status() response1.raise_for_status()
process_results(response1.json()['records'], seen_titles, media_search_manager, choices) process_results(response1.json().get('records', []), seen_titles, media_search_manager, choices)
except Exception as e: except Exception as e:
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}") console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0 return 0
# Second API call - archivio # Second call: /archivio/get-animes
try: try:
json_data = { json_data = {
'title': query, 'title': query,
@ -135,7 +107,6 @@ def title_search(query: str) -> int:
'dubbed': False, 'dubbed': False,
'season': False 'season': False
} }
response2 = httpx.post( response2 = httpx.post(
f'{site_constant.FULL_URL}/archivio/get-animes', f'{site_constant.FULL_URL}/archivio/get-animes',
cookies=cookies, cookies=cookies,
@ -143,15 +114,14 @@ def title_search(query: str) -> int:
json=json_data, json=json_data,
timeout=max_timeout timeout=max_timeout
) )
response2.raise_for_status() response2.raise_for_status()
process_results(response2.json()['records'], seen_titles, media_search_manager, choices) process_results(response2.json().get('records', []), seen_titles, media_search_manager, choices)
except Exception as e: except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, archivio search error: {e}") console.print(f"Site: {site_constant.SITE_NAME}, archivio search error: {e}")
if site_constant.TELEGRAM_BOT and choices and len(choices) > 0: if site_constant.TELEGRAM_BOT and choices and len(choices) > 0:
bot.send_message(f"Lista dei risultati:", choices) bot.send_message("List of results:", choices)
result_count = media_search_manager.get_length() result_count = media_search_manager.get_length()
if result_count == 0: if result_count == 0:
@ -159,8 +129,11 @@ def title_search(query: str) -> int:
return result_count return result_count
def process_results(records: list, seen_titles: set, media_manager: MediaManager, choices: list = None) -> None: def process_results(records: list, seen_titles: set, media_manager: MediaManager, choices: list = None) -> None:
"""Helper function to process search results and add unique entries.""" """
Add unique results to the media manager and to choices.
"""
for dict_title in records: for dict_title in records:
try: try:
title_id = dict_title.get('id') title_id = dict_title.get('id')
@ -181,8 +154,7 @@ def process_results(records: list, seen_titles: set, media_manager: MediaManager
}) })
if choices is not None: if choices is not None:
choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodi: {dict_title.get('episodes_count')}" choice_text = f"{len(choices)} - {dict_title.get('name')} ({dict_title.get('type')}) - Episodes: {dict_title.get('episodes_count')}"
choices.append(choice_text) choices.append(choice_text)
except Exception as e: except Exception as e:
print(f"Error parsing a title entry: {e}") print(f"Error parsing a title entry: {e}")