Fix proxy (#319)

* Add ENABLE_VIDEO

* Fix proxy

* Add error proxy

* Update config.json
This commit is contained in:
None 2025-05-17 09:54:41 +02:00 committed by GitHub
parent c0f3d8619b
commit dfcc29078f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 444 additions and 234 deletions

View File

@ -470,7 +470,11 @@ To enable qBittorrent integration, follow the setup guide [here](https://github.
"REQUESTS": {
"verify": false,
"timeout": 20,
"max_retry": 8
"max_retry": 8,
"proxy": {
"http": "http://username:password@host:port",
"https": "https://username:password@host:port"
}
}
}
```
@ -478,6 +482,22 @@ To enable qBittorrent integration, follow the setup guide [here](https://github.
- `verify`: Verifies SSL certificates
- `timeout`: Maximum timeout (in seconds) for each request
- `max_retry`: Number of retry attempts per segment during M3U8 index download
- `proxy`: Proxy configuration for HTTP/HTTPS requests
* Set to empty string `""` to disable proxies (default)
* Example with authentication:
```json
"proxy": {
"http": "http://username:password@host:port",
"https": "https://username:password@host:port"
}
```
* Example without authentication:
```json
"proxy": {
"http": "http://host:port",
"https": "https://host:port"
}
```
</details>
<details>

View File

@ -0,0 +1,141 @@
# 05.07.24
# NOTE: NOT USED
import re
import logging
# External libraries
import httpx
import jsbeautifier
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
# Variable
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
def __init__(self, url: str):
"""
Sets up the video source with the provided URL.
Parameters:
- url (str): The URL of the video.
"""
self.url = url
self.redirect_url = None
self.maxstream_url = None
self.m3u8_url = None
self.headers = {'user-agent': get_userAgent()}
def get_redirect_url(self):
"""
Sends a request to the initial URL and extracts the redirect URL.
"""
try:
response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract the redirect URL from the HTML
soup = BeautifulSoup(response.text, "html.parser")
self.redirect_url = soup.find("div", id="iframen1").get("data-src")
logging.info(f"Redirect URL: {self.redirect_url}")
return self.redirect_url
except Exception as e:
logging.error(f"Error parsing HTML: {e}")
raise
def get_maxstream_url(self):
"""
Sends a request to the redirect URL and extracts the Maxstream URL.
"""
try:
response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract the Maxstream URL from the HTML
soup = BeautifulSoup(response.text, "html.parser")
maxstream_url = soup.find("a")
if maxstream_url is None:
# If no anchor tag is found, try the alternative method
logging.warning("Anchor tag not found. Trying the alternative method.")
headers = {
'origin': 'https://stayonline.pro',
'user-agent': get_userAgent(),
'x-requested-with': 'XMLHttpRequest',
}
# Make request to stayonline api
data = {'id': self.redirect_url.split("/")[-2], 'ref': ''}
response = httpx.post('https://stayonline.pro/ajax/linkEmbedView.php', headers=headers, data=data)
response.raise_for_status()
uprot_url = response.json()['data']['value']
# Retry getting maxtstream url
response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
maxstream_url = soup.find("a").get("href")
else:
maxstream_url = maxstream_url.get("href")
self.maxstream_url = maxstream_url
logging.info(f"Maxstream URL: {self.maxstream_url}")
return self.maxstream_url
except Exception as e:
logging.error(f"Error during the request: {e}")
raise
def get_m3u8_url(self):
"""
Sends a request to the Maxstream URL and extracts the .m3u8 file URL.
"""
try:
response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Iterate over all script tags in the HTML
for script in soup.find_all("script"):
if "eval(function(p,a,c,k,e,d)" in script.text:
# Execute the script using
data_js = jsbeautifier.beautify(script.text)
# Extract the .m3u8 URL from the script's output
match = re.search(r'sources:\s*\[\{\s*src:\s*"([^"]+)"', data_js)
if match:
self.m3u8_url = match.group(1)
logging.info(f"M3U8 URL: {self.m3u8_url}")
break
else:
logging.error("Failed to find M3U8 URL: No match found")
return self.m3u8_url
except Exception as e:
logging.error(f"Error executing the Node.js script: {e}")
raise
def get_playlist(self):
"""
Executes the entire flow to obtain the final .m3u8 file URL.
"""
self.get_redirect_url()
self.get_maxstream_url()
return self.get_m3u8_url()

View File

@ -27,7 +27,7 @@ indice = 2
_useFor = "Film_&_Serie"
_priority = 0
_engineDownload = "hls"
_deprecate = True
_deprecate = False
msg = Prompt()
console = Console()

View File

@ -1,6 +1,7 @@
# 16.03.25
import os
import re
# External library
@ -56,51 +57,30 @@ def download_film(select_title: MediaItem) -> str:
start_message()
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n")
# Extract mostraguarda link
# Extract mostraguarda URL
try:
response = httpx.get(select_title.url, headers=get_headers(), timeout=10)
response.raise_for_status()
except Exception as e:
console.print(f"[red]Error fetching the page: {e}")
if site_constant.TELEGRAM_BOT:
bot.send_message(f"ERRORE\n\nErrore durante il recupero della pagina.\n\n{e}", None)
return None
soup = BeautifulSoup(response.text, 'html.parser')
iframes = soup.find_all('iframe')
mostraguarda = iframes[0]['src']
# Create mostraguarda url
soup = BeautifulSoup(response.text, "html.parser")
iframe_tag = soup.find_all("iframe")
url_mostraGuarda = iframe_tag[0].get('data-src')
if not url_mostraGuarda:
console.print("Error: data-src attribute not found in iframe.")
if site_constant.TELEGRAM_BOT:
bot.send_message(f"ERRORE\n\nErrore: attributo data-src non trovato nell'iframe", None)
except Exception as e:
console.print(f"[red]Site: {site_constant.SITE_NAME}, request error: {e}, get mostraguarda")
# Extract supervideo URL
try:
response = httpx.get(url_mostraGuarda, headers=get_headers(), timeout=10)
response.raise_for_status()
response = httpx.get(mostraguarda, headers=get_headers(), timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
pattern = r'//supervideo\.[^/]+/[a-z]/[a-zA-Z0-9]+'
supervideo_match = re.search(pattern, response.text)
supervideo_url = 'https:' + supervideo_match.group(0)
except Exception as e:
console.print(f"[red]Error fetching mostraguarda link: {e}")
console.print("[yellow]Missing access credentials. This part of the code is still under development.")
if site_constant.TELEGRAM_BOT:
bot.send_message(f"ERRORE\n\nErrore durante il recupero del link mostra/guarda.\n\n{e}", None)
bot.send_message(f"ERRORE\n\nCredenziali di accesso mancanti.\nQuesta parte del codice è ancora in fase di sviluppo.", None)
return None
# Create supervio URL
soup = BeautifulSoup(response.text, "html.parser")
player_links = soup.find("ul", class_="_player-mirrors")
player_items = player_links.find_all("li")
supervideo_url = "https:" + player_items[0].get("data-link")
if not supervideo_url:
return None
console.print(f"[red]Site: {site_constant.SITE_NAME}, request error: {e}, get supervideo URL")
# Init class
video_source = VideoSource(url=supervideo_url)
video_source = VideoSource(supervideo_url)
master_playlist = video_source.get_playlist()
# Define the filename and path for the downloaded film

View File

@ -0,0 +1,72 @@
# 09.06.24
from urllib.parse import quote_plus
# External library
from rich.console import Console
from rich.prompt import Prompt
# Internal utilities
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Logic class
from .site import title_search, media_search_manager, table_show_manager
from .film import download_film
# Variable
indice = -1
_useFor = "Film"
_priority = 0
_engineDownload = "mp4"
_deprecate = True
msg = Prompt()
console = Console()
def process_search_result(select_title):
"""
Handles the search result and initiates the download for either a film or series.
"""
# !!! ADD TYPE DONT WORK FOR SERIE
download_film(select_title)
def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None):
"""
Main function of the application for search.
Parameters:
string_to_search (str, optional): String to search for
get_onylDatabase (bool, optional): If True, return only the database object
direct_item (dict, optional): Direct item to process (bypass search)
"""
if direct_item:
select_title = MediaItem(**direct_item)
process_search_result(select_title)
return
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{site_constant.SITE_NAME}").strip()
# Search on database
len_database = title_search(quote_plus(string_to_search))
## If only the database is needed, return the manager
if get_onlyDatabase:
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
process_search_result(select_title)
else:
# If no results are found, ask again
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
search()

View File

@ -0,0 +1,62 @@
# 03.07.24
import os
# External library
from rich.console import Console
# Internal utilities
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import HLS_Downloader
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Player
from StreamingCommunity.Api.Player.maxstream import VideoSource
# Variable
console = Console()
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided obj.
Parameters:
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
Return:
- str: output path
"""
start_message()
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n")
# Setup api manger
video_source = VideoSource(select_title.url)
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name) +".mp4"
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
r_proc = HLS_Downloader(
m3u8_url=master_playlist,
output_path=os.path.join(mp4_path, title_name)
).start()
if r_proc['error'] is not None:
try: os.remove(r_proc['path'])
except: pass
return r_proc['path']

View File

@ -0,0 +1,78 @@
# 03.07.24
# External libraries
import httpx
from bs4 import BeautifulSoup
from rich.console import Console
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
console = Console()
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
def title_search(query: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- query (str): The query to search for.
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
search_url = f"{site_constant.FULL_URL}/?s={query}"
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
follow_redirects=True,
verify=False
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
for card in soup.find_all("div", class_=["card", "mp-post", "horizontal"]):
try:
title_tag = card.find("h3", class_="card-title").find("a")
url = title_tag.get("href")
title = title_tag.get_text(strip=True)
title_info = {
'name': title,
'url': url,
'type': 'film'
}
media_search_manager.add_media(title_info)
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -12,6 +12,7 @@ from rich.prompt import Prompt
# Internal utilities
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Lib.Proxies.proxy import ProxyFinder
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance
@ -58,7 +59,7 @@ def get_user_input(string_to_search: str = None):
return string_to_search
def process_search_result(select_title, selections=None):
def process_search_result(select_title, selections=None, proxy=None):
"""
Handles the search result and initiates the download for either a film or series.
@ -75,7 +76,7 @@ def process_search_result(select_title, selections=None):
season_selection = selections.get('season')
episode_selection = selections.get('episode')
download_series(select_title, season_selection, episode_selection)
download_series(select_title, season_selection, episode_selection, proxy)
else:
download_film(select_title)
@ -114,7 +115,9 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
# Search on database
len_database = title_search(string_to_search)
finder = ProxyFinder(site_constant.FULL_URL)
proxy = finder.find_fast_proxy()
len_database = title_search(string_to_search, proxy)
# If only the database is needed, return the manager
if get_onlyDatabase:
@ -122,7 +125,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
process_search_result(select_title, selections)
process_search_result(select_title, selections, proxy)
else:
# If no results are found, ask again

View File

@ -142,7 +142,7 @@ def download_episode(index_season_selected: int, scrape_serie: GetSerieInfo, vid
break
def download_series(select_season: MediaItem, season_selection: str = None, episode_selection: str = None) -> None:
def download_series(select_season: MediaItem, season_selection: str = None, episode_selection: str = None, proxy = None) -> None:
"""
Handle downloading a complete series.
@ -155,7 +155,7 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
# Init class
video_source = VideoSource(f"{site_constant.FULL_URL}/it", True, select_season.id)
scrape_serie = GetSerieInfo(f"{site_constant.FULL_URL}/it", select_season.id, select_season.slug)
scrape_serie = GetSerieInfo(f"{site_constant.FULL_URL}/it", select_season.id, select_season.slug, proxy)
# Collect information about season
scrape_serie.getNumberSeason()
@ -219,4 +219,4 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
# Get script_id
script_id = TelegramSession.get_session()
if script_id != "unknown":
TelegramSession.deleteScriptId(script_id)
TelegramSession.deleteScriptId(script_id)

View File

@ -28,7 +28,7 @@ table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
def title_search(query: str) -> int:
def title_search(query: str, proxy: str) -> int:
"""
Search for titles based on a search query.
@ -48,7 +48,8 @@ def title_search(query: str) -> int:
response = httpx.get(
f"{site_constant.FULL_URL}/it",
headers={'user-agent': get_userAgent()},
timeout=max_timeout
timeout=max_timeout,
proxy=proxy
)
response.raise_for_status()
@ -56,6 +57,7 @@ def title_search(query: str) -> int:
version = json.loads(soup.find('div', {'id': "app"}).get("data-page"))['version']
except Exception as e:
if "WinError" in str(e) or "Errno" in str(e): console.print("\n[bold yellow]Please make sure you have enabled and configured a valid proxy.[/bold yellow]")
console.print(f"[red]Site: {site_constant.SITE_NAME} version, request error: {e}")
return 0
@ -71,7 +73,8 @@ def title_search(query: str) -> int:
'x-inertia': 'true',
'x-inertia-version': version
},
timeout=max_timeout
timeout=max_timeout,
proxy=proxy
)
response.raise_for_status()

View File

@ -20,7 +20,7 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
class GetSerieInfo:
def __init__(self, url, media_id: int = None, series_name: str = None):
def __init__(self, url, media_id: int = None, series_name: str = None, proxy = None):
"""
Initialize the GetSerieInfo class for scraping TV series information.
@ -32,6 +32,7 @@ class GetSerieInfo:
self.is_series = False
self.headers = {'user-agent': get_userAgent()}
self.url = url
self.proxy = proxy
self.media_id = media_id
self.seasons_manager = SeasonManager()
@ -50,7 +51,8 @@ class GetSerieInfo:
response = httpx.get(
url=f"{self.url}/titles/{self.media_id}-{self.series_name}",
headers=self.headers,
timeout=max_timeout
timeout=max_timeout,
proxy=self.proxy
)
response.raise_for_status()
@ -104,7 +106,8 @@ class GetSerieInfo:
'x-inertia': 'true',
'x-inertia-version': self.version,
},
timeout=max_timeout
timeout=max_timeout,
proxy=self.proxy
)
# Extract episodes from JSON response

View File

@ -21,7 +21,7 @@ from .series import download_series
# Variable
indice = 7
_useFor = "Film_&_Serie"
_priority = 10 # !!! MOLTO LENTO
_priority = 0
_engineDownload = "hls"
_deprecate = False
@ -79,8 +79,8 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
finder = ProxyFinder(url=f"{site_constant.FULL_URL}/serie/euphoria/")
proxy, response_serie, _ = finder.find_fast_proxy()
len_database = title_search(string_to_search, [proxy, response_serie])
proxy = finder.find_fast_proxy()
len_database = title_search(string_to_search, proxy)
# If only the database is needed, return the manager
if get_onlyDatabase:

View File

@ -27,9 +27,16 @@ table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
def extract_nonce(response_) -> str:
def extract_nonce(proxy) -> str:
"""Extract nonce value from the page script"""
soup = BeautifulSoup(response_.content, 'html.parser')
response = httpx.get(
site_constant.FULL_URL,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
proxy=proxy
)
soup = BeautifulSoup(response.content, 'html.parser')
script = soup.find('script', id='live-search-js-extra')
if script:
match = re.search(r'"admin_ajax_nonce":"([^"]+)"', script.text)
@ -38,7 +45,7 @@ def extract_nonce(response_) -> str:
return ""
def title_search(query: str, additionalData: list) -> int:
def title_search(query: str, proxy: str) -> int:
"""
Search for titles based on a search query.
@ -51,12 +58,11 @@ def title_search(query: str, additionalData: list) -> int:
media_search_manager.clear()
table_show_manager.clear()
proxy, response_serie = additionalData
search_url = f"{site_constant.FULL_URL}/wp-admin/admin-ajax.php"
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
_wpnonce = extract_nonce(response_serie)
_wpnonce = extract_nonce(proxy)
if not _wpnonce:
console.print("[red]Error: Failed to extract nonce")
@ -82,6 +88,7 @@ def title_search(query: str, additionalData: list) -> int:
soup = BeautifulSoup(response.text, 'html.parser')
except Exception as e:
if "WinError" in str(e) or "Errno" in str(e): console.print("\n[bold yellow]Please make sure you have enabled and configured a valid proxy.[/bold yellow]")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0

View File

@ -1,20 +1,15 @@
# 29.04.25
import os
import sys
import time
import json
import signal
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
# External library
import httpx
from rich import print
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeRemainingColumn
# Internal utilities
@ -27,118 +22,18 @@ MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class ProxyFinder:
def __init__(self, url, timeout_threshold: float = 7.0, max_proxies: int = 150, max_workers: int = 12):
def __init__(self, url, timeout_threshold: float = 7.0):
self.url = url
self.timeout_threshold = timeout_threshold
self.max_proxies = max_proxies
self.max_workers = max_workers
self.found_proxy = None
self.shutdown_flag = False
self.json_file = os.path.join(os.path.dirname(__file__), 'working_proxies.json')
signal.signal(signal.SIGINT, self._handle_interrupt)
def load_saved_proxies(self) -> tuple:
"""Load saved proxies if they're not expired (2 hours old)"""
try:
if not os.path.exists(self.json_file):
return None, None
with open(self.json_file, 'r') as f:
data = json.load(f)
if not data.get('proxies') or not data.get('last_update'):
return None, None
last_update = datetime.fromisoformat(data['last_update'])
if datetime.now() - last_update > timedelta(hours=2):
return None, None
return data['proxies'], last_update
except Exception:
return None, None
def save_working_proxy(self, proxy: str, response_time: float):
"""Save working proxy to JSON file"""
data = {
'proxies': [{'proxy': proxy, 'response_time': response_time}],
'last_update': datetime.now().isoformat()
}
try:
with open(self.json_file, 'w') as f:
json.dump(data, f, indent=4)
except Exception as e:
print(f"[bold red]Error saving proxy:[/bold red] {str(e)}")
def fetch_geonode(self) -> list:
proxies = []
try:
response = httpx.get(
"https://proxylist.geonode.com/api/proxy-list?protocols=http%2Chttps&limit=100&page=1&sort_by=speed&sort_type=asc",
headers=get_headers(),
timeout=MAX_TIMEOUT
)
data = response.json()
proxies = [(f"http://{p['ip']}:{p['port']}", "Geonode") for p in data.get('data', [])]
except Exception as e:
print(f"[bold red]Error in Geonode:[/bold red] {str(e)[:100]}")
return proxies
def fetch_proxyscrape(self) -> list:
proxies = []
try:
response = httpx.get(
"https://api.proxyscrape.com/v4/free-proxy-list/get?request=get_proxies&protocol=http&skip=0&proxy_format=protocolipport&format=json&limit=100&timeout=1000",
headers=get_headers(),
timeout=MAX_TIMEOUT
)
data = response.json()
if 'proxies' in data and isinstance(data['proxies'], list):
proxies = [(proxy_data['proxy'], "ProxyScrape") for proxy_data in data['proxies'] if 'proxy' in proxy_data]
except Exception as e:
print(f"[bold red]Error in ProxyScrape:[/bold red] {str(e)[:100]}")
return proxies
def fetch_proxies_from_sources(self) -> list:
#print("[cyan]Fetching proxies from sources...[/cyan]")
with ThreadPoolExecutor(max_workers=3) as executor:
proxyscrape_future = executor.submit(self.fetch_proxyscrape)
geonode_future = executor.submit(self.fetch_geonode)
sources_proxies = {}
try:
proxyscrape_result = proxyscrape_future.result()
sources_proxies["proxyscrape"] = proxyscrape_result[:int(self.max_proxies/2)]
except Exception as e:
print(f"[bold red]Error fetching from proxyscrape:[/bold red] {str(e)[:100]}")
sources_proxies["proxyscrape"] = []
try:
geonode_result = geonode_future.result()
sources_proxies["geonode"] = geonode_result[:int(self.max_proxies/2)]
except Exception as e:
print(f"[bold red]Error fetching from geonode:[/bold red] {str(e)[:100]}")
sources_proxies["geonode"] = []
merged_proxies = []
if "proxyscrape" in sources_proxies:
merged_proxies.extend(sources_proxies["proxyscrape"])
if "geonode" in sources_proxies:
merged_proxies.extend(sources_proxies["geonode"])
proxy_list = merged_proxies[:self.max_proxies]
return proxy_list
def _test_single_request(self, proxy_info: tuple) -> tuple:
proxy, source = proxy_info
try:
start = time.time()
print(f"[yellow]Testing proxy...")
with httpx.Client(proxy=proxy, timeout=self.timeout_threshold) as client:
response = client.get(self.url, headers=get_headers())
if response.status_code == 200:
@ -161,72 +56,17 @@ class ProxyFinder:
return (proxy, success2 and time2 <= self.timeout_threshold, avg_time, text1, source)
def _handle_interrupt(self, sig, frame):
print("\n[bold yellow]Received keyboard interrupt. Terminating...[/bold yellow]")
print("\n[red]Received keyboard interrupt. Terminating...")
self.shutdown_flag = True
sys.exit(0)
def find_fast_proxy(self) -> tuple:
saved_proxies, last_update = self.load_saved_proxies()
if saved_proxies:
print("[cyan]Testing saved proxy...[/cyan]")
for proxy_data in saved_proxies:
result = self.test_proxy((proxy_data['proxy'], 'cached'))
if result[1]:
return proxy_data['proxy'], result[3], result[2]
else:
print(f"[red]Saved proxy {proxy_data['proxy']} failed - response time: {result[2]:.2f}s[/red]")
proxies = self.fetch_proxies_from_sources()
if not proxies:
print("[bold red]No proxies fetched to test.[/bold red]")
return (None, None, None)
found_proxy = None
response_text = None
source = None
failed_count = 0
success_count = 0
#print(f"[cyan]Testing {len(proxies)} proxies...[/cyan]")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.test_proxy, p): p for p in proxies}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("[cyan]{task.fields[success]}[/cyan]/[red]{task.fields[failed]}[/red]"),
TimeRemainingColumn(),
) as progress:
task = progress.add_task(
"[cyan]Testing Proxies",
total=len(futures),
success=success_count,
failed=failed_count
)
for future in as_completed(futures):
if self.shutdown_flag:
break
try:
proxy, success, elapsed, response, proxy_source = future.result()
if success:
success_count += 1
print(f"[bold green]Found valid proxy:[/bold green] {proxy} ({elapsed:.2f}s)")
found_proxy = proxy
response_text = response
self.save_working_proxy(proxy, elapsed)
self.shutdown_flag = True
break
else:
failed_count += 1
except Exception:
failed_count += 1
progress.update(task, advance=1, success=success_count, failed=failed_count)
if not found_proxy:
print("[bold red]No working proxies found[/bold red]")
return (found_proxy, response_text, source)
def find_fast_proxy(self) -> str:
try:
proxy_config = config_manager.get("REQUESTS", "proxy")
if proxy_config and isinstance(proxy_config, dict) and 'http' in proxy_config:
print("[cyan]Using configured proxy from config.json...[/cyan]")
return proxy_config['http']
except Exception as e:
print(f"[red]Error getting configured proxy: {str(e)}[/red]")
return None

View File

@ -57,6 +57,7 @@
"REQUESTS": {
"verify": false,
"timeout": 20,
"max_retry": 8
"max_retry": 8,
"proxy": ""
}
}
}