Bump v3.0.2

* Add api "StreamingWatch"

* Add hdplayer

---------

Co-authored-by: Lovi <62809003+Lovi-0@users.noreply.github.com>
This commit is contained in:
None 2025-05-01 14:22:47 +02:00 committed by GitHub
parent bd922afde2
commit 782b03d248
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1040 additions and 177 deletions

3
.gitignore vendored
View File

@ -52,4 +52,5 @@ cmd.txt
bot_config.json
scripts.json
active_requests.json
domains.json
domains.json
working_proxies.json

View File

@ -797,13 +797,27 @@ Contributions are welcome! Steps:
4. Push to branch (`git push origin feature/AmazingFeature`)
5. Open Pull Request
# Disclaimer
This software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose, and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages, or other liability, whether in an action of contract, tort, or otherwise, arising from, out of, or in connection with the software or the use or other dealings in the software.
## Useful Project
### 🎯 [Unit3Dup](https://github.com/31December99/Unit3Dup)
Bot in Python per la generazione e l'upload automatico di torrent su tracker basati su Unit3D.
### 🇮🇹 [MammaMia](https://github.com/UrloMythus/MammaMia)
Addon per Stremio che consente lo streaming HTTPS di film, serie, anime e TV in diretta in lingua italiana.
### 🧩 [streamingcommunity-unofficialapi](https://github.com/Blu-Tiger/streamingcommunity-unofficialapi)
API non ufficiale per accedere ai contenuti del sito italiano StreamingCommunity.
### 🎥 [stream-buddy](https://github.com/Bbalduzz/stream-buddy)
Tool per guardare o scaricare film dalla piattaforma StreamingCommunity.
## Contributors
<a href="https://github.com/Arrowar/StreamingCommunity/graphs/contributors" alt="View Contributors">
<img src="https://contrib.rocks/image?repo=Arrowar/StreamingCommunity&max=1000&columns=10" alt="Contributors" />
</a>
</a>

View File

@ -0,0 +1,65 @@
# 29.04.25
import re
# External library
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.config_json import config_manager
# Variable
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
def __init__(self, proxy=None):
self.client = httpx.Client(headers=get_headers(), timeout=MAX_TIMEOUT, proxy=proxy)
def extractLinkHdPlayer(self, response):
"""Extract iframe source from the page."""
soup = BeautifulSoup(response.content, 'html.parser')
iframes = soup.find_all("iframe")
if iframes:
return iframes[0].get('data-lazy-src')
return None
def get_m3u8_url(self, page_url):
"""
Extract m3u8 URL from hdPlayer page.
"""
try:
# Get the page content
response = self.client.get(page_url)
# Extract HDPlayer iframe URL
iframe_url = self.extractLinkHdPlayer(response)
if not iframe_url:
return None
# Get HDPlayer page content
response_hdplayer = self.client.get(iframe_url)
if response_hdplayer.status_code != 200:
return None
soup = BeautifulSoup(response_hdplayer.text, 'html.parser')
# Find m3u8 URL in scripts
for script in soup.find_all("script"):
match = re.search(r'sources:\s*\[\{\s*file:\s*"([^"]+)"', script.text)
if match:
return match.group(1)
return None
except Exception as e:
print(f"Error in HDPlayer: {str(e)}")
return None
finally:
self.client.close()

View File

