Update get_domain

This commit is contained in:
Lovi 2025-01-03 11:51:28 +01:00
parent 5d71f81b61
commit 3727b5dea7
10 changed files with 152 additions and 201 deletions

View File

@ -57,7 +57,6 @@ def title_search(word_to_search: str) -> int:
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
# Scrape div film in table on single page
for tr in soup.find_all('tr'):
try:
@ -72,8 +71,8 @@ def title_search(word_to_search: str) -> int:
media_search_manager.add_media(title_info)
except:
continue
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -62,21 +62,27 @@ def title_search(title_search: str) -> int:
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', id="dle-content")
# Scrape div film in table on single page
for film_div in table_content.find_all('div', class_='col-lg-3'):
title = film_div.find('h2', class_='titleFilm').get_text(strip=True)
link = film_div.find('h2', class_='titleFilm').find('a')['href']
imdb_rating = film_div.find('div', class_='imdb-rate').get_text(strip=True).split(":")[-1]
for row in soup.find_all('div', class_='col-lg-3 col-md-3 col-xs-4'):
try:
title_element = row.find('h2', class_='titleFilm').find('a')
title = title_element.get_text(strip=True)
link = title_element['href']
film_info = {
'name': title,
'url': link,
'score': imdb_rating
}
imdb_element = row.find('div', class_='imdb-rate')
imdb_rating = imdb_element.get_text(strip=True).split(":")[-1]
media_search_manager.add_media(film_info)
film_info = {
'name': title,
'url': link,
'score': imdb_rating
}
media_search_manager.add_media(film_info)
except AttributeError as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -140,21 +140,23 @@ def title_search(title: str) -> int:
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
# Process each record returned in the response
for dict_title in response.json()['records']:
try:
# Rename keys for consistency
dict_title['name'] = get_real_title(dict_title)
# Rename keys for consistency
dict_title['name'] = get_real_title(dict_title)
# Add the record to media search manager if the name is not None
media_search_manager.add_media({
'id': dict_title.get('id'),
'slug': dict_title.get('slug'),
'name': dict_title.get('name'),
'type': dict_title.get('type'),
'score': dict_title.get('score'),
'episodes_count': dict_title.get('episodes_count')
})
media_search_manager.add_media({
'id': dict_title.get('id'),
'slug': dict_title.get('slug'),
'name': dict_title.get('name'),
'type': dict_title.get('type'),
'score': dict_title.get('score'),
'episodes_count': dict_title.get('episodes_count')
})
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the length of media search manager
return media_search_manager.get_length()

View File

@ -50,20 +50,23 @@ def title_search(word_to_search: str) -> int:
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
# For all element in table
for div in soup.find_all("div", class_ = "card-content"):
try:
url = div.find("h3").find("a").get("href")
title = div.find("h3").find("a").get_text(strip=True)
desc = div.find("p").find("strong").text
url = div.find("h3").find("a").get("href")
title = div.find("h3").find("a").get_text(strip=True)
desc = div.find("p").find("strong").text
title_info = {
'name': title,
'desc': desc,
'url': url
}
title_info = {
'name': title,
'desc': desc,
'url': url
}
media_search_manager.add_media(title_info)
media_search_manager.add_media(title_info)
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -75,9 +75,9 @@ def title_search(word_to_search: str) -> int:
}
media_search_manager.add_media(title_info)
except Exception as e:
logging.error(f"Error processing title div: {e}")
print(f"Error parsing a film entry: {e}")
return media_search_manager.get_length()

View File

