Bump v3.0.2

This commit is contained in:
Lovi 2025-05-01 14:19:59 +02:00
parent ace77c6f7a
commit 9ed9b5afa5
13 changed files with 301 additions and 318 deletions

3
.gitignore vendored
View File

@ -52,4 +52,5 @@ cmd.txt
bot_config.json bot_config.json
scripts.json scripts.json
active_requests.json active_requests.json
domains.json domains.json
working_proxies.json

View File

@ -1,17 +1,16 @@
# 29.04.25 # 29.04.25
import re import re
import logging
# External libraries # External library
import httpx import httpx
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
# Internal utilities # Internal utilities
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.config_json import config_manager from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
# Variable # Variable
@ -19,83 +18,48 @@ MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource: class VideoSource:
def __init__(self, url: str): def __init__(self, proxy=None):
""" self.client = httpx.Client(headers=get_headers(), timeout=MAX_TIMEOUT, proxy=proxy)
Sets up the video source with the provided URL.
Parameters: def extractLinkHdPlayer(self, response):
- url (str): The URL of the video.
"""
self.url = url
self.iframe_url = None
self.m3u8_url = None
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'user-agent': get_userAgent(),
'referer': url
}
def extract_iframe_sources(self, response) -> str:
"""Extract iframe source from the page.""" """Extract iframe source from the page."""
try: soup = BeautifulSoup(response.content, 'html.parser')
soup = BeautifulSoup(response.content, 'html.parser') iframes = soup.find_all("iframe")
iframes = soup.select("iframe[data-lazy-src]") if iframes:
return iframes[0].get('data-lazy-src')
if not iframes: return None
iframes = soup.select("iframe[src]")
if iframes:
iframe_url = iframes[0].get('data-lazy-src') or iframes[0].get('src')
self.iframe_url = iframe_url
logging.info(f"Iframe URL found: {iframe_url}")
return iframe_url
logging.error("No iframes found in the page")
return None
except Exception as e:
logging.error(f"Error extracting iframe: {e}")
raise
def get_m3u8_url(self) -> str: def get_m3u8_url(self, page_url):
""" """
Extract m3u8 URL from hdPlayer page. Extract m3u8 URL from hdPlayer page.
""" """
try: try:
# First request to get iframe # Get the page content
response = httpx.get(self.url, headers=self.headers, timeout=MAX_TIMEOUT) response = self.client.get(page_url)
response.raise_for_status()
iframe_url = self.extract_iframe_sources(response) # Extract HDPlayer iframe URL
iframe_url = self.extractLinkHdPlayer(response)
if not iframe_url: if not iframe_url:
raise ValueError("No iframe URL found") return None
# Update headers for iframe request # Get HDPlayer page content
self.headers['referer'] = iframe_url response_hdplayer = self.client.get(iframe_url)
if response_hdplayer.status_code != 200:
# Request to iframe page return None
logging.info(f"Making request to hdPlayer: {iframe_url}")
response = httpx.get(iframe_url, headers=self.headers, timeout=MAX_TIMEOUT) soup = BeautifulSoup(response_hdplayer.text, 'html.parser')
response.raise_for_status()
# Find m3u8 URL in scripts
# Find m3u8 in the script for script in soup.find_all("script"):
soup = BeautifulSoup(response.text, 'html.parser') match = re.search(r'sources:\s*\[\{\s*file:\s*"([^"]+)"', script.text)
scripts = soup.find_all("script")
for script in scripts:
if not script.string:
continue
match = re.search(r'sources:\s*\[\{\s*file:\s*"([^"]+)"', script.string)
if match: if match:
self.m3u8_url = match.group(1) return match.group(1)
logging.info(f"Found m3u8 URL: {self.m3u8_url}")
return self.m3u8_url
logging.error("No m3u8 URL found in scripts")
return None return None
except Exception as e: except Exception as e:
logging.error(f"Error getting m3u8 URL: {e}") print(f"Error in HDPlayer: {str(e)}")
raise return None
finally:
self.client.close()

View File

