Switch to FULL_URL

This commit is contained in:
Lovi 2025-02-20 18:16:25 +01:00
parent 774dc9403f
commit bea0055409
28 changed files with 251 additions and 646 deletions

View File

@ -84,7 +84,7 @@ jobs:
shell: pwsh
run: |
pyinstaller --onefile --hidden-import=pycryptodomex --hidden-import=ua_generator `
--hidden-import=qbittorrentapi --hidden-import=qbittorrent --hidden-import=googlesearch `
--hidden-import=qbittorrentapi --hidden-import=qbittorrent `
--hidden-import=bs4 --hidden-import=httpx --hidden-import=rich --hidden-import=tqdm `
--hidden-import=m3u8 --hidden-import=psutil --hidden-import=unidecode `
--hidden-import=jsbeautifier --hidden-import=pathvalidate `
@ -99,7 +99,7 @@ jobs:
if: matrix.os == 'ubuntu-latest'
run: |
pyinstaller --onefile --hidden-import=pycryptodomex --hidden-import=ua_generator \
--hidden-import=qbittorrentapi --hidden-import=qbittorrent --hidden-import=googlesearch \
--hidden-import=qbittorrentapi --hidden-import=qbittorrent \
--hidden-import=bs4 --hidden-import=httpx --hidden-import=rich --hidden-import=tqdm \
--hidden-import=m3u8 --hidden-import=psutil --hidden-import=unidecode \
--hidden-import=jsbeautifier --hidden-import=pathvalidate \
@ -114,7 +114,7 @@ jobs:
if: matrix.os == 'macos-latest'
run: |
pyinstaller --onefile --hidden-import=pycryptodomex --hidden-import=ua_generator \
--hidden-import=qbittorrentapi --hidden-import=qbittorrent --hidden-import=googlesearch \
--hidden-import=qbittorrentapi --hidden-import=qbittorrent \
--hidden-import=bs4 --hidden-import=httpx --hidden-import=rich --hidden-import=tqdm \
--hidden-import=m3u8 --hidden-import=psutil --hidden-import=unidecode \
--hidden-import=jsbeautifier --hidden-import=pathvalidate \

View File

