Update headers

This commit is contained in:
Lovi 2024-06-11 21:12:36 +02:00
parent 9c9a46c2a8
commit 28ff3bca92
4 changed files with 150 additions and 14 deletions

View File

@ -46,7 +46,7 @@ class VideoSource:
"""
try:
response = httpx.get(url, headers=self.headers)
response = httpx.get(url, headers=self.headers, follow_redirects=True)
response.raise_for_status()
with open('index.html', 'w', encoding='utf-8') as file:

View File

@ -8,7 +8,7 @@ import threading
import logging
import binascii
from queue import PriorityQueue
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
@ -19,7 +19,7 @@ from tqdm import tqdm
# Internal utilities
from Src.Util.console import console
from Src.Util.headers import get_headers
from Src.Util.headers import get_headers, random_headers
from Src.Util.color import Colors
from Src.Util._jsonConfig import config_manager
from Src.Util.os import check_file_existence
@ -50,7 +50,6 @@ THERE_IS_PROXY_LIST = check_file_existence("list_proxy.txt")
# Variable
headers_index = config_manager.get_dict('REQUESTS', 'index')
headers_segments = config_manager.get_dict('REQUESTS', 'segments')
class M3U8_Segments:
@ -88,9 +87,10 @@ class M3U8_Segments:
"""
headers_index['user-agent'] = get_headers()
# Construct the full URL of the key
key_uri = urljoin(self.url, m3u8_parser.keys.get('uri'))
parsed_url = urlparse(key_uri)
self.key_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
logging.info(f"Uri key: {key_uri}")
try:
@ -191,15 +191,21 @@ class M3U8_Segments:
# Generate headers
start_time = time.time()
headers_segments['user-agent'] = get_headers()
# Make request to get content
if THERE_IS_PROXY_LIST:
proxy = self.valid_proxy[index % len(self.valid_proxy)]
logging.info(f"Use proxy: {proxy}")
response = httpx.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, proxies=proxy, verify=False)
if 'key_base_url' in self.__dict__:
response = httpx.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT, proxies=proxy, verify=False)
else:
response = httpx.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT, proxies=proxy, verify=False)
else:
response = httpx.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, verify=False)
if 'key_base_url' in self.__dict__:
response = httpx.get(ts_url, headers=random_headers(self.key_base_url), timeout=REQUEST_TIMEOUT, verify=False)
else:
response = httpx.get(ts_url, headers={'user-agent': get_headers()}, timeout=REQUEST_TIMEOUT, verify=False)
# Get response content
response.raise_for_status()
@ -290,7 +296,10 @@ class M3U8_Segments:
delay = max(0.5, min(1, 1 / (num_proxies + 1)))
else:
delay = TQDM_DELAY_WORKER
<<<<<<< Updated upstream
=======
>>>>>>> Stashed changes
else:
delay = TQDM_DELAY_WORKER

View File

@ -1,5 +1,7 @@
# 4.04.24
import re
import random
import logging
@ -11,6 +13,133 @@ import fake_useragent
useragent = fake_useragent.UserAgent()
def extract_versions(user_agent):
"""
Extract browser versions from the user agent.
Args:
user_agent (str): User agent of the browser.
Returns:
list: List of browser versions.
"""
# Patterns to extract versions from various user agents
patterns = {
'chrome': re.compile(r'Chrome/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
'firefox': re.compile(r'Firefox/(\d+)\.?(\d+)?\.?(\d+)?'),
'safari': re.compile(r'Version/(\d+)\.(\d+)\.(\d+) Safari/(\d+)\.(\d+)\.(\d+)'),
'edge': re.compile(r'Edg/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
'edgios': re.compile(r'EdgiOS/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
'crios': re.compile(r'CriOS/(\d+)\.(\d+)\.(\d+)\.(\d+)'),
}
for key, pattern in patterns.items():
match = pattern.search(user_agent)
if match:
return [match.group(i+1) for i in range(match.lastindex)]
# Fallback values if specific versions are not found
return ['99', '0', '0', '0']
def get_platform(user_agent):
"""
Determine the device platform from the user agent.
Args:
user_agent (str): User agent of the browser.
Returns:
str: Device platform.
"""
if 'Windows' in user_agent:
return '"Windows"'
elif 'Mac OS X' in user_agent:
return '"macOS"'
elif 'Android' in user_agent:
return '"Android"'
elif 'iPhone' in user_agent or 'iPad' in user_agent:
return '"iOS"'
elif 'Linux' in user_agent:
return '"Linux"'
return '"Unknown"'
def get_model(user_agent):
"""
Determine the device model from the user agent.
Args:
user_agent (str): User agent of the browser.
Returns:
str: Device model.
"""
if 'iPhone' in user_agent:
return '"iPhone"'
elif 'iPad' in user_agent:
return '"iPad"'
elif 'Android' in user_agent:
return '"Android"'
elif 'Windows' in user_agent:
return '"PC"'
elif 'Mac OS X' in user_agent:
return '"Mac"'
elif 'Linux' in user_agent:
return '"Linux"'
return '"Unknown"'
def random_headers(referer: str = None):
"""
Generate random HTTP headers to simulate human-like behavior.
Returns:
dict: Generated HTTP headers.
"""
user_agent = useragent.random
versions = extract_versions(user_agent)
platform = get_platform(user_agent)
model = get_model(user_agent)
is_mobile = 'Mobi' in user_agent or 'Android' in user_agent
# Generate sec-ch-ua string based on the browser
if 'Chrome' in user_agent or 'CriOS' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Chromium";v="{versions[0]}", "Google Chrome";v="{versions[0]}"'
elif 'Edg' in user_agent or 'EdgiOS' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Chromium";v="{versions[0]}", "Microsoft Edge";v="{versions[0]}"'
elif 'Firefox' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Firefox";v="{versions[0]}"'
elif 'Safari' in user_agent:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}", "Safari";v="{versions[0]}"'
else:
sec_ch_ua = f'" Not;A Brand";v="{versions[0]}"'
headers = {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': random.choice(['en-US', 'en-GB', 'fr-FR', 'es-ES', 'de-DE']),
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0',
'TE': 'Trailers',
'Pragma': 'no-cache',
'DNT': '1',
'sec-ch-ua-mobile': '?1' if is_mobile else '?0',
'sec-ch-ua-platform': platform,
'sec-ch-ua': sec_ch_ua,
'sec-ch-ua-model': model
}
if referer:
headers['Origin'] = referer
headers['Referer'] = referer
return headers
def get_headers() -> str:
"""
Generate a random user agent to use in HTTP requests.
@ -20,4 +149,4 @@ def get_headers() -> str:
"""
# Get a random user agent string from the user agent rotator
return useragent.firefox
return useragent.random

View File

@ -12,11 +12,10 @@
"REQUESTS": {
"timeout": 5,
"verify_ssl": false,
"index": {"user-agent": ""},
"segments": {"user-agent": ""}
"index": {"user-agent": ""}
},
"M3U8_DOWNLOAD": {
"tdqm_workers": 2,
"tdqm_workers": 4,
"tqdm_delay": 0.01,
"tqdm_use_large_bar": true,
"download_video": true,
@ -39,7 +38,6 @@
"check_output_after_ffmpeg": false
},
"M3U8_PARSER": {
"skip_empty_row_playlist": false,
"force_resolution": -1
},
"SITE": {