github: Update domains

This commit is contained in:
Lovi 2025-05-31 11:28:38 +02:00
parent 884bcf656c
commit 1776538c6c
2 changed files with 180 additions and 169 deletions

View File

@ -1,17 +1,14 @@
# 20.04.2024 # 20.04.2024
import os
import re import re
import time import os
import json import json
from datetime import datetime from datetime import datetime
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
import httpx import httpx
import ua_generator import ua_generator
JSON_FILE_PATH = os.path.join(".github", ".domain", "domains.json") JSON_FILE_PATH = os.path.join(".github", ".domain", "domains.json")
@ -50,69 +47,137 @@ def get_new_tld(full_url):
return None return None
def try_url_with_retries(url_to_try, headers, timeout=15, retries=3, backoff_factor=0.5): def extract_domain_from_response(response, original_url):
for attempt in range(retries): if 'location' in response.headers:
try: return response.headers['location']
with httpx.Client(headers=headers, timeout=timeout, follow_redirects=True) as client:
response = client.get(url_to_try) if str(response.url) != original_url:
response.raise_for_status() return str(response.url)
return response
try:
except (httpx.TimeoutException, httpx.ConnectError) as e: content_type = response.headers.get('content-type', '').lower()
print(f" [!] Attempt {attempt + 1}/{retries} for {url_to_try}: Network error ({type(e).__name__}). Retrying in {backoff_factor * (2 ** attempt)}s...") if 'text/html' in content_type or 'text/plain' in content_type:
if attempt + 1 == retries: response_text = response.text
print(f" [!] Failed all {retries} attempts for {url_to_try} due to {type(e).__name__}.")
return None
time.sleep(backoff_factor * (2 ** attempt))
except httpx.HTTPStatusError as http_err: js_redirect_patterns = [
if http_err.response.status_code in [403, 429, 503]: r'window\.location\.href\s*=\s*["\']([^"\']+)["\']',
print(f" [!] HTTP error {http_err.response.status_code} for {url_to_try}. Suspected Cloudflare, checking for <base href>...") r'window\.location\s*=\s*["\']([^"\']+)["\']',
try: r'location\.href\s*=\s*["\']([^"\']+)["\']',
with httpx.Client(headers=headers, timeout=timeout, follow_redirects=False) as cf_client: r'document\.location\s*=\s*["\']([^"\']+)["\']'
cf_page_response = cf_client.get(url_to_try) ]
if cf_page_response.status_code != http_err.response.status_code and not (200 <= cf_page_response.status_code < 300) :
cf_page_response.raise_for_status()
match = re.search(r'<base\s+href="([^"]+)"', cf_page_response.text, re.IGNORECASE)
if match:
base_href_url = match.group(1)
parsed_base_href = urlparse(base_href_url)
if not parsed_base_href.scheme or not parsed_base_href.netloc:
original_parsed_url = urlparse(url_to_try)
base_href_url = urlunparse(original_parsed_url._replace(path=base_href_url if base_href_url.startswith('/') else '/' + base_href_url, query='', fragment=''))
print(f" [+] Found <base href>: {base_href_url}")
try:
print(f" [] Attempting request to <base href> URL: {base_href_url}")
with httpx.Client(headers=headers, timeout=timeout, follow_redirects=True) as base_client:
final_response_from_base = base_client.get(base_href_url)
final_response_from_base.raise_for_status()
print(f" [+] Successfully fetched from <base href> URL.")
return final_response_from_base
except httpx.RequestError as base_req_e:
print(f" [!] Error requesting <base href> URL {base_href_url}: {base_req_e}")
return None
else:
print(f" [!] No <base href> found in page content for {url_to_try}.")
return None
except httpx.RequestError as cf_req_e:
print(f" [!] Error fetching Cloudflare-like page content for {url_to_try}: {cf_req_e}")
return None
else:
print(f" [!] HTTP error {http_err.response.status_code} for {url_to_try}. No retry.")
return None
except httpx.RequestError as e: for pattern in js_redirect_patterns:
print(f" [!] Generic error for {url_to_try}: {e}. No retry.") js_match = re.search(pattern, response_text, re.IGNORECASE)
return None if js_match:
return js_match.group(1)
meta_patterns = [
r'<meta[^>]*http-equiv=["\']?refresh["\']?[^>]*content=["\'][^"\']*url=([^"\'>\s]+)',
r'<meta[^>]*content=["\'][^"\']*url=([^"\'>\s]+)[^>]*http-equiv=["\']?refresh["\']?'
]
for pattern in meta_patterns:
meta_match = re.search(pattern, response_text, re.IGNORECASE)
if meta_match:
return meta_match.group(1)
canonical_match = re.search(r'<link[^>]*rel=["\']?canonical["\']?[^>]*href=["\']([^"\']+)["\']', response_text, re.IGNORECASE)
if canonical_match:
return canonical_match.group(1)
base_match = re.search(r'<base[^>]*href=["\']([^"\']+)["\']', response_text, re.IGNORECASE)
if base_match:
return base_match.group(1)
error_redirect_patterns = [
r'[Rr]edirect(?:ed)?\s+to:?\s*([^\s<>"\']+)',
r'[Nn]ew\s+[Uu][Rr][Ll]:?\s*([^\s<>"\']+)',
r'[Mm]oved\s+to:?\s*([^\s<>"\']+)',
r'[Ff]ound\s+at:?\s*([^\s<>"\']+)'
]
for pattern in error_redirect_patterns:
error_match = re.search(pattern, response_text)
if error_match:
potential_url = error_match.group(1)
if potential_url.startswith(('http://', 'https://', '//')):
return potential_url
except Exception as e:
print(f" [!] Error extracting from response content: {e}")
return None return None
def try_url(url_to_try, headers, timeout=15):
try:
with httpx.Client(headers=headers, timeout=timeout, follow_redirects=False) as client:
response = client.get(url_to_try)
if response.status_code in [301, 302, 303, 307, 308]:
location = response.headers.get('location')
if location:
print(f" [+] Found redirect ({response.status_code}) to: {location}")
try:
final_response = client.get(location)
if 200 <= final_response.status_code < 400:
return final_response
else:
return httpx.Response(
status_code=200,
headers={"location": location},
content=b"",
request=response.request
)
except Exception:
return httpx.Response(
status_code=200,
headers={"location": location},
content=b"",
request=response.request
)
elif response.status_code in [403, 409, 429, 503]:
print(f" [!] HTTP {response.status_code} - attempting to extract redirect info")
location = response.headers.get('location')
if location:
print(f" [+] Found location header in error response: {location}")
return httpx.Response(
status_code=200,
headers={"location": location},
content=b"",
request=response.request
)
new_url = extract_domain_from_response(response, url_to_try)
if new_url and new_url != url_to_try:
print(f" [+] Found redirect URL in error response content: {new_url}")
return httpx.Response(
status_code=200,
headers={"location": new_url},
content=b"",
request=response.request
)
if 200 <= response.status_code < 400:
return response
print(f" [!] HTTP {response.status_code} for {url_to_try}")
except httpx.HTTPStatusError as http_err:
new_url = extract_domain_from_response(http_err.response, url_to_try)
if new_url:
print(f" [+] Found new URL from HTTPStatusError response: {new_url}")
return httpx.Response(
status_code=200,
headers={"location": new_url},
content=b"",
request=http_err.request
)
except Exception as e:
print(f" [!] Error for {url_to_try}: {type(e).__name__}")
return None
def update_domain_entries(data): def update_domain_entries(data):
if not data: if not data:
@ -135,100 +200,47 @@ def update_domain_entries(data):
print(f" [] Stored URL: {original_full_url}") print(f" [] Stored URL: {original_full_url}")
if original_domain_in_entry: if original_domain_in_entry:
print(f" [] Stored Domain (TLD): {original_domain_in_entry}") print(f" [] Stored Domain (TLD): {original_domain_in_entry}")
potential_urls_to_try = []
potential_urls_to_try.append(("Original", original_full_url))
try:
parsed_original = urlparse(original_full_url)
current_netloc = parsed_original.netloc
if current_netloc.startswith("www."):
varied_netloc = current_netloc[4:]
potential_urls_to_try.append(("Without www", urlunparse(parsed_original._replace(netloc=varied_netloc))))
else:
varied_netloc = "www." + current_netloc
potential_urls_to_try.append(("With www", urlunparse(parsed_original._replace(netloc=varied_netloc))))
current_path = parsed_original.path
if not current_path:
potential_urls_to_try.append(("With trailing slash", urlunparse(parsed_original._replace(path='/'))))
elif current_path.endswith('/'):
potential_urls_to_try.append(("Without trailing slash", urlunparse(parsed_original._replace(path=current_path[:-1]))))
else:
potential_urls_to_try.append(("With trailing slash", urlunparse(parsed_original._replace(path=current_path + '/'))))
except Exception as e:
print(f" [!] Error generating URL variations: {e}")
entry_updated_in_this_run = False
seen_urls_for_entry = set() print(f" [] Testing URL: {original_full_url}")
unique_potential_urls = [] response = try_url(original_full_url, current_headers)
for label, url_val in potential_urls_to_try:
if url_val not in seen_urls_for_entry:
unique_potential_urls.append((label, url_val))
seen_urls_for_entry.add(url_val)
parsed_original_for_http_check = urlparse(original_full_url)
if parsed_original_for_http_check.scheme == 'https':
http_url = urlunparse(parsed_original_for_http_check._replace(scheme='http'))
if http_url not in seen_urls_for_entry:
unique_potential_urls.append(("HTTP Fallback", http_url))
for label, url_to_check in unique_potential_urls: if response:
if entry_updated_in_this_run: final_url_from_request = str(response.url)
break print(f" [+] Redirect/Response to: {final_url_from_request}")
parsed_final_url = urlparse(final_url_from_request)
normalized_full_url = urlunparse(parsed_final_url._replace(path='/', params='', query='', fragment=''))
if parsed_final_url.path == '' and not normalized_full_url.endswith('/'):
normalized_full_url += '/'
print(f" [] Testing URL ({label}): {url_to_check}") if normalized_full_url != final_url_from_request:
response = try_url_with_retries(url_to_check, current_headers) print(f" [+] Normalized URL: {normalized_full_url}")
if response: if normalized_full_url != original_full_url:
final_url_from_request = str(response.url) new_tld_val = get_new_tld(final_url_from_request)
print(f" [+] Redirect/Response to: {final_url_from_request}")
parsed_final_url = urlparse(final_url_from_request)
normalized_full_url = urlunparse(parsed_final_url._replace(path='/', params='', query='', fragment=''))
if parsed_final_url.path == '' and not normalized_full_url.endswith('/'):
normalized_full_url += '/'
if normalized_full_url != final_url_from_request: if new_tld_val:
print(f" [+] Normalized URL: {normalized_full_url}") entry["full_url"] = normalized_full_url
if normalized_full_url != original_full_url:
new_tld_val = get_new_tld(final_url_from_request)
if new_tld_val: if new_tld_val != original_domain_in_entry:
entry["full_url"] = normalized_full_url print(f" [-] Domain TLD Changed: '{original_domain_in_entry}' -> '{new_tld_val}'")
entry["old_domain"] = original_domain_in_entry if original_domain_in_entry else entry.get("old_domain", "")
if new_tld_val != original_domain_in_entry: entry["domain"] = new_tld_val
print(f" [-] Domain TLD Changed: '{original_domain_in_entry}' -> '{new_tld_val}'") entry["time_change"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
entry["old_domain"] = original_domain_in_entry if original_domain_in_entry else entry.get("old_domain", "") print(f" [-] Domain & URL Updated: New TLD '{new_tld_val}', New URL '{normalized_full_url}'")
entry["domain"] = new_tld_val
entry["time_change"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f" [-] Domain & URL Updated: New TLD '{new_tld_val}', New URL '{normalized_full_url}'")
else:
entry["domain"] = new_tld_val
print(f" [-] URL Updated (TLD Unchanged '{new_tld_val}'): New URL '{normalized_full_url}'")
updated_count += 1
entry_updated_in_this_run = True
else: else:
print(f" [!] Could not extract TLD from {final_url_from_request}. URL not updated despite potential change.") entry["domain"] = new_tld_val
print(f" [-] URL Updated (TLD Unchanged '{new_tld_val}'): New URL '{normalized_full_url}'")
updated_count += 1
else: else:
if final_url_from_request != original_full_url: print(f" [!] Could not extract TLD from {final_url_from_request}. URL not updated despite potential change.")
print(f" [] Same Domain (after normalization): {final_url_from_request} -> {normalized_full_url}") else:
print(f" [] Same Domain: {final_url_from_request}")
else: else:
print(f" [] Same Domain: {final_url_from_request}") print(f" [-] No response for {key}")
if label == "Original" or normalized_full_url == original_full_url :
entry_updated_in_this_run = True
if not entry_updated_in_this_run:
print(f" [-] No Update for {key} after {len(unique_potential_urls)} attempts.")
return updated_count > 0 return updated_count > 0
@ -240,10 +252,8 @@ def main():
if update_domain_entries(domain_data): if update_domain_entries(domain_data):
save_domains(JSON_FILE_PATH, domain_data) save_domains(JSON_FILE_PATH, domain_data)
print("\nUpdate complete. Some entries were modified.") print("\nUpdate complete. Some entries were modified.")
else: else:
print("\nUpdate complete. No domains were modified.") print("\nUpdate complete. No domains were modified.")
else: else:
print("\nCannot proceed without domain data.") print("\nCannot proceed without domain data.")

View File

@ -1,8 +1,8 @@
name: Aggiorna Domini Periodicamente name: Update domains
on: on:
schedule: schedule:
- cron: "*/45 * * * *" - cron: "0 */2 * * *"
workflow_dispatch: workflow_dispatch:
jobs: jobs:
@ -12,37 +12,38 @@ jobs:
contents: write contents: write
steps: steps:
- name: Checkout del codice - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: '3.12' python-version: '3.12'
- name: Install dependencies
run: |
pip install httpx ua-generator requests
pip install --upgrade pip setuptools wheel
- name: Installa dipendenze - name: Configure DNS
run: pip install httpx ua-generator
- name: Configura DNS
run: | run: |
sudo sh -c 'echo "nameserver 9.9.9.9" > /etc/resolv.conf' sudo sh -c 'echo "nameserver 9.9.9.9" > /etc/resolv.conf'
sudo sh -c 'echo "nameserver 149.112.112.122" >> /etc/resolv.conf'
cat /etc/resolv.conf cat /etc/resolv.conf
- name: Esegui lo script di aggiornamento domini - name: Execute domain update script
run: python domain_updater.py run: python .github/.domain/domain_update.py
- name: Commit e Push delle modifiche (se presenti) - name: Commit and push changes (if any)
run: | run: |
git config --global user.name 'github-actions[bot]' git config --global user.name 'github-actions[bot]'
git config --global user.email 'github-actions[bot]@users.noreply.github.com' git config --global user.email 'github-actions[bot]@users.noreply.github.com'
# Controlla se domain.json è stato modificato # Check if domains.json was modified
if ! git diff --quiet domain.json; then if ! git diff --quiet .github/.domain/domains.json; then
git add domain.json git add .github/.domain/domains.json
git commit -m "Aggiornamento automatico domini [skip ci]" git commit -m "Automatic domain update [skip ci]"
echo "Modifiche committate. Tentativo di push..." echo "Changes committed. Attempting to push..."
git push git push
else else
echo "Nessuna modifica a domain.json da committare." echo "No changes to .github/.domain/domains.json to commit."
fi fi