Update get_domain

This commit is contained in:
Lovi 2025-01-06 22:17:24 +01:00
parent 5a091d94d7
commit 7b94b17d2d
12 changed files with 121 additions and 130 deletions

View File

@ -406,7 +406,7 @@ The `run-container` command mounts also the `config.json` file, so any change to
| [Ilcorsaronero](https://ilcorsaronero.link/) | ✅ |
| [CB01New](https://cb01new.quest/) | ✅ |
| [DDLStreamItaly](https://ddlstreamitaly.co/) | ✅ |
| [GuardaSerie](https://guardaserie.com/) | ✅ |
| [GuardaSerie](https://guardaserie.academy/) | ✅ |
| [MostraGuarda](https://mostraguarda.stream/) | ✅ |
| [StreamingCommunity](https://streamingcommunity.prof/) | ✅ |

View File

@ -43,7 +43,7 @@ def title_search(word_to_search: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
# Construct the full site URL and load the search page
try:

View File

@ -43,7 +43,7 @@ def title_search(title_search: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
# Send request to search for title
client = httpx.Client()

View File

@ -110,7 +110,7 @@ def title_search(title: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://www.{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://www.{SITE_NAME}.{DOMAIN_NOW}")
data = get_token(SITE_NAME, domain_to_use)

View File

@ -42,7 +42,7 @@ def title_search(word_to_search: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
response = httpx.get(
url=f"https://{SITE_NAME}.{domain_to_use}/?s={word_to_search}",

View File

@ -46,7 +46,7 @@ def title_search(word_to_search: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
# Send request to search for titles
try:

View File

@ -43,7 +43,7 @@ def title_search(word_to_search: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
# Send request to search for titles
try:

View File

@ -38,7 +38,7 @@ async def title_search(word_to_search: str) -> int:
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
# Create scraper and collect result
print("\n")

View File

@ -81,7 +81,7 @@ def get_version_and_domain():
domain_to_use = DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}")
domain_to_use, base_url = search_domain(SITE_NAME, f"https://{SITE_NAME}.{DOMAIN_NOW}")
version = get_version(domain_to_use)

View File

@ -1,5 +1,8 @@
# 18.06.24
import ssl
import time
import certifi
from urllib.parse import urlparse, unquote
@ -26,158 +29,146 @@ base_headers = {
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': None
'user-agent': ''
}
def get_tld(url_str):
"""Extract the TLD (Top-Level Domain) from the URL without using external libraries."""
url_str = unquote(url_str)
parsed = urlparse(url_str)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
parts = domain.split('.')
if len(parts) >= 2:
return parts[-1]
return None
"""Extract the TLD (Top-Level Domain) from the URL."""
try:
url_str = unquote(url_str)
parsed = urlparse(url_str)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
parts = domain.split('.')
return parts[-1] if len(parts) >= 2 else None
except Exception:
return None
def get_base_domain(url_str):
"""Extract base domain without protocol, www and path"""
parsed = urlparse(url_str)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
return domain.split('.')[0]
"""Extract base domain without protocol, www and path."""
try:
parsed = urlparse(url_str)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
# Check if domain has multiple parts separated by dots
parts = domain.split('.')
if len(parts) > 2: # Handle subdomains
return '.'.join(parts[:-1]) # Return everything except TLD
return parts[0] # Return base domain
except Exception:
return None
def validate_url(url, base_url, max_timeout, max_retries=5):
"""
Validate if URL is accessible and matches expected base domain, with retry mechanism for 403 errors.
"""
"""Validate if URL is accessible and matches expected base domain."""
console.print(f"\n[cyan]Starting validation for URL[white]: [yellow]{url}")
# Verify URL structure matches base_url structure
base_domain = get_base_domain(base_url)
url_domain = get_base_domain(url)
base_headers['user-agent'] = get_headers()
if base_domain != url_domain:
console.print(f"[red]Domain structure mismatch: {url_domain} != {base_domain}")
return False, None
# Count dots to ensure we don't have extra subdomains
base_dots = base_url.count('.')
url_dots = url.count('.')
if url_dots > base_dots + 1: # Allow for one extra dot for TLD change
console.print(f"[red]Too many subdomains in URL")
return False, None
def check_response(response, check_num):
if response.status_code == 403:
console.print(f"[red]Check {check_num} failed: Access forbidden (403)")
return False
if response.status_code >= 400:
console.print(f"[red]Check {check_num} failed: HTTP {response.status_code}")
return False
console.print(f"[green]Check {check_num} passed: HTTP {response.status_code}")
return True
client = httpx.Client(
verify=certifi.where(),
headers=base_headers,
timeout=max_timeout
)
retries = 0
while retries < max_retries:
for retry in range(max_retries):
try:
# Check 1: Initial request without following redirects
#console.print("[cyan]Performing initial connection check...")
base_headers['user-agent'] = get_headers()
with httpx.Client(
headers=base_headers,
follow_redirects=False,
timeout=max_timeout
) as client:
response = client.get(url)
if not check_response(response, 1):
if response.status_code == 403:
retries += 1
console.print(f"[yellow]Retrying... Attempt {retries}/{max_retries}")
continue # Retry on 403 error
return False, None
# Check 2: Follow redirects and verify final domain
#console.print("[cyan]Checking redirect destination...")
with httpx.Client(
headers=base_headers,
follow_redirects=True,
timeout=max_timeout
) as client:
response = client.get(url)
if not check_response(response, 2):
return False, None
# Compare base domains
original_base = get_base_domain(url)
final_base = get_base_domain(str(response.url))
"""console.print(f"[cyan]Comparing domains:")
console.print(f"Original base domain: [yellow]{original_base}.{get_tld(str(url))}")
console.print(f"Final base domain: [yellow]{final_base}.{get_tld(str(response.url))}")"""
if original_base != final_base:
return False, None
expected_base = get_base_domain(base_url)
if final_base != expected_base:
return False, None
if get_tld(str(url)) != get_tld(str(response.url)):
return True, get_tld(str(response.url))
#console.print(f"[green]All checks passed: URL is valid and matches expected domain")
return True, None
except Exception as e:
console.print(f"[red]Error during validation: {str(e)}")
return False, None
console.print(f"[red]Maximum retries reached for URL: {url}")
time.sleep(2) # Add delay between retries
# Initial check without redirects
response = client.get(url, follow_redirects=False)
if response.status_code == 403:
console.print(f"[red]Check failed (403) - Attempt {retry + 1}/{max_retries}")
continue
if response.status_code >= 400:
console.print(f"[red]Check failed: HTTP {response.status_code}")
return False, None
# Follow redirects and verify final domain
final_response = client.get(url, follow_redirects=True)
final_domain = get_base_domain(str(final_response.url))
console.print(f"[cyan]Redirect url: [red]{final_response.url}")
if final_domain != base_domain:
console.print(f"[red]Final domain mismatch: {final_domain} != {base_domain}")
return False, None
new_tld = get_tld(str(final_response.url))
if new_tld != get_tld(url):
return True, new_tld
return True, None
except (httpx.RequestError, ssl.SSLError) as e:
console.print(f"[red]Connection error: {str(e)}")
time.sleep(2) # Add delay after error
continue
return False, None
def search_domain(site_name: str, base_url: str, get_first: bool = False):
"""
Search for valid domain matching site name and base URL.
"""
"""Search for valid domain matching site name and base URL."""
max_timeout = config_manager.get_int("REQUESTS", "timeout")
domain = str(config_manager.get_dict("SITE", site_name)['domain'])
# Test initial URL
try:
is_correct, redirect_tld = validate_url(base_url, base_url, max_timeout, max_retries=5)
if is_correct and redirect_tld is not None:
config_manager.config['SITE'][site_name]['domain'] = redirect_tld
config_manager.write_config()
console.print(f"[green]Successfully validated initial URL")
return redirect_tld, base_url
is_correct, redirect_tld = validate_url(base_url, base_url, max_timeout)
if is_correct:
parsed_url = urlparse(base_url)
tld = parsed_url.netloc.split('.')[-1]
tld = redirect_tld or get_tld(base_url)
config_manager.config['SITE'][site_name]['domain'] = tld
config_manager.write_config()
console.print(f"[green]Successfully validated initial URL")
return tld, base_url
except Exception as e:
console.print(f"[red]Error testing initial URL: {str(e)}")
# Google search phase
query = base_url.split("/")[-1]
console.print(f"\n[cyan]Performing Google search for[white]: [yellow]{query}")
search_results = list(search(query, num_results=20, lang="it"))
for idx, result_url in enumerate(search_results, 1):
if get_base_domain(result_url) == get_base_domain(base_url):
console.print(f"\n[cyan]Checking Google result {idx}/20[white]: [yellow]{result_url}")
if validate_url(result_url, base_url, max_timeout):
parsed_result = urlparse(result_url)
new_domain = parsed_result.netloc.split(".")[-1]
base_domain = get_base_domain(base_url)
console.print(f"\n[cyan]Searching for alternate domains for[white]: [yellow]{base_domain}")
try:
search_results = list(search(base_domain, num_results=20, lang="it"))
filtered_results = [
url for url in search_results
if get_base_domain(url) == base_domain
and url.count('.') <= base_url.count('.') + 1
]
for idx, result_url in enumerate(filtered_results, 1):
console.print(f"\n[cyan]Checking result {idx}/{len(filtered_results)}[white]: [yellow]{result_url}")
is_valid, new_tld = validate_url(result_url, base_url, max_timeout)
if is_valid:
final_tld = new_tld or get_tld(result_url)
if get_first or msg.ask(
f"\n[cyan]Do you want to update site[white] [red]'{site_name}'[cyan] with domain[white] [red]'{new_domain}'",
f"\n[cyan]Update site[white] [red]'{site_name}'[cyan] with domain[white] [red]'{final_tld}'",
choices=["y", "n"],
default="y"
).lower() == "y":
config_manager.config['SITE'][site_name]['domain'] = new_domain
config_manager.config['SITE'][site_name]['domain'] = final_tld
config_manager.write_config()
return new_domain, f"{base_url}.{new_domain}"
return final_tld, f"{base_url}.{final_tld}"
except Exception as e:
console.print(f"[red]Error during search: {str(e)}")
console.print("[bold red]No valid URLs found matching the base URL.")
return domain, f"{base_url}.{domain}"

View File

@ -123,4 +123,4 @@ if __name__ == "__main__":
update_readme(alias, domain_to_use)
print("------------------------------------")
time.sleep(3)
time.sleep(2)

View File

@ -64,7 +64,7 @@
"domain": "prof"
},
"guardaserie": {
"domain": "com"
"domain": "academy"
},
"mostraguarda": {
"domain": "stream"