2024-06-10 21:36:42 +02:00

114 lines
3.3 KiB
Python

# 09.06.24
import os
import time
import logging
from concurrent.futures import ThreadPoolExecutor
# External libraries
import httpx
# Internal utilities
from Src.Util._jsonConfig import config_manager
from Src.Util.os import check_file_existence
class ProxyManager:
def __init__(self, proxy_list=None, url=None):
"""
Initialize ProxyManager with a list of proxies and timeout.
Args:
- proxy_list: List of proxy strings
- timeout: Timeout for proxy requests
"""
self.proxy_list = proxy_list or []
self.verified_proxies = []
self.failed_proxies = {}
self.timeout = config_manager.get_float('REQUESTS', 'timeout')
self.url = url
def _check_proxy(self, proxy):
"""
Check if a single proxy is working by making a request to Google.
Args:
- proxy: Proxy string to be checked
Returns:
- Proxy string if working, None otherwise
"""
protocol = proxy.split(":")[0].lower()
protocol = f'{protocol}://'
try:
response = httpx.get(self.url, proxies={protocol: proxy}, timeout=self.timeout)
if response.status_code == 200:
logging.info(f"Proxy {proxy} is working.")
return proxy
except Exception as e:
logging.error(f"Proxy {proxy} failed: {e}")
self.failed_proxies[proxy] = time.time()
return None
def verify_proxies(self):
"""
Verify all proxies in the list and store the working ones.
"""
logging.info("Starting proxy verification...")
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
self.verified_proxies = list(executor.map(self._check_proxy, self.proxy_list))
self.verified_proxies = [proxy for proxy in self.verified_proxies if proxy]
logging.info(f"Verification complete. {len(self.verified_proxies)} proxies are working.")
def get_verified_proxies(self):
"""
Get validate proxies.
"""
validate_proxy = []
for proxy in self.verified_proxies:
protocol = proxy.split(":")[0].lower()
protocol = f'{protocol}://' # For httpx
validate_proxy.append({protocol: proxy})
return validate_proxy
def main_test_proxy(url_test):
path_file_proxt_list = "list_proxy.txt"
if check_file_existence(path_file_proxt_list):
# Write test to pass THERE IS PROXY on config.json for segments
config_manager.set_key("REQUESTS", "proxy", ["192.168.1.1"])
# Read file
with open(path_file_proxt_list, 'r') as file:
ip_addresses = file.readlines()
# Formatt ip
ip_addresses = [ip.strip() for ip in ip_addresses]
formatted_ips = [f"http://{ip}" for ip in ip_addresses]
# Get list of proxy from config.json
proxy_list = formatted_ips
# Verify proxy
manager = ProxyManager(proxy_list, url_test)
manager.verify_proxies()
# Write valid ip in txt file
with open(path_file_proxt_list, 'w') as file:
for ip in ip_addresses:
file.write(f"{ip}\n")
# Return valid proxy
return manager.get_verified_proxies()