@ -1,70 +0,0 @@
# 09.06.24
from urllib.parse import quote_plus
# External library
from rich.console import Console
from rich.prompt import Prompt
# Internal utilities
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Logic class
from .site import title_search, media_search_manager, table_show_manager
from .film import download_film
# Variable
indice = 4
_useFor = "film"
_priority = 0
_engineDownload = "mp4"
msg = Prompt()
console = Console()
def process_search_result(select_title):
"""
Handles the search result and initiates the download for either a film or series.
"""
download_film(select_title)
def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None):
"""
Main function of the application for search.
Parameters:
string_to_search (str, optional): String to search for
get_onylDatabase (bool, optional): If True, return only the database object
direct_item (dict, optional): Direct item to process (bypass search)
"""
if direct_item:
select_title = MediaItem(**direct_item)
process_search_result(select_title)
return
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{site_constant.SITE_NAME}").strip()
# Search on database
len_database = title_search(quote_plus(string_to_search))
## If only the database is needed, return the manager
if get_onlyDatabase:
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
process_search_result(select_title)
else:
# If no results are found, ask again
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
search()

View File

@ -1,64 +0,0 @@
# 03.07.24
import os
# External library
from rich.console import Console
# Internal utilities
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import MP4_downloader
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Player
from StreamingCommunity.Api.Player.mixdrop import VideoSource
# Variable
console = Console()
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided obj.
Parameters:
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
Return:
- str: output path
"""
start_message()
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n")
# Setup api manger
video_source = VideoSource(select_title.url)
src_mp4 = video_source.get_playlist()
print(src_mp4)
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name) +".mp4"
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Start downloading
path, kill_handler = MP4_downloader(
url=src_mp4,
path=mp4_path,
headers_= {
'Connection': 'keep-alive',
'Origin': 'https://mixdrop.sb',
'Range': 'bytes=0-',
'Referer': 'https://mixdrop.sb/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 OPR/118.0.0.0',
}
)
return path, kill_handler

View File

@ -1,81 +0,0 @@
# 03.07.24
import sys
# External libraries
import httpx
from bs4 import BeautifulSoup
from rich.console import Console
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
console = Console()
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
def title_search(query: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- query (str): The query to search for.
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
search_url = f"{site_constant.FULL_URL}/?s={query}"
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
response = httpx.get(
search_url,
headers={'user-agent': get_userAgent()},
timeout=max_timeout,
follow_redirects=True,
verify=False
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
for card in soup.find_all("div", class_=["card", "mp-post", "horizontal"]):
try:
title_tag = card.find("h3", class_="card-title").find("a")
url = title_tag.get("href")
title = title_tag.get_text(strip=True)
title_info = {
'name': title,
'url': url,
'type': 'film'
}
media_search_manager.add_media(title_info)
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -7,6 +7,7 @@ from rich.prompt import Prompt
# Internal utilities # Internal utilities
from StreamingCommunity.Api.Template import get_select_title from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Lib.Proxies.proxy import ProxyFinder
from StreamingCommunity.Api.Template.config_loader import site_constant from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
@ -20,7 +21,7 @@ from .series import download_series
# Variable # Variable
indice = 8 indice = 8
_useFor = "film_serie" _useFor = "film_serie"
_priority = 0 _priority = 10 # !!! MOLTO LENTO
_engineDownload = "hls" _engineDownload = "hls"
msg = Prompt() msg = Prompt()
@ -35,7 +36,7 @@ def get_user_input(string_to_search: str = None):
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip() string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
return string_to_search return string_to_search
def process_search_result(select_title, selections=None): def process_search_result(select_title, selections=None, proxy=None):
""" """
Handles the search result and initiates the download for either a film or series. Handles the search result and initiates the download for either a film or series.
@ -52,10 +53,10 @@ def process_search_result(select_title, selections=None):
season_selection = selections.get('season') season_selection = selections.get('season')
episode_selection = selections.get('episode') episode_selection = selections.get('episode')
download_series(select_title, season_selection, episode_selection) download_series(select_title, season_selection, episode_selection, proxy)
else: else:
download_film(select_title) download_film(select_title, proxy)
def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None, selections: dict = None): def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None, selections: dict = None):
""" """
@ -70,13 +71,15 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
""" """
if direct_item: if direct_item:
select_title = MediaItem(**direct_item) select_title = MediaItem(**direct_item)
process_search_result(select_title, selections) process_search_result(select_title, selections) # DONT SUPPORT PROXY FOR NOW
return return
if string_to_search is None: if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip() string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
len_database = title_search(string_to_search) finder = ProxyFinder(url=f"{site_constant.FULL_URL}/serie/euphoria/")
proxy, response_serie, _ = finder.find_fast_proxy()
len_database = title_search(string_to_search, [proxy, response_serie])
# If only the database is needed, return the manager # If only the database is needed, return the manager
if get_onlyDatabase: if get_onlyDatabase:
@ -84,7 +87,7 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
if len_database > 0: if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager) select_title = get_select_title(table_show_manager, media_search_manager)
process_search_result(select_title, selections) process_search_result(select_title, selections, proxy)
else: else:
# If no results are found, ask again # If no results are found, ask again

View File

@ -26,7 +26,7 @@ from StreamingCommunity.Api.Player.hdplayer import VideoSource
console = Console() console = Console()
def download_film(select_title: MediaItem) -> str: def download_film(select_title: MediaItem, proxy) -> str:
""" """
Downloads a film using the provided film ID, title name, and domain. Downloads a film using the provided film ID, title name, and domain.
@ -41,8 +41,8 @@ def download_film(select_title: MediaItem) -> str:
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n") console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n")
# Get master playlists # Get master playlists
video_source = VideoSource(select_title.url) video_source = VideoSource(proxy)
master_playlist = video_source.get_m3u8_url() master_playlist = video_source.get_m3u8_url(select_title.url)
# Define the filename and path for the downloaded film # Define the filename and path for the downloaded film
title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4" title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4"

View File

@ -36,7 +36,7 @@ msg = Prompt()
console = Console() console = Console()
def download_video(index_season_selected: int, index_episode_selected: int, scrape_serie: GetSerieInfo) -> Tuple[str,bool]: def download_video(index_season_selected: int, index_episode_selected: int, scrape_serie: GetSerieInfo, proxy=None) -> Tuple[str,bool]:
""" """
Downloads a specific episode from a specified season. Downloads a specific episode from a specified season.
@ -60,8 +60,8 @@ def download_video(index_season_selected: int, index_episode_selected: int, scra
mp4_path = os.path.join(site_constant.SERIES_FOLDER, scrape_serie.series_name, f"S{index_season_selected}") mp4_path = os.path.join(site_constant.SERIES_FOLDER, scrape_serie.series_name, f"S{index_season_selected}")
# Retrieve scws and if available master playlist # Retrieve scws and if available master playlist
video_source = VideoSource(obj_episode.url) video_source = VideoSource(proxy)
master_playlist = video_source.get_m3u8_url() master_playlist = video_source.get_m3u8_url(obj_episode.url)
# Download the episode # Download the episode
r_proc = HLS_Downloader( r_proc = HLS_Downloader(
@ -76,7 +76,7 @@ def download_video(index_season_selected: int, index_episode_selected: int, scra
return r_proc['path'], r_proc['stopped'] return r_proc['path'], r_proc['stopped']
def download_episode(index_season_selected: int, scrape_serie: GetSerieInfo, download_all: bool = False, episode_selection: str = None) -> None: def download_episode(index_season_selected: int, scrape_serie: GetSerieInfo, download_all: bool = False, episode_selection: str = None, proxy = None) -> None:
""" """
Handle downloading episodes for a specific season. Handle downloading episodes for a specific season.
@ -92,7 +92,7 @@ def download_episode(index_season_selected: int, scrape_serie: GetSerieInfo, dow
if download_all: if download_all:
for i_episode in range(1, episodes_count + 1): for i_episode in range(1, episodes_count + 1):
path, stopped = download_video(index_season_selected, i_episode, scrape_serie) path, stopped = download_video(index_season_selected, i_episode, scrape_serie, proxy)
if stopped: if stopped:
break break
@ -113,12 +113,12 @@ def download_episode(index_season_selected: int, scrape_serie: GetSerieInfo, dow
# Download selected episodes if not stopped # Download selected episodes if not stopped
for i_episode in list_episode_select: for i_episode in list_episode_select:
path, stopped = download_video(index_season_selected, i_episode, scrape_serie) path, stopped = download_video(index_season_selected, i_episode, scrape_serie, proxy)
if stopped: if stopped:
break break
def download_series(select_season: MediaItem, season_selection: str = None, episode_selection: str = None) -> None: def download_series(select_season: MediaItem, season_selection: str = None, episode_selection: str = None, proxy = None) -> None:
""" """
Handle downloading a complete series. Handle downloading a complete series.
@ -127,7 +127,7 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
- season_selection (str, optional): Pre-defined season selection that bypasses manual input - season_selection (str, optional): Pre-defined season selection that bypasses manual input
- episode_selection (str, optional): Pre-defined episode selection that bypasses manual input - episode_selection (str, optional): Pre-defined episode selection that bypasses manual input
""" """
scrape_serie = GetSerieInfo(select_season.url) scrape_serie = GetSerieInfo(select_season.url, proxy)
# Get total number of seasons # Get total number of seasons
seasons_count = scrape_serie.getNumberSeason() seasons_count = scrape_serie.getNumberSeason()
@ -154,7 +154,7 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
for i_season in list_season_select: for i_season in list_season_select:
if len(list_season_select) > 1 or index_season_selected == "*": if len(list_season_select) > 1 or index_season_selected == "*":
# Download all episodes if multiple seasons are selected or if '*' is used # Download all episodes if multiple seasons are selected or if '*' is used
download_episode(i_season, scrape_serie, download_all=True) download_episode(i_season, scrape_serie, download_all=True, proxy=proxy)
else: else:
# Otherwise, let the user select specific episodes for the single season # Otherwise, let the user select specific episodes for the single season
download_episode(i_season, scrape_serie, download_all=False, episode_selection=episode_selection) download_episode(i_season, scrape_serie, download_all=False, episode_selection=episode_selection, proxy=proxy)

View File

@ -38,7 +38,7 @@ def extract_nonce(response_) -> str:
return "" return ""
def title_search(query: str) -> int: def title_search(query: str, additionalData: list) -> int:
""" """
Search for titles based on a search query. Search for titles based on a search query.
@ -51,16 +51,12 @@ def title_search(query: str) -> int:
media_search_manager.clear() media_search_manager.clear()
table_show_manager.clear() table_show_manager.clear()
proxy, response_serie = additionalData
search_url = f"{site_constant.FULL_URL}/wp-admin/admin-ajax.php" search_url = f"{site_constant.FULL_URL}/wp-admin/admin-ajax.php"
console.print(f"[cyan]Search url: [yellow]{search_url}") console.print(f"[cyan]Search url: [yellow]{search_url}")
try: try:
nonce_response = httpx.get( _wpnonce = extract_nonce(response_serie)
"https://www.streamingwatch.org/serie/euphoria/",
timeout=max_timeout,
headers={'user-agent': get_userAgent()}
)
_wpnonce = extract_nonce(nonce_response)
if not _wpnonce: if not _wpnonce:
console.print("[red]Error: Failed to extract nonce") console.print("[red]Error: Failed to extract nonce")
@ -79,7 +75,8 @@ def title_search(query: str) -> int:
'user-agent': get_userAgent() 'user-agent': get_userAgent()
}, },
data=data, data=data,
timeout=max_timeout timeout=max_timeout,
proxy=proxy
) )
response.raise_for_status() response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser') soup = BeautifulSoup(response.text, 'html.parser')

View File

@ -19,19 +19,20 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
class GetSerieInfo: class GetSerieInfo:
def __init__(self, url, media_id: int = None, series_name: str = None): def __init__(self, url, proxy: str = None):
self.headers = {'user-agent': get_userAgent()} self.headers = {'user-agent': get_userAgent()}
self.url = url self.url = url
self.media_id = media_id
self.seasons_manager = SeasonManager() self.seasons_manager = SeasonManager()
self.series_name = series_name self.series_name = None
self.client = httpx.Client(headers=self.headers, proxy=proxy, timeout=max_timeout)
def collect_info_season(self) -> None: def collect_info_season(self) -> None:
""" """
Retrieve all series information including episodes and seasons. Retrieve all series information including episodes and seasons.
""" """
try: try:
response = httpx.get(self.url, headers=self.headers, timeout=max_timeout) response = self.client.get(self.url)
response.raise_for_status() response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser') soup = BeautifulSoup(response.text, 'html.parser')

View File

@ -0,0 +1,232 @@
# 29.04.25
import os
import sys
import time
import json
import signal
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
# External library
import httpx
from rich import print
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeRemainingColumn
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_headers
# Variable
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class ProxyFinder:
def __init__(self, url, timeout_threshold: float = 7.0, max_proxies: int = 150, max_workers: int = 12):
self.url = url
self.timeout_threshold = timeout_threshold
self.max_proxies = max_proxies
self.max_workers = max_workers
self.found_proxy = None
self.shutdown_flag = False
self.json_file = os.path.join(os.path.dirname(__file__), 'working_proxies.json')
signal.signal(signal.SIGINT, self._handle_interrupt)
def load_saved_proxies(self) -> tuple:
"""Load saved proxies if they're not expired (2 hours old)"""
try:
if not os.path.exists(self.json_file):
return None, None
with open(self.json_file, 'r') as f:
data = json.load(f)
if not data.get('proxies') or not data.get('last_update'):
return None, None
last_update = datetime.fromisoformat(data['last_update'])
if datetime.now() - last_update > timedelta(hours=2):
return None, None
return data['proxies'], last_update
except Exception:
return None, None
def save_working_proxy(self, proxy: str, response_time: float):
"""Save working proxy to JSON file"""
data = {
'proxies': [{'proxy': proxy, 'response_time': response_time}],
'last_update': datetime.now().isoformat()
}
try:
with open(self.json_file, 'w') as f:
json.dump(data, f, indent=4)
except Exception as e:
print(f"[bold red]Error saving proxy:[/bold red] {str(e)}")
def fetch_geonode(self) -> list:
proxies = []
try:
response = httpx.get(
"https://proxylist.geonode.com/api/proxy-list?protocols=http%2Chttps&limit=100&page=1&sort_by=speed&sort_type=asc",
headers=get_headers(),
timeout=MAX_TIMEOUT
)
data = response.json()
proxies = [(f"http://{p['ip']}:{p['port']}", "Geonode") for p in data.get('data', [])]
except Exception as e:
print(f"[bold red]Error in Geonode:[/bold red] {str(e)[:100]}")
return proxies
def fetch_proxyscrape(self) -> list:
proxies = []
try:
response = httpx.get(
"https://api.proxyscrape.com/v4/free-proxy-list/get?request=get_proxies&protocol=http&skip=0&proxy_format=protocolipport&format=json&limit=100&timeout=1000",
headers=get_headers(),
timeout=MAX_TIMEOUT
)
data = response.json()
if 'proxies' in data and isinstance(data['proxies'], list):
proxies = [(proxy_data['proxy'], "ProxyScrape") for proxy_data in data['proxies'] if 'proxy' in proxy_data]
except Exception as e:
print(f"[bold red]Error in ProxyScrape:[/bold red] {str(e)[:100]}")
return proxies
def fetch_proxies_from_sources(self) -> list:
print("[cyan]Fetching proxies from sources...[/cyan]")
with ThreadPoolExecutor(max_workers=3) as executor:
proxyscrape_future = executor.submit(self.fetch_proxyscrape)
geonode_future = executor.submit(self.fetch_geonode)
sources_proxies = {}
try:
proxyscrape_result = proxyscrape_future.result()
sources_proxies["proxyscrape"] = proxyscrape_result[:int(self.max_proxies/2)]
except Exception as e:
print(f"[bold red]Error fetching from proxyscrape:[/bold red] {str(e)[:100]}")
sources_proxies["proxyscrape"] = []
try:
geonode_result = geonode_future.result()
sources_proxies["geonode"] = geonode_result[:int(self.max_proxies/2)]
except Exception as e:
print(f"[bold red]Error fetching from geonode:[/bold red] {str(e)[:100]}")
sources_proxies["geonode"] = []
merged_proxies = []
if "proxyscrape" in sources_proxies:
merged_proxies.extend(sources_proxies["proxyscrape"])
if "geonode" in sources_proxies:
merged_proxies.extend(sources_proxies["geonode"])
proxy_list = merged_proxies[:self.max_proxies]
return proxy_list
def _test_single_request(self, proxy_info: tuple) -> tuple:
proxy, source = proxy_info
try:
start = time.time()
with httpx.Client(proxy=proxy, timeout=self.timeout_threshold) as client:
response = client.get(self.url, headers=get_headers())
if response.status_code == 200:
return (True, time.time() - start, response, source)
except Exception:
pass
return (False, self.timeout_threshold + 1, None, source)
def test_proxy(self, proxy_info: tuple) -> tuple:
proxy, source = proxy_info
if self.shutdown_flag:
return (proxy, False, 0, None, source)
success1, time1, text1, source = self._test_single_request(proxy_info)
if not success1 or time1 > self.timeout_threshold:
return (proxy, False, time1, None, source)
success2, time2, _, source = self._test_single_request(proxy_info)
avg_time = (time1 + time2) / 2
return (proxy, success2 and time2 <= self.timeout_threshold, avg_time, text1, source)
def _handle_interrupt(self, sig, frame):
print("\n[bold yellow]Received keyboard interrupt. Terminating...[/bold yellow]")
self.shutdown_flag = True
sys.exit(0)
def find_fast_proxy(self) -> tuple:
saved_proxies, last_update = self.load_saved_proxies()
if saved_proxies:
print("[cyan]Testing saved proxy...[/cyan]")
for proxy_data in saved_proxies:
result = self.test_proxy((proxy_data['proxy'], 'cached'))
if result[1]:
return proxy_data['proxy'], result[3], result[2]
else:
print(f"[red]Saved proxy {proxy_data['proxy']} failed - response time: {result[2]:.2f}s[/red]")
proxies = self.fetch_proxies_from_sources()
if not proxies:
print("[bold red]No proxies fetched to test.[/bold red]")
return (None, None, None)
found_proxy = None
response_text = None
source = None
failed_count = 0
success_count = 0
#print(f"[cyan]Testing {len(proxies)} proxies...[/cyan]")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.test_proxy, p): p for p in proxies}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("[cyan]{task.fields[success]}[/cyan]/[red]{task.fields[failed]}[/red]"),
TimeRemainingColumn(),
) as progress:
task = progress.add_task(
"[cyan]Testing Proxies",
total=len(futures),
success=success_count,
failed=failed_count
)
for future in as_completed(futures):
if self.shutdown_flag:
break
try:
proxy, success, elapsed, response, proxy_source = future.result()
if success:
success_count += 1
print(f"[bold green]Found valid proxy:[/bold green] {proxy} ({elapsed:.2f}s)")
found_proxy = proxy
response_text = response
self.save_working_proxy(proxy, elapsed)
self.shutdown_flag = True
break
else:
failed_count += 1
except Exception:
failed_count += 1
progress.update(task, advance=1, success=success_count, failed=failed_count)
if not found_proxy:
print("[bold red]No working proxies found[/bold red]")
return (found_proxy, response_text, source)

View File

@ -1,5 +1,5 @@
__title__ = 'StreamingCommunity' __title__ = 'StreamingCommunity'
__version__ = '3.0.1' __version__ = '3.0.2'
__author__ = 'Arrowar' __author__ = 'Arrowar'
__description__ = 'A command-line program to download film' __description__ = 'A command-line program to download film'
__copyright__ = 'Copyright 2024' __copyright__ = 'Copyright 2024'

View File

@ -10,7 +10,7 @@ with open(os.path.join(os.path.dirname(__file__), "requirements.txt"), "r", enco
setup( setup(
name="StreamingCommunity", name="StreamingCommunity",
version="3.0.1", version="3.0.2",
long_description=read_readme(), long_description=read_readme(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
author="Lovi-0", author="Lovi-0",