Fix requests for mac

This commit is contained in:
Ghost 2024-05-09 18:57:00 +02:00
parent e985b31e74
commit 58382ea13a

View File

@ -1,24 +1,26 @@
# 04.4.24
# Note: verify_ssl need to be set to false for macOs
import os
import sys
import base64
import json
import logging
import ssl
import time
import re
import subprocess
import urllib.parse
import urllib.request
import urllib.error
from typing import Dict, Optional, Union, TypedDict, Any
from typing import Dict, Optional, Union, Any
try:
from typing import Unpack
from typing import Unpack, TypedDict
except ImportError:
# (Python <= 3.10),
try:
from typing_extensions import Unpack
from typing_extensions import Unpack, TypedDict # type: ignore
except ImportError:
raise ImportError("Unable to import Unpack from typing or typing_extensions. "
"Please make sure you have the necessary libraries installed.")
@ -190,7 +192,9 @@ class ManageRequests:
self.headers[key] = value
def send(self) -> Response:
"""Send the HTTP request."""
"""
Send the HTTP request.
"""
start_time = time.time()
self.attempt = 0
@ -207,9 +211,30 @@ class ManageRequests:
self._handle_error(e)
self.attempt += 1
def _build_request(self) -> urllib.request.Request:
"""Build the urllib Request object."""
def log_request(self):
"""
Constructs a log message based on the request parameters and logs it.
"""
log_message = "Request: ("
if self.url:
log_message += f"'url': {self.url}, "
if self.headers:
log_message += f"'headers': {self.headers}, "
if self.cookies:
log_message += f"'cookies': {self.cookies}, "
if self.json_data:
log_message += f"'body': {json.dumps(self.json_data).encode('utf-8')}, "
# Remove the trailing comma and add parentheses
log_message = log_message.rstrip(", ") + ")"
logging.info(log_message)
def _build_request(self) -> urllib.request.Request:
"""
Build the urllib Request object.
"""
# Make a copy of headers to avoid modifying the original dictionary
headers = self.headers.copy()
@ -241,19 +266,27 @@ class ManageRequests:
default_user_agent = 'Mozilla/5.0'
req.add_header('user-agent', default_user_agent)
self.log_request()
return req
def _perform_request(self, req: urllib.request.Request) -> urllib.response.addinfourl:
"""Perform the HTTP request."""
"""
Perform the HTTP request.
"""
if self.proxy:
proxy_handler = urllib.request.ProxyHandler({'http': self.proxy, 'https': self.proxy})
opener = urllib.request.build_opener(proxy_handler)
urllib.request.install_opener(opener)
if not self.verify_ssl:
# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Build the request with SSL context
response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl_context)
else:
@ -262,7 +295,9 @@ class ManageRequests:
return response
def _process_response(self, response: urllib.response.addinfourl, start_time: float, redirect_url: Optional[str]) -> Response:
"""Process the HTTP response."""
"""
Process the HTTP response.
"""
response_data = response.read()
content_type = response.headers.get('Content-Type', '').lower()
@ -276,14 +311,16 @@ class ManageRequests:
return self._build_response(response, response_data, start_time, redirect_url, content_type)
def _build_response(self, response: urllib.response.addinfourl, response_data: bytes, start_time: float, redirect_url: Optional[str], content_type: str) -> Response:
"""Build the Response object."""
"""
Build the Response object.
"""
response_time = time.time() - start_time
response_headers = dict(response.headers)
response_cookies = {}
for cookie in response.headers.get_all('Set-Cookie', []):
cookie_parts = cookie.split(';')
cookie_name, cookie_value = cookie_parts[0].split('=', 1)
cookie_name, cookie_value = cookie_parts[0].split('=', 1) # Only the first
response_cookies[cookie_name.strip()] = cookie_value.strip()
return Response(
@ -299,7 +336,9 @@ class ManageRequests:
)
def _handle_error(self, e: Union[urllib.error.URLError, urllib.error.HTTPError]) -> None:
"""Handle request error."""
"""
Handle request error.
"""
logging.error(f"Request failed for URL '{self.url}': {str(e)}")
if self.attempt < self.retries:
@ -361,13 +400,44 @@ class KwargsRequest(TypedDict, total = False):
retries: int = HTTP_RETRIES
params: Optional[Dict[str, str]] = None
cookies: Optional[Dict[str, str]] = None
verify_ssl: bool = True
json_data: Optional[Dict[str, Any]] = None
class Request:
"""Class for making HTTP requests."""
def __init__(self) -> None:
pass
# Ensure SSL certificate is set up
self.__setup_ssl_certificate__()
def __setup_ssl_certificate__(self):
"""
Set up SSL certificate environment variables.
"""
try:
# Determine the Python executable
python_executable = sys.executable
logging.info("Python path: ", python_executable)
# Check if certifi package is installed, install it if not
if subprocess.run([python_executable, "-c", "import certifi"], capture_output=True).returncode != 0:
subprocess.run(["pip", "install", "certifi"], check=True)
logging.info("Installed certifi package.")
# Get path to SSL certificate
cert_path = subprocess.run([python_executable, "-c", "import certifi; print(certifi.where())"], capture_output=True, text=True, check=True).stdout.strip()
logging.info("Path cert: ", cert_path)
if not cert_path:
raise ValueError("Unable to determine the path to the SSL certificate.")
# Set SSL certificate environment variables
os.environ['SSL_CERT_FILE'] = cert_path
os.environ['REQUESTS_CA_BUNDLE'] = cert_path
except subprocess.CalledProcessError as e:
raise ValueError(f"Error executing subprocess: {e}") from e
def get(self, url: str, **kwargs: Unpack[KwargsRequest])-> 'Response':
"""