Ea porsea gha fato i porsei

This commit is contained in:
Lovi 2024-12-16 22:18:04 +01:00
parent 284d13bc22
commit 837bcf3110
43 changed files with 543 additions and 510 deletions

View File

@ -1,4 +1,4 @@
# StreamingCommunity Downloader 🎬
# StreamingCommunity Downloader
![Project Logo](https://i.ibb.co/f4h5Y2m/min-logo.png)
@ -26,6 +26,9 @@ Chat, contribute, and have fun in our **Git_StreamingCommunity** Discord [Server
- [Docker](#docker)
- [Tutorial](#tutorials)
- [To Do](#to-do)
- [Support](#support)
- [Contribute](#contributing)
- [Disclamer](#disclaimer)
@ -372,12 +375,11 @@ The `run-container` command mounts also the `config.json` file, so any change to
| 1337xx | ✅ |
| Altadefinizione | ✅ |
| AnimeUnity | ✅ |
| BitSearch | ✅ |
| Ilcorsaronero | ✅ |
| CB01New | ✅ |
| DDLStreamItaly | ✅ |
| GuardaSerie | ✅ |
| MostraGuarda | ✅ |
| PirateBays | ✅ |
| StreamingCommunity | ✅ |
# Tutorials
@ -391,7 +393,7 @@ The `run-container` command mounts also the `config.json` file, so any change to
- Create website API -> https://github.com/Lovi-0/StreamingCommunity/tree/test_gui_1
# SUPPORT
# Support
If you'd like to support this project, consider making a donation!

View File

@ -1,6 +1,6 @@
# 02.07.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -30,7 +30,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -48,4 +48,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -34,6 +34,8 @@ def title_search(word_to_search: str) -> int:
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")

View File

@ -1,6 +1,6 @@
# 26.05.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -30,7 +30,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -48,4 +48,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -25,13 +25,16 @@ from StreamingCommunity.Api.Player.supervideo import VideoSource
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
def download_film(select_title: MediaItem):
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided film ID, title name, and domain.
Parameters:
- title_name (str): The name of the film title.
- url (str): The url of the video
Return:
- str: output path
"""
# Start message and display film information
@ -56,14 +59,16 @@ def download_film(select_title: MediaItem):
output_filename=os.path.join(mp4_path, title_name)
).start()
if r_proc == 404:
"""if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
execute_search(frames[-4])"""
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, title_name)

View File

@ -34,13 +34,16 @@ def title_search(title_search: str) -> int:
Returns:
int: The number of titles found.
"""
client = httpx.Client()
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")
domain_to_use, _ = search_domain(SITE_NAME, f"https://{SITE_NAME}")
# Send request to search for title
client = httpx.Client()
try:
response = client.get(
url=f"https://{SITE_NAME}.{domain_to_use}/?story={title_search.replace(' ', '+')}&do=search&subaction=search&titleonly=3",
@ -83,4 +86,4 @@ def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)
return get_select_title(table_show_manager, media_search_manager)

View File

@ -1,6 +1,6 @@
# 21.05.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -27,7 +27,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -48,4 +48,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -27,12 +27,15 @@ from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER, MOVIE_FOLDER
def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_source: VideoSourceAnime):
def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_source: VideoSourceAnime) -> str:
"""
Downloads the selected episode.
Parameters:
- index_select (int): Index of the episode to download.
Return:
- str: output path
"""
# Get information about the selected episode
@ -63,14 +66,16 @@ def download_episode(index_select: int, scrape_serie: ScrapeSerieAnime, video_so
# Start downloading
r_proc = MP4_downloader(
url = str(video_source.src_mp4).strip(),
path = os.path.join(mp4_path, title_name)
url=str(video_source.src_mp4).strip(),
path=os.path.join(mp4_path, title_name)
)
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, title_name)
else:
logging.error(f"Skip index: {index_select} cant find info with api.")

View File

@ -99,6 +99,8 @@ def title_search(title: str) -> int:
Returns:
- int: A number containing the length of media search manager.
"""
media_search_manager.clear()
table_show_manager.clear()
# Get token and session value from configuration
max_timeout = config_manager.get_int("REQUESTS", "timeout")

View File

@ -1,52 +0,0 @@
# 01.07.24
from unidecode import unidecode
# Internal utilities
from StreamingCommunity.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title, media_search_manager
from .title import download_title
# Variable
indice = 7
_useFor = "film_serie"
_deprecate = False
_priority = 2
_engineDownload = "tor"
from .costant import SITE_NAME
def search(string_to_search: str = None, get_onylDatabase:bool = False):
"""
Main function of the application for film and series.
"""
if string_to_search is None:
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list
select_title = run_get_select_title()
# Download title
download_title(select_title)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()

View File

@ -1,15 +0,0 @@
# 01.07.24
import os
# Internal utilities
from StreamingCommunity.Util._jsonConfig import config_manager
SITE_NAME = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
DOMAIN_NOW = config_manager.get_dict('SITE', SITE_NAME)['domain']
SERIES_FOLDER = config_manager.get('DEFAULT', 'serie_folder_name')
MOVIE_FOLDER = config_manager.get('DEFAULT', 'movie_folder_name')

View File

@ -1,84 +0,0 @@
# 01.07.24
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
from .costant import SITE_NAME
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(word_to_search: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
Returns:
- int: The number of titles found.
"""
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")
domain_to_use, _ = search_domain(SITE_NAME, f"https://{SITE_NAME}")
# Construct the full site URL and load the search page
try:
response = httpx.get(
url=f"https://{SITE_NAME}.{domain_to_use}/search?q={word_to_search}&category=1&subcat=2&page=1",
headers={'user-agent': get_headers()},
timeout=max_timeout
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
for title_div in soup.find_all("li", class_ = "card"):
try:
div_stats = title_div.find("div", class_ = "stats")
title_info = {
'name': title_div.find("a").get_text(strip=True),
'url': title_div.find_all("a")[-1].get("href"),
#'nDownload': div_stats.find_all("div")[0].get_text(strip=True),
'size': div_stats.find_all("div")[1].get_text(strip=True),
'seader': div_stats.find_all("div")[2].get_text(strip=True),
'leacher': div_stats.find_all("div")[3].get_text(strip=True),
'date': div_stats.find_all("div")[4].get_text(strip=True)
}
media_search_manager.add_media(title_info)
except:
pass
# Return the number of titles found
return media_search_manager.get_length()
def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)

View File

@ -1,6 +1,6 @@
# 09.06.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -30,7 +30,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -49,4 +49,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -1,11 +1,10 @@
# 03.07.24
import os
import time
# Internal utilities
from StreamingCommunity.Util.console import console, msg
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.call_stack import get_call_stack
@ -25,12 +24,15 @@ from StreamingCommunity.Api.Player.maxstream import VideoSource
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
def download_film(select_title: MediaItem):
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided obj.
Parameters:
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
Return:
- str: output path
"""
# Start message and display film information
@ -56,14 +58,16 @@ def download_film(select_title: MediaItem):
output_filename=os.path.join(mp4_path, title_name)
).start()
if r_proc == 404:
"""if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
execute_search(frames[-4])"""
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, title_name)

View File

@ -33,6 +33,8 @@ def title_search(word_to_search: str) -> int:
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")
@ -71,4 +73,4 @@ def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)
return get_select_title(table_show_manager, media_search_manager)

View File

@ -1,7 +1,7 @@
# 09.06.24
import logging
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -33,7 +33,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -55,4 +55,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -29,13 +29,16 @@ table_show_manager = TVShowManager()
def download_video(index_episode_selected: int, scape_info_serie: GetSerieInfo, video_source: VideoSource) -> None:
def download_video(index_episode_selected: int, scape_info_serie: GetSerieInfo, video_source: VideoSource) -> str:
"""
Download a single episode video.
Parameters:
- tv_name (str): Name of the TV series.
- index_episode_selected (int): Index of the selected episode.
Return:
- str: output path
"""
start_message()
@ -65,15 +68,17 @@ def download_video(index_episode_selected: int, scape_info_serie: GetSerieInfo,
# Start download
r_proc = MP4_downloader(
url = master_playlist,
path = os.path.join(mp4_path, title_name),
referer = f"{parsed_url.scheme}://{parsed_url.netloc}/",
url=master_playlist,
path=os.path.join(mp4_path, title_name),
referer=f"{parsed_url.scheme}://{parsed_url.netloc}/",
)
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, title_name)
def download_thread(dict_serie: MediaItem):
"""

View File

@ -37,6 +37,8 @@ def title_search(word_to_search: str) -> int:
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")

View File

@ -1,6 +1,6 @@
# 09.06.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -32,7 +32,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = title_search(quote_plus(string_to_search))
# Return list of elements
if get_onylDatabase:
@ -50,4 +50,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -29,7 +29,7 @@ table_show_manager = TVShowManager()
def download_video(index_season_selected: int, index_episode_selected: int, scape_info_serie: GetSerieInfo) -> None:
def download_video(index_season_selected: int, index_episode_selected: int, scape_info_serie: GetSerieInfo) -> str:
"""
Download a single episode video.
@ -37,6 +37,9 @@ def download_video(index_season_selected: int, index_episode_selected: int, scap
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- index_episode_selected (int): Index of the selected episode.
Return:
- str: output path
"""
start_message()
@ -62,18 +65,19 @@ def download_video(index_season_selected: int, index_episode_selected: int, scap
output_filename=os.path.join(mp4_path, mp4_name)
).start()
if r_proc == 404:
"""if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
execute_search(frames[-4])"""
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, mp4_name)
def download_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, download_all: bool = False) -> None:

View File

@ -34,6 +34,8 @@ def title_search(word_to_search: str) -> int:
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")
@ -81,4 +83,4 @@ def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)
return get_select_title(table_show_manager, media_search_manager)

View File

@ -1,6 +1,7 @@
# 02.07.24
from unidecode import unidecode
import asyncio
from urllib.parse import quote_plus
# Internal utilities
@ -13,7 +14,7 @@ from .title import download_title
# Variable
indice = 8
indice = 9
_useFor = "film_serie"
_deprecate = False
_priority = 2
@ -30,7 +31,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
string_to_search = msg.ask(f"\n[purple]Insert word to search in [red]{SITE_NAME}").strip()
# Search on database
len_database = title_search(unidecode(string_to_search))
len_database = asyncio.run(title_search(quote_plus(string_to_search)))
# Return list of elements
if get_onylDatabase:
@ -48,4 +49,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -0,0 +1,63 @@
# 02.07.24
# Internal utilities
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Api.Template.Util import search_domain
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
from .util.ilCorsarScraper import IlCorsaroNeroScraper
# Variable
from .costant import SITE_NAME
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
async def title_search(word_to_search: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
Returns:
- int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
# Find new domain if prev dont work
domain_to_use, _ = search_domain(SITE_NAME, f"https://{SITE_NAME}")
# Create scraper and collect result
print("\n")
scraper = IlCorsaroNeroScraper(f"https://{SITE_NAME}.{domain_to_use}/", 1)
results = await scraper.search(word_to_search)
# Add all result to media manager
for i, torrent in enumerate(results):
media_search_manager.add_media({
'name': torrent['name'],
'type': torrent['type'],
'seed': torrent['seed'],
'leech': torrent['leech'],
'size': torrent['size'],
'date': torrent['date'],
'url': torrent['url']
})
# Return the number of titles found
return media_search_manager.get_length()
def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)

View File

@ -1,12 +1,12 @@
# 01.07.24
# 02.07.24
import os
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Lib.Downloader import TOR_downloader
@ -27,18 +27,17 @@ def download_title(select_title: MediaItem):
"""
start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n")
print()
print()
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name.replace("-", "_") + ".mp4")
title_name = os_manager.get_sanitize_file(select_title.name)
mp4_path = os_manager.get_sanitize_path(
os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", ""))
)
# Create output folder
os_manager.create_path(mp4_path)
os_manager.create_path(mp4_path)
# Tor manager
manager = TOR_downloader()

View File

@ -0,0 +1,139 @@
# 12.14.24
import logging
import asyncio
from typing import List, Dict, Optional
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.console import console
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class IlCorsaroNeroScraper:
def __init__(self, base_url: str, max_page: int = 1):
self.base_url = base_url
self.max_page = max_page
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
async def fetch_url(self, url: str) -> Optional[str]:
"""
Fetch the HTML content of a given URL.
"""
try:
console.print(f"[cyan]Fetching url[white]: [red]{url}")
async with httpx.AsyncClient(headers=self.headers, follow_redirects=True, timeout=max_timeout) as client:
response = await client.get(url)
# If the request was successful, return the HTML content
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Error fetching {url}: {e}")
return None
def parse_torrents(self, html: str) -> List[Dict[str, str]]:
"""
Parse the HTML content and extract torrent details.
"""
torrents = []
soup = BeautifulSoup(html, "html.parser")
table = soup.find("tbody")
for row in table.find_all("tr"):
try:
columns = row.find_all("td")
torrents.append({
'type': columns[0].get_text(strip=True),
'name': row.find("th").find("a").get_text(strip=True),
'seed': columns[1].get_text(strip=True),
'leech': columns[2].get_text(strip=True),
'size': columns[3].get_text(strip=True),
'date': columns[4].get_text(strip=True),
'url': "https://ilcorsaronero.link" + row.find("th").find("a").get("href")
})
except Exception as e:
logging.error(f"Error parsing row: {e}")
continue
return torrents
async def fetch_real_url(self, url: str) -> Optional[str]:
"""
Fetch the real torrent URL from the detailed page.
"""
response_html = await self.fetch_url(url)
if not response_html:
return None
soup = BeautifulSoup(response_html, "html.parser")
links = soup.find_all("a")
# Find and return the magnet link
for link in links:
if "magnet" in str(link):
return link.get("href")
return None
async def search(self, query: str) -> List[Dict[str, str]]:
"""
Search for torrents based on the query string.
"""
all_torrents = []
# Loop through each page
for page in range(self.max_page):
url = f'{self.base_url}search?q={query}&page={page}'
html = await self.fetch_url(url)
if not html:
console.print(f"[bold red]No HTML content for page {page}[/bold red]")
break
torrents = self.parse_torrents(html)
if not torrents:
console.print(f"[bold red]No torrents found on page {page}[/bold red]")
break
# Use asyncio.gather to fetch all real URLs concurrently
tasks = [self.fetch_real_url(result['url']) for result in torrents]
real_urls = await asyncio.gather(*tasks)
# Attach real URLs to the torrent data
for i, result in enumerate(torrents):
result['url'] = real_urls[i]
all_torrents.extend(torrents)
return all_torrents
async def main():
scraper = IlCorsaroNeroScraper("https://ilcorsaronero.link/")
results = await scraper.search("cars")
if results:
for i, torrent in enumerate(results):
console.print(f"[bold green]{i} = {torrent}[/bold green] \n")
else:
console.print("[bold red]No torrents found.[/bold red]")
if __name__ == '__main__':
asyncio.run(main())

View File

@ -1,6 +1,6 @@
# 26.05.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -34,7 +34,7 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
return 0
# Search on database
movie_id = tmdb.search_movie(unidecode(string_to_search))
movie_id = tmdb.search_movie(quote_plus(string_to_search))
if movie_id is not None:
movie_details: Json_film = tmdb.get_movie_details(tmdb_id=movie_id)
@ -46,4 +46,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -36,12 +36,15 @@ from StreamingCommunity.Lib.TMBD import Json_film
from .costant import ROOT_PATH, SITE_NAME, DOMAIN_NOW, MOVIE_FOLDER
def download_film(movie_details: Json_film):
def download_film(movie_details: Json_film) -> str:
"""
Downloads a film using the provided tmbd id.
Parameters:
- movie_details (Json_film): Class with info about film title.
Return:
- str: output path
"""
# Start message and display film information
@ -81,14 +84,16 @@ def download_film(movie_details: Json_film):
output_filename=os.path.join(mp4_path, title_name)
).start()
if r_proc == 404:
"""if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
execute_search(frames[-4])"""
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, title_name)

View File

@ -1,89 +0,0 @@
# 02.07.24
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util._jsonConfig import config_manager
from StreamingCommunity.Util.headers import get_headers
from StreamingCommunity.Util.table import TVShowManager
# Logic class
from StreamingCommunity.Api.Template import get_select_title
from StreamingCommunity.Api.Template.Class.SearchType import MediaManager
# Variable
from .costant import SITE_NAME, DOMAIN_NOW
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(word_to_search: str) -> int:
"""
Search for titles based on a search query.
Parameters:
- title_search (str): The title to search for.
Returns:
- int: The number of titles found.
"""
# Find new domain if prev dont work
max_timeout = config_manager.get_int("REQUESTS", "timeout")
# Construct the full site URL and load the search page
try:
response = httpx.get(
url=f"https://1.{SITE_NAME}.{DOMAIN_NOW}/s/?q={word_to_search}&video=on",
headers={
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7',
'referer': 'https://wwv.thepiratebay3.co/',
'user-agent': get_headers()
},
follow_redirects=True,
timeout=max_timeout
)
response.raise_for_status()
except Exception as e:
console.print(f"Site: {SITE_NAME}, request search error: {e}")
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table = soup.find("tbody")
# Scrape div film in table on single page
for tr in table.find_all('tr'):
try:
title_info = {
'name': tr.find_all("a")[1].get_text(strip=True),
'url': tr.find_all("td")[3].find("a").get("href"),
'upload': tr.find_all("td")[2].get_text(strip=True),
'size': tr.find_all("td")[4].get_text(strip=True),
'seader': tr.find_all("td")[5].get_text(strip=True),
'leacher': tr.find_all("td")[6].get_text(strip=True),
'by': tr.find_all("td")[7].get_text(strip=True),
}
media_search_manager.add_media(title_info)
except:
continue
# Return the number of titles found
return media_search_manager.get_length()
def run_get_select_title():
"""
Display a selection of titles and prompt the user to choose one.
"""
return get_select_title(table_show_manager, media_search_manager)

View File

@ -1,45 +0,0 @@
# 02.07.24
import os
import sys
# Internal utilities
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util.message import start_message
from StreamingCommunity.Util.os import os_manager
from StreamingCommunity.Lib.Downloader import TOR_downloader
# Logic class
from StreamingCommunity.Api.Template.Class.SearchType import MediaItem
# Config
from .costant import ROOT_PATH, DOMAIN_NOW, SITE_NAME, MOVIE_FOLDER
def download_title(select_title: MediaItem):
"""
Downloads a media item and saves it as an MP4 file.
Parameters:
- select_title (MediaItem): The media item to be downloaded. This should be an instance of the MediaItem class, containing attributes like `name` and `url`.
"""
start_message()
console.print(f"[yellow]Download: [red]{select_title.name} \n")
print()
# Define output path
title_name = os_manager.get_sanitize_file(select_title.name.replace("-", "_") + ".mp4")
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, title_name.replace(".mp4", ""))
# Create output folder
os_manager.create_path(mp4_path)
# Tor manager
manager = TOR_downloader()
manager.add_magnet_link(select_title.url)
manager.start_download()
manager.move_downloaded_files(mp4_path)

View File

@ -1,6 +1,6 @@
# 21.05.24
from unidecode import unidecode
from urllib.parse import quote_plus
# Internal utilities
@ -32,12 +32,12 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
# Get site domain and version and get result of the search
site_version, domain = get_version_and_domain()
len_database = title_search(unidecode(string_to_search), domain)
len_database = title_search(quote_plus(string_to_search), domain)
# Return list of elements
if get_onylDatabase:
return media_search_manager
if len_database > 0:
# Select title from list
@ -53,4 +53,4 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")
# Retry
search()
search()

View File

@ -25,13 +25,16 @@ from StreamingCommunity.Api.Player.vixcloud import VideoSource
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
def download_film(select_title: MediaItem):
def download_film(select_title: MediaItem) -> str:
"""
Downloads a film using the provided film ID, title name, and domain.
Parameters:
- domain (str): The domain of the site
- version (str): Version of site.
Return:
- str: output path
"""
# Start message and display film information
@ -57,14 +60,16 @@ def download_film(select_title: MediaItem):
output_filename=os.path.join(mp4_path, title_name)
).start()
if r_proc == 404:
"""if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
execute_search(frames[-4])"""
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
return os.path.join(mp4_path, title_name)

View File

@ -29,16 +29,17 @@ table_show_manager = TVShowManager()
def download_video(tv_name: str, index_season_selected: int, index_episode_selected: int, scrape_serie: ScrapeSerie, video_source: VideoSource) -> None:
def download_video(index_season_selected: int, index_episode_selected: int, scrape_serie: ScrapeSerie, video_source: VideoSource) -> str:
"""
Download a single episode video.
Parameters:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- index_episode_selected (int): Index of the selected episode.
"""
Return:
- str: output path
"""
start_message()
# Get info about episode
@ -47,8 +48,8 @@ def download_video(tv_name: str, index_season_selected: int, index_episode_selec
print()
# Define filename and path for the downloaded video
mp4_name = f"{map_episode_title(tv_name, index_season_selected, index_episode_selected, obj_episode.name)}.mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, tv_name, f"S{index_season_selected}")
mp4_name = f"{map_episode_title(scrape_serie.series_name, index_season_selected, index_episode_selected, obj_episode.name)}.mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scrape_serie.series_name, f"S{index_season_selected}")
# Retrieve scws and if available master playlist
video_source.get_iframe(obj_episode.id)
@ -61,24 +62,25 @@ def download_video(tv_name: str, index_season_selected: int, index_episode_selec
output_filename=os.path.join(mp4_path, mp4_name)
).start()
if r_proc == 404:
"""if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
execute_search(frames[-4])"""
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
def download_episode(tv_name: str, index_season_selected: int, scrape_serie: ScrapeSerie, video_source: VideoSource, download_all: bool = False) -> None:
return os.path.join(mp4_path, mp4_name)
def download_episode(index_season_selected: int, scrape_serie: ScrapeSerie, video_source: VideoSource, download_all: bool = False) -> None:
"""
Download episodes of a selected season.
Parameters:
- tv_name (str): Name of the TV series.
- index_season_selected (int): Index of the selected season.
- download_all (bool): Download all episodes in the season.
"""
@ -95,7 +97,7 @@ def download_episode(tv_name: str, index_season_selected: int, scrape_serie: Scr
# Download all episodes without asking
for i_episode in range(1, episodes_count + 1):
download_video(tv_name, index_season_selected, i_episode, scrape_serie, video_source)
download_video(index_season_selected, i_episode, scrape_serie, video_source)
console.print(f"\n[red]End downloaded [yellow]season: [red]{index_season_selected}.")
else:
@ -112,7 +114,7 @@ def download_episode(tv_name: str, index_season_selected: int, scrape_serie: Scr
# Download selected episodes
for i_episode in list_episode_select:
download_video(tv_name, index_season_selected, i_episode, scrape_serie, video_source)
download_video(index_season_selected, i_episode, scrape_serie, video_source)
def download_series(select_season: MediaItem, version: str) -> None:
"""
@ -160,11 +162,11 @@ def download_series(select_season: MediaItem, version: str) -> None:
if len(list_season_select) > 1 or index_season_selected == "*":
# Download all episodes if multiple seasons are selected or if '*' is used
download_episode(select_season.slug, i_season, scrape_serie, video_source, download_all=True)
download_episode(i_season, scrape_serie, video_source, download_all=True)
else:
# Otherwise, let the user select specific episodes for the single season
download_episode(select_season.slug, i_season, scrape_serie, video_source, download_all=False)
download_episode(i_season, scrape_serie, video_source, download_all=False)
def display_episodes_list(scrape_serie) -> str:

View File

@ -101,6 +101,9 @@ def title_search(title_search: str, domain: str) -> int:
Returns:
int: The number of titles found.
"""
media_search_manager.clear()
table_show_manager.clear()
try:
response = httpx.get(
url=f"https://{SITE_NAME}.{domain}/api/search?q={title_search.replace(' ', '+')}",

View File

@ -826,11 +826,6 @@ class HLS_Downloader:
Args:
out_path (str): The path of the output file to be cleaned up.
"""
def dict_to_seconds(d):
"""Converts a dictionary of time components to total seconds."""
if d is not None:
return d['h'] * 3600 + d['m'] * 60 + d['s']
return 0
# Check if the final output file exists
logging.info(f"Check if end file converted exists: {out_path}")
@ -861,7 +856,8 @@ class HLS_Downloader:
panel_content = (
f"[bold green]Download completed![/bold green]\n"
f"[cyan]File size: [bold red]{formatted_size}[/bold red]\n"
f"[cyan]Duration: [bold]{formatted_duration}[/bold]"
f"[cyan]Duration: [bold]{formatted_duration}[/bold]\n"
f"[cyan]Output: [bold]{self.output_filename}[/bold]"
)
if missing_ts:

View File

@ -558,7 +558,7 @@ class M3U8_Segments:
console.print(
"[yellow]⚠ Warning:[/yellow] Too many retries detected! "
"Consider reducing the number of [cyan]workers[/cyan] in the [magenta]config.json[/magenta] file. "
"This will impact [bold]performance[/bold]."
"This will impact [bold]performance[/bold]. \n"
)
# Info to return

View File

@ -1,15 +1,18 @@
# 23.06.24
import os
import re
import sys
import time
import shutil
import psutil
import logging
# Internal utilities
from StreamingCommunity.Util.color import Colors
from StreamingCommunity.Util.os import internet_manager
from StreamingCommunity.Util.console import console
from StreamingCommunity.Util._jsonConfig import config_manager
@ -42,16 +45,18 @@ class TOR_downloader:
- username (str): Username for logging into qBittorrent.
- password (str): Password for logging into qBittorrent.
"""
try:
try:
console.print(f"[cyan]Connect to: [green]{HOST}:{PORT}")
self.qb = Client(f'http://{HOST}:{PORT}/')
except:
logging.error("Start qbitorrent first.")
sys.exit(0)
self.username = USERNAME
self.password = PASSWORD
self.logged_in = False
self.save_path = None
self.torrent_name = None
self.latest_torrent_hash = None
self.output_file = None
self.file_name = None
self.login()
@ -68,155 +73,226 @@ class TOR_downloader:
logging.error(f"Failed to log in: {str(e)}")
self.logged_in = False
def delete_magnet(self, torrent_info):
if (int(torrent_info.get('dl_speed')) == 0 and
int(torrent_info.get('peers')) == 0 and
int(torrent_info.get('seeds')) == 0):
# Elimina il torrent appena aggiunto
console.print(f"[bold red]⚠️ Torrent non scaricabile. Rimozione in corso...[/bold red]")
try:
# Rimuovi il torrent
self.qb.delete_permanently(torrent_info['hash'])
except Exception as delete_error:
logging.error(f"Errore durante la rimozione del torrent: {delete_error}")
# Resetta l'ultimo hash
self.latest_torrent_hash = None
def add_magnet_link(self, magnet_link):
"""
Adds a torrent via magnet link to qBittorrent.
Aggiunge un magnet link e recupera le informazioni dettagliate.
Parameters:
- magnet_link (str): Magnet link of the torrent to be added.
Args:
magnet_link (str): Magnet link da aggiungere
Returns:
dict: Informazioni del torrent aggiunto, o None se fallisce
"""
try:
self.qb.download_from_link(magnet_link)
logging.info("Added magnet link to qBittorrent.")
# Get the hash of the latest added torrent
torrents = self.qb.torrents()
if torrents:
self.latest_torrent_hash = torrents[-1]['hash']
logging.info(f"Latest torrent hash: {self.latest_torrent_hash}")
# Estrai l'hash dal magnet link
magnet_hash_match = re.search(r'urn:btih:([0-9a-fA-F]+)', magnet_link)
if not magnet_hash_match:
raise ValueError("Hash del magnet link non trovato")
magnet_hash = magnet_hash_match.group(1).lower()
# Estrai il nome del file dal magnet link (se presente)
name_match = re.search(r'dn=([^&]+)', magnet_link)
torrent_name = name_match.group(1).replace('+', ' ') if name_match else "Nome non disponibile"
# Salva il timestamp prima di aggiungere il torrent
before_add_time = time.time()
# Aggiungi il magnet link
console.print(f"[cyan]Aggiunta magnet link[/cyan]: [red]{magnet_link}")
self.qb.download_from_link(magnet_link)
# Aspetta un attimo per essere sicuri che il torrent sia stato aggiunto
time.sleep(1)
# Cerca il torrent
torrents = self.qb.torrents()
matching_torrents = [
t for t in torrents
if (t['hash'].lower() == magnet_hash) or (t.get('added_on', 0) > before_add_time)
]
if not matching_torrents:
raise ValueError("Nessun torrent corrispondente trovato")
# Prendi il primo torrent corrispondente
torrent_info = matching_torrents[0]
# Formatta e stampa le informazioni
console.print("\n[bold green]🔗 Dettagli Torrent Aggiunto:[/bold green]")
console.print(f"[yellow]Nome:[/yellow] {torrent_info.get('name', torrent_name)}")
console.print(f"[yellow]Hash:[/yellow] {torrent_info['hash']}")
console.print(f"[yellow]Dimensione:[/yellow] {torrent_info.get('size', 'Non disponibile'):,} bytes")
console.print(f"[yellow]Stato:[/yellow] {torrent_info.get('state', 'Sconosciuto')}")
print()
except Exception as e:
logging.error(f"Failed to add magnet link: {str(e)}")
# Salva l'hash per usi successivi e il path
self.latest_torrent_hash = torrent_info['hash']
self.output_file = torrent_info['content_path']
self.file_name = torrent_info['name']
# Controlla che sia possibile il download
time.sleep(5)
self.delete_magnet(self.qb.get_torrent(self.latest_torrent_hash))
return torrent_info
def start_download(self):
"""
Starts downloading the latest added torrent and monitors progress.
"""
try:
if self.latest_torrent_hash is not None:
try:
torrents = self.qb.torrents()
if not torrents:
logging.error("No torrents found.")
return
# Sleep to load magnet to qbit app
time.sleep(10)
latest_torrent = torrents[-1]
torrent_hash = latest_torrent['hash']
# Custom bar for mobile and pc
if TQDM_USE_LARGE_BAR:
bar_format = (
f"{Colors.YELLOW}[TOR] {Colors.WHITE}({Colors.CYAN}video{Colors.WHITE}): "
f"{Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ "
f"{Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
)
else:
bar_format = (
f"{Colors.YELLOW}Proc{Colors.WHITE}: "
f"{Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| "
f"{Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
)
# Custom bar for mobile and pc
if TQDM_USE_LARGE_BAR:
bar_format = (
f"{Colors.YELLOW}[TOR] {Colors.WHITE}({Colors.CYAN}video{Colors.WHITE}): "
f"{Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ "
f"{Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
)
else:
bar_format = (
f"{Colors.YELLOW}Proc{Colors.WHITE}: "
f"{Colors.RED}{{percentage:.2f}}% {Colors.WHITE}| "
f"{Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]"
progress_bar = tqdm(
total=100,
ascii='░▒█',
bar_format=bar_format,
unit_scale=True,
unit_divisor=1024,
mininterval=0.05
)
progress_bar = tqdm(
total=100,
ascii='░▒█',
bar_format=bar_format,
unit_scale=True,
unit_divisor=1024,
mininterval=0.05
)
with progress_bar as pbar:
while True:
with progress_bar as pbar:
while True:
# Get variable from qtorrent
torrent_info = self.qb.get_torrent(self.latest_torrent_hash)
self.save_path = torrent_info['save_path']
self.torrent_name = torrent_info['name']
# Get variable from qtorrent
torrent_info = self.qb.get_torrent(torrent_hash)
self.save_path = torrent_info['save_path']
self.torrent_name = torrent_info['name']
# Fetch important variable
pieces_have = torrent_info['pieces_have']
pieces_num = torrent_info['pieces_num']
progress = (pieces_have / pieces_num) * 100 if pieces_num else 0
pbar.n = progress
# Fetch important variable
pieces_have = torrent_info['pieces_have']
pieces_num = torrent_info['pieces_num']
progress = (pieces_have / pieces_num) * 100 if pieces_num else 0
pbar.n = progress
download_speed = torrent_info['dl_speed']
total_size = torrent_info['total_size']
downloaded_size = torrent_info['total_downloaded']
download_speed = torrent_info['dl_speed']
total_size = torrent_info['total_size']
downloaded_size = torrent_info['total_downloaded']
# Format variable
downloaded_size_str = internet_manager.format_file_size(downloaded_size)
downloaded_size = downloaded_size_str.split(' ')[0]
# Format variable
downloaded_size_str = internet_manager.format_file_size(downloaded_size)
downloaded_size = downloaded_size_str.split(' ')[0]
total_size_str = internet_manager.format_file_size(total_size)
total_size = total_size_str.split(' ')[0]
total_size_unit = total_size_str.split(' ')[1]
total_size_str = internet_manager.format_file_size(total_size)
total_size = total_size_str.split(' ')[0]
total_size_unit = total_size_str.split(' ')[1]
average_internet_str = internet_manager.format_transfer_speed(download_speed)
average_internet = average_internet_str.split(' ')[0]
average_internet_unit = average_internet_str.split(' ')[1]
average_internet_str = internet_manager.format_transfer_speed(download_speed)
average_internet = average_internet_str.split(' ')[0]
average_internet_unit = average_internet_str.split(' ')[1]
# Update the progress bar's postfix
if TQDM_USE_LARGE_BAR:
pbar.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size} {Colors.WHITE}< {Colors.GREEN}{total_size} {Colors.RED}{total_size_unit} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet} {Colors.RED}{average_internet_unit}"
)
else:
pbar.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size}{Colors.RED} {total_size} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet} {Colors.RED}{average_internet_unit}"
)
pbar.refresh()
time.sleep(0.2)
# Update the progress bar's postfix
if TQDM_USE_LARGE_BAR:
pbar.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size} {Colors.WHITE}< {Colors.GREEN}{total_size} {Colors.RED}{total_size_unit} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet} {Colors.RED}{average_internet_unit}"
)
else:
pbar.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size}{Colors.RED} {total_size} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet} {Colors.RED}{average_internet_unit}"
)
pbar.refresh()
time.sleep(0.2)
# Break at the end
if int(progress) == 100:
break
# Break at the end
if int(progress) == 100:
break
except KeyboardInterrupt:
logging.info("Download process interrupted.")
except KeyboardInterrupt:
logging.info("Download process interrupted.")
def is_file_in_use(self, file_path: str) -> bool:
"""Check if a file is in use by any process."""
for proc in psutil.process_iter(['open_files']):
try:
if any(file_path == f.path for f in proc.info['open_files'] or []):
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except Exception as e:
logging.error(f"Download error: {str(e)}")
sys.exit(0)
return False
def move_downloaded_files(self, destination=None):
def move_downloaded_files(self, destination: str):
"""
Moves downloaded files of the latest torrent to another location.
Parameters:
- save_path (str): Current save path (output directory) of the torrent.
- destination (str, optional): Destination directory to move files. If None, moves to current directory.
- destination (str): Destination directory to move files.
Returns:
- bool: True if files are moved successfully, False otherwise.
"""
video_extensions = {'.mp4', '.mkv', 'avi'}
time.sleep(2)
console.print(f"[cyan]Destination folder: [red]{destination}")
# List directories in the save path
dirs = [d for d in os.listdir(self.save_path) if os.path.isdir(os.path.join(self.save_path, d))]
for dir_name in dirs:
if self.torrent_name.split(" ")[0] in dir_name:
dir_path = os.path.join(self.save_path, dir_name)
try:
# Ensure destination is set; if not, use current directory
destination = destination or os.getcwd()
# Ensure the file is not in use
timeout = 3
elapsed = 0
while self.is_file_in_use(self.output_file) and elapsed < timeout:
time.sleep(1)
elapsed += 1
if elapsed == timeout:
raise Exception(f"File '{self.output_file}' is in use and could not be moved.")
# Move only video files
for file_name in os.listdir(dir_path):
file_path = os.path.join(dir_path, file_name)
# Check if it's a file and if it has a video extension
if os.path.isfile(file_path) and os.path.splitext(file_name)[1] in video_extensions:
shutil.move(file_path, os.path.join(destination, file_name))
logging.info(f"Moved file {file_name} to {destination}")
# Ensure destination directory exists
os.makedirs(destination, exist_ok=True)
time.sleep(2)
self.qb.delete_permanently(self.qb.torrents()[-1]['hash'])
return True
# Perform the move operation
try:
shutil.move(self.output_file, destination)
except OSError as e:
if e.errno == 17: # Cross-disk move error
# Perform copy and delete manually
shutil.copy2(self.output_file, destination)
os.remove(self.output_file)
else:
raise
# Delete the torrent data
#self.qb.delete_permanently(self.qb.torrents()[-1]['hash'])
return True
except Exception as e:
print(f"Error moving file: {e}")
return False