@ -31,7 +31,8 @@
# 📋 Table of Contents
- 🌐 [Website available](#website-status)
- 🌐 [Website available](https://www.npoint.io/docs/e67633acc3816cc70132)
- 🔄 [Update Domains](#update-domains)
- 🛠️ [Installation](#installation)
- 📦 [PyPI Installation](#1-pypi-installation)
- 🔄 [Automatic Installation](#2-automatic-installation)
@ -58,18 +59,15 @@
# Installation
<p align="center">
<a href="https://github.com/Arrowar/StreamingCommunity/releases/latest/download/StreamingCommunity_win.exe">
<a href="https://github.com/Arrowar/StreamingCommunity/releases/latest/download/StreamingCommunity_win.exe" style="margin: 0 20px;">
<img src="https://img.shields.io/badge/-Windows-blue.svg?style=for-the-badge&logo=windows" alt="Windows">
</a>
<a href="https://github.com/Arrowar/StreamingCommunity/releases/latest/download/StreamingCommunity_mac">
<a href="https://github.com/Arrowar/StreamingCommunity/releases/latest/download/StreamingCommunity_mac" style="margin: 0 20px;">
<img src="https://img.shields.io/badge/-macOS-black.svg?style=for-the-badge&logo=apple" alt="macOS">
</a>
<a href="https://github.com/Arrowar/StreamingCommunity/releases/latest/download/StreamingCommunity_linux">
<a href="https://github.com/Arrowar/StreamingCommunity/releases/latest/download/StreamingCommunity_linux" style="margin: 0 20px;">
<img src="https://img.shields.io/badge/-Linux-orange.svg?style=for-the-badge&logo=linux" alt="Linux">
</a>
<a href="https://github.com/Arrowar/StreamingCommunity/releases">
<img src="https://img.shields.io/badge/-All_Versions-lightgrey.svg?style=for-the-badge" alt="All Versions">
</a>
</p>
@ -450,6 +448,33 @@ You can download VLC Media Player from the [official website](https://www.videol
- `get_only_link`: Return M3U8 playlist/index URL instead of downloading
## 🔄 Update Domains
To update the domains for the supported websites:
1. Visit the configuration endpoint: https://www.npoint.io/docs/e67633acc3816cc70132
2. You'll find a JSON structure similar to:
```json
{
"altadefinizione": {
"domain": "si",
"full_url": "https://altadefinizione.si/"
},
...
}
```
3. Update the following fields for each website as needed:
- `domain`: The new domain extension
- `full_url`: The complete URL including the new domain
4. Save your changes on the npoint.io interface
5. Re-run the script to use the updated domain information
Note: The script will automatically fetch the latest domain information from the configuration endpoint when executed.
# COMMAND
- Download a specific season by entering its number.
@ -559,20 +584,6 @@ Start the bot from the folder /StreamingCommunity/TelegramHelp
python3 telegram_bot.py
```
# Website Status
| Website | Status | Command |
|:-------------------|:------:|:--------:|
| [1337xx](https://1337xx.to/) | ✅ | -133 |
| [AnimeUnity](https://animeunity.so/) | ✅ | -ANI |
| [Ilcorsaronero](https://ilcorsaronero.link/) | ✅ | `-ILC` |
| [CB01New](https://cb01new.gold/) | ✅ | -CB0 |
| [DDLStreamItaly](https://ddlstreamitaly.co/) | ✅ | -DDL |
| [GuardaSerie](https://guardaserie.now/) | ✅ | -GUA |
| [MostraGuarda](https://mostraguarda.stream/) | ✅ | -MOS |
| [StreamingCommunity](https://streamingcommunity.lu/) | ✅ | -STR |
# Tutorials
- [Windows Tutorial](https://www.youtube.com/watch?v=mZGqK4wdN-k)

View File

@ -23,17 +23,16 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
def __init__(self, site_name: str, is_series: bool):
def __init__(self, url: str, is_series: bool):
"""
Initialize video source for streaming site.
Args:
site_name (str): Name of streaming site
is_series (bool): Flag for series or movie content
- url (str): The URL of the streaming site.
- is_series (bool): Flag for series or movie content
"""
self.headers = {'user-agent': get_headers()}
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
self.url = url
self.is_series = is_series
def setup(self, media_id: int):
@ -64,7 +63,7 @@ class VideoSource:
# Make a request to get iframe source
response = httpx.get(
url=f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}",
url=f"{self.url}/iframe/{self.media_id}",
params=params,
timeout=max_timeout
)
@ -185,7 +184,7 @@ class VideoSource:
}
# API request to get video details
video_api_url = f'https://{self.base_name}.{self.domain}/api/video/{scws_id}'
video_api_url = f'{self.url}/api/video/{scws_id}'
response = httpx.get(video_api_url, headers=headers)
if response.status_code == 200:
@ -197,7 +196,7 @@ class VideoSource:
# Request download link generation for each track
download_response = httpx.post(
url=f'https://{self.base_name}.{self.domain}/api/download/generate_link?scws_id={track["video_id"]}&rendition={track["quality"]}',
url=f'{self.url}/api/download/generate_link?scws_id={track["video_id"]}&rendition={track["quality"]}',
headers={
'referer': url_to_download,
'user-agent': get_headers(),
@ -220,18 +219,17 @@ class VideoSource:
class VideoSourceAnime(VideoSource):
def __init__(self, site_name: str):
def __init__(self, url: str):
"""
Initialize anime-specific video source.
Args:
site_name (str): Name of anime streaming site
- url (str): The URL of the streaming site.
Extends base VideoSource with anime-specific initialization
"""
self.headers = {'user-agent': get_headers()}
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
self.url = url
self.src_mp4 = None
def get_embed(self, episode_id: int):
@ -247,7 +245,7 @@ class VideoSourceAnime(VideoSource):
try:
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}",
url=f"{self.url}/embed-url/{episode_id}",
headers=self.headers,
timeout=max_timeout
)

View File

@ -1,5 +1,7 @@
# 02.07.24
import sys
# External libraries
import httpx
from bs4 import BeautifulSoup
@ -42,12 +44,17 @@ def title_search(word_to_search: str) -> int:
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, site_constant.FULL_URL)
if domain_to_use is None or base_url is None:
console.print("[bold red]❌ Error: Unable to determine valid domain or base URL.[/bold red]")
console.print("[yellow]The service might be temporarily unavailable or the domain may have changed.[/yellow]")
sys.exit(1)
# Construct the full site URL and load the search page
try:
response = httpx.get(
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/search/{word_to_search}/1/",
url=f"{site_constant.FULL_URL}/search/{word_to_search}/1/",
headers={'user-agent': get_headers()},
follow_redirects=True,
timeout=max_timeout

View File

@ -41,9 +41,8 @@ def download_title(select_title: MediaItem):
os_manager.create_path(mp4_path)
# Make request to page with magnet
full_site_name = f"{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}"
response = httpx.get(
url="https://" + full_site_name + select_title.url,
url=f"{site_constant.FULL_URL}{select_title.url}",
headers={
'user-agent': get_headers()
},

View File

@ -85,6 +85,7 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
else:
logging.error(f"Skip index: {index_select} cant find info with api.")
return None, True
def download_series(select_title: MediaItem):
@ -100,8 +101,8 @@ def download_series(select_title: MediaItem):
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
scrape_serie = ScrapeSerieAnime(site_constant.SITE_NAME)
video_source = VideoSourceAnime(site_constant.SITE_NAME)
scrape_serie = ScrapeSerieAnime(site_constant.FULL_URL)
video_source = VideoSourceAnime(site_constant.FULL_URL)
# Set up video source
scrape_serie.setup(None, select_title.id, select_title.slug)
@ -160,8 +161,8 @@ def download_film(select_title: MediaItem):
"""
# Init class
scrape_serie = ScrapeSerieAnime(site_constant.SITE_NAME)
video_source = VideoSourceAnime(site_constant.SITE_NAME)
scrape_serie = ScrapeSerieAnime(site_constant.FULL_URL)
video_source = VideoSourceAnime(site_constant.FULL_URL)
# Set up video source
scrape_serie.setup(None, select_title.id, select_title.slug)

View File

@ -1,5 +1,6 @@
# 10.12.23
import sys
import logging
@ -42,7 +43,7 @@ def get_token(site_name: str, domain: str) -> dict:
# Send a GET request to the specified URL composed of the site name and domain
response = httpx.get(
url=f"https://www.{site_name}.{domain}",
url=site_constant.FULL_URL,
timeout=max_timeout
)
response.raise_for_status()
@ -113,7 +114,12 @@ def title_search(title: str) -> int:
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://www.{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, site_constant.FULL_URL)
if domain_to_use is None or base_url is None:
console.print("[bold red]❌ Error: Unable to determine valid domain or base URL.[/bold red]")
console.print("[yellow]The service might be temporarily unavailable or the domain may have changed.[/yellow]")
sys.exit(1)
data = get_token(site_constant.SITE_NAME, domain_to_use)
@ -138,7 +144,7 @@ def title_search(title: str) -> int:
# Send a POST request to the API endpoint for live search
try:
response = httpx.post(
url=f'https://www.{site_constant.SITE_NAME}.{domain_to_use}/livesearch',
url=f'{site_constant.FULL_URL}/livesearch',
cookies=cookies,
headers=headers,
json=json_data,

View File

@ -18,18 +18,17 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
class ScrapeSerieAnime():
def __init__(self, site_name: str):
class ScrapeSerieAnime:
def __init__(self, url: str):
"""
Initialize the media scraper for a specific website.
Args:
site_name (str): Name of the streaming site to scrape
url (str): Url of the streaming site
"""
self.is_series = False
self.headers = {'user-agent': get_headers()}
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
self.url = url
def setup(self, version: str = None, media_id: int = None, series_name: str = None):
self.version = version
@ -50,7 +49,7 @@ class ScrapeSerieAnime():
try:
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/",
url=f"{self.url}/info_api/{self.media_id}/",
headers=self.headers,
timeout=max_timeout
)
@ -81,7 +80,7 @@ class ScrapeSerieAnime():
}
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}",
url=f"{self.url}/info_api/{self.media_id}/{index_ep}",
headers=self.headers,
params=params,
timeout=max_timeout

View File

@ -1,11 +1,14 @@
# 03.07.24
import sys
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.table import TVShowManager
@ -41,10 +44,15 @@ def title_search(word_to_search: str) -> int:
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, site_constant.FULL_URL)
if domain_to_use is None or base_url is None:
console.print("[bold red]❌ Error: Unable to determine valid domain or base URL.[/bold red]")
console.print("[yellow]The service might be temporarily unavailable or the domain may have changed.[/yellow]")
sys.exit(1)
response = httpx.get(
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/?s={word_to_search}",
url=f"{site_constant.FULL_URL}/?s={word_to_search}",
headers={'user-agent': get_headers()},
timeout=max_timeout
)

View File

@ -1,5 +1,6 @@
# 09.06.24
import sys
import logging
@ -45,12 +46,17 @@ def title_search(word_to_search: str) -> int:
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, site_constant.FULL_URL)
if domain_to_use is None or base_url is None:
console.print("[bold red]❌ Error: Unable to determine valid domain or base URL.[/bold red]")
console.print("[yellow]The service might be temporarily unavailable or the domain may have changed.[/yellow]")
sys.exit(1)
# Send request to search for titles
try:
response = httpx.get(
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/search/?&q={word_to_search}&quick=1&type=videobox_video&nodes=11",
url=f"{site_constant.FULL_URL}/search/?&q={word_to_search}&quick=1&type=videobox_video&nodes=11",
headers={'user-agent': get_headers()},
timeout=max_timeout
)

View File

@ -1,5 +1,7 @@
# 09.06.24
import sys
# External libraries
import httpx
from bs4 import BeautifulSoup
@ -42,12 +44,18 @@ def title_search(word_to_search: str) -> int:
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, site_constant.FULL_URL)
if domain_to_use is None or base_url is None:
console.print("[bold red]❌ Error: Unable to determine valid domain or base URL.[/bold red]")
console.print("[yellow]The service might be temporarily unavailable or the domain may have changed.[/yellow]")
sys.exit(1)
# Send request to search for titles
print(f"{site_constant.FULL_URL}/?story={word_to_search}&do=search&subaction=search")
try:
response = httpx.get(
url=f"https://guardaserie.{domain_to_use}/?story={word_to_search}&do=search&subaction=search",
url=f"{site_constant.FULL_URL}/?story={word_to_search}&do=search&subaction=search",
headers={'user-agent': get_headers()},
timeout=max_timeout
)
@ -58,19 +66,17 @@ def title_search(word_to_search: str) -> int:
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', class_="mlnew-list")
table_content = soup.find('div', class_="recent-posts")
for serie_div in table_content.find_all('div', class_='mlnew'):
for serie_div in table_content.find_all('div', class_='post-thumb'):
try:
title = serie_div.find('div', class_='mlnh-2').find("h2").get_text(strip=True)
link = serie_div.find('div', class_='mlnh-2').find('a')['href']
imdb_rating = serie_div.find('span', class_='mlnh-imdb').get_text(strip=True)
title = serie_div.find('a').get("title")
link = serie_div.find('a').get("href")
serie_info = {
'name': title,
'url': link,
'score': imdb_rating
'url': link
}
media_search_manager.add_media(serie_info)

View File

@ -49,9 +49,10 @@ class GetSerieInfo:
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', class_="tt_season")
seasons_number = len(table_content.find_all("li"))
self.tv_name = soup.find("h1", class_="front_title").get_text(strip=True)
self.tv_name = soup.find("h1", class_="entry-title").get_text(strip=True)
return seasons_number

View File

@ -1,53 +0,0 @@
# 02.07.24
import asyncio
from urllib.parse import quote_plus
# Internal utilities
from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Api.Template import get_select_title
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from .site import title_search, media_search_manager, table_show_manager
from .title import download_title
# Variable
indice = 9
_useFor = "film_serie"
_deprecate = False
_priority = 2
_engineDownload = "tor"
def search(string_to_search: str = None, get_onylDatabase: bool = False):
"""
Main function of the application for film and series.
"""
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert word to search in [green]{site_constant.SITE_NAME}").strip()
# Search on database
len_database = asyncio.run(title_search(quote_plus(string_to_search)))
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list
select_title = get_select_title(table_show_manager, media_search_manager)
# Download title
download_title(select_title)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()

View File

@ -1,64 +0,0 @@
# 02.07.24
# Internal utilities
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
from .util.ilCorsarScraper import IlCorsaroNeroScraper
# Variable
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
max_timeout = config_manager.get_int("REQUESTS", "timeout")
disable_searchDomain = config_manager.get_bool("DEFAULT", "disable_searchDomain")
async def title_search(word_to_search: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
domain_to_use = site_constant.DOMAIN_NOW
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
# Create scraper and collect result
print("\n")
scraper = IlCorsaroNeroScraper(f"https://{site_constant.SITE_NAME}.{domain_to_use}/", 1)
results = await scraper.search(word_to_search)
for i, torrent in enumerate(results):
try:
media_search_manager.add_media({
'name': torrent['name'],
'type': torrent['type'],
'seed': torrent['seed'],
'leech': torrent['leech'],
'size': torrent['size'],
'date': torrent['date'],
'url': torrent['url']
})
except Exception as e:
print(f"Error parsing a film entry: {e}")
# Return the number of titles found
return media_search_manager.get_length()

View File

@ -1,42 +0,0 @@
# 02.07.24
import os
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import TOR_downloader
# Logic class
from StreamingCommunity.Api.Template.config_loader import site_constant
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
def download_title(select_title: MediaItem):
"""
Downloads a media item and saves it as an MP4 file.
Parameters:
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
"""
start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n")
print()
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name)
mp4_path = os.path.join(site_constant.MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Create output folder
os_manager.create_path(mp4_path)
# Tor manager
manager = TOR_downloader()
manager.add_magnet_link(select_title.url)
manager.start_download()
manager.move_downloaded_files(mp4_path)

View File

@ -1,149 +0,0 @@
# 12.14.24
import logging
import asyncio
from typing import List, Dict, Optional
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.console import console
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class IlCorsaroNeroScraper:
def __init__(self, base_url: str, max_page: int = 1):
self.base_url = base_url
self.max_page = max_page
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
'priority': 'u=0, i',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': get_headers()
}
async def fetch_url(self, url: str) -> Optional[str]:
"""
Fetch the HTML content of a given URL.
"""
try:
console.print(f"[cyan]Fetching url[white]: [red]{url}")
async with httpx.AsyncClient(headers=self.headers, follow_redirects=True, timeout=max_timeout) as client:
response = await client.get(url)
# If the request was successful, return the HTML content
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Error fetching from {url}: {e}")
return None
def parse_torrents(self, html: str) -> List[Dict[str, str]]:
"""
Parse the HTML content and extract torrent details.
"""
torrents = []
soup = BeautifulSoup(html, "html.parser")
table = soup.find("tbody")
for row in table.find_all("tr"):
try:
columns = row.find_all("td")
torrents.append({
'type': columns[0].get_text(strip=True),
'name': row.find("th").find("a").get_text(strip=True),
'seed': columns[1].get_text(strip=True),
'leech': columns[2].get_text(strip=True),
'size': columns[3].get_text(strip=True),
'date': columns[4].get_text(strip=True),
'url': "https://ilcorsaronero.link" + row.find("th").find("a").get("href")
})
except Exception as e:
logging.error(f"Error parsing row: {e}")
continue
return torrents
async def fetch_real_url(self, url: str) -> Optional[str]:
"""
Fetch the real torrent URL from the detailed page.
"""
response_html = await self.fetch_url(url)
if not response_html:
return None
soup = BeautifulSoup(response_html, "html.parser")
links = soup.find_all("a")
# Find and return the magnet link
for link in links:
if "magnet" in str(link):
return link.get("href")
return None
async def search(self, query: str) -> List[Dict[str, str]]:
"""
Search for torrents based on the query string.
"""
all_torrents = []
# Loop through each page
for page in range(self.max_page):
url = f'{self.base_url}search?q={query}&page={page}'
html = await self.fetch_url(url)
if not html:
console.print(f"[bold red]No HTML content for page {page}[/bold red]")
break
torrents = self.parse_torrents(html)
if not torrents:
console.print(f"[bold red]No torrents found on page {page}[/bold red]")
break
# Use asyncio.gather to fetch all real URLs concurrently
tasks = [self.fetch_real_url(result['url']) for result in torrents]
real_urls = await asyncio.gather(*tasks)
# Attach real URLs to the torrent data
for i, result in enumerate(torrents):
result['url'] = real_urls[i]
all_torrents.extend(torrents)
return all_torrents
async def main():
scraper = IlCorsaroNeroScraper("https://ilcorsaronero.link/")
results = await scraper.search("cars")
if results:
for i, torrent in enumerate(results):
console.print(f"[bold green]{i} = {torrent}[/bold green] \n")
else:
console.print("[bold red]No torrents found.[/bold red]")
if __name__ == '__main__':
asyncio.run(main())

View File

@ -44,7 +44,7 @@ def download_film(movie_details: Json_film) -> str:
# Make request to main site
try:
url = f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}/set-movie-a/{movie_details.imdb_id}"
url = f"{site_constant.FULL_URL}/set-movie-a/{movie_details.imdb_id}"
response = httpx.get(url, headers={'User-Agent': get_headers()})
response.raise_for_status()

View File

@ -48,7 +48,7 @@ def download_film(select_title: MediaItem) -> str:
console.print(f"[yellow]Download: [red]{select_title.name} \n")
# Init class
video_source = VideoSource(site_constant.SITE_NAME, False)
video_source = VideoSource(site_constant.FULL_URL, False)
video_source.setup(select_title.id)
# Retrieve scws and if available master playlist

View File

@ -147,8 +147,8 @@ def download_series(select_season: MediaItem) -> None:
start_message()
# Init class
scrape_serie = ScrapeSerie(site_constant.SITE_NAME)
video_source = VideoSource(site_constant.SITE_NAME, True)
scrape_serie = ScrapeSerie(site_constant.FULL_URL)
video_source = VideoSource(site_constant.FULL_URL, True)
# Setup video source
scrape_serie.setup(select_season.id, select_season.slug)

View File

@ -1,5 +1,7 @@
# 10.12.23
import sys
# External libraries
import httpx
@ -40,7 +42,12 @@ def title_search(title_search: str) -> int:
domain_to_use = site_constant
if not disable_searchDomain:
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, f"https://{site_constant.SITE_NAME}.{site_constant.DOMAIN_NOW}")
domain_to_use, base_url = search_domain(site_constant.SITE_NAME, site_constant.FULL_URL)
if domain_to_use is None or base_url is None:
console.print("[bold red]❌ Error: Unable to determine valid domain or base URL.[/bold red]")
console.print("[yellow]The service might be temporarily unavailable or the domain may have changed.[/yellow]")
sys.exit(1)
if site_constant.TELEGRAM_BOT:
bot = get_bot_instance()
@ -50,7 +57,7 @@ def title_search(title_search: str) -> int:
try:
response = httpx.get(
url=f"https://{site_constant.SITE_NAME}.{domain_to_use}/api/search?q={title_search.replace(' ', '+')}",
url=f"{site_constant.FULL_URL}/api/search?q={title_search.replace(' ', '+')}",
headers={'user-agent': get_headers()},
timeout=max_timeout
)

View File

@ -20,17 +20,16 @@ max_timeout = config_manager.get_int("REQUESTS", "timeout")
class ScrapeSerie:
def __init__(self, site_name: str):
def __init__(self, url):
"""
Initialize the ScrapeSerie class for scraping TV series information.
Args:
site_name (str): Name of the streaming site to scrape from
- url (str): The URL of the streaming site.
"""
self.is_series = False
self.headers = {'user-agent': get_headers()}
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
self.url = url
def setup(self, media_id: int = None, series_name: str = None):
"""
@ -58,7 +57,7 @@ class ScrapeSerie:
"""
try:
response = httpx.get(
url=f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}",
url=f"{self.url}/titles/{self.media_id}-{self.series_name}",
headers=self.headers,
timeout=max_timeout
)
@ -88,7 +87,7 @@ class ScrapeSerie:
"""
try:
response = httpx.get(
url=f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}',
url=f'{self.url}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}',
headers={
'User-Agent': get_headers(),
'x-inertia': 'true',

View File

@ -7,12 +7,11 @@ from urllib.parse import urlparse, unquote
# External libraries
import httpx
from googlesearch import search
# Internal utilities
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util._jsonConfig import config_manager
base_headers = {
@ -58,10 +57,10 @@ def get_base_domain(url_str):
# Check if domain has multiple parts separated by dots
parts = domain.split('.')
if len(parts) > 2: # Handle subdomains
return '.'.join(parts[:-1]) # Return everything except TLD
if len(parts) > 2:
return '.'.join(parts[:-1])
return parts[0] # Return base domain
return parts[0]
except Exception:
return None
@ -93,7 +92,7 @@ def validate_url(url, base_url, max_timeout, max_retries=2, sleep=1):
# Count dots to ensure we don't have extra subdomains
base_dots = base_url.count('.')
url_dots = url.count('.')
if url_dots > base_dots + 1: # Allow for one extra dot for TLD change
if url_dots > base_dots + 1:
console.print(f"[red]Too many subdomains in URL")
return False, None
@ -142,61 +141,19 @@ def validate_url(url, base_url, max_timeout, max_retries=2, sleep=1):
def search_domain(site_name: str, base_url: str, get_first: bool = False):
"""Search for valid domain matching site name and base URL."""
max_timeout = config_manager.get_int("REQUESTS", "timeout")
domain = str(config_manager.get_dict("SITE", site_name)['domain'])
# Test initial URL
try:
is_correct, redirect_tld = validate_url(base_url, base_url, max_timeout)
if is_correct:
tld = redirect_tld or get_tld(base_url)
config_manager.config['SITE'][site_name]['domain'] = tld
config_manager.write_config()
config_manager.configSite[site_name]['domain'] = tld
console.print(f"[green]Successfully validated initial URL")
return tld, base_url
except Exception as e:
console.print(f"[red]Error testing initial URL: {str(e)}")
# Google search phase
base_domain = get_base_domain(base_url)
console.print(f"\n[cyan]Searching for alternate domains for[white]: [yellow]{base_domain}")
try:
search_results = list(search(base_domain, num_results=20, lang="it"))
base_urls = set()
for url in search_results:
element_url = get_base_url(url)
if element_url:
base_urls.add(element_url)
else:
return None, None
# Filter URLs based on domain matching and subdomain count
filtered_results = [
url for url in base_urls
if get_base_domain(url) == base_domain
and url.count('.') <= base_url.count('.') + 1
]
for idx, result_url in enumerate(filtered_results, 1):
console.print(f"\n[cyan]Checking result {idx}/{len(filtered_results)}[white]: [yellow]{result_url}")
is_valid, new_tld = validate_url(result_url, base_url, max_timeout)
if is_valid:
final_tld = new_tld or get_tld(result_url)
if get_first or msg.ask(
f"\n[cyan]Update site[white] [red]'{site_name}'[cyan] with domain[white] [red]'{final_tld}'",
choices=["y", "n"],
default="y"
).lower() == "y":
config_manager.config['SITE'][site_name]['domain'] = final_tld
config_manager.write_config()
return final_tld, f"{base_url}.{final_tld}"
except Exception as e:
console.print(f"[red]Error during search: {str(e)}")
console.print("[bold red]No valid URLs found matching the base URL.")
return domain, f"{base_url}.{domain}"
console.print(f"[red]Error testing initial URL: {str(e)}")

View File

@ -33,7 +33,11 @@ class SiteConstant:
@property
def DOMAIN_NOW(self):
return config_manager.get_dict('SITE', self.SITE_NAME)['domain']
return config_manager.get_site(self.SITE_NAME, 'domain')
@property
def FULL_URL(self):
return config_manager.get_site(self.SITE_NAME, 'full_url').rstrip('/')
@property
def SERIES_FOLDER(self):
@ -59,7 +63,7 @@ class SiteConstant:
@property
def COOKIE(self):
try:
return config_manager.get_dict('SITE', self.SITE_NAME)['extra']
return config_manager.get_dict('SITE_EXTRA', self.SITE_NAME)
except KeyError:
return None

View File

@ -4,7 +4,7 @@ import os
import sys
import json
import logging
from pathlib import Path
import requests
from typing import Any, List
@ -21,31 +21,32 @@ class ConfigManager:
"""Initialize the ConfigManager.
Parameters:
- file_path (str, optional): The path to the configuration file. Default is 'config.json'.
- file_name (str, optional): The name of the configuration file. Default is 'config.json'.
"""
if getattr(sys, 'frozen', False):
base_path = os.path.join(".")
else:
base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
self.file_path = os.path.join(base_path, file_name)
self.file_path = os.path.join(base_path, file_name)
self.config = {}
self.configSite = {}
self.cache = {}
console.print(f"[green]Configuration file path: {self.file_path}[/green]")
console.print(f"[bold cyan]📂 Configuration file path:[/bold cyan] [green]{self.file_path}[/green]")
def read_config(self) -> None:
"""Read the configuration file."""
try:
logging.info(f"Reading file: {self.file_path}")
logging.info(f"📖 Reading file: {self.file_path}")
# Check if file exists
if os.path.exists(self.file_path):
with open(self.file_path, 'r') as f:
self.config = json.load(f)
logging.info("Configuration file loaded successfully.")
console.print("[bold green]✅ Configuration file loaded successfully.[/bold green]")
# Download config.json if it doesn't exist locally
else:
logging.info("Configuration file does not exist. Downloading...")
console.print("[bold yellow]⚠️ Configuration file not found. Downloading...[/bold yellow]")
self.download_requirements(
'https://raw.githubusercontent.com/Arrowar/StreamingCommunity/refs/heads/main/config.json',
self.file_path
@ -54,60 +55,81 @@ class ConfigManager:
# Load the downloaded config.json into the config attribute
with open(self.file_path, 'r') as f:
self.config = json.load(f)
logging.info("Configuration file downloaded and saved.")
console.print("[bold green]✅ Configuration file downloaded and saved.[/bold green]")
logging.info("Configuration file processed successfully.")
# Update site configuration separately
self.update_site_config()
console.print("[bold cyan]🔧 Configuration file processing complete.[/bold cyan]")
except Exception as e:
logging.error(f"Error reading configuration file: {e}")
logging.error(f"Error reading configuration file: {e}")
def download_requirements(self, url: str, filename: str):
"""
Download the requirements.txt file from the specified URL if not found locally using requests.
Download a file from the specified URL if not found locally using requests.
Args:
url (str): The URL to download the requirements file from.
filename (str): The local filename to save the requirements file as.
url (str): The URL to download the file from.
filename (str): The local filename to save the file as.
"""
try:
import requests
logging.info(f"{filename} not found locally. Downloading from {url}...")
logging.info(f"🌍 Downloading {filename} from {url}...")
response = requests.get(url)
if response.status_code == 200:
with open(filename, 'wb') as f:
f.write(response.content)
console.print(f"[bold green]✅ Successfully downloaded {filename}.[/bold green]")
else:
logging.error(f"Failed to download {filename}. HTTP Status code: {response.status_code}")
logging.error(f"Failed to download {filename}. HTTP Status code: {response.status_code}")
sys.exit(0)
except Exception as e:
logging.error(f"Failed to download {filename}: {e}")
logging.error(f"Failed to download {filename}: {e}")
sys.exit(0)
def read_key(self, section: str, key: str, data_type: type = str) -> Any:
"""Read a key from the configuration file.
def update_site_config(self) -> None:
"""Fetch and update the site configuration with data from the API."""
api_url = "https://api.npoint.io/e67633acc3816cc70132"
try:
console.print("[bold cyan]🌍 Fetching SITE data from API...[/bold cyan]")
response = requests.get(api_url)
if response.status_code == 200:
self.configSite = response.json() # Store API data in separate configSite
console.print("[bold green]✅ SITE data successfully fetched.[/bold green]")
else:
console.print(f"[bold red]❌ Failed to fetch SITE data. HTTP Status code: {response.status_code}[/bold red]")
except Exception as e:
console.print(f"[bold red]❌ Error fetching SITE data: {e}[/bold red]")
def read_key(self, section: str, key: str, data_type: type = str, from_site: bool = False) -> Any:
"""Read a key from the configuration.
Parameters:
- section (str): The section in the configuration file.
- section (str): The section in the configuration.
- key (str): The key to be read.
- data_type (type, optional): The expected data type of the key's value. Default is str.
- from_site (bool, optional): Whether to read from site config. Default is False.
Returns:
The value of the key converted to the specified data type.
"""
cache_key = f"{section}.{key}"
cache_key = f"{'site' if from_site else 'config'}.{section}.{key}"
logging.info(f"Read key: {cache_key}")
if cache_key in self.cache:
return self.cache[cache_key]
if section in self.config and key in self.config[section]:
value = self.config[section][key]
config_source = self.configSite if from_site else self.config
if section in config_source and key in config_source[section]:
value = config_source[section][key]
else:
raise ValueError(f"Key '{key}' not found in section '{section}'")
raise ValueError(f"Key '{key}' not found in section '{section}' of {'site' if from_site else 'main'} config")
value = self._convert_to_data_type(value, data_type)
self.cache[cache_key] = value
@ -135,100 +157,80 @@ class ConfigManager:
else:
return value
# Main config getters
def get(self, section: str, key: str) -> Any:
"""Read a value from the configuration file.
Parameters:
- section (str): The section in the configuration file.
- key (str): The key to be read.
Returns:
The value associated with the key.
"""
"""Read a value from the main configuration."""
return self.read_key(section, key)
def get_int(self, section: str, key: str) -> int:
"""Read an integer value from the configuration file.
Parameters:
- section (str): The section in the configuration file.
- key (str): The key to be read.
Returns:
int: The integer value.
"""
"""Read an integer value from the main configuration."""
return self.read_key(section, key, int)
def get_float(self, section: str, key: str) -> int:
"""Read an float value from the configuration file.
Parameters:
- section (str): The section in the configuration file.
- key (str): The key to be read.
Returns:
float: The float value.
"""
def get_float(self, section: str, key: str) -> float:
"""Read a float value from the main configuration."""
return self.read_key(section, key, float)
def get_bool(self, section: str, key: str) -> bool:
"""Read a boolean value from the configuration file.
Parameters:
- section (str): The section in the configuration file.
- key (str): The key to be read.
Returns:
bool: The boolean value.
"""
"""Read a boolean value from the main configuration."""
return self.read_key(section, key, bool)
def get_list(self, section: str, key: str) -> List[str]:
"""Read a list value from the configuration file.
Parameters:
- section (str): The section in the configuration file.
- key (str): The key to be read.
Returns:
list: The list value.
"""
"""Read a list value from the main configuration."""
return self.read_key(section, key, list)
def get_dict(self, section: str, key: str) -> dict:
"""Read a dictionary value from the configuration file.
Parameters:
- section (str): The section in the configuration file.
- key (str): The key to be read.
Returns:
dict: The dictionary value.
"""
"""Read a dictionary value from the main configuration."""
return self.read_key(section, key, dict)
def set_key(self, section: str, key: str, value: Any) -> None:
"""Set a key in the configuration file.
# Site config getters
def get_site(self, section: str, key: str) -> Any:
"""Read a value from the site configuration."""
return self.read_key(section, key, from_site=True)
def get_site_int(self, section: str, key: str) -> int:
"""Read an integer value from the site configuration."""
return self.read_key(section, key, int, from_site=True)
def get_site_float(self, section: str, key: str) -> float:
"""Read a float value from the site configuration."""
return self.read_key(section, key, float, from_site=True)
def get_site_bool(self, section: str, key: str) -> bool:
"""Read a boolean value from the site configuration."""
return self.read_key(section, key, bool, from_site=True)
def get_site_list(self, section: str, key: str) -> List[str]:
"""Read a list value from the site configuration."""
return self.read_key(section, key, list, from_site=True)
def get_site_dict(self, section: str, key: str) -> dict:
"""Read a dictionary value from the site configuration."""
return self.read_key(section, key, dict, from_site=True)
def set_key(self, section: str, key: str, value: Any, to_site: bool = False) -> None:
"""Set a key in the configuration.
Parameters:
- section (str): The section in the configuration file.
- section (str): The section in the configuration.
- key (str): The key to be set.
- value (Any): The value to be associated with the key.
- to_site (bool, optional): Whether to set in site config. Default is False.
"""
try:
if section not in self.config:
self.config[section] = {}
config_target = self.configSite if to_site else self.config
if section not in config_target:
config_target[section] = {}
self.config[section][key] = value
cache_key = f"{section}.{key}"
config_target[section][key] = value
cache_key = f"{'site' if to_site else 'config'}.{section}.{key}"
self.cache[cache_key] = value
self.write_config()
except Exception as e:
print(f"Error setting key '{key}' in section '{section}': {e}")
print(f"Error setting key '{key}' in section '{section}' of {'site' if to_site else 'main'} config: {e}")
def write_config(self) -> None:
"""Write the configuration to the file."""
"""Write the main configuration to the file."""
try:
with open(self.file_path, 'w') as f:
json.dump(self.config, f, indent=4)

View File

@ -27,6 +27,7 @@ from StreamingCommunity.TelegramHelp.telegram_bot import get_bot_instance, Teleg
# Config
SHOW_TRENDING = config_manager.get_bool('DEFAULT', 'show_trending')
CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close')
TELEGRAM_BOT = config_manager.get_bool('DEFAULT', 'telegram_bot')
@ -134,10 +135,10 @@ def initialize():
sys.exit(0)
# Trending tmbd
print()
tmdb.display_trending_films()
tmdb.display_trending_tv_shows()
if SHOW_TRENDING:
print()
tmdb.display_trending_films()
tmdb.display_trending_tv_shows()
# Attempting GitHub update
try:

View File

@ -1,75 +0,0 @@
# 12.11.24
# Fix import
import os
import sys
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(src_path)
# Other
import time
import json
from rich.console import Console
# Util
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Api.Template.Util import search_domain
# Variable
console = Console()
README_PATH = "README.md"
def get_config():
with open("config.json", "r", encoding="utf-8") as file:
return json.load(file)
def update_readme(site_names, domain_to_use):
if not os.path.exists(README_PATH):
console.print(f"[red]README file not found at {README_PATH}")
return
with open(README_PATH, "r", encoding="utf-8") as file:
lines = file.readlines()
updated_lines = []
for line in lines:
if line.startswith("| [") and "|" in line:
site_name = line.split("[")[1].split("]")[0]
alias = f"{site_name.lower()}"
if alias in site_names:
command = f"-{site_name[:3].upper()}"
if site_name == "animeunity":
updated_line = f"| [{site_name}](https://www.{alias}.{domain_to_use}/) | ✅ | {command} |\n"
else:
updated_line = f"| [{site_name}](https://{alias}.{domain_to_use}/) | ✅ | {command} |\n"
updated_lines.append(updated_line)
continue
updated_lines.append(line)
with open(README_PATH, "w", encoding="utf-8") as file:
file.writelines(updated_lines)
if __name__ == "__main__":
for site_name, data in get_config()['SITE'].items():
original_domain = config_manager.get_dict("SITE", site_name)['domain']
if site_name != "ilcorsaronero":
if site_name == "animeunity":
domain_to_use, _ = search_domain(site_name, f"https://www.{site_name}.{original_domain}", True)
else:
domain_to_use, _ = search_domain(site_name, f"https://{site_name}.{original_domain}", True)
update_readme(site_name, domain_to_use)
print("\n------------------------------------")
time.sleep(1)

View File

@ -5,14 +5,15 @@
"log_to_file": true,
"show_message": true,
"clean_console": true,
"show_trending": true,
"root_path": "Video",
"movie_folder_name": "Movie",
"serie_folder_name": "Serie",
"anime_folder_name": "Anime",
"map_episode_name": "E%(episode)_%(episode_name)",
"config_qbit_tor": {
"host": "192.168.1.99",
"port": "7060",
"host": "192.168.1.51",
"port": "6666",
"user": "admin",
"pass": "adminadmin"
},
@ -57,35 +58,11 @@
"force_resolution": "Best",
"get_only_link": false
},
"SITE": {
"streamingcommunity": {
"domain": "lu"
},
"guardaserie": {
"domain": "now"
},
"mostraguarda": {
"domain": "stream"
},
"SITE_EXTRA": {
"ddlstreamitaly": {
"domain": "co",
"extra": {
"ips4_device_key": "",
"ips4_member_id": "",
"ips4_login_key": ""
}
},
"animeunity": {
"domain": "so"
},
"cb01new": {
"domain": "gold"
},
"1337xx": {
"domain": "to"
},
"ilcorsaronero": {
"domain": "link"
"ips4_device_key": "",
"ips4_member_id": "",
"ips4_login_key": ""
}
}
}

View File

@ -8,7 +8,6 @@ unidecode
jsbeautifier
pathvalidate
pycryptodomex
googlesearch-python
ua-generator
qbittorrent-api
python-qbittorrent