mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-06 19:45:24 +00:00
Update library and remove my request and user agent.
This commit is contained in:
parent
febc16b6de
commit
ae12d86d71
@ -1,15 +1,12 @@
|
||||
# 26.05.24
|
||||
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
|
@ -6,7 +6,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
|
@ -5,8 +5,8 @@ import threading
|
||||
import logging
|
||||
|
||||
|
||||
# Internal libraries
|
||||
from Src.Lib.Request import requests
|
||||
# External libraries
|
||||
import requests
|
||||
|
||||
|
||||
# Internal utilities
|
||||
|
@ -6,7 +6,7 @@ from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
|
@ -5,7 +5,7 @@ import logging
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
|
@ -5,8 +5,11 @@ import threading
|
||||
import logging
|
||||
|
||||
|
||||
# External library
|
||||
import requests
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Lib.Request import requests
|
||||
from Src.Lib.Google import search as google_search
|
||||
|
||||
|
||||
|
@ -6,7 +6,7 @@ from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
|
@ -8,12 +8,12 @@ from typing import Tuple
|
||||
|
||||
|
||||
# External libraries
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from unidecode import unidecode
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Lib.Request import requests
|
||||
from Src.Util.headers import get_headers
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
from Src.Util.console import console
|
||||
|
@ -8,7 +8,7 @@ from typing import Generator, Optional
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
|
@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from unidecode import unidecode
|
||||
|
||||
|
||||
|
@ -12,7 +12,7 @@ from urllib.parse import urljoin, urlparse, urlunparse
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ from Src.Util.headers import get_headers
|
||||
from Src.Util.color import Colors
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
# Logic class
|
||||
from ..M3U8 import (
|
||||
M3U8_Decryption,
|
||||
@ -30,6 +31,11 @@ from ..M3U8 import (
|
||||
M3U8_UrlFix
|
||||
)
|
||||
|
||||
|
||||
# Warning
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# Config
|
||||
TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers')
|
||||
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
|
||||
@ -265,7 +271,7 @@ class M3U8_Segments:
|
||||
|
||||
# Make request and calculate time duration
|
||||
start_time = time.time()
|
||||
response = requests.get(ts_url, headers=headers_segments, verify=REQUEST_VERIFY_SSL, timeout=30)
|
||||
response = requests.get(ts_url, headers=headers_segments, verify=REQUEST_VERIFY_SSL, timeout=15)
|
||||
duration = time.time() - start_time
|
||||
logging.info(f"Make request to get segment: [{index} - {len(self.segments)}] in: {duration}, len data: {len(response.content)}")
|
||||
|
||||
@ -275,7 +281,7 @@ class M3U8_Segments:
|
||||
segment_content = response.content
|
||||
|
||||
# Update bar
|
||||
self.class_ts_estimator.update_progress_bar(segment_content, duration, progress_bar)
|
||||
self.class_ts_estimator.update_progress_bar(int(response.headers.get('Content-Length', 0)), duration, progress_bar)
|
||||
|
||||
# Decrypt the segment content if decryption is needed
|
||||
if self.decryption is not None:
|
||||
|
@ -31,7 +31,7 @@ class M3U8_Ts_Estimator:
|
||||
"""
|
||||
self.ts_file_sizes = []
|
||||
self.now_downloaded_size = 0
|
||||
self.average_over = 6
|
||||
self.average_over = 3
|
||||
self.list_speeds = deque(maxlen=self.average_over)
|
||||
self.smoothed_speeds = []
|
||||
self.total_segments = total_segments
|
||||
@ -52,7 +52,7 @@ class M3U8_Ts_Estimator:
|
||||
|
||||
# Calculate speed outside of the lock
|
||||
try:
|
||||
speed_mbps = (size_download * 16) / (duration * 1_000_000)
|
||||
speed_mbps = (size_download * 8) / (duration * 1_000_000)
|
||||
except ZeroDivisionError as e:
|
||||
logging.error("Division by zero error while calculating speed: %s", e)
|
||||
return
|
||||
@ -114,16 +114,15 @@ class M3U8_Ts_Estimator:
|
||||
"""
|
||||
return format_size(self.now_downloaded_size)
|
||||
|
||||
def update_progress_bar(self, segment_content: bytes, duration: float, progress_counter: tqdm) -> None:
|
||||
def update_progress_bar(self, total_downloaded: int, duration: float, progress_counter: tqdm) -> None:
|
||||
"""
|
||||
Updates the progress bar with information about the TS segment download.
|
||||
|
||||
Args:
|
||||
segment_content (bytes): The content of the downloaded TS segment.
|
||||
total_downloaded (int): The len of the content of the downloaded TS segment.
|
||||
duration (float): The duration of the segment download in seconds.
|
||||
progress_counter (tqdm): The tqdm object representing the progress bar.
|
||||
"""
|
||||
total_downloaded = len(segment_content)
|
||||
|
||||
# Add the size of the downloaded segment to the estimator
|
||||
self.add_ts_file(total_downloaded * self.total_segments, total_downloaded, duration)
|
||||
|
@ -8,7 +8,7 @@ from .lib_parser import load
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
|
||||
|
||||
# Costant
|
||||
|
@ -1,3 +0,0 @@
|
||||
# 04.4.24
|
||||
|
||||
from .my_requests import requests
|
@ -1,541 +0,0 @@
|
||||
# 04.4.24
|
||||
|
||||
import os
|
||||
import sys
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import ssl
|
||||
import time
|
||||
import re
|
||||
import subprocess
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
from typing import Dict, Optional, Union, Any
|
||||
|
||||
|
||||
try:
|
||||
from typing import Unpack, TypedDict
|
||||
except ImportError:
|
||||
# (Python <= 3.10),
|
||||
try:
|
||||
from typing_extensions import Unpack, TypedDict # type: ignore
|
||||
except ImportError:
|
||||
raise ImportError("Unable to import Unpack from typing or typing_extensions. "
|
||||
"Please make sure you have the necessary libraries installed.")
|
||||
|
||||
|
||||
# External library
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
# Default settings
|
||||
HTTP_TIMEOUT = config_manager.get_int('REQUESTS', 'timeout')
|
||||
HTTP_RETRIES = config_manager.get_int('REQUESTS', 'max_retry')
|
||||
HTTP_DELAY = 1
|
||||
|
||||
|
||||
|
||||
class RequestError(Exception):
|
||||
"""Custom exception class for request errors."""
|
||||
|
||||
def __init__(self, message: str, original_exception: Optional[Exception] = None) -> None:
|
||||
"""
|
||||
Initialize a RequestError instance.
|
||||
|
||||
Args:
|
||||
- message (str): The error message.
|
||||
- original_exception (Optional[Exception], optional): The original exception that occurred. Defaults to None.
|
||||
"""
|
||||
super().__init__(message)
|
||||
self.original_exception = original_exception
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return a string representation of the exception."""
|
||||
if self.original_exception:
|
||||
return f"{super().__str__()} Original Exception: {type(self.original_exception).__name__}: {str(self.original_exception)}"
|
||||
else:
|
||||
return super().__str__()
|
||||
|
||||
|
||||
def parse_http_error(error_string: str):
|
||||
"""
|
||||
Parse the HTTP error string to extract the error code and message.
|
||||
|
||||
Args:
|
||||
- error_string (str): The error string from an HTTP response.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary with 'error_code' and 'message' if the string is parsed successfully, or None if parsing fails.
|
||||
"""
|
||||
|
||||
# Regular expression to match the error pattern
|
||||
error_pattern = re.compile(r"HTTP Error (\d{3}): (.+)")
|
||||
match = error_pattern.search(error_string)
|
||||
|
||||
if match:
|
||||
error_code = match.group(1)
|
||||
message = match.group(2)
|
||||
return {'error_code': error_code, 'message': message}
|
||||
|
||||
else:
|
||||
logging.error(f"Error string does not match expected format: {error_string}")
|
||||
return None
|
||||
|
||||
|
||||
class Response:
|
||||
"""
|
||||
Class representing an HTTP response.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
status: int,
|
||||
text: str,
|
||||
is_json: bool = False,
|
||||
content: bytes = b"",
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
cookies: Optional[Dict[str, str]] = None,
|
||||
redirect_url: Optional[str] = None,
|
||||
response_time: Optional[float] = None,
|
||||
timeout: Optional[float] = None,
|
||||
):
|
||||
"""
|
||||
Initialize a Response object.
|
||||
|
||||
Args:
|
||||
- status (int): The HTTP status code of the response.
|
||||
- text (str): The response content as text.
|
||||
- is_json (bool, optional): Indicates if the response content is JSON. Defaults to False.
|
||||
- content (bytes, optional): The response content as bytes. Defaults to b"".
|
||||
- headers (Optional[Dict[str, str]], optional): The response headers. Defaults to None.
|
||||
- cookies (Optional[Dict[str, str]], optional): The cookies set in the response. Defaults to None.
|
||||
- redirect_url (Optional[str], optional): The URL if a redirection occurred. Defaults to None.
|
||||
- response_time (Optional[float], optional): The time taken to receive the response. Defaults to None.
|
||||
- timeout (Optional[float], optional): The request timeout. Defaults to None.
|
||||
"""
|
||||
self.status_code = status
|
||||
self.text = text
|
||||
self.is_json = is_json
|
||||
self.content = content
|
||||
self.headers = headers or {}
|
||||
self.cookies = cookies or {}
|
||||
self.redirect_url = redirect_url
|
||||
self.response_time = response_time
|
||||
self.timeout = timeout
|
||||
self.ok = 200 <= status < 300
|
||||
|
||||
def raise_for_status(self):
|
||||
"""
|
||||
Raise an error if the response status code is not in the 2xx range.
|
||||
"""
|
||||
if not self.ok:
|
||||
raise RequestError(f"Request failed with status code {self.status_code}")
|
||||
|
||||
def json(self):
|
||||
"""
|
||||
Return the response content as JSON if it is JSON.
|
||||
|
||||
Returns:
|
||||
dict or list or None: A Python dictionary or list parsed from JSON if the response content is JSON, otherwise None.
|
||||
"""
|
||||
if self.is_json:
|
||||
return json.loads(self.text)
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_redirects(self):
|
||||
"""
|
||||
Extracts unique site URLs from HTML <link> elements within the <head> section.
|
||||
|
||||
Returns:
|
||||
list or None: A list of unique site URLs if found, otherwise None.
|
||||
"""
|
||||
|
||||
site_find = []
|
||||
|
||||
if self.text:
|
||||
soup = BeautifulSoup(self.text, "html.parser")
|
||||
|
||||
for links in soup.find("head").find_all('link'):
|
||||
if links is not None:
|
||||
parsed_url = urllib.parse.urlparse(links.get('href'))
|
||||
site = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
|
||||
if site not in site_find:
|
||||
site_find.append(site)
|
||||
|
||||
if site_find:
|
||||
return site_find
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class ManageRequests:
|
||||
"""
|
||||
Class for managing HTTP requests.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
method: str = 'GET',
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
timeout: float = HTTP_TIMEOUT,
|
||||
retries: int = HTTP_RETRIES,
|
||||
params: Optional[Dict[str, str]] = None,
|
||||
verify: bool = True,
|
||||
auth: Optional[tuple] = None,
|
||||
proxy: Optional[str] = None,
|
||||
cookies: Optional[Dict[str, str]] = None,
|
||||
json_data: Optional[Dict[str, Any]] = None,
|
||||
redirection_handling: bool = True,
|
||||
):
|
||||
"""
|
||||
Initialize a ManageRequests object.
|
||||
|
||||
Args:
|
||||
- url (str): The URL to which the request will be sent.
|
||||
- method (str, optional): The HTTP method to be used for the request. Defaults to 'GET'.
|
||||
- headers (Optional[Dict[str, str]], optional): The request headers. Defaults to None.
|
||||
- timeout (float, optional): The request timeout. Defaults to HTTP_TIMEOUT.
|
||||
- retries (int, optional): The number of retries in case of request failure. Defaults to HTTP_RETRIES.
|
||||
- params (Optional[Dict[str, str]], optional): The query parameters for the request. Defaults to None.
|
||||
- verify (bool, optional): Indicates whether SSL certificate verification should be performed. Defaults to True.
|
||||
- auth (Optional[tuple], optional): Tuple containing the username and password for basic authentication. Defaults to None.
|
||||
- proxy (Optional[str], optional): The proxy URL. Defaults to None.
|
||||
- cookies (Optional[Dict[str, str]], optional): The cookies to be included in the request. Defaults to None.
|
||||
- redirection_handling (bool, optional): Indicates whether redirections should be followed. Defaults to True.
|
||||
"""
|
||||
self.url = url
|
||||
self.method = method
|
||||
self.headers = headers or {}
|
||||
self.timeout = timeout
|
||||
self.retries = retries
|
||||
self.params = params
|
||||
self.verify_ssl = verify
|
||||
self.auth = auth
|
||||
self.proxy = proxy
|
||||
self.cookies = cookies
|
||||
self.json_data = json_data
|
||||
self.redirection_handling = redirection_handling
|
||||
|
||||
def add_header(self, key: str, value: str) -> None:
|
||||
"""
|
||||
Add a header to the request.
|
||||
"""
|
||||
self.headers[key] = value
|
||||
|
||||
def send(self) -> Response:
|
||||
"""
|
||||
Send the HTTP request.
|
||||
"""
|
||||
|
||||
start_time = time.time()
|
||||
self.attempt = 0
|
||||
redirect_url = None
|
||||
|
||||
while self.attempt < self.retries:
|
||||
try:
|
||||
req = self._build_request()
|
||||
response = self._perform_request(req)
|
||||
|
||||
return self._process_response(response, start_time, redirect_url)
|
||||
|
||||
except (urllib.error.URLError, urllib.error.HTTPError) as e:
|
||||
self._handle_error(e)
|
||||
self.attempt += 1
|
||||
|
||||
def log_request(self):
|
||||
"""
|
||||
Constructs a log message based on the request parameters and logs it.
|
||||
"""
|
||||
log_message = "Request: ("
|
||||
|
||||
if self.url:
|
||||
log_message += f"'url': {self.url}, "
|
||||
if self.headers:
|
||||
log_message += f"'headers': {self.headers}, "
|
||||
if self.cookies:
|
||||
log_message += f"'cookies': {self.cookies}, "
|
||||
if self.json_data:
|
||||
log_message += f"'body': {json.dumps(self.json_data).encode('utf-8')}, "
|
||||
|
||||
# Remove the trailing comma and add parentheses
|
||||
log_message = log_message.rstrip(", ") + ")"
|
||||
logging.info(log_message)
|
||||
|
||||
def _build_request(self) -> urllib.request.Request:
|
||||
"""
|
||||
Build the urllib Request object.
|
||||
"""
|
||||
|
||||
# Make a copy of headers to avoid modifying the original dictionary
|
||||
headers = self.headers.copy()
|
||||
|
||||
# Construct the URL with query parameters if present
|
||||
if self.params:
|
||||
url = self.url + '?' + urllib.parse.urlencode(self.params)
|
||||
else:
|
||||
url = self.url
|
||||
|
||||
# Create the initial Request object
|
||||
req = urllib.request.Request(url, headers=headers, method=self.method)
|
||||
|
||||
# Add JSON data if provided
|
||||
if self.json_data:
|
||||
req.add_header('Content-Type', 'application/json')
|
||||
req.data = json.dumps(self.json_data).encode('utf-8')
|
||||
|
||||
# Add authorization header if provided
|
||||
if self.auth:
|
||||
req.add_header('Authorization', 'Basic ' + base64.b64encode(f"{self.auth[0]}:{self.auth[1]}".encode()).decode())
|
||||
|
||||
# Add cookies if provided
|
||||
if self.cookies:
|
||||
cookie_str = '; '.join([f"{name}={value}" for name, value in self.cookies.items()])
|
||||
req.add_header('Cookie', cookie_str)
|
||||
|
||||
# Add default user agent if not already present
|
||||
if 'user-agent' not in headers:
|
||||
default_user_agent = 'Mozilla/5.0'
|
||||
req.add_header('user-agent', default_user_agent)
|
||||
|
||||
|
||||
self.log_request()
|
||||
return req
|
||||
|
||||
def _perform_request(self, req: urllib.request.Request) -> urllib.response.addinfourl:
|
||||
"""
|
||||
Perform the HTTP request.
|
||||
"""
|
||||
if self.proxy:
|
||||
proxy_handler = urllib.request.ProxyHandler({'http': self.proxy, 'https': self.proxy})
|
||||
opener = urllib.request.build_opener(proxy_handler)
|
||||
urllib.request.install_opener(opener)
|
||||
|
||||
if not self.verify_ssl:
|
||||
|
||||
# Create SSL context
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
# Build the request with SSL context
|
||||
response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl_context)
|
||||
|
||||
else:
|
||||
response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl.create_default_context())
|
||||
|
||||
return response
|
||||
|
||||
def _process_response(self, response: urllib.response.addinfourl, start_time: float, redirect_url: Optional[str]) -> Response:
|
||||
"""
|
||||
Process the HTTP response.
|
||||
"""
|
||||
response_data = response.read()
|
||||
content_type = response.headers.get('Content-Type', '').lower()
|
||||
|
||||
if self.redirection_handling and response.status in (301, 302, 303, 307, 308):
|
||||
location = response.headers.get('Location')
|
||||
logging.info(f"Redirecting to: {location}")
|
||||
redirect_url = location
|
||||
self.url = location
|
||||
return self.send()
|
||||
|
||||
return self._build_response(response, response_data, start_time, redirect_url, content_type)
|
||||
|
||||
def _build_response(self, response: urllib.response.addinfourl, response_data: bytes, start_time: float, redirect_url: Optional[str], content_type: str) -> Response:
|
||||
"""
|
||||
Build the Response object.
|
||||
"""
|
||||
response_time = time.time() - start_time
|
||||
response_headers = dict(response.headers)
|
||||
response_cookies = {}
|
||||
|
||||
for cookie in response.headers.get_all('Set-Cookie', []):
|
||||
cookie_parts = cookie.split(';')
|
||||
cookie_name, cookie_value = cookie_parts[0].split('=', 1) # Only the first
|
||||
response_cookies[cookie_name.strip()] = cookie_value.strip()
|
||||
|
||||
return Response(
|
||||
status=response.status,
|
||||
text=response_data.decode('latin-1'),
|
||||
is_json=("json" in content_type),
|
||||
content=response_data,
|
||||
headers=response_headers,
|
||||
cookies=response_cookies,
|
||||
redirect_url=redirect_url,
|
||||
response_time=response_time,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
|
||||
def _handle_error(self, e: Union[urllib.error.URLError, urllib.error.HTTPError]) -> None:
|
||||
"""
|
||||
Handle request error.
|
||||
"""
|
||||
logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}")
|
||||
|
||||
if self.attempt < self.retries:
|
||||
logging.error(f"Retry request for URL '{self.url}' (attempt {self.attempt}/{self.retries})")
|
||||
time.sleep(HTTP_DELAY)
|
||||
|
||||
else:
|
||||
logging.error(f"Maximum retries reached for URL '{self.url}'")
|
||||
raise RequestError(str(e))
|
||||
|
||||
|
||||
class ValidateRequest:
|
||||
"""
|
||||
Class for validating request inputs.
|
||||
"""
|
||||
@staticmethod
|
||||
def validate_url(url: str) -> bool:
|
||||
"""Validate URL format."""
|
||||
|
||||
url_regex = re.compile(
|
||||
r'^(?:http|ftp)s?://' # http:// or https://
|
||||
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
|
||||
r'localhost|' # localhost...
|
||||
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', re.IGNORECASE)
|
||||
return re.match(url_regex, url) is not None
|
||||
|
||||
@staticmethod
|
||||
def validate_headers(headers: Dict[str, str]) -> bool:
|
||||
"""Validate header values."""
|
||||
|
||||
for key, value in headers.items():
|
||||
if not isinstance(key, str) or not isinstance(value, str):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class ValidateResponse:
|
||||
"""
|
||||
Class for validating response data.
|
||||
"""
|
||||
@staticmethod
|
||||
def is_valid_json(data: str) -> bool:
|
||||
"""Check if response data is a valid JSON."""
|
||||
|
||||
try:
|
||||
json.loads(data)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
class SSLHandler:
|
||||
"""Class for handling SSL certificates."""
|
||||
@staticmethod
|
||||
def load_certificate(custom_cert_path: str) -> None:
|
||||
"""Load custom SSL certificate."""
|
||||
ssl_context = ssl.create_default_context(cafile=custom_cert_path)
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
|
||||
class KwargsRequest(TypedDict, total = False):
|
||||
url: str
|
||||
headers: Optional[Dict[str, str]] = None
|
||||
timeout: float = HTTP_TIMEOUT
|
||||
retries: int = HTTP_RETRIES
|
||||
params: Optional[Dict[str, str]] = None
|
||||
cookies: Optional[Dict[str, str]] = None
|
||||
verify_ssl: bool = True
|
||||
json_data: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
class Request:
|
||||
"""
|
||||
Class for making HTTP requests.
|
||||
"""
|
||||
def __init__(self) -> None:
|
||||
|
||||
# Ensure SSL certificate is set up
|
||||
self.__setup_ssl_certificate__()
|
||||
|
||||
def __setup_ssl_certificate__(self):
|
||||
"""
|
||||
Set up SSL certificate environment variables.
|
||||
"""
|
||||
try:
|
||||
# Determine the Python executable
|
||||
python_executable = sys.executable
|
||||
logging.info("Python path: ", python_executable)
|
||||
|
||||
# Check if certifi package is installed, install it if not
|
||||
if subprocess.run([python_executable, "-c", "import certifi"], capture_output=True).returncode != 0:
|
||||
subprocess.run(["pip", "install", "certifi"], check=True)
|
||||
logging.info("Installed certifi package.")
|
||||
|
||||
# Get path to SSL certificate
|
||||
cert_path = subprocess.run([python_executable, "-c", "import certifi; print(certifi.where())"], capture_output=True, text=True, check=True).stdout.strip()
|
||||
logging.info("Path cert: ", cert_path)
|
||||
|
||||
if not cert_path:
|
||||
raise ValueError("Unable to determine the path to the SSL certificate.")
|
||||
|
||||
# Set SSL certificate environment variables
|
||||
os.environ['SSL_CERT_FILE'] = cert_path
|
||||
os.environ['REQUESTS_CA_BUNDLE'] = cert_path
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ValueError(f"Error executing subprocess: {e}") from e
|
||||
|
||||
def get(self, url: str, **kwargs: Unpack[KwargsRequest])-> 'Response':
|
||||
"""
|
||||
Send a GET request.
|
||||
|
||||
Args:
|
||||
- url (str): The URL to which the request will be sent.
|
||||
**kwargs: Additional keyword arguments for the request.
|
||||
|
||||
Returns:
|
||||
Response: The response object.
|
||||
"""
|
||||
return self._send_request(url, 'GET', **kwargs)
|
||||
|
||||
def post(self, url: str, **kwargs: Unpack[KwargsRequest]) -> 'Response':
|
||||
"""
|
||||
Send a POST request.
|
||||
|
||||
Args:
|
||||
- url (str): The URL to which the request will be sent.
|
||||
**kwargs: Additional keyword arguments for the request.
|
||||
|
||||
Returns:
|
||||
Response: The response object.
|
||||
"""
|
||||
return self._send_request(url, 'POST', **kwargs)
|
||||
|
||||
def head(self, url: str, **kwargs: Unpack[KwargsRequest]) -> 'Response':
|
||||
"""
|
||||
Send a HEAD request.
|
||||
|
||||
Args:
|
||||
- url (str): The URL to which the request will be sent.
|
||||
**kwargs: Additional keyword arguments for the request.
|
||||
|
||||
Returns:
|
||||
Response: The response object.
|
||||
"""
|
||||
return self._send_request(url, 'HEAD', **kwargs)
|
||||
|
||||
def _send_request(self, url: str, method: str, **kwargs: Unpack[KwargsRequest]) -> 'Response':
|
||||
"""Send an HTTP request."""
|
||||
if not ValidateRequest.validate_url(url):
|
||||
raise ValueError("Invalid URL format")
|
||||
|
||||
if 'headers' in kwargs and not ValidateRequest.validate_headers(kwargs['headers']):
|
||||
raise ValueError("Invalid header values")
|
||||
|
||||
return ManageRequests(url, method, **kwargs).send()
|
||||
|
||||
|
||||
# Output
|
||||
requests: Request = Request()
|
@ -1,3 +0,0 @@
|
||||
# 04.4.24
|
||||
|
||||
from .user_agent import ua
|
@ -1,112 +0,0 @@
|
||||
# 04.4.24
|
||||
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
import random
|
||||
import threading
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
# Internal libraries
|
||||
from Src.Lib.Request import requests
|
||||
|
||||
|
||||
|
||||
def get_browser_user_agents_online(browser: str) -> List[str]:
|
||||
"""
|
||||
Retrieve browser user agent strings from a website.
|
||||
|
||||
Args:
|
||||
- browser (str): The name of the browser (e.g., 'chrome', 'firefox', 'safari').
|
||||
|
||||
Returns:
|
||||
List[str]: List of user agent strings for the specified browser.
|
||||
"""
|
||||
url = f"https://useragentstring.com/pages/{browser}/"
|
||||
|
||||
try:
|
||||
|
||||
# Make request and find all user agents
|
||||
html = requests.get(url).text
|
||||
browser_user_agents = re.findall(r"<a href=\'/.*?>(.+?)</a>", html, re.UNICODE)
|
||||
return [ua for ua in browser_user_agents if "more" not in ua.lower()]
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to fetch user agents for '{browser}': {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
def update_user_agents(browser_name: str, browser_user_agents: Dict[str, List[str]]) -> None:
|
||||
"""
|
||||
Update browser user agents dictionary with new requests.
|
||||
|
||||
Args:
|
||||
- browser_name (str): Name of the browser.
|
||||
- browser_user_agents (Dict[str, List[str]]): Dictionary to store browser user agents.
|
||||
"""
|
||||
browser_user_agents[browser_name] = get_browser_user_agents_online(browser_name)
|
||||
|
||||
|
||||
def create_or_update_user_agent_file() -> None:
|
||||
"""
|
||||
Create or update the user agent file with browser user agents.
|
||||
"""
|
||||
user_agent_file = os.path.join(tempfile.gettempdir(), 'fake_user_agent.json')
|
||||
logging.info(f"Upload file: {user_agent_file}")
|
||||
|
||||
if not os.path.exists(user_agent_file):
|
||||
browser_user_agents: Dict[str, List[str]] = {}
|
||||
threads = []
|
||||
|
||||
for browser_name in ['chrome', 'firefox', 'safari']:
|
||||
t = threading.Thread(target=update_user_agents, args=(browser_name, browser_user_agents))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
with open(user_agent_file, 'w') as f:
|
||||
json.dump(browser_user_agents, f, indent=4)
|
||||
logging.info(f"User agent file created at: {user_agent_file}")
|
||||
|
||||
else:
|
||||
logging.info("User agent file already exists.")
|
||||
|
||||
|
||||
class UserAgentManager:
|
||||
"""
|
||||
Manager class to access browser user agents from a file.
|
||||
"""
|
||||
def __init__(self):
|
||||
|
||||
# Get path to temp file where save all user agents
|
||||
self.user_agent_file = os.path.join(tempfile.gettempdir(), 'fake_user_agent.json')
|
||||
logging.info(f"Check file: {self.user_agent_file}")
|
||||
|
||||
# If file dont exist, creaet it
|
||||
if not os.path.exists(self.user_agent_file):
|
||||
create_or_update_user_agent_file()
|
||||
logging.info(f"Create file: {self.user_agent_file}")
|
||||
|
||||
def get_random_user_agent(self, browser: str) -> str:
|
||||
"""
|
||||
Get a random user agent for the specified browser.
|
||||
|
||||
Args:
|
||||
browser (str): The name of the browser ('chrome', 'firefox', 'safari').
|
||||
|
||||
Returns:
|
||||
Optional[str]: Random user agent string for the specified browser.
|
||||
"""
|
||||
with open(self.user_agent_file, 'r') as f:
|
||||
browser_user_agents = json.load(f)
|
||||
return random.choice(browser_user_agents.get(browser.lower(), []))
|
||||
|
||||
|
||||
# Output
|
||||
ua: UserAgentManager = UserAgentManager()
|
@ -10,7 +10,7 @@ from Src.Util.console import console
|
||||
|
||||
|
||||
# External library
|
||||
from Src.Lib.Request import requests
|
||||
import requests
|
||||
|
||||
|
||||
# Variable
|
||||
|
@ -3,8 +3,12 @@
|
||||
import logging
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Lib.UserAgent import ua
|
||||
# External library
|
||||
import fake_useragent
|
||||
|
||||
|
||||
# Variable
|
||||
useragent = fake_useragent.UserAgent()
|
||||
|
||||
|
||||
def get_headers() -> str:
|
||||
@ -12,11 +16,8 @@ def get_headers() -> str:
|
||||
Generate a random user agent to use in HTTP requests.
|
||||
|
||||
Returns:
|
||||
str: A random user agent string.
|
||||
- str: A random user agent string.
|
||||
"""
|
||||
|
||||
# Get a random user agent string from the user agent rotator
|
||||
random_headers = ua.get_random_user_agent("firefox")
|
||||
|
||||
#logging.info(f"Use headers: {random_headers}")
|
||||
return random_headers
|
||||
return useragent.firefox
|
@ -11,7 +11,7 @@ warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWar
|
||||
|
||||
|
||||
# Variable
|
||||
url_test = "https://sc-b1-18.scws-content.net/hls/170/3/25/32503b5b-4646-4376-ad47-7766c65be7e2/audio/ita/0004-0100.ts"
|
||||
url_test = "https://sc-b1-18.scws-content.net/hls/100/b/d3/bd3a430d-0a13-4bec-8fcc-ea41af183555/audio/ita/0010-0100.ts?token=CiEPTIyvEoTkGk3szgDu9g&expires=1722801022"
|
||||
|
||||
|
||||
def get_ip_from_url(url):
|
||||
@ -81,5 +81,6 @@ def main():
|
||||
|
||||
print(f"Valid IP addresses: {sorted(valid_ip, reverse=True)}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -1,5 +1,7 @@
|
||||
bs4
|
||||
requests
|
||||
bs4
|
||||
certifi
|
||||
tqdm
|
||||
rich
|
||||
unidecode
|
||||
unidecode
|
||||
fake-useragent
|
Loading…
x
Reference in New Issue
Block a user