View File

@ -1,5 +1,5 @@
__title__ = 'StreamingCommunity'
__version__ = '1.9.2'
__version__ = '1.9.4'
__author__ = 'Lovi-0'
__description__ = 'A command-line program to download film'
__copyright__ = 'Copyright 2024'

View File

@ -162,11 +162,8 @@ class TVShowManager:
# Get the search function
search_func = getattr(module, 'media_search_manager')
# Ask for search string
string_to_search = Prompt.ask(f"\n[purple]Insert word to search in [red]{research_func['folder_base']}").strip()
# Call the search function with the search string
search_func(string_to_search)
search_func(None)
except Exception as e:
self.console.print(f"[red]Error during search: {e}")
@ -227,13 +224,10 @@ class TVShowManager:
module = importlib.import_module(module_path)
# Get the search function
search_func = getattr(module, 'media_search_manager')
# Ask for search string
string_to_search = Prompt.ask(f"\n[purple]Insert word to search in [red]{research_func['folder_base']}").strip()
search_func = getattr(module, 'search')
# Call the search function with the search string
search_func(string_to_search)
search_func(None)
except Exception as e:
self.console.print(f"[red]Error during search: {e}")

View File

@ -120,11 +120,11 @@ def initialize():
sys.exit(0)
# Attempting GitHub update
try:
"""try:
git_update()
print()
except:
console.log("[red]Error with loading github.")
console.log("[red]Error with loading github.")"""
# Show trending film and series
if SHOW_TRENDING:

View File

@ -19,7 +19,7 @@ start_message()
logger = Logger()
manager = TOR_downloader()
magnet_link = "magnet:?x"
magnet_link = "magnet:?xt="
manager.add_magnet_link(magnet_link)
manager.start_download()
manager.move_downloaded_files()

View File

@ -10,7 +10,7 @@
"serie_folder_name": "TV",
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
"config_qbit_tor": {
"host": "192.168.1.59",
"host": "192.168.1.58",
"port": "8080",
"user": "admin",
"pass": "adminadmin"
@ -85,16 +85,13 @@
"domain": "to"
},
"cb01new": {
"domain": "club"
},
"bitsearch": {
"domain": "to"
"domain": "icu"
},
"1337xx": {
"domain": "to"
},
"piratebays": {
"domain": "to"
"ilcorsaronero": {
"domain": "link"
}
}
}

View File

@ -11,7 +11,7 @@ with open("requirements.txt", "r", encoding="utf-8-sig") as f:
setup(
name="StreamingCommunity",
version="1.9.2",
version="1.9.4",
long_description=read_readme(),
long_description_content_type="text/markdown",
author="Lovi-0",