Fix proxy __init__

This commit is contained in:
Lovi 2025-05-18 14:09:03 +02:00
parent 8de72fabfc
commit 7f7f35612e
9 changed files with 46 additions and 46 deletions

View File

@ -1,6 +1,6 @@
# 01.03.24
import sys
import time
import logging
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
@ -24,7 +24,7 @@ console = Console()
class VideoSource:
def __init__(self, url: str, is_series: bool, media_id: int = None):
def __init__(self, url: str, is_series: bool, media_id: int = None, proxy: str = None):
"""
Initialize video source for streaming site.
@ -35,6 +35,7 @@ class VideoSource:
"""
self.headers = {'user-agent': get_userAgent()}
self.url = url
self.proxy = proxy
self.is_series = is_series
self.media_id = media_id
self.iframe_src = None
@ -55,7 +56,7 @@ class VideoSource:
}
try:
response = httpx.get(f"{self.url}/iframe/{self.media_id}", params=params, timeout=MAX_TIMEOUT)
response = httpx.get(f"{self.url}/iframe/{self.media_id}", headers=self.headers, params=params, timeout=MAX_TIMEOUT, proxy=self.proxy)
response.raise_for_status()
# Parse response with BeautifulSoup to get iframe source
@ -81,6 +82,7 @@ class VideoSource:
self.window_video = WindowVideo(converter.get('video'))
self.window_streams = StreamsCollection(converter.get('streams'))
self.window_parameter = WindowParameter(converter.get('masterPlaylist'))
time.sleep(0.5)
except Exception as e:
logging.error(f"Error parsing script: {e}")

View File

@ -24,7 +24,7 @@ indice = 3
_useFor = "Torrent"
_priority = 0
_engineDownload = "Torrent"
_deprecate = False
_deprecate = True
console = Console()
msg = Prompt()

View File

@ -43,40 +43,38 @@ class ScrapeSerieAnime:
def get_count_episodes(self):
"""
Retrieve total number of episodes for the selected media.
This includes partial episodes (like episode 6.5).
Returns:
int: Total episode count
int: Total episode count including partial episodes
"""
try:
response = httpx.get(
url=f"{self.url}/info_api/{self.media_id}/",
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Parse JSON response and return episode count
return response.json()["episodes_count"]
except Exception as e:
logging.error(f"Error fetching episode count: {e}")
return None
if self.episodes_cache is None:
self._fetch_all_episodes()
if self.episodes_cache:
return len(self.episodes_cache)
return None
def _fetch_all_episodes(self):
"""
Fetch all episodes data at once and cache it
"""
try:
all_episodes = []
count = self.get_count_episodes()
if not count:
return
# Get initial episode count
response = httpx.get(
url=f"{self.url}/info_api/{self.media_id}/",
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
initial_count = response.json()["episodes_count"]
# Fetch episodes
all_episodes = []
start_range = 1
while start_range <= count:
end_range = min(start_range + 119, count)
# Fetch episodes in chunks
while start_range <= initial_count:
end_range = min(start_range + 119, initial_count)
response = httpx.get(
url=f"{self.url}/info_api/{self.media_id}/1",

View File

@ -33,6 +33,7 @@ _deprecate = False
msg = Prompt()
console = Console()
proxy = None
def get_user_input(string_to_search: str = None):
@ -101,8 +102,8 @@ def process_search_result(select_title, selections=None, proxy=None):
download_series(select_title, season_selection, episode_selection, proxy)
else: # 'movie' or other types assumed to be film-like
download_film(select_title, proxy) # Assuming download_film might also need proxy
else:
download_film(select_title, proxy)
def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None, selections: dict = None):
"""
@ -122,12 +123,12 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
if direct_item:
select_title_obj = MediaItem(**direct_item)
# Note: If direct_item processing requires a proxy, it should be fetched here.
# For now, assuming process_search_result handles proxy=None if not provided.
finder = ProxyFinder(site_constant.FULL_URL) # Get proxy for direct item too
proxy = finder.find_fast_proxy()
process_search_result(select_title_obj, selections, proxy)
return
# Check proxy if not already set
finder = ProxyFinder(site_constant.FULL_URL)
proxy = finder.find_fast_proxy()
actual_search_query = get_user_input(string_to_search)
@ -136,12 +137,9 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
if bot:
if actual_search_query is None: # Specifically for timeout from bot.ask or failed restart
bot.send_message("Search term not provided or operation cancelled. Returning.", None)
# If not bot, or empty string, just return; will likely lead to no results or previous menu.
return
# Perform search on the database using the obtained query
finder = ProxyFinder(site_constant.FULL_URL)
proxy = finder.find_fast_proxy()
len_database = title_search(actual_search_query, proxy)
# If only the database object (media_search_manager populated by title_search) is needed
@ -149,12 +147,10 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
return media_search_manager
if len_database > 0:
# *** THE FIX IS HERE: Added len_database as the third argument ***
select_title = get_select_title(table_show_manager, media_search_manager, len_database)
process_search_result(select_title, selections, proxy) # Pass proxy
process_search_result(select_title, selections, proxy)
else:
# No results found
no_results_message = f"No results found for: '{actual_search_query}'"
if bot:
bot.send_message(no_results_message, None)

View File

@ -27,7 +27,7 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource
console = Console()
def download_film(select_title: MediaItem) -> str:
def download_film(select_title: MediaItem, proxy: str = None) -> str:
"""
Downloads a film using the provided film ID, title name, and domain.
@ -55,7 +55,7 @@ def download_film(select_title: MediaItem) -> str:
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n")
# Init class
video_source = VideoSource(f"{site_constant.FULL_URL}/it", False, select_title.id)
video_source = VideoSource(f"{site_constant.FULL_URL}/it", False, select_title.id, proxy)
# Retrieve scws and if available master playlist
video_source.get_iframe(select_title.id)

View File

@ -154,7 +154,7 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
start_message()
# Init class
video_source = VideoSource(f"{site_constant.FULL_URL}/it", True, select_season.id)
video_source = VideoSource(f"{site_constant.FULL_URL}/it", True, select_season.id, proxy)
scrape_serie = GetSerieInfo(f"{site_constant.FULL_URL}/it", select_season.id, select_season.slug, proxy)
# Collect information about season

View File

@ -27,6 +27,7 @@ _deprecate = False
msg = Prompt()
console = Console()
proxy = None
def get_user_input(string_to_search: str = None):
@ -74,12 +75,15 @@ def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_
select_title = MediaItem(**direct_item)
process_search_result(select_title, selections) # DONT SUPPORT PROXY FOR NOW
return
# Check proxy if not already set
finder = ProxyFinder(site_constant.FULL_URL)
proxy = finder.find_fast_proxy()
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
finder = ProxyFinder(url=f"{site_constant.FULL_URL}/serie/euphoria/")
proxy = finder.find_fast_proxy()
# Perform search on the database using the obtained query
len_database = title_search(string_to_search, proxy)
# If only the database is needed, return the manager

View File

@ -32,7 +32,7 @@ class ProxyFinder:
proxy, source = proxy_info
try:
start = time.time()
print(f"[yellow]Testing proxy...")
print(f"[yellow]Testing proxy for URL: {self.url}...")
with httpx.Client(proxy=proxy, timeout=self.timeout_threshold) as client:
response = client.get(self.url, headers=get_headers())

View File

@ -60,4 +60,4 @@
"max_retry": 8,
"proxy": ""
}
}
}