@ -58,8 +58,8 @@ def title_search(word_to_search: str) -> int:
table_content = soup.find('div', class_="mlnew-list")
for serie_div in table_content.find_all('div', class_='mlnew'):
try:
title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True)
link = serie_div.find('div', class_='mlnh-2').find('a')['href']
imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True)
@ -72,8 +72,8 @@ def title_search(word_to_search: str) -> int:
media_search_manager.add_media(serie_info)
except:
pass
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -39,18 +39,21 @@ async def title_search(word_to_search: str) -> int:
scraper = IlCorsaroNeroScraper(f"https://{SITE_NAME}.{domain_to_use}/", 1)
results = await scraper.search(word_to_search)
# Add all result to media manager
for i, torrent in enumerate(results):
media_search_manager.add_media({
'name': torrent['name'],
'type': torrent['type'],
'seed': torrent['seed'],
'leech': torrent['leech'],
'size': torrent['size'],
'date': torrent['date'],
'url': torrent['url']
})
try:
media_search_manager.add_media({
'name': torrent['name'],
'type': torrent['type'],
'seed': torrent['seed'],
'leech': torrent['leech'],
'size': torrent['size'],
'date': torrent['date'],
'url': torrent['url']
})
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -116,16 +116,20 @@ def title_search(title_search: str, domain: str) -> int:
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
# Add found titles to media search manager
for dict_title in response.json()['data']:
media_search_manager.add_media({
'id': dict_title.get('id'),
'slug': dict_title.get('slug'),
'name': dict_title.get('name'),
'type': dict_title.get('type'),
'date': dict_title.get('last_air_date'),
'score': dict_title.get('score')
})
try:
media_search_manager.add_media({
'id': dict_title.get('id'),
'slug': dict_title.get('slug'),
'name': dict_title.get('name'),
'type': dict_title.get('type'),
'date': dict_title.get('last_air_date'),
'score': dict_title.get('score')
})
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -1,6 +1,5 @@
# 18.06.24
import sys
from urllib.parse import urlparse
@ -15,43 +14,34 @@ from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Util._jsonConfig import config_manager
def google_search(query):
"""
Perform a Google search and return the first result.
Args:
query (str): Search query to execute
Returns:
str: First URL from search results, None if no results found
"""
def get_base_domain(url_str):
"""Extract base domain without protocol, www and path"""
parsed = urlparse(url_str)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
return domain.split('.')[0]
# Perform search with single result limit
search_results = search(query, num_results=1)
first_result = next(search_results, None)
def validate_url(url, base_url, max_timeout):
"""
Validate if URL is accessible and matches expected base domain
"""
console.print(f"\n[cyan]Starting validation for URL[white]: [yellow]{url}")
if not first_result:
console.print("[red]No results found.[/red]")
return first_result
def check_response(response, check_num):
if response.status_code == 403:
console.print(f"[red]Check {check_num} failed: Access forbidden (403)")
return False
if response.status_code >= 400:
console.print(f"[red]Check {check_num} failed: HTTP {response.status_code}")
return False
console.print(f"[green]Check {check_num} passed: HTTP {response.status_code}")
return True
def validate_url(url, max_timeout):
"""
Validate if a URL is accessible and check if its redirect destination is significantly different.
Args:
url (str): URL to validate
max_timeout (int): Maximum timeout for request
Returns:
bool: True if URL is valid, accessible and redirect destination is acceptable
"""
def get_domain_parts(url_str):
parsed = urlparse(url_str)
return parsed.netloc.lower().split('.')[-2:] # Get last two parts of domain
try:
# First check without following redirects
# Check 1: Initial request without following redirects
console.print("[cyan]Performing initial connection check...")
with httpx.Client(
headers={
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
@ -62,11 +52,11 @@ def validate_url(url, max_timeout):
timeout=max_timeout
) as client:
response = client.get(url)
if response.status_code == 403:
if not check_response(response, 1):
return False
response.raise_for_status()
# Then check with redirects enabled
# Check 2: Follow redirects and verify final domain
console.print("[cyan]Checking redirect destination...")
with httpx.Client(
headers={
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
@ -77,131 +67,76 @@ def validate_url(url, max_timeout):
timeout=max_timeout
) as client:
response = client.get(url)
if response.status_code == 403:
if not check_response(response, 2):
return False
response.raise_for_status()
# Compare original and final URLs
original_domain = get_domain_parts(url)
final_domain = get_domain_parts(str(response.url))
# Compare base domains
original_base = get_base_domain(url)
final_base = get_base_domain(str(response.url))
# Check if domains are significantly different
if original_domain != final_domain:
console.print(f"[yellow]Warning: URL redirects to different domain: {response.url}[/yellow]")
console.print(f"[cyan]Comparing domains:")
console.print(f"Original base domain: [yellow]{original_base}")
console.print(f"Final base domain: [yellow]{final_base}")
if original_base != final_base:
console.print(f"[red]Domain mismatch: Redirected to different base domain")
return False
# Verify against expected base_url
expected_base = get_base_domain(base_url)
if final_base != expected_base:
console.print(f"[red]Domain mismatch: Final domain does not match expected base URL")
console.print(f"Expected: [yellow]{expected_base}")
return False
console.print(f"[green]All checks passed: URL is valid and matches expected domain")
return True
except Exception:
return False
def get_final_redirect_url(initial_url, max_timeout):
"""
Follow all redirects for a URL and return final destination.
Args:
initial_url (str): Starting URL to follow redirects from
max_timeout (int): Maximum timeout for request
Returns:
str: Final URL after all redirects, None if error occurs
"""
try:
with httpx.Client(
headers={
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7',
'User-Agent': get_headers()
},
follow_redirects=True,
timeout=max_timeout
) as client:
# Follow redirects and get response
response = client.get(initial_url)
if response.status_code == 403:
console.print("[bold red]The owner of this website has banned your IP[/bold red]")
raise
response.raise_for_status()
return response.url
except Exception as e:
console.print(f"\n[cyan]Test url[white]: [red]{initial_url}, [cyan]error[white]: [red]{e}")
return None
console.print(f"[red]Error during validation: {str(e)}")
return False
def search_domain(site_name: str, base_url: str, get_first: bool = False):
"""
Search for valid domain matching site name and base URL.
Args:
site_name (str): Name of site to find domain for
base_url (str): Base URL to construct complete URLs
get_first (bool): Auto-update config with first valid match if True
Returns:
tuple: (found_domain, complete_url)
"""
# Get configuration values
max_timeout = config_manager.get_int("REQUESTS", "timeout")
domain = str(config_manager.get_dict("SITE", site_name)['domain'])
test_url = f"{base_url}.{domain}"
console.print(f"\n[cyan]Testing initial URL[white]: [yellow]{test_url}")
try:
if validate_url(test_url, max_timeout):
if validate_url(test_url, base_url, max_timeout):
parsed_url = urlparse(test_url)
tld = parsed_url.netloc.split('.')[-1]
config_manager.config['SITE'][site_name]['domain'] = tld
config_manager.write_config()
console.print(f"[green]Successfully validated initial URL")
return tld, test_url
except Exception as e:
console.print(f"[red]Error testing initial URL: {str(e)}")
except Exception:
pass
# Perform Google search if current domain fails
# Google search phase
query = base_url.split("/")[-1]
console.print(f"\n[cyan]Performing Google search for[white]: [yellow]{query}")
search_results = list(search(query, num_results=15, lang="it"))
console.print(f"Google search: {search_results}")
def normalize_for_comparison(url):
"""Normalize URL by removing protocol, www, and trailing slashes"""
url = url.lower()
url = url.replace("https://", "").replace("http://", "")
url = url.replace("www.", "")
return url.rstrip("/")
target_url = normalize_for_comparison(base_url)
# Check each search result
for result_url in search_results:
#console.print(f"[green]Checking url[white]: [red]{result_url}")
# Skip invalid URLs
if not validate_url(result_url, max_timeout):
#console.print(f"[red]URL validation failed for: {result_url}")
continue
parsed_result = urlparse(result_url)
result_domain = normalize_for_comparison(parsed_result.netloc)
# Check if domain matches target
if result_domain.startswith(target_url.split("/")[-1]):
final_url = get_final_redirect_url(result_url, max_timeout)
for idx, result_url in enumerate(search_results, 1):
console.print(f"\n[cyan]Checking Google result {idx}/15[white]: [yellow]{result_url}")
if validate_url(result_url, base_url, max_timeout):
parsed_result = urlparse(result_url)
new_domain = parsed_result.netloc.split(".")[-1]
if final_url is not None:
new_domain = urlparse(str(final_url)).netloc.split(".")[-1]
if get_first or msg.ask(
f"\n[cyan]Do you want to update site[white] [red]'{site_name}'[cyan] with domain[white] [red]'{new_domain}'",
choices=["y", "n"],
default="y"
).lower() == "y":
config_manager.config['SITE'][site_name]['domain'] = new_domain
config_manager.write_config()
return new_domain, f"{base_url}.{new_domain}"
# Update config if auto-update enabled or user confirms
if get_first or msg.ask(
f"\n[cyan]Do you want to auto update site[white] [red]'{site_name}'[cyan] with domain[white] [red]'{new_domain}'.",
choices=["y", "n"],
default="y"
).lower() == "y":
config_manager.config['SITE'][site_name]['domain'] = new_domain
config_manager.write_config()
return new_domain, f"{base_url}.{new_domain}"
# Return original domain if no valid matches found
console.print("[bold red]No valid URL found matching the base URL.[/bold red]")
console.print("[bold red]No valid URLs found matching the base URL.")
return domain, f"{base_url}.{domain}"

View File

@ -79,7 +79,7 @@ def load_site_names():
return site_names
def update_readme(site_names):
def update_readme(site_names, domain_to_use):
if not os.path.exists(README_PATH):
console.print(f"[red]README file not found at {README_PATH}")
return
@ -95,7 +95,6 @@ def update_readme(site_names):
alias = f"{site_name.lower()}"
if alias in site_names:
domain_to_use, _ = search_domain(site_name=alias, base_url=f"https://{alias}", get_first=True)
print("Update line: ", line)
if site_name == "animeunity":
@ -126,4 +125,4 @@ if __name__ == "__main__":
# Update readme
print("\n")
print("Return domain: ", domain_to_use)
update_readme(alias)
update_readme(alias, domain_to_use)