Merge branch 'test_httpx' into test_httpx

This commit is contained in:
Francesco Grazioso 2024-06-12 14:55:08 +02:00 committed by GitHub
commit 35a410c3c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 146 additions and 14 deletions

View File

@ -46,7 +46,7 @@ class VideoSource:
""" """
try: try:
response = httpx.get(url, headers=self.headers) response = httpx.get(url, headers=self.headers, follow_redirects=True)
response.raise_for_status() response.raise_for_status()
with open('index.html', 'w', encoding='utf-8') as file: with open('index.html', 'w', encoding='utf-8') as file:

View File

@ -8,7 +8,7 @@ import threading
import logging import logging
import binascii import binascii
from queue import PriorityQueue from queue import PriorityQueue
from urllib.parse import urljoin from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -19,7 +19,7 @@ from tqdm import tqdm
# Internal utilities # Internal utilities
from Src.Util.console import console from Src.Util.console import console
from Src.Util.headers import get_headers from Src.Util.headers import get_headers, random_headers
from Src.Util.color import Colors from Src.Util.color import Colors
from Src.Util._jsonConfig import config_manager from Src.Util._jsonConfig import config_manager
from Src.Util.os import check_file_existence from Src.Util.os import check_file_existence
@ -50,7 +50,6 @@ THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt")
# Variable # Variable
headers_index = config_manager.get_dict('REQUESTS', 'index') headers_index = config_manager.get_dict('REQUESTS', 'index')
headers_segments = config_manager.get_dict('REQUESTS', 'segments')
class M3U8_Segments: class M3U8_Segments:
@ -88,9 +87,10 @@ class M3U8_Segments:
""" """
headers_index['user-agent'] = get_headers() headers_index['user-agent'] = get_headers()
# Construct the full URL of the key # Construct the full URL of the key
key_uri = urljoin(self.url, m3u8_parser.keys.get('uri')) key_uri = urljoin(self.url, m3u8_parser.keys.get('uri'))
parsed_url = urlparse(key_uri)
self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
logging.info(f"Uri key: {key_uri}") logging.info(f"Uri key: {key_uri}")
try: try:
@ -191,15 +191,21 @@ class M3U8_Segments:
# Generate headers # Generate headers
start_time = time.time() start_time = time.time()
headers_segments['user-agent'] = get_headers()
# Make request to get content # Make request to get content
if THERE_IS_PROXY_LIST: if THERE_IS_PROXY_LIST:
proxy = self.valid_proxy[index % len(self.valid_proxy)] proxy = self.valid_proxy[index % len(self.valid_proxy)]
logging.info(f"Use proxy: {proxy}") logging.info(f"Use proxy: {proxy}")
response = httpx.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, proxies=proxy, verify=False)
if 'key_base_url' in self.__dict__:
response = httpx.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT, proxies=proxy, verify=False)
else:
response = httpx.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT, proxies=proxy, verify=False)
else: else:
response = httpx.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, verify=False) if 'key_base_url' in self.__dict__:
response = httpx.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT, verify=False)
else:
response = httpx.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT, verify=False)
# Get response content # Get response content
response.raise_for_status() response.raise_for_status()
@ -290,7 +296,6 @@ class M3U8_Segments:
delay = max(0.5, min(1, 1 / (num_proxies + 1))) delay = max(0.5, min(1, 1 / (num_proxies + 1)))
else: else:
delay = TQDM_DELAY_WORKER delay = TQDM_DELAY_WORKER
else: else:
delay = TQDM_DELAY_WORKER delay = TQDM_DELAY_WORKER

View File

