mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-05 02:55:25 +00:00
Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f4529e5f05 | ||
![]() |
dcfd22bc2b | ||
![]() |
3cbabfb98b | ||
![]() |
6efeb96201 | ||
![]() |
d0207b3669 |
634
.github/.domain/domain_update.py
vendored
634
.github/.domain/domain_update.py
vendored
@ -1,358 +1,360 @@
|
||||
import re
|
||||
# 20.04.2024
|
||||
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
from urllib.parse import urlparse, unquote
|
||||
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
import tldextract
|
||||
import ua_generator
|
||||
|
||||
JSON_FILE_PATH = os.path.join(".github", ".domain", "domains.json")
|
||||
import dns.resolver
|
||||
|
||||
|
||||
def load_domains(file_path):
|
||||
# Variables
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
JSON_FILE_PATH = os.path.join(SCRIPT_DIR, "domains.json")
|
||||
ua = ua_generator.generate(device='desktop', browser=('chrome', 'edge'))
|
||||
|
||||
|
||||
def get_headers():
|
||||
return ua.headers.get()
|
||||
|
||||
def get_tld(url_str):
|
||||
try:
|
||||
parsed = urlparse(unquote(url_str))
|
||||
domain = parsed.netloc.lower().lstrip('www.')
|
||||
parts = domain.split('.')
|
||||
return parts[-1] if len(parts) >= 2 else None
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def get_base_domain(url_str):
|
||||
try:
|
||||
parsed = urlparse(url_str)
|
||||
domain = parsed.netloc.lower().lstrip('www.')
|
||||
parts = domain.split('.')
|
||||
return '.'.join(parts[:-1]) if len(parts) > 2 else parts[0]
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def get_base_url(url_str):
|
||||
try:
|
||||
parsed = urlparse(url_str)
|
||||
return f"{parsed.scheme}://{parsed.netloc}"
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def log(msg, level='INFO'):
|
||||
levels = {
|
||||
'INFO': '[ ]',
|
||||
'SUCCESS': '[+]',
|
||||
'WARNING': '[!]',
|
||||
'ERROR': '[-]'
|
||||
}
|
||||
entry = f"{levels.get(level, '[?]')} {msg}"
|
||||
print(entry)
|
||||
|
||||
def load_json_data(file_path):
|
||||
if not os.path.exists(file_path):
|
||||
print(f"Error: The file {file_path} was not found.")
|
||||
log(f"Error: The file {file_path} was not found.", "ERROR")
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error reading the file {file_path}: {e}")
|
||||
log(f"Error reading the file {file_path}: {e}", "ERROR")
|
||||
return None
|
||||
|
||||
def save_domains(file_path, data):
|
||||
def save_json_data(file_path, data):
|
||||
try:
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
print(f"Data successfully saved to {file_path}")
|
||||
log(f"Data successfully saved to {file_path}", "SUCCESS")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error saving the file {file_path}: {e}")
|
||||
log(f"Error saving the file {file_path}: {e}", "ERROR")
|
||||
|
||||
def parse_url(url):
|
||||
if not url.startswith(('http://', 'https://')):
|
||||
url = 'https://' + url
|
||||
|
||||
def get_new_tld(full_url):
|
||||
try:
|
||||
parsed_url = urlparse(full_url)
|
||||
hostname = parsed_url.hostname
|
||||
if hostname:
|
||||
parts = hostname.split('.')
|
||||
return parts[-1]
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def get_enhanced_headers():
|
||||
ua = ua_generator.generate(device='desktop', browser='chrome')
|
||||
headers = ua.headers.get()
|
||||
extracted = tldextract.extract(url)
|
||||
parsed = urlparse(url)
|
||||
clean_url = f"{parsed.scheme}://{parsed.netloc}/"
|
||||
full_domain = f"{extracted.domain}.{extracted.suffix}" if extracted.domain else extracted.suffix
|
||||
domain_tld = extracted.suffix
|
||||
result = {
|
||||
'url': clean_url,
|
||||
'full_domain': full_domain,
|
||||
'domain': domain_tld,
|
||||
'suffix': extracted.suffix,
|
||||
'subdomain': extracted.subdomain or None
|
||||
}
|
||||
return result
|
||||
|
||||
additional_headers = {
|
||||
'DNT': '1',
|
||||
'Upgrade-Insecure-Requests': '1',
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
|
||||
'Accept-Language': 'en-US,en;q=0.9,it;q=0.8',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'Cache-Control': 'max-age=0',
|
||||
'Connection': 'keep-alive',
|
||||
'Referer': 'https://www.google.com/',
|
||||
except Exception as e:
|
||||
log(f"Error parsing URL: {e}", "ERROR")
|
||||
return None
|
||||
|
||||
def check_dns_resolution(domain):
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.timeout = 2
|
||||
resolver.lifetime = 2
|
||||
|
||||
try:
|
||||
answers = resolver.resolve(domain, 'A')
|
||||
return str(answers[0])
|
||||
except:
|
||||
try:
|
||||
answers = resolver.resolve(domain, 'AAAA')
|
||||
return str(answers[0])
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
except:
|
||||
return None
|
||||
|
||||
def find_new_domain(input_url, output_file=None, verbose=True, json_output=False):
|
||||
log_buffer = []
|
||||
original_info = parse_url(input_url)
|
||||
|
||||
if not original_info:
|
||||
log(f"Could not parse original URL: {input_url}", "ERROR")
|
||||
if json_output:
|
||||
return {'full_url': input_url, 'domain': None}
|
||||
return None
|
||||
|
||||
log(f"Starting analysis for: {original_info['full_domain']}")
|
||||
orig_ip = check_dns_resolution(original_info['full_domain'])
|
||||
if orig_ip:
|
||||
log(f"Original domain resolves to: {orig_ip}", "SUCCESS")
|
||||
else:
|
||||
log(f"Original domain does not resolve to an IP address", "WARNING")
|
||||
|
||||
headers = get_headers()
|
||||
new_domains = []
|
||||
redirects = []
|
||||
final_url = None
|
||||
final_domain_info = None
|
||||
url_to_test_in_loop = None
|
||||
|
||||
for protocol in ['https://', 'http://']:
|
||||
try:
|
||||
url_to_test_in_loop = f"{protocol}{original_info['full_domain']}"
|
||||
log(f"Testing connectivity to {url_to_test_in_loop}")
|
||||
redirect_chain = []
|
||||
current_url = url_to_test_in_loop
|
||||
max_redirects = 10
|
||||
redirect_count = 0
|
||||
|
||||
while redirect_count < max_redirects:
|
||||
with httpx.Client(verify=False, follow_redirects=False, timeout=5) as client:
|
||||
response = client.get(current_url, headers=headers)
|
||||
|
||||
redirect_info = {'url': current_url, 'status_code': response.status_code}
|
||||
redirect_chain.append(redirect_info)
|
||||
log(f"Request to {current_url} - Status: {response.status_code}")
|
||||
|
||||
if response.status_code in (301, 302, 303, 307, 308):
|
||||
if 'location' in response.headers:
|
||||
next_url = response.headers['location']
|
||||
if next_url.startswith('/'):
|
||||
parsed_current = urlparse(current_url)
|
||||
next_url = f"{parsed_current.scheme}://{parsed_current.netloc}{next_url}"
|
||||
|
||||
log(f"Redirect found: {next_url} (Status: {response.status_code})")
|
||||
current_url = next_url
|
||||
redirect_count += 1
|
||||
redirect_domain_info_val = parse_url(next_url)
|
||||
if redirect_domain_info_val and redirect_domain_info_val['full_domain'] != original_info['full_domain']:
|
||||
new_domains.append({'domain': redirect_domain_info_val['full_domain'], 'url': next_url, 'source': 'redirect'})
|
||||
|
||||
else:
|
||||
log(f"Redirect status code but no Location header", "WARNING")
|
||||
break
|
||||
else:
|
||||
break
|
||||
|
||||
if redirect_chain:
|
||||
final_url = redirect_chain[-1]['url']
|
||||
final_domain_info = parse_url(final_url)
|
||||
redirects.extend(redirect_chain)
|
||||
log(f"Final URL after redirects: {final_url}", "SUCCESS")
|
||||
if final_domain_info and final_domain_info['full_domain'] != original_info['full_domain']:
|
||||
new_domains.append({'domain': final_domain_info['full_domain'], 'url': final_url, 'source': 'final_url'})
|
||||
|
||||
final_status = redirect_chain[-1]['status_code'] if redirect_chain else None
|
||||
|
||||
if final_status and final_status < 400 and final_status != 403:
|
||||
break
|
||||
|
||||
if final_status == 403 and redirect_chain and len(redirect_chain) > 1:
|
||||
log(f"Got 403 Forbidden, but captured {len(redirect_chain)-1} redirects before that", "SUCCESS")
|
||||
break
|
||||
|
||||
except httpx.RequestError as e:
|
||||
log(f"Error connecting to {protocol}{original_info['full_domain']}: {str(e)}", "ERROR")
|
||||
|
||||
url_for_auto_redirect = input_url
|
||||
if url_to_test_in_loop:
|
||||
url_for_auto_redirect = url_to_test_in_loop
|
||||
elif original_info and original_info.get('url'):
|
||||
url_for_auto_redirect = original_info['url']
|
||||
|
||||
if not redirects or not new_domains:
|
||||
log("Trying alternate method with automatic redirect following")
|
||||
|
||||
try:
|
||||
with httpx.Client(verify=False, follow_redirects=True, timeout=5) as client:
|
||||
response_auto = client.get(url_for_auto_redirect, headers=headers)
|
||||
|
||||
log(f"Connected with auto-redirects: Status {response_auto.status_code}")
|
||||
|
||||
if response_auto.history:
|
||||
log(f"Found {len(response_auto.history)} redirects with auto-following", "SUCCESS")
|
||||
|
||||
for r_hist in response_auto.history:
|
||||
redirect_info_auto = {'url': str(r_hist.url), 'status_code': r_hist.status_code}
|
||||
redirects.append(redirect_info_auto)
|
||||
log(f"Auto-redirect: {r_hist.url} (Status: {r_hist.status_code})")
|
||||
|
||||
final_url = str(response_auto.url)
|
||||
final_domain_info = parse_url(final_url)
|
||||
for redirect_hist_item in response_auto.history:
|
||||
redirect_domain_val = parse_url(str(redirect_hist_item.url))
|
||||
if redirect_domain_val and original_info and redirect_domain_val['full_domain'] != original_info['full_domain']:
|
||||
new_domains.append({'domain': redirect_domain_val['full_domain'], 'url': str(redirect_hist_item.url), 'source': 'auto-redirect'})
|
||||
|
||||
current_final_url_info = parse_url(str(response_auto.url))
|
||||
|
||||
if current_final_url_info and original_info and current_final_url_info['full_domain'] != original_info['full_domain']:
|
||||
is_already_added = any(d['domain'] == current_final_url_info['full_domain'] and d['source'] == 'auto-redirect' for d in new_domains)
|
||||
if not is_already_added:
|
||||
new_domains.append({'domain': current_final_url_info['full_domain'], 'url': str(response_auto.url), 'source': 'final_url_auto'})
|
||||
final_url = str(response_auto.url)
|
||||
final_domain_info = current_final_url_info
|
||||
log(f"Final URL from auto-redirect: {final_url}", "SUCCESS")
|
||||
|
||||
except httpx.RequestError as e:
|
||||
log(f"Error with auto-redirect attempt: {str(e)}", "ERROR")
|
||||
except NameError:
|
||||
log(f"Error: URL for auto-redirect attempt was not defined.", "ERROR")
|
||||
|
||||
unique_domains = []
|
||||
seen_domains = set()
|
||||
for domain_info_item in new_domains:
|
||||
if domain_info_item['domain'] not in seen_domains:
|
||||
seen_domains.add(domain_info_item['domain'])
|
||||
unique_domains.append(domain_info_item)
|
||||
|
||||
if not final_url:
|
||||
final_url = input_url
|
||||
if not final_domain_info:
|
||||
final_domain_info = original_info
|
||||
|
||||
if final_domain_info:
|
||||
parsed_final_url_info = parse_url(final_url)
|
||||
if parsed_final_url_info:
|
||||
final_url = parsed_final_url_info['url']
|
||||
final_domain_info = parsed_final_url_info
|
||||
else:
|
||||
final_domain_info = original_info
|
||||
final_url = original_info['url'] if original_info else input_url
|
||||
|
||||
results_original_domain = original_info['full_domain'] if original_info else None
|
||||
results_final_domain_tld = final_domain_info['domain'] if final_domain_info and 'domain' in final_domain_info else None
|
||||
|
||||
results = {
|
||||
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
'original_url': input_url,
|
||||
'original_domain': results_original_domain,
|
||||
'original_ip': orig_ip,
|
||||
'new_domains': unique_domains,
|
||||
'redirects': redirects,
|
||||
'log': log_buffer
|
||||
}
|
||||
simplified_json_output = {'full_url': final_url, 'domain': results_final_domain_tld}
|
||||
|
||||
headers.update(additional_headers)
|
||||
return headers
|
||||
if verbose:
|
||||
log(f"DEBUG - Simplified output: {simplified_json_output}", "INFO")
|
||||
|
||||
if output_file:
|
||||
try:
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(results, f, indent=2, ensure_ascii=False)
|
||||
log(f"Results saved to {output_file}", "SUCCESS")
|
||||
except Exception as e:
|
||||
log(f"Error writing to output file: {str(e)}", "ERROR")
|
||||
|
||||
if json_output:
|
||||
return simplified_json_output
|
||||
else:
|
||||
return results
|
||||
|
||||
def extract_redirect_from_403(response, original_url):
|
||||
redirect_headers = ['location', 'refresh', 'x-redirect-to', 'x-location', 'redirect']
|
||||
for header in redirect_headers:
|
||||
if header in response.headers:
|
||||
return response.headers[header]
|
||||
|
||||
try:
|
||||
content = response.text
|
||||
|
||||
js_patterns = [
|
||||
r'window\.location\.href\s*=\s*["\']([^"\']+)["\']',
|
||||
r'window\.location\s*=\s*["\']([^"\']+)["\']',
|
||||
r'location\.href\s*=\s*["\']([^"\']+)["\']',
|
||||
r'document\.location\s*=\s*["\']([^"\']+)["\']',
|
||||
r'top\.location\.href\s*=\s*["\']([^"\']+)["\']',
|
||||
r'parent\.location\s*=\s*["\']([^"\']+)["\']'
|
||||
]
|
||||
|
||||
for pattern in js_patterns:
|
||||
match = re.search(pattern, content, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
meta_patterns = [
|
||||
r'<meta[^>]*http-equiv=["\']?refresh["\']?[^>]*content=["\'][^"\']*url=([^"\'>\s]+)',
|
||||
r'<meta[^>]*content=["\'][^"\']*url=([^"\'>\s]+)[^>]*http-equiv=["\']?refresh["\']?'
|
||||
]
|
||||
|
||||
for pattern in meta_patterns:
|
||||
match = re.search(pattern, content, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
text_patterns = [
|
||||
r'[Rr]edirect(?:ed)?\s+to:?\s*([^\s<>"\']+)',
|
||||
r'[Nn]ew\s+[Uu][Rr][Ll]:?\s*([^\s<>"\']+)',
|
||||
r'[Mm]oved\s+to:?\s*([^\s<>"\']+)',
|
||||
r'[Ff]ound\s+at:?\s*([^\s<>"\']+)',
|
||||
r'[Gg]o\s+to:?\s*([^\s<>"\']+)',
|
||||
r'[Vv]isit:?\s*([^\s<>"\']+)',
|
||||
r'https?://[^\s<>"\']+\.[a-z]{2,}[^\s<>"\']*'
|
||||
]
|
||||
|
||||
for pattern in text_patterns:
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
potential_url = match.group(1) if '(' in pattern else match.group(0)
|
||||
if potential_url.startswith(('http://', 'https://', '//')):
|
||||
return potential_url
|
||||
|
||||
link_patterns = [
|
||||
r'<a[^>]*href=["\']([^"\']+)["\'][^>]*>(?:click here|continue|proceed|go here)',
|
||||
r'<link[^>]*rel=["\']?canonical["\']?[^>]*href=["\']([^"\']+)["\']',
|
||||
r'<base[^>]*href=["\']([^"\']+)["\']'
|
||||
]
|
||||
|
||||
for pattern in link_patterns:
|
||||
match = re.search(pattern, content, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def extract_domain_from_response(response, original_url):
|
||||
if 'location' in response.headers:
|
||||
return response.headers['location']
|
||||
|
||||
if str(response.url) != original_url:
|
||||
return str(response.url)
|
||||
|
||||
try:
|
||||
content_type = response.headers.get('content-type', '').lower()
|
||||
if 'text/html' in content_type or 'text/plain' in content_type:
|
||||
response_text = response.text
|
||||
|
||||
js_redirect_patterns = [
|
||||
r'window\.location\.href\s*=\s*["\']([^"\']+)["\']',
|
||||
r'window\.location\s*=\s*["\']([^"\']+)["\']',
|
||||
r'location\.href\s*=\s*["\']([^"\']+)["\']',
|
||||
r'document\.location\s*=\s*["\']([^"\']+)["\']'
|
||||
]
|
||||
|
||||
for pattern in js_redirect_patterns:
|
||||
js_match = re.search(pattern, response_text, re.IGNORECASE)
|
||||
if js_match:
|
||||
return js_match.group(1)
|
||||
|
||||
meta_patterns = [
|
||||
r'<meta[^>]*http-equiv=["\']?refresh["\']?[^>]*content=["\'][^"\']*url=([^"\'>\s]+)',
|
||||
r'<meta[^>]*content=["\'][^"\']*url=([^"\'>\s]+)[^>]*http-equiv=["\']?refresh["\']?'
|
||||
]
|
||||
|
||||
for pattern in meta_patterns:
|
||||
meta_match = re.search(pattern, response_text, re.IGNORECASE)
|
||||
if meta_match:
|
||||
return meta_match.group(1)
|
||||
|
||||
canonical_match = re.search(r'<link[^>]*rel=["\']?canonical["\']?[^>]*href=["\']([^"\']+)["\']', response_text, re.IGNORECASE)
|
||||
if canonical_match:
|
||||
return canonical_match.group(1)
|
||||
|
||||
base_match = re.search(r'<base[^>]*href=["\']([^"\']+)["\']', response_text, re.IGNORECASE)
|
||||
if base_match:
|
||||
return base_match.group(1)
|
||||
|
||||
error_redirect_patterns = [
|
||||
r'[Rr]edirect(?:ed)?\s+to:?\s*([^\s<>"\']+)',
|
||||
r'[Nn]ew\s+[Uu][Rr][Ll]:?\s*([^\s<>"\']+)',
|
||||
r'[Mm]oved\s+to:?\s*([^\s<>"\']+)',
|
||||
r'[Ff]ound\s+at:?\s*([^\s<>"\']+)'
|
||||
]
|
||||
|
||||
for pattern in error_redirect_patterns:
|
||||
error_match = re.search(pattern, response_text)
|
||||
if error_match:
|
||||
potential_url = error_match.group(1)
|
||||
if potential_url.startswith(('http://', 'https://', '//')):
|
||||
return potential_url
|
||||
|
||||
except Exception as e:
|
||||
print(f" [!] Error extracting from response content: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def try_url(url_to_try, headers=None, timeout=15):
|
||||
if headers is None:
|
||||
headers = get_enhanced_headers()
|
||||
|
||||
try:
|
||||
with httpx.Client(headers=headers, timeout=timeout, follow_redirects=False) as client:
|
||||
response = client.get(url_to_try)
|
||||
|
||||
if response.status_code in [301, 302, 303, 307, 308]:
|
||||
location = response.headers.get('location')
|
||||
if location:
|
||||
print(f" [+] Found redirect ({response.status_code}) to: {location}")
|
||||
try:
|
||||
final_response = client.get(location)
|
||||
if 200 <= final_response.status_code < 400:
|
||||
return final_response
|
||||
else:
|
||||
return httpx.Response(
|
||||
status_code=200,
|
||||
headers={"location": location},
|
||||
content=b"",
|
||||
request=response.request
|
||||
)
|
||||
except Exception:
|
||||
return httpx.Response(
|
||||
status_code=200,
|
||||
headers={"location": location},
|
||||
content=b"",
|
||||
request=response.request
|
||||
)
|
||||
|
||||
elif response.status_code == 403:
|
||||
print(f" [!] HTTP 403 - attempting enhanced extraction")
|
||||
|
||||
redirect_url = extract_redirect_from_403(response, url_to_try)
|
||||
if redirect_url:
|
||||
print(f" [+] Found redirect URL in 403 response: {redirect_url}")
|
||||
return httpx.Response(
|
||||
status_code=200,
|
||||
headers={"location": redirect_url},
|
||||
content=b"",
|
||||
request=response.request
|
||||
)
|
||||
|
||||
elif response.status_code in [409, 429, 503]:
|
||||
print(f" [!] HTTP {response.status_code} - attempting to extract redirect info")
|
||||
|
||||
location = response.headers.get('location')
|
||||
if location:
|
||||
print(f" [+] Found location header in error response: {location}")
|
||||
return httpx.Response(
|
||||
status_code=200,
|
||||
headers={"location": location},
|
||||
content=b"",
|
||||
request=response.request
|
||||
)
|
||||
|
||||
new_url = extract_domain_from_response(response, url_to_try)
|
||||
if new_url and new_url != url_to_try:
|
||||
print(f" [+] Found redirect URL in error response content: {new_url}")
|
||||
return httpx.Response(
|
||||
status_code=200,
|
||||
headers={"location": new_url},
|
||||
content=b"",
|
||||
request=response.request
|
||||
)
|
||||
|
||||
if 200 <= response.status_code < 400:
|
||||
return response
|
||||
|
||||
print(f" [!] HTTP {response.status_code} for {url_to_try}")
|
||||
|
||||
except httpx.HTTPStatusError as http_err:
|
||||
new_url = extract_domain_from_response(http_err.response, url_to_try)
|
||||
if new_url:
|
||||
print(f" [+] Found new URL from HTTPStatusError response: {new_url}")
|
||||
return httpx.Response(
|
||||
status_code=200,
|
||||
headers={"location": new_url},
|
||||
content=b"",
|
||||
request=http_err.request
|
||||
)
|
||||
except Exception as e:
|
||||
print(f" [!] Error for {url_to_try}: {type(e).__name__}")
|
||||
|
||||
return None
|
||||
|
||||
def update_domain_entries(data):
|
||||
if not data:
|
||||
def update_site_entry(site_name: str, all_domains_data: dict):
|
||||
site_config = all_domains_data.get(site_name, {})
|
||||
log(f"Processing site: {site_name}", "INFO")
|
||||
if not site_config.get('full_url'):
|
||||
log(f"Site {site_name} has no full_url in config. Skipping.", "WARNING")
|
||||
return False
|
||||
|
||||
updated_count = 0
|
||||
current_full_url = site_config.get('full_url')
|
||||
current_domain_tld = site_config.get('domain')
|
||||
found_domain_info = find_new_domain(current_full_url, verbose=False, json_output=True)
|
||||
|
||||
for key, entry in data.items():
|
||||
print(f"\n--- [DOMAIN] {key} ---")
|
||||
original_full_url = entry.get("full_url")
|
||||
original_domain_in_entry = entry.get("domain")
|
||||
if found_domain_info and found_domain_info.get('full_url') and found_domain_info.get('domain'):
|
||||
new_full_url = found_domain_info['full_url']
|
||||
new_domain_tld = found_domain_info['domain']
|
||||
|
||||
if not original_full_url:
|
||||
print(f" [!] 'full_url' missing. Skipped.")
|
||||
continue
|
||||
if new_full_url != current_full_url or new_domain_tld != current_domain_tld:
|
||||
log(f"Update found for {site_name}: URL '{current_full_url}' -> '{new_full_url}', TLD '{current_domain_tld}' -> '{new_domain_tld}'", "SUCCESS")
|
||||
updated_entry = site_config.copy()
|
||||
updated_entry['full_url'] = new_full_url
|
||||
updated_entry['domain'] = new_domain_tld
|
||||
if new_domain_tld != current_domain_tld :
|
||||
updated_entry['old_domain'] = current_domain_tld if current_domain_tld else ""
|
||||
|
||||
print(f" [] Stored URL: {original_full_url}")
|
||||
if original_domain_in_entry:
|
||||
print(f" [] Stored Domain (TLD): {original_domain_in_entry}")
|
||||
updated_entry['time_change'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
all_domains_data[site_name] = updated_entry
|
||||
return True
|
||||
|
||||
print(f" [] Testing URL: {original_full_url}")
|
||||
response = try_url(original_full_url)
|
||||
|
||||
if response:
|
||||
final_url_from_request = str(response.url)
|
||||
print(f" [+] Redirect/Response to: {final_url_from_request}")
|
||||
|
||||
parsed_final_url = urlparse(final_url_from_request)
|
||||
normalized_full_url = urlunparse(parsed_final_url._replace(path='/', params='', query='', fragment=''))
|
||||
if parsed_final_url.path == '' and not normalized_full_url.endswith('/'):
|
||||
normalized_full_url += '/'
|
||||
|
||||
if normalized_full_url != final_url_from_request:
|
||||
print(f" [+] Normalized URL: {normalized_full_url}")
|
||||
|
||||
if normalized_full_url != original_full_url:
|
||||
new_tld_val = get_new_tld(final_url_from_request)
|
||||
|
||||
if new_tld_val:
|
||||
entry["full_url"] = normalized_full_url
|
||||
|
||||
if new_tld_val != original_domain_in_entry:
|
||||
print(f" [-] Domain TLD Changed: '{original_domain_in_entry}' -> '{new_tld_val}'")
|
||||
entry["old_domain"] = original_domain_in_entry if original_domain_in_entry else entry.get("old_domain", "")
|
||||
entry["domain"] = new_tld_val
|
||||
entry["time_change"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
print(f" [-] Domain & URL Updated: New TLD '{new_tld_val}', New URL '{normalized_full_url}'")
|
||||
else:
|
||||
entry["domain"] = new_tld_val
|
||||
print(f" [-] URL Updated (TLD Unchanged '{new_tld_val}'): New URL '{normalized_full_url}'")
|
||||
|
||||
updated_count += 1
|
||||
|
||||
else:
|
||||
print(f" [!] Could not extract TLD from {final_url_from_request}. URL not updated despite potential change.")
|
||||
else:
|
||||
print(f" [] Same Domain: {final_url_from_request}")
|
||||
|
||||
else:
|
||||
print(f" [-] No response for {key}")
|
||||
|
||||
return updated_count > 0
|
||||
log(f"No changes detected for {site_name}.", "INFO")
|
||||
return False
|
||||
else:
|
||||
log(f"Could not reliably find new domain info for {site_name} from URL: {current_full_url}. No search fallback.", "WARNING")
|
||||
return False
|
||||
|
||||
def main():
|
||||
print("Starting domain update script...")
|
||||
domain_data = load_domains(JSON_FILE_PATH)
|
||||
log("Starting domain update script...")
|
||||
all_domains_data = load_json_data(JSON_FILE_PATH)
|
||||
if not all_domains_data:
|
||||
log("Cannot proceed: Domain data is missing or could not be loaded.", "ERROR")
|
||||
log("Script finished.")
|
||||
return
|
||||
|
||||
if domain_data:
|
||||
if update_domain_entries(domain_data):
|
||||
save_domains(JSON_FILE_PATH, domain_data)
|
||||
print("\nUpdate complete. Some entries were modified.")
|
||||
else:
|
||||
print("\nUpdate complete. No domains were modified.")
|
||||
else:
|
||||
print("\nCannot proceed without domain data.")
|
||||
any_updates_made = False
|
||||
for site_name_key in list(all_domains_data.keys()):
|
||||
if update_site_entry(site_name_key, all_domains_data):
|
||||
any_updates_made = True
|
||||
print("\n")
|
||||
|
||||
print("Script finished.")
|
||||
if any_updates_made:
|
||||
save_json_data(JSON_FILE_PATH, all_domains_data)
|
||||
log("Update complete. Some entries were modified.", "SUCCESS")
|
||||
else:
|
||||
log("Update complete. No domains were modified.", "INFO")
|
||||
log("Script finished.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
16
.github/.domain/domains.json
vendored
16
.github/.domain/domains.json
vendored
@ -48,15 +48,15 @@
|
||||
"time_change": "2025-05-26 23:22:45"
|
||||
},
|
||||
"streamingcommunity": {
|
||||
"domain": "bio",
|
||||
"full_url": "https://streamingunity.bio/",
|
||||
"old_domain": "blog",
|
||||
"time_change": "2025-05-31 12:17:33"
|
||||
"domain": "bid",
|
||||
"full_url": "https://streamingunity.bid/",
|
||||
"old_domain": "bio",
|
||||
"time_change": "2025-06-03 15:27:02"
|
||||
},
|
||||
"altadefinizionegratis": {
|
||||
"domain": "icu",
|
||||
"full_url": "https://altadefinizionegratis.icu/",
|
||||
"old_domain": "taipei",
|
||||
"time_change": "2025-05-18 11:21:05"
|
||||
"domain": "cc",
|
||||
"full_url": "https://altadefinizionegratis.cc/",
|
||||
"old_domain": "icu",
|
||||
"time_change": "2025-06-02 10:35:25"
|
||||
}
|
||||
}
|
7
.github/workflows/update_domain.yml
vendored
7
.github/workflows/update_domain.yml
vendored
@ -2,7 +2,7 @@ name: Update domains
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 */3 * * *"
|
||||
- cron: "0 7-21 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
@ -22,7 +22,8 @@ jobs:
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
pip install httpx ua-generator requests
|
||||
pip install httpx tldextract ua-generator dnspython
|
||||
|
||||
pip install --upgrade pip setuptools wheel
|
||||
|
||||
- name: Configure DNS
|
||||
@ -46,4 +47,4 @@ jobs:
|
||||
git push
|
||||
else
|
||||
echo "No changes to .github/.domain/domains.json to commit."
|
||||
fi
|
||||
fi
|
||||
|
@ -5,9 +5,9 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
import httpx
|
||||
import jsbeautifier
|
||||
from bs4 import BeautifulSoup
|
||||
from curl_cffi import requests
|
||||
|
||||
|
||||
# Internal utilities
|
||||
@ -28,7 +28,6 @@ class VideoSource:
|
||||
- url (str): The URL of the video source.
|
||||
"""
|
||||
self.headers = get_headers()
|
||||
self.client = httpx.Client()
|
||||
self.url = url
|
||||
|
||||
def make_request(self, url: str) -> str:
|
||||
@ -42,8 +41,10 @@ class VideoSource:
|
||||
- str: The response content if successful, None otherwise.
|
||||
"""
|
||||
try:
|
||||
response = self.client.get(url, headers=self.headers, timeout=MAX_TIMEOUT, follow_redirects=True)
|
||||
response.raise_for_status()
|
||||
response = requests.get(url, headers=self.headers, timeout=MAX_TIMEOUT, impersonate="chrome110")
|
||||
if response.status_code >= 400:
|
||||
logging.error(f"Request failed with status code: {response.status_code}")
|
||||
return None
|
||||
return response.text
|
||||
|
||||
except Exception as e:
|
||||
|
@ -39,6 +39,7 @@ class VideoSource:
|
||||
self.is_series = is_series
|
||||
self.media_id = media_id
|
||||
self.iframe_src = None
|
||||
self.window_parameter = None
|
||||
|
||||
def get_iframe(self, episode_id: int) -> None:
|
||||
"""
|
||||
@ -109,41 +110,45 @@ class VideoSource:
|
||||
# Parse script to get video information
|
||||
self.parse_script(script_text=script)
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
if e.response.status_code == 404:
|
||||
console.print("[yellow]This content will be available soon![/yellow]")
|
||||
return
|
||||
|
||||
logging.error(f"Error getting content: {e}")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error getting content: {e}")
|
||||
raise
|
||||
|
||||
def get_playlist(self) -> str:
|
||||
def get_playlist(self) -> str | None:
|
||||
"""
|
||||
Generate authenticated playlist URL.
|
||||
|
||||
Returns:
|
||||
str: Fully constructed playlist URL with authentication parameters
|
||||
str | None: Fully constructed playlist URL with authentication parameters, or None if content unavailable
|
||||
"""
|
||||
if not self.window_parameter:
|
||||
return None
|
||||
|
||||
params = {}
|
||||
|
||||
# Add 'h' parameter if video quality is 1080p
|
||||
if self.canPlayFHD:
|
||||
params['h'] = 1
|
||||
|
||||
# Parse the original URL
|
||||
parsed_url = urlparse(self.window_parameter.url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# Check specifically for 'b=1' in the query parameters
|
||||
if 'b' in query_params and query_params['b'] == ['1']:
|
||||
params['b'] = 1
|
||||
|
||||
# Add authentication parameters (token and expiration)
|
||||
params.update({
|
||||
"token": self.window_parameter.token,
|
||||
"expires": self.window_parameter.expires
|
||||
})
|
||||
|
||||
# Build the updated query string
|
||||
query_string = urlencode(params)
|
||||
|
||||
# Construct the new URL with updated query parameters
|
||||
return urlunparse(parsed_url._replace(query=query_string))
|
||||
|
||||
|
||||
|
@ -61,16 +61,22 @@ def download_film(select_title: MediaItem) -> str:
|
||||
# Extract mostraguarda URL
|
||||
try:
|
||||
response = httpx.get(select_title.url, headers=get_headers(), timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
iframes = soup.find_all('iframe')
|
||||
mostraguarda = iframes[0]['src']
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[red]Site: {site_constant.SITE_NAME}, request error: {e}, get mostraguarda")
|
||||
return None
|
||||
|
||||
# Extract supervideo URL
|
||||
supervideo_url = None
|
||||
try:
|
||||
response = httpx.get(mostraguarda, headers=get_headers(), timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
pattern = r'//supervideo\.[^/]+/[a-z]/[a-zA-Z0-9]+'
|
||||
supervideo_match = re.search(pattern, response.text)
|
||||
@ -78,7 +84,9 @@ def download_film(select_title: MediaItem) -> str:
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[red]Site: {site_constant.SITE_NAME}, request error: {e}, get supervideo URL")
|
||||
|
||||
console.print("[yellow]This content will be available soon![/yellow]")
|
||||
return None
|
||||
|
||||
# Init class
|
||||
video_source = VideoSource(supervideo_url)
|
||||
master_playlist = video_source.get_playlist()
|
||||
|
@ -38,38 +38,52 @@ class GetSerieInfo:
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
self.series_name = soup.find("title").get_text(strip=True).split(" - ")[0]
|
||||
|
||||
# Process all seasons
|
||||
season_items = soup.find_all('div', class_='accordion-item')
|
||||
|
||||
for season_idx, season_item in enumerate(season_items, 1):
|
||||
season_header = season_item.find('div', class_='accordion-header')
|
||||
if not season_header:
|
||||
continue
|
||||
|
||||
season_name = season_header.get_text(strip=True)
|
||||
# Find all season dropdowns
|
||||
seasons_dropdown = soup.find('div', class_='dropdown seasons')
|
||||
if not seasons_dropdown:
|
||||
return
|
||||
|
||||
# Get all season items
|
||||
season_items = seasons_dropdown.find_all('span', {'data-season': True})
|
||||
|
||||
for season_item in season_items:
|
||||
season_num = int(season_item['data-season'])
|
||||
season_name = season_item.get_text(strip=True)
|
||||
|
||||
# Create a new season and get a reference to it
|
||||
# Create a new season
|
||||
current_season = self.seasons_manager.add_season({
|
||||
'number': season_idx,
|
||||
'number': season_num,
|
||||
'name': season_name
|
||||
})
|
||||
|
||||
# Find episodes for this season
|
||||
episode_divs = season_item.find_all('div', class_='down-episode')
|
||||
for ep_idx, ep_div in enumerate(episode_divs, 1):
|
||||
episode_name_tag = ep_div.find('b')
|
||||
if not episode_name_tag:
|
||||
# Find all episodes for this season
|
||||
episodes_container = soup.find('div', {'class': 'dropdown mirrors', 'data-season': str(season_num)})
|
||||
if not episodes_container:
|
||||
continue
|
||||
|
||||
# Get all episode mirrors for this season
|
||||
episode_mirrors = soup.find_all('div', {'class': 'dropdown mirrors',
|
||||
'data-season': str(season_num)})
|
||||
|
||||
for mirror in episode_mirrors:
|
||||
episode_data = mirror.get('data-episode', '').split('-')
|
||||
if len(episode_data) != 2:
|
||||
continue
|
||||
|
||||
episode_name = episode_name_tag.get_text(strip=True)
|
||||
link_tag = ep_div.find('a', string=lambda text: text and "Supervideo" in text)
|
||||
episode_url = link_tag['href'] if link_tag else None
|
||||
ep_num = int(episode_data[1])
|
||||
|
||||
# Find supervideo link
|
||||
supervideo_span = mirror.find('span', {'data-id': 'supervideo'})
|
||||
if not supervideo_span:
|
||||
continue
|
||||
|
||||
episode_url = supervideo_span.get('data-link', '')
|
||||
|
||||
# Add episode to the season
|
||||
if current_season:
|
||||
current_season.episodes.add({
|
||||
'number': ep_idx,
|
||||
'name': episode_name,
|
||||
'number': ep_num,
|
||||
'name': f"Episodio {ep_num}",
|
||||
'url': episode_url
|
||||
})
|
||||
|
||||
|
@ -62,6 +62,10 @@ def download_film(select_title: MediaItem, proxy: str = None) -> str:
|
||||
video_source.get_content()
|
||||
master_playlist = video_source.get_playlist()
|
||||
|
||||
if master_playlist is None:
|
||||
console.print(f"[red]Site: {site_constant.SITE_NAME}, error: No master playlist found[/red]")
|
||||
return None
|
||||
|
||||
# Define the filename and path for the downloaded film
|
||||
title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4"
|
||||
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
|
||||
|
@ -4,6 +4,7 @@ import os
|
||||
import sys
|
||||
import time
|
||||
import asyncio
|
||||
import importlib.metadata
|
||||
|
||||
# External library
|
||||
import httpx
|
||||
@ -11,7 +12,7 @@ from rich.console import Console
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from .version import __version__, __author__, __title__
|
||||
from .version import __version__ as source_code_version, __author__, __title__
|
||||
from StreamingCommunity.Util.config_json import config_manager
|
||||
from StreamingCommunity.Util.headers import get_userAgent
|
||||
|
||||
@ -75,7 +76,11 @@ def update():
|
||||
percentual_stars = 0
|
||||
|
||||
# Get the current version (installed version)
|
||||
current_version = __version__
|
||||
try:
|
||||
current_version = importlib.metadata.version(__title__)
|
||||
except importlib.metadata.PackageNotFoundError:
|
||||
#console.print(f"[yellow]Warning: Could not determine installed version for '{__title__}' via importlib.metadata. Falling back to source version.[/yellow]")
|
||||
current_version = source_code_version
|
||||
|
||||
# Get commit details
|
||||
latest_commit = response_commits[0] if response_commits else None
|
||||
|
@ -2,4 +2,4 @@ __title__ = 'StreamingCommunity'
|
||||
__version__ = '3.0.9'
|
||||
__author__ = 'Arrowar'
|
||||
__description__ = 'A command-line program to download film'
|
||||
__copyright__ = 'Copyright 2024'
|
||||
__copyright__ = 'Copyright 2025'
|
@ -6,6 +6,7 @@ m3u8
|
||||
certifi
|
||||
psutil
|
||||
unidecode
|
||||
curl_cffi
|
||||
dnspython
|
||||
jsbeautifier
|
||||
pathvalidate
|
||||
@ -13,3 +14,4 @@ pycryptodomex
|
||||
ua-generator
|
||||
qbittorrent-api
|
||||
pyTelegramBotAPI
|
||||
beautifulsoup4
|
17
setup.py
17
setup.py
@ -1,4 +1,5 @@
|
||||
import os
|
||||
import re
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
def read_readme():
|
||||
@ -8,9 +9,21 @@ def read_readme():
|
||||
with open(os.path.join(os.path.dirname(__file__), "requirements.txt"), "r", encoding="utf-8-sig") as f:
|
||||
required_packages = f.read().splitlines()
|
||||
|
||||
def get_version():
|
||||
try:
|
||||
import pkg_resources
|
||||
return pkg_resources.get_distribution('StreamingCommunity').version
|
||||
except:
|
||||
version_file_path = os.path.join(os.path.dirname(__file__), "StreamingCommunity", "Upload", "version.py")
|
||||
with open(version_file_path, "r", encoding="utf-8") as f:
|
||||
version_match = re.search(r"^__version__\s*=\s*['\"]([^'\"]*)['\"]", f.read(), re.M)
|
||||
if version_match:
|
||||
return version_match.group(1)
|
||||
raise RuntimeError("Unable to find version string in StreamingCommunity/Upload/version.py.")
|
||||
|
||||
setup(
|
||||
name="StreamingCommunity",
|
||||
version="3.0.9",
|
||||
version=get_version(),
|
||||
long_description=read_readme(),
|
||||
long_description_content_type="text/markdown",
|
||||
author="Lovi-0",
|
||||
@ -29,4 +42,4 @@ setup(
|
||||
"Bug Reports": "https://github.com/Lovi-0/StreamingCommunity/issues",
|
||||
"Source": "https://github.com/Lovi-0/StreamingCommunity",
|
||||
}
|
||||
)
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user