@ -1,140 +0,0 @@
# 05.07.24
import re
import logging
# External libraries
import httpx
import jsbeautifier
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
# Variable
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
def __init__(self, url: str):
"""
Sets up the video source with the provided URL.
Parameters:
- url (str): The URL of the video.
"""
self.url = url
self.redirect_url = None
self.maxstream_url = None
self.m3u8_url = None
self.headers = {'user-agent': get_userAgent()}
def get_redirect_url(self):
"""
Sends a request to the initial URL and extracts the redirect URL.
"""
try:
response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract the redirect URL from the HTML
soup = BeautifulSoup(response.text, "html.parser")
self.redirect_url = soup.find("div", id="iframen1").get("data-src")
logging.info(f"Redirect URL: {self.redirect_url}")
return self.redirect_url
except Exception as e:
logging.error(f"Error parsing HTML: {e}")
raise
def get_maxstream_url(self):
"""
Sends a request to the redirect URL and extracts the Maxstream URL.
"""
try:
response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
# Extract the Maxstream URL from the HTML
soup = BeautifulSoup(response.text, "html.parser")
maxstream_url = soup.find("a")
if maxstream_url is None:
# If no anchor tag is found, try the alternative method
logging.warning("Anchor tag not found. Trying the alternative method.")
headers = {
'origin': 'https://stayonline.pro',
'user-agent': get_userAgent(),
'x-requested-with': 'XMLHttpRequest',
}
# Make request to stayonline api
data = {'id': self.redirect_url.split("/")[-2], 'ref': ''}
response = httpx.post('https://stayonline.pro/ajax/linkEmbedView.php', headers=headers, data=data)
response.raise_for_status()
uprot_url = response.json()['data']['value']
# Retry getting maxtstream url
response = httpx.get(uprot_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
maxstream_url = soup.find("a").get("href")
else:
maxstream_url = maxstream_url.get("href")
self.maxstream_url = maxstream_url
logging.info(f"Maxstream URL: {self.maxstream_url}")
return self.maxstream_url
except Exception as e:
logging.error(f"Error during the request: {e}")
raise
def get_m3u8_url(self):
"""
Sends a request to the Maxstream URL and extracts the .m3u8 file URL.
"""
try:
response = httpx.get(self.maxstream_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Iterate over all script tags in the HTML
for script in soup.find_all("script"):
if "eval(function(p,a,c,k,e,d)" in script.text:
# Execute the script using
data_js = jsbeautifier.beautify(script.text)
# Extract the .m3u8 URL from the script's output
match = re.search(r'sources:\s*\[\{\s*src:\s*"([^"]+)"', data_js)
if match:
self.m3u8_url = match.group(1)
logging.info(f"M3U8 URL: {self.m3u8_url}")
break
else:
logging.error("Failed to find M3U8 URL: No match found")
return self.m3u8_url
except Exception as e:
logging.error(f"Error executing the Node.js script: {e}")
raise
def get_playlist(self):
"""
Executes the entire flow to obtain the final .m3u8 file URL.
"""
self.get_redirect_url()
self.get_maxstream_url()
return self.get_m3u8_url()

View File

@ -0,0 +1,145 @@
# 05.07.24
import re
import logging
# External libraries
import httpx
import jsbeautifier
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
# Variable
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
STAYONLINE_BASE_URL = "https://stayonline.pro"
MIXDROP_BASE_URL = "https://mixdrop.sb"
def __init__(self, url: str):
self.url = url
self.redirect_url: str | None = None
self._init_headers()
def _init_headers(self) -> None:
"""Initialize the base headers used for requests."""
self.headers = {
'origin': self.STAYONLINE_BASE_URL,
'user-agent': get_userAgent(),
}
def _get_mixdrop_headers(self) -> dict:
"""Get headers specifically for MixDrop requests."""
return {
'referer': 'https://mixdrop.club/',
'user-agent': get_userAgent()
}
def get_redirect_url(self) -> str:
"""Extract the stayonline redirect URL from the initial page."""
try:
response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for link in soup.find_all('a'):
href = link.get('href')
if href and 'stayonline' in href:
self.redirect_url = href
logging.info(f"Redirect URL: {self.redirect_url}")
return self.redirect_url
raise ValueError("Stayonline URL not found")
except Exception as e:
logging.error(f"Error getting redirect URL: {e}")
raise
def get_link_id(self) -> str:
"""Extract the link ID from the redirect page."""
if not self.redirect_url:
raise ValueError("Redirect URL not set. Call get_redirect_url first.")
try:
response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for script in soup.find_all('script'):
match = re.search(r'var\s+linkId\s*=\s*"([^"]+)"', script.text)
if match:
return match.group(1)
raise ValueError("LinkId not found")
except Exception as e:
logging.error(f"Error getting link ID: {e}")
raise
def get_final_url(self, link_id: str) -> str:
"""Get the final URL using the link ID."""
try:
self.headers['referer'] = f'{self.STAYONLINE_BASE_URL}/l/{link_id}/'
data = {'id': link_id, 'ref': ''}
response = httpx.post(f'{self.STAYONLINE_BASE_URL}/ajax/linkView.php', headers=self.headers, data=data, timeout=MAX_TIMEOUT)
response.raise_for_status()
return response.json()['data']['value']
except Exception as e:
logging.error(f"Error getting final URL: {e}")
raise
def _extract_video_id(self, final_url: str) -> str:
"""Extract video ID from the final URL."""
parts = final_url.split('/')
if len(parts) < 5:
raise ValueError("Invalid final URL format")
return parts[4]
def _extract_delivery_url(self, script_text: str) -> str:
"""Extract delivery URL from beautified JavaScript."""
beautified = jsbeautifier.beautify(script_text)
for line in beautified.splitlines():
if 'MDCore.wurl' in line:
url = line.split('= ')[1].strip('"').strip(';')
return f"https:{url}"
raise ValueError("Delivery URL not found in script")
def get_playlist(self) -> str:
"""
Execute the entire flow to obtain the final video URL.
Returns:
str: The final video delivery URL
"""
self.get_redirect_url()
link_id = self.get_link_id()
final_url = self.get_final_url(link_id)
video_id = self._extract_video_id(final_url)
response = httpx.get(
f'{self.MIXDROP_BASE_URL}/e/{video_id}',
headers=self._get_mixdrop_headers(),
timeout=MAX_TIMEOUT
)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
script_text = next(
(script.text for script in soup.find_all('script')
if "eval" in str(script.text)),
None
)
if not script_text:
raise ValueError("Required script not found")
return self._extract_delivery_url(script_text).replace('"', '')

View File

@ -52,7 +52,7 @@ def title_search(query: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Create soup and find table

View File

@ -55,7 +55,7 @@ def title_search(query: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
if site_constant.TELEGRAM_BOT:
bot.send_message(f"ERRORE\n\nErrore nella richiesta di ricerca:\n\n{e}", None)
return 0

View File

@ -119,7 +119,8 @@ def title_search(query: str) -> int:
process_results(response1.json()['records'], seen_titles, media_search_manager, choices)
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, livesearch error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Second API call - archivio
try:

View File

@ -78,7 +78,7 @@ def title_search(query: str) -> int:
)
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Create soup istance

View File

@ -53,7 +53,7 @@ def title_search(query: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Create soup and find table

View File

@ -54,7 +54,7 @@ def title_search(query: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Create soup and find table

View File

@ -26,7 +26,7 @@ console = Console()
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
MAX_THREADS = 4
MAX_THREADS = 12
def determine_media_type(title):
@ -134,7 +134,7 @@ def title_search(query: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
# Limit to only 15 results for performance

View File

@ -155,7 +155,7 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
# Init class
video_source = VideoSource(site_constant.FULL_URL, True, select_season.id)
scrape_serie = GetSerieInfo(site_constant.FULL_URL, select_season.id, select_season.name)
scrape_serie = GetSerieInfo(site_constant.FULL_URL, select_season.id, select_season.slug)
# Collect information about season
scrape_serie.getNumberSeason()
@ -219,4 +219,4 @@ def download_series(select_season: MediaItem, season_selection: str = None, epis
# Get script_id
script_id = TelegramSession.get_session()
if script_id != "unknown":
TelegramSession.deleteScriptId(script_id)
TelegramSession.deleteScriptId(script_id)

View File

@ -55,7 +55,7 @@ def title_search(query: str) -> int:
response.raise_for_status()
except Exception as e:
console.print(f"Site: {site_constant.SITE_NAME}, request search error: {e}")
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
if site_constant.TELEGRAM_BOT:
bot.send_message(f"ERRORE\n\nErrore nella richiesta di ricerca:\n\n{e}", None)
return 0

View File

@ -0,0 +1,95 @@
# 29.04.25
# External library
from rich.console import Console
from rich.prompt import Prompt
# Internal utilities
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Lib.Proxies.proxy import ProxyFinder
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Logic class
from .site import title_search, table_show_manager, media_search_manager
from .film import download_film
from .series import download_series
# Variable
indice = 8
_useFor = "film_serie"
_priority = 10 # !!! MOLTO LENTO
_engineDownload = "hls"
msg = Prompt()
console = Console()
def get_user_input(string_to_search: str = None):
"""
Asks the user to input a search term.
Handles both Telegram bot input and direct input.
"""
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
return string_to_search
def process_search_result(select_title, selections=None, proxy=None):
"""
Handles the search result and initiates the download for either a film or series.
Parameters:
select_title (MediaItem): The selected media item
selections (dict, optional): Dictionary containing selection inputs that bypass manual input
{'season': season_selection, 'episode': episode_selection}
"""
if select_title.type == 'tv':
season_selection = None
episode_selection = None
if selections:
season_selection = selections.get('season')
episode_selection = selections.get('episode')
download_series(select_title, season_selection, episode_selection, proxy)
else:
download_film(select_title, proxy)
def search(string_to_search: str = None, get_onlyDatabase: bool = False, direct_item: dict = None, selections: dict = None):
"""
Main function of the application for search.
Parameters:
string_to_search (str, optional): String to search for
get_onlyDatabase (bool, optional): If True, return only the database object
direct_item (dict, optional): Direct item to process (bypass search)
selections (dict, optional): Dictionary containing selection inputs that bypass manual input
{'season': season_selection, 'episode': episode_selection}
"""
if direct_item:
select_title = MediaItem(**direct_item)
process_search_result(select_title, selections) # DONT SUPPORT PROXY FOR NOW
return
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert a word to search in [green]{site_constant.SITE_NAME}").strip()
finder = ProxyFinder(url=f"{site_constant.FULL_URL}/serie/euphoria/")
proxy, response_serie, _ = finder.find_fast_proxy()
len_database = title_search(string_to_search, [proxy, response_serie])
# If only the database is needed, return the manager
if get_onlyDatabase:
return media_search_manager
if len_database > 0:
select_title = get_select_title(table_show_manager, media_search_manager)
process_search_result(select_title, selections, proxy)
else:
# If no results are found, ask again
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
search()

View File

@ -0,0 +1,61 @@
# 29.04.25
import os
# External library
from rich.console import Console
# Internal utilities
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import HLS_Downloader
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Player
from StreamingCommunity.Api.Player.hdplayer import VideoSource
# Variable
console = Console()
def download_film(select_title: MediaItem, proxy) -> str:
"""
Downloads a film using the provided film ID, title name, and domain.
Parameters:
- domain (str): The domain of the site
- version (str): Version of site.
Return:
- str: output path
"""
start_message()
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [cyan]{select_title.name}[/cyan] \n")
# Get master playlists
video_source = VideoSource(proxy)
master_playlist = video_source.get_m3u8_url(select_title.url)
# Define the filename and path for the downloaded film
title_name = os_manager.get_sanitize_file(select_title.name) + ".mp4"
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Download the film using the m3u8 playlist, and output filename
r_proc = HLS_Downloader(
m3u8_url=master_playlist,
output_path=os.path.join(mp4_path, title_name)
).start()
if r_proc['error'] is not None:
try: os.remove(r_proc['path'])
except: pass
return r_proc['path']

View File

@ -0,0 +1,160 @@
# 29.04.25
import os
from typing import Tuple
# External library
from rich.console import Console
from rich.prompt import Prompt
# Internal utilities
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import HLS_Downloader
# Logic class
from .util.ScrapeSerie import GetSerieInfo
from StreamingCommunity.Api.Template.Util import (
manage_selection,
map_episode_title,
validate_selection,
validate_episode_selection,
display_episodes_list
)
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Player
from StreamingCommunity.Api.Player.hdplayer import VideoSource
# Variable
msg = Prompt()
console = Console()
def download_video(index_season_selected: int, index_episode_selected: int, scrape_serie: GetSerieInfo, proxy=None) -> Tuple[str,bool]:
"""
Downloads a specific episode from a specified season.
Parameters:
- index_season_selected (int): Season number
- index_episode_selected (int): Episode index
- scrape_serie (GetSerieInfo): Scraper object with series information
Returns:
- str: Path to downloaded file
- bool: Whether download was stopped
"""
start_message()
# Get episode information
obj_episode = scrape_serie.selectEpisode(index_season_selected, index_episode_selected-1)
console.print(f"[bold yellow]Download:[/bold yellow] [red]{site_constant.SITE_NAME}[/red] → [bold magenta]{obj_episode.name}[/bold magenta] ([cyan]S{index_season_selected}E{index_episode_selected}[/cyan]) \n")
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(scrape_serie.series_name, index_season_selected, index_episode_selected, obj_episode.name)}.mp4"
mp4_path = os.path.join(site_constant.SERIES_FOLDER, scrape_serie.series_name, f"S{index_season_selected}")
# Retrieve scws and if available master playlist
video_source = VideoSource(proxy)
master_playlist = video_source.get_m3u8_url(obj_episode.url)
# Download the episode
r_proc = HLS_Downloader(
m3u8_url=master_playlist,
output_path=os.path.join(mp4_path, mp4_name)
).start()
if r_proc['error'] is not None:
try: os.remove(r_proc['path'])
except: pass
return r_proc['path'], r_proc['stopped']
def download_episode(index_season_selected: int, scrape_serie: GetSerieInfo, download_all: bool = False, episode_selection: str = None, proxy = None) -> None:
"""
Handle downloading episodes for a specific season.
Parameters:
- index_season_selected (int): Season number
- scrape_serie (GetSerieInfo): Scraper object with series information
- download_all (bool): Whether to download all episodes
- episode_selection (str, optional): Pre-defined episode selection that bypasses manual input
"""
# Get episodes for the selected season
episodes = scrape_serie.getEpisodeSeasons(index_season_selected)
episodes_count = len(episodes)
if download_all:
for i_episode in range(1, episodes_count + 1):
path, stopped = download_video(index_season_selected, i_episode, scrape_serie, proxy)
if stopped:
break
console.print(f"\n[red]End downloaded [yellow]season: [red]{index_season_selected}.")
else:
if episode_selection is not None:
last_command = episode_selection
console.print(f"\n[cyan]Using provided episode selection: [yellow]{episode_selection}")
else:
last_command = display_episodes_list(episodes)
# Prompt user for episode selection
list_episode_select = manage_selection(last_command, episodes_count)
list_episode_select = validate_episode_selection(list_episode_select, episodes_count)
# Download selected episodes if not stopped
for i_episode in list_episode_select:
path, stopped = download_video(index_season_selected, i_episode, scrape_serie, proxy)
if stopped:
break
def download_series(select_season: MediaItem, season_selection: str = None, episode_selection: str = None, proxy = None) -> None:
"""
Handle downloading a complete series.
Parameters:
- select_season (MediaItem): Series metadata from search
- season_selection (str, optional): Pre-defined season selection that bypasses manual input
- episode_selection (str, optional): Pre-defined episode selection that bypasses manual input
"""
scrape_serie = GetSerieInfo(select_season.url, proxy)
# Get total number of seasons
seasons_count = scrape_serie.getNumberSeason()
# Prompt user for season selection and download episodes
console.print(f"\n[green]Seasons found: [red]{seasons_count}")
# If season_selection is provided, use it instead of asking for input
if season_selection is None:
index_season_selected = msg.ask(
"\n[cyan]Insert season number [yellow](e.g., 1), [red]* [cyan]to download all seasons, "
"[yellow](e.g., 1-2) [cyan]for a range of seasons, or [yellow](e.g., 3-*) [cyan]to download from a specific season to the end"
)
else:
index_season_selected = season_selection
console.print(f"\n[cyan]Using provided season selection: [yellow]{season_selection}")
# Validate the selection
list_season_select = manage_selection(index_season_selected, seasons_count)
list_season_select = validate_selection(list_season_select, seasons_count)
# Loop through the selected seasons and download episodes
for i_season in list_season_select:
if len(list_season_select) > 1 or index_season_selected == "*":
# Download all episodes if multiple seasons are selected or if '*' is used
download_episode(i_season, scrape_serie, download_all=True, proxy=proxy)
else:
# Otherwise, let the user select specific episodes for the single season
download_episode(i_season, scrape_serie, download_all=False, episode_selection=episode_selection, proxy=proxy)

View File

@ -0,0 +1,111 @@
# 29.04.25
import re
# External libraries
import httpx
from bs4 import BeautifulSoup
from rich.console import Console
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_userAgent
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
console = Console()
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
def extract_nonce(response_) -> str:
"""Extract nonce value from the page script"""
soup = BeautifulSoup(response_.content, 'html.parser')
script = soup.find('script', id='live-search-js-extra')
if script:
match = re.search(r'"admin_ajax_nonce":"([^"]+)"', script.text)
if match:
return match.group(1)
return ""
def title_search(query: str, additionalData: list) -> int:
"""
Search for titles based on a search query.
Parameters:
- query (str): The query to search for.
Returns:
int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
proxy, response_serie = additionalData
search_url = f"{site_constant.FULL_URL}/wp-admin/admin-ajax.php"
console.print(f"[cyan]Search url: [yellow]{search_url}")
try:
_wpnonce = extract_nonce(response_serie)
if not _wpnonce:
console.print("[red]Error: Failed to extract nonce")
return 0
data = {
'action': 'data_fetch',
'keyword': query,
'_wpnonce': _wpnonce
}
response = httpx.post(
search_url,
headers={
'origin': site_constant.FULL_URL,
'user-agent': get_userAgent()
},
data=data,
timeout=max_timeout,
proxy=proxy
)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
except Exception as e:
console.print(f"[red]Site: {site_constant.SITE_NAME}, request search error: {e}")
return 0
for item in soup.find_all('div', class_='searchelement'):
try:
title = item.find_all("a")[-1].get_text(strip=True) if item.find_all("a") else 'N/A'
url = item.find('a').get('href', '')
year = item.find('div', id='search-cat-year')
year = year.get_text(strip=True) if year else 'N/A'
if any(keyword in year.lower() for keyword in ['stagione', 'episodio', 'ep.', 'season', 'episode']):
continue
media_search_manager.add_media({
'name': title,
'type': 'tv' if '/serie/' in url else 'Film',
'date': year,
'image': item.find('img').get('src', ''),
'url': url
})
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -0,0 +1,118 @@
# 29.04.25
import re
import logging
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.headers import get_userAgent
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Api.Player.Helper.Vixcloud.util import SeasonManager, Episode
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class GetSerieInfo:
def __init__(self, url, proxy: str = None):
self.headers = {'user-agent': get_userAgent()}
self.url = url
self.seasons_manager = SeasonManager()
self.series_name = None
self.client = httpx.Client(headers=self.headers, proxy=proxy, timeout=max_timeout)
def collect_info_season(self) -> None:
"""
Retrieve all series information including episodes and seasons.
"""
try:
response = self.client.get(self.url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
if not self.series_name:
title_tag = soup.find('h1', class_='title-border')
self.series_name = title_tag.get_text(strip=True) if title_tag else 'N/A'
# Extract episodes and organize by season
episodes = {}
for ep in soup.find_all('div', class_='bolumust'):
a_tag = ep.find('a')
if not a_tag:
continue
ep_url = a_tag.get('href', '')
episode_title = a_tag.get_text(strip=True)
# Clean up episode title by removing season info and date
clean_title = re.sub(r'Stagione \d+ Episodio \d+\s*\(?([^)]+)\)?\s*\d+\s*\w+\s*\d+', r'\1', episode_title)
season_match = re.search(r'stagione-(\d+)', ep_url)
if season_match:
season_num = int(season_match.group(1))
if season_num not in episodes:
episodes[season_num] = []
episodes[season_num].append({
'id': len(episodes[season_num]) + 1,
'number': len(episodes[season_num]) + 1,
'name': clean_title.strip(),
'url': ep_url
})
# Add seasons to SeasonManager
for season_num, eps in episodes.items():
season = self.seasons_manager.add_season({
'id': season_num,
'number': season_num,
'name': f'Stagione {season_num}'
})
# Add episodes to season's EpisodeManager
for ep in eps:
season.episodes.add(ep)
except Exception as e:
logging.error(f"Error collecting series info: {str(e)}")
raise
# ------------- FOR GUI -------------
def getNumberSeason(self) -> int:
"""
Get the total number of seasons available for the series.
"""
if not self.seasons_manager.seasons:
self.collect_info_season()
return len(self.seasons_manager.seasons)
def getEpisodeSeasons(self, season_number: int) -> list:
"""
Get all episodes for a specific season.
"""
if not self.seasons_manager.seasons:
self.collect_info_season()
season = self.seasons_manager.get_season_by_number(season_number)
if not season:
logging.error(f"Season {season_number} not found")
return []
return season.episodes.episodes
def selectEpisode(self, season_number: int, episode_index: int) -> Episode:
"""
Get information for a specific episode in a specific season.
"""
episodes = self.getEpisodeSeasons(season_number)
if not episodes or episode_index < 0 or episode_index >= len(episodes):
logging.error(f"Episode index {episode_index} is out of range for season {season_number}")
return None
return episodes[episode_index]

View File

@ -0,0 +1,232 @@
# 29.04.25
import os
import sys
import time
import json
import signal
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
# External library
import httpx
from rich import print
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeRemainingColumn
# Internal utilities
from StreamingCommunity.Util.config_json import config_manager
from StreamingCommunity.Util.headers import get_headers
# Variable
MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout")
class ProxyFinder:
def __init__(self, url, timeout_threshold: float = 7.0, max_proxies: int = 150, max_workers: int = 12):
self.url = url
self.timeout_threshold = timeout_threshold
self.max_proxies = max_proxies
self.max_workers = max_workers
self.found_proxy = None
self.shutdown_flag = False
self.json_file = os.path.join(os.path.dirname(__file__), 'working_proxies.json')
signal.signal(signal.SIGINT, self._handle_interrupt)
def load_saved_proxies(self) -> tuple:
"""Load saved proxies if they're not expired (2 hours old)"""
try:
if not os.path.exists(self.json_file):
return None, None
with open(self.json_file, 'r') as f:
data = json.load(f)
if not data.get('proxies') or not data.get('last_update'):
return None, None
last_update = datetime.fromisoformat(data['last_update'])
if datetime.now() - last_update > timedelta(hours=2):
return None, None
return data['proxies'], last_update
except Exception:
return None, None
def save_working_proxy(self, proxy: str, response_time: float):
"""Save working proxy to JSON file"""
data = {
'proxies': [{'proxy': proxy, 'response_time': response_time}],
'last_update': datetime.now().isoformat()
}
try:
with open(self.json_file, 'w') as f:
json.dump(data, f, indent=4)
except Exception as e:
print(f"[bold red]Error saving proxy:[/bold red] {str(e)}")
def fetch_geonode(self) -> list:
proxies = []
try:
response = httpx.get(
"https://proxylist.geonode.com/api/proxy-list?protocols=http%2Chttps&limit=100&page=1&sort_by=speed&sort_type=asc",
headers=get_headers(),
timeout=MAX_TIMEOUT
)
data = response.json()
proxies = [(f"http://{p['ip']}:{p['port']}", "Geonode") for p in data.get('data', [])]
except Exception as e:
print(f"[bold red]Error in Geonode:[/bold red] {str(e)[:100]}")
return proxies
def fetch_proxyscrape(self) -> list:
proxies = []
try:
response = httpx.get(
"https://api.proxyscrape.com/v4/free-proxy-list/get?request=get_proxies&protocol=http&skip=0&proxy_format=protocolipport&format=json&limit=100&timeout=1000",
headers=get_headers(),
timeout=MAX_TIMEOUT
)
data = response.json()
if 'proxies' in data and isinstance(data['proxies'], list):
proxies = [(proxy_data['proxy'], "ProxyScrape") for proxy_data in data['proxies'] if 'proxy' in proxy_data]
except Exception as e:
print(f"[bold red]Error in ProxyScrape:[/bold red] {str(e)[:100]}")
return proxies
def fetch_proxies_from_sources(self) -> list:
print("[cyan]Fetching proxies from sources...[/cyan]")
with ThreadPoolExecutor(max_workers=3) as executor:
proxyscrape_future = executor.submit(self.fetch_proxyscrape)
geonode_future = executor.submit(self.fetch_geonode)
sources_proxies = {}
try:
proxyscrape_result = proxyscrape_future.result()
sources_proxies["proxyscrape"] = proxyscrape_result[:int(self.max_proxies/2)]
except Exception as e:
print(f"[bold red]Error fetching from proxyscrape:[/bold red] {str(e)[:100]}")
sources_proxies["proxyscrape"] = []
try:
geonode_result = geonode_future.result()
sources_proxies["geonode"] = geonode_result[:int(self.max_proxies/2)]
except Exception as e:
print(f"[bold red]Error fetching from geonode:[/bold red] {str(e)[:100]}")
sources_proxies["geonode"] = []
merged_proxies = []
if "proxyscrape" in sources_proxies:
merged_proxies.extend(sources_proxies["proxyscrape"])
if "geonode" in sources_proxies:
merged_proxies.extend(sources_proxies["geonode"])
proxy_list = merged_proxies[:self.max_proxies]
return proxy_list
def _test_single_request(self, proxy_info: tuple) -> tuple:
proxy, source = proxy_info
try:
start = time.time()
with httpx.Client(proxy=proxy, timeout=self.timeout_threshold) as client:
response = client.get(self.url, headers=get_headers())
if response.status_code == 200:
return (True, time.time() - start, response, source)
except Exception:
pass
return (False, self.timeout_threshold + 1, None, source)
def test_proxy(self, proxy_info: tuple) -> tuple:
proxy, source = proxy_info
if self.shutdown_flag:
return (proxy, False, 0, None, source)
success1, time1, text1, source = self._test_single_request(proxy_info)
if not success1 or time1 > self.timeout_threshold:
return (proxy, False, time1, None, source)
success2, time2, _, source = self._test_single_request(proxy_info)
avg_time = (time1 + time2) / 2
return (proxy, success2 and time2 <= self.timeout_threshold, avg_time, text1, source)
def _handle_interrupt(self, sig, frame):
print("\n[bold yellow]Received keyboard interrupt. Terminating...[/bold yellow]")
self.shutdown_flag = True
sys.exit(0)
def find_fast_proxy(self) -> tuple:
saved_proxies, last_update = self.load_saved_proxies()
if saved_proxies:
print("[cyan]Testing saved proxy...[/cyan]")
for proxy_data in saved_proxies:
result = self.test_proxy((proxy_data['proxy'], 'cached'))
if result[1]:
return proxy_data['proxy'], result[3], result[2]
else:
print(f"[red]Saved proxy {proxy_data['proxy']} failed - response time: {result[2]:.2f}s[/red]")
proxies = self.fetch_proxies_from_sources()
if not proxies:
print("[bold red]No proxies fetched to test.[/bold red]")
return (None, None, None)
found_proxy = None
response_text = None
source = None
failed_count = 0
success_count = 0
#print(f"[cyan]Testing {len(proxies)} proxies...[/cyan]")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.test_proxy, p): p for p in proxies}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("[cyan]{task.fields[success]}[/cyan]/[red]{task.fields[failed]}[/red]"),
TimeRemainingColumn(),
) as progress:
task = progress.add_task(
"[cyan]Testing Proxies",
total=len(futures),
success=success_count,
failed=failed_count
)
for future in as_completed(futures):
if self.shutdown_flag:
break
try:
proxy, success, elapsed, response, proxy_source = future.result()
if success:
success_count += 1
print(f"[bold green]Found valid proxy:[/bold green] {proxy} ({elapsed:.2f}s)")
found_proxy = proxy
response_text = response
self.save_working_proxy(proxy, elapsed)
self.shutdown_flag = True
break
else:
failed_count += 1
except Exception:
failed_count += 1
progress.update(task, advance=1, success=success_count, failed=failed_count)
if not found_proxy:
print("[bold red]No working proxies found[/bold red]")
return (found_proxy, response_text, source)

View File

@ -1,5 +1,5 @@
__title__ = 'StreamingCommunity'
__version__ = '3.0.1'
__version__ = '3.0.2'
__author__ = 'Arrowar'
__description__ = 'A command-line program to download film'
__copyright__ = 'Copyright 2024'

View File

@ -1,22 +1,22 @@
# 23.11.24
# Fix import
import sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(src_path)
# Import
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.logger import Logger
from StreamingCommunity.Api.Player.maxstream import VideoSource
# Test
start_message()
logger = Logger()
video_source = VideoSource("https://cb01new.biz/what-the-waters-left-behind-scars-hd-2023")
master_playlist = video_source.get_playlist()
# 23.11.24
# Fix import
import sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(src_path)
# Import
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.logger import Logger
from StreamingCommunity.Api.Player.mixdrop import VideoSource
# Test
start_message()
logger = Logger()
video_source = VideoSource("https://cb01net.uno/pino-daniele-nero-a-meta-hd-2024/")
master_playlist = video_source.get_playlist()
print(master_playlist)

View File

@ -10,7 +10,7 @@ with open(os.path.join(os.path.dirname(__file__), "requirements.txt"), "r", enco
setup(
name="StreamingCommunity",
version="3.0.1",
version="3.0.2",
long_description=read_readme(),
long_description_content_type="text/markdown",
author="Lovi-0",