@ -1,5 +1,7 @@
# 4.04.24 # 4.04.24
import re
import random
import logging import logging
@ -11,6 +13,133 @@ import fake_useragent
useragent = fake_useragent.UserAgent() useragent = fake_useragent.UserAgent()
def extract_versions(user_agent):
"""
Extract browser versions from the user agent.
Args:
user_agent (str): User agent of the browser.
Returns:
list: List of browser versions.
"""
# Patterns to extract versions from various user agents
patterns = {
'chrome': re.compile(r'Chrome/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
'firefox': re.compile(r'Firefox/(\d+)\.?(\d+)?\.?(\d+)?'),
'safari': re.compile(r'Version/(\d+)\.(\d+)\.(\d+) Safari/(\d+)\.(\d+)\.(\d+)'),
'edge': re.compile(r'Edg/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
'edgios': re.compile(r'EdgiOS/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
'crios': re.compile(r'CriOS/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
}
for key, pattern in patterns.items():
match = pattern.search(user_agent)
if match:
return [match.group(i+1) for i in range(match.lastindex)]
# Fallback values if specific versions are not found
return ['99', '0', '0', '0']
def get_platform(user_agent):
"""
Determine the device platform from the user agent.
Args:
user_agent (str): User agent of the browser.
Returns:
str: Device platform.
"""
if 'Windows' in user_agent:
return '"Windows"'
elif 'Mac OS X' in user_agent:
return '"macOS"'
elif 'Android' in user_agent:
return '"Android"'
elif 'iPhone' in user_agent or 'iPad' in user_agent:
return '"iOS"'
elif 'Linux' in user_agent:
return '"Linux"'
return '"Unknown"'
def get_model(user_agent):
"""
Determine the device model from the user agent.
Args:
user_agent (str): User agent of the browser.
Returns:
str: Device model.
"""
if 'iPhone' in user_agent:
return '"iPhone"'
elif 'iPad' in user_agent:
return '"iPad"'
elif 'Android' in user_agent:
return '"Android"'
elif 'Windows' in user_agent:
return '"PC"'
elif 'Mac OS X' in user_agent:
return '"Mac"'
elif 'Linux' in user_agent:
return '"Linux"'
return '"Unknown"'
def random_headers(referer: str = None):
"""
Generate random HTTP headers to simulate human-like behavior.
Returns:
dict: Generated HTTP headers.
"""
user_agent = useragent.random
versions = extract_versions(user_agent)
platform = get_platform(user_agent)
model = get_model(user_agent)
is_mobile = 'Mobi' in user_agent or 'Android' in user_agent
# Generate sec-ch-ua string based on the browser
if 'Chrome' in user_agent or 'CriOS' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Chromium";v="{versions[0]}", "Google Chrome";v="{versions[0]}"'
elif 'Edg' in user_agent or 'EdgiOS' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Chromium";v="{versions[0]}", "Microsoft Edge";v="{versions[0]}"'
elif 'Firefox' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Firefox";v="{versions[0]}"'
elif 'Safari' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Safari";v="{versions[0]}"'
else:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}"'
headers = {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': random.choice(['en-US', 'en-GB', 'fr-FR', 'es-ES', 'de-DE']),
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0',
'TE': 'Trailers',
'Pragma': 'no-cache',
'DNT': '1',
'sec-ch-ua-mobile': '?1' if is_mobile else '?0',
'sec-ch-ua-platform': platform,
'sec-ch-ua': sec_ch_ua,
'sec-ch-ua-model': model
}
if referer:
headers['Origin'] = referer
headers['Referer'] = referer
return headers
def get_headers() -> str: def get_headers() -> str:
""" """
Generate a random user agent to use in HTTP requests. Generate a random user agent to use in HTTP requests.
@ -20,4 +149,4 @@ def get_headers() -> str:
""" """
# Get a random user agent string from the user agent rotator # Get a random user agent string from the user agent rotator
return useragent.firefox return useragent.random

View File

@ -12,8 +12,7 @@
"REQUESTS": { "REQUESTS": {
"timeout": 5, "timeout": 5,
"verify_ssl": false, "verify_ssl": false,
"index": {"user-agent": ""}, "index": {"user-agent": ""}
"segments": {"user-agent": ""}
}, },
"M3U8_DOWNLOAD": { "M3U8_DOWNLOAD": {
"tdqm_workers": 2, "tdqm_workers": 2,
@ -39,7 +38,6 @@
"check_output_after_ffmpeg": false "check_output_after_ffmpeg": false
}, },
"M3U8_PARSER": { "M3U8_PARSER": {
"skip_empty_row_playlist": false,
"force_resolution": -1 "force_resolution": -1
}, },
"SITE": { "SITE": {