Commenti di alto spessore, fix streamingcommunity, ddlstreamitaly, guardaserie

This commit is contained in:
Lovi 2024-11-26 18:28:11 +01:00
parent b6efe5f4ea
commit 7880f56a82
30 changed files with 1129 additions and 1121 deletions

2
.gitignore vendored
View File

@ -58,5 +58,5 @@ venv.bak/
Video
note.txt
list_proxy.txt
config.json
cmd.txt
downloaded_files

View File

@ -1,5 +0,0 @@
build-container:
docker build -t streaming-community-api .
run-container:
docker run --rm -it -p 8000:8000 -v ${LOCAL_DIR}:/app/Video -v ./config.json:/app/config.json streaming-community-api

196
README.md
View File

@ -1,74 +1,89 @@
<p align="center">
<img src="https://i.ibb.co/f4h5Y2m/min-logo.png">
</p>
# StreamingCommunity Downloader 🎬
This repository provide a simple script designed to downloading films and series from a variety of supported streaming platforms. [SITE](#website-status-)
![Project Logo](https://i.ibb.co/f4h5Y2m/min-logo.png)
## Join us 🌟
A versatile script designed to download films and series from various supported streaming platforms.
You can chat, help improve this repo, or just hang around for some fun in the **Git_StreamingCommunity** Discord [Server](https://discord.com/invite/8vV68UGRc7)
## 🤝 Join our Community
# Table of Contents
Chat, contribute, and have fun in our **Git_StreamingCommunity** Discord [Server](https://discord.com/invite/8vV68UGRc7)
* [INSTALLATION](#installation)
* [Automatic Installation](#automatic-installation)
* [Usage](#usage-automatic)
* [Supported OSs for Automatic Installation](#automatic-installation-os)
* [Manual Installation](#manual-installation)
* [Requirement](#requirement)
* [Usage](#usage-manual)
* [Win 7](https://github.com/Ghost6446/StreamingCommunity_api/wiki/Installation#win-7)
* [Termux](https://github.com/Ghost6446/StreamingCommunity_api/wiki/Termux)
* [Update](#update)
* [CONFIGURATION](#configuration)
* [DOCKER](#docker)
* [TUTORIAL](#tutorial)
* [TO DO](#to-do)
## 📋 Table of Contents
# INSTALLATION
- [Installation](#installation)
- [PyPI Installation](#pypi-installation)
- [Automatic Installation](#automatic-installation)
- [Manual Installation](#manual-installation)
- [Win 7](https://github.com/Ghost6446/StreamingCommunity_api/wiki/Installation#win-7)
- [Termux](https://github.com/Ghost6446/StreamingCommunity_api/wiki/Termux)
- [Configuration](#configuration)
- [Tutorial](#tutorial)
- [To Do](#to-do)
## Automatic Installation
## 💻 Installation
### Supported OSs for Automatic Installation 💿
### 1. PyPI Installation
- Supported ✔️
- Not tested ⏳
- Not supported ❌
Install directly from PyPI:
```bash
pip install StreamingCommunity
```
#### Creating a Run Script
Create `run_streaming.py`:
```python
from StreamingCommunity.run import main
if __name__ == "__main__":
main()
```
Run the script:
```bash
python run_streaming.py
```
#### Updating via PyPI
```bash
pip install --upgrade StreamingCommunity
```
### 2. Automatic Installation
#### Supported Operating Systems 💿
| OS | Automatic Installation Support |
| :-------------- | :----------------------------: |
|:----------------|:------------------------------:|
| Windows 10/11 | ✔️ |
| Windows 7 | ❌ |
| Windows 7 | ❌ |
| Debian Linux | ✔️ |
| Arch Linux | ✔️ |
| CentOS Stream 9 | ✔️ |
| FreeBSD | ⏳ |
| FreeBSD | ⏳ |
| MacOS | ✔️ |
| Termux | ❌ |
| Termux | ❌ |
### Installation ⚙️
#### Installation Steps
Run the following command inside the main directory:
#### On Windows:
##### On Windows:
```powershell
.\win_install.bat
```
#### On Linux/MacOS/BSD:
##### On Linux/MacOS/BSD:
```bash
sudo chmod +x unix_install.sh && ./unix_install.sh
```
`<a id="usage-automatic"></a>`
#### Usage
### Usage 📚
Run the script with the following command:
#### On Windows:
##### On Windows:
```powershell
python .\run.py
@ -80,43 +95,35 @@ or
source .venv/bin/activate && python run.py && deactivate
```
#### On Linux/MacOS/BSD:
##### On Linux/MacOS/BSD:
```bash
./run.py
```
## Manual Installation
### 3. Manual Installation
`<a id="requirement"></a>`
#### Requirements 📋
### Requirement 📋
Prerequisites:
* [Python](https://www.python.org/downloads/) > 3.8
* [FFmpeg](https://www.gyan.dev/ffmpeg/builds/)
Make sure you have the following prerequisites installed on your system:
#### Install Python Dependencies
* [python](https://www.python.org/downloads/) > 3.8
* [ffmpeg](https://www.gyan.dev/ffmpeg/builds/)
* [openssl](https://www.openssl.org) or [pycryptodome](https://pypi.org/project/pycryptodome/)
### Installation ⚙️
Install the required Python libraries using the following command:
```
```bash
pip install -r requirements.txt
```
### Usage 📚
#### Usage
Run the script with the following command:
#### On Windows:
##### On Windows:
```powershell
python run.py
```
#### On Linux/MacOS:
##### On Linux/MacOS:
```bash
python3 run.py
@ -129,13 +136,13 @@ Keep your script up to date with the latest features by running:
#### On Windows:
```powershell
python update_version.py
python update.py
```
#### On Linux/MacOS:
```bash
python3 update_version.py
python3 update.py
```
## Configuration ⚙️
@ -150,7 +157,6 @@ The configuration file is divided into several main sections:
{
"root_path": "Video",
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
"special_chars_to_remove": "!@#$%^&*()[]{}<>|`~'\";:,?=+\u00e2\u20ac\u00a6",
"not_close": false,
"show_trending": false
}
@ -176,7 +182,7 @@ The configuration file is divided into several main sections:
* `%(episode)` : Is the number of the episode
* `%(episode_name)` : Is the name of the episode
`<br/><br/>`
- `special_chars_to_remove`: Special characters to be removed from filenames
- `not_close`: If true, continues running after downloading
- `show_trending`: Display trending content on startup
@ -282,46 +288,11 @@ forced-ita hin - Hindi pol - Polish tur - Turkish
- `force_resolution`: Force specific resolution (-1 for best available, or specify 1080, 720, 360)
- `get_only_link`: Return M3U8 playlist/index URL instead of downloading
## Docker 🐳
You can run the script in a docker container, to build the image just run
```
docker build -t streaming-community-api .
```
and to run it use
```
docker run -it -p 8000:8000 streaming-community-api
```
By default the videos will be saved in `/app/Video` inside the container, if you want to to save them in your machine instead of the container just run
```
docker run -it -p 8000:8000 -v /path/to/download:/app/Video streaming-community-api
```
### Docker quick setup with Make
Inside the Makefile (install `make`) are already configured two commands to build and run the container:
```
make build-container
# set your download directory as ENV variable
make LOCAL_DIR=/path/to/download run-container
```
The `run-container` command mounts also the `config.json` file, so any change to the configuration file is reflected immediately without having to rebuild the image.
### Website Status 🌐
- Working ✅
- Not Working ❌
## 🌐 Website Status
| Website | Status |
| :----------------- | :----: |
|:-------------------|:------:|
| 1337xx | ✅ |
| Altadefinizione | ✅ |
| AnimeUnity | ✅ |
@ -333,13 +304,20 @@ The `run-container` command mounts also the `config.json` file, so any change to
| PirateBays | ✅ |
| StreamingCommunity | ✅ |
## Tutorial 📖
## 📖 Tutorials
[win](https://www.youtube.com/watch?v=mZGqK4wdN-k)
[linux](https://www.youtube.com/watch?v=0qUNXPE_mTg)
- [Windows Tutorial](https://www.youtube.com/watch?v=mZGqK4wdN-k)
- [Linux Tutorial](https://www.youtube.com/watch?v=0qUNXPE_mTg)
## To do 📝
## 📝 To Do
- GUI
- Website api
- Add other site
- Create website API
## 🤝 Contributing
Contributions are welcome! Steps:
1. Fork the repository
2. Create feature branch (`git checkout -b feature/AmazingFeature`)
3. Commit changes (`git commit -m 'Add some AmazingFeature'`)
4. Push to branch (`git push origin feature/AmazingFeature`)
5. Open Pull Request

View File

@ -0,0 +1,140 @@
# 26.11.24
# !!! DIO CANErino
import re
class JavaScriptParser:
@staticmethod
def fix_string(ss):
if ss is None:
return None
ss = str(ss)
ss = ss.encode('utf-8').decode('unicode-escape')
ss = ss.strip("\"'")
ss = ss.strip()
return ss
@staticmethod
def fix_url(url):
if url is None:
return None
url = url.replace('\\/', '/')
return url
@staticmethod
def parse_value(value):
value = JavaScriptParser.fix_string(value)
if 'http' in str(value) or 'https' in str(value):
return JavaScriptParser.fix_url(value)
if value is None or str(value).lower() == 'null':
return None
if str(value).lower() == 'true':
return True
if str(value).lower() == 'false':
return False
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
pass
return value
@staticmethod
def parse_object(obj_str):
obj_str = obj_str.strip('{}').strip()
result = {}
key_value_pairs = re.findall(r'([\'"]?[\w]+[\'"]?)\s*:\s*([^,{}]+|{[^}]*}|\[[^\]]*\]|\'[^\']*\'|"[^"]*")', obj_str)
for key, value in key_value_pairs:
key = JavaScriptParser.fix_string(key)
value = value.strip()
if value.startswith('{'):
result[key] = JavaScriptParser.parse_object(value)
elif value.startswith('['):
result[key] = JavaScriptParser.parse_array(value)
else:
result[key] = JavaScriptParser.parse_value(value)
return result
@staticmethod
def parse_array(arr_str):
arr_str = arr_str.strip('[]').strip()
result = []
elements = []
current_elem = ""
brace_count = 0
in_string = False
quote_type = None
for char in arr_str:
if char in ['"', "'"]:
if not in_string:
in_string = True
quote_type = char
elif quote_type == char:
in_string = False
quote_type = None
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
elif char == ',' and brace_count == 0:
elements.append(current_elem.strip())
current_elem = ""
continue
current_elem += char
if current_elem.strip():
elements.append(current_elem.strip())
for elem in elements:
elem = elem.strip()
if elem.startswith('{'):
result.append(JavaScriptParser.parse_object(elem))
elif 'active' in elem or 'url' in elem:
key_value_match = re.search(r'([\w]+)\":([^,}]+)', elem)
if key_value_match:
key = key_value_match.group(1)
value = key_value_match.group(2)
result[-1][key] = JavaScriptParser.parse_value(value.strip('"\\'))
else:
result.append(JavaScriptParser.parse_value(elem))
return result
@classmethod
def parse(cls, js_string):
assignments = re.findall(r'window\.(\w+)\s*=\s*([^;]+);?', js_string, re.DOTALL)
result = {}
for var_name, value in assignments:
value = value.strip()
if value.startswith('{'):
result[var_name] = cls.parse_object(value)
elif value.startswith('['):
result[var_name] = cls.parse_array(value)
else:
result[var_name] = cls.parse_value(value)
return result

View File

@ -111,6 +111,29 @@ class SeasonManager:
return f"SeasonManager(num_seasons={len(self.seasons)})"
class Stream:
def __init__(self, name: str, url: str, active: bool):
self.name = name
self.url = url
self.active = active
def __repr__(self):
return f"Stream(name={self.name!r}, url={self.url!r}, active={self.active!r})"
class StreamsCollection:
def __init__(self, streams: list):
self.streams = [Stream(**stream) for stream in streams]
def __repr__(self):
return f"StreamsCollection(streams={self.streams})"
def add_stream(self, name: str, url: str, active: bool):
self.streams.append(Stream(name, url, active))
def get_streams(self):
return self.streams
class WindowVideo:
def __init__(self, data: Dict[str, Any]):
self.data = data
@ -134,133 +157,10 @@ class WindowVideo:
class WindowParameter:
def __init__(self, data: Dict[str, Any]):
self.data = data
self.token: str = data.get('token', '')
self.token360p: str = data.get('token360p', '')
self.token480p: str = data.get('token480p', '')
self.token720p: str = data.get('token720p', '')
self.token1080p: str = data.get('token1080p', '')
self.expires: str = data.get('expires', '')
params = data.get('params', {})
self.token: str = params.get('token', '')
self.expires: str = str(params.get('expires', ''))
self.url = data.get('url')
def __str__(self):
return f"WindowParameter(token='{self.token}', token360p='{self.token360p}', token480p='{self.token480p}', token720p='{self.token720p}', token1080p='{self.token1080p}', expires='{self.expires}')"
class DynamicJSONConverter:
"""
Class for converting an input string into dynamic JSON.
"""
def __init__(self, input_string: str):
"""
Initialize the converter with the input string.
Parameters:
input_string (str): The input string to convert.
"""
self.input_string = input_string
self.json_data = {}
def _parse_key_value(self, key: str, value: str):
"""
Parse a key-value pair.
Parameters:
key (str): The key.
value (str): The value.
Returns:
object: The parsed value.
"""
try:
value = value.strip()
if value.startswith('{'):
return self._parse_json_object(value)
else:
return self._parse_non_json_value(value)
except Exception as e:
logging.error(f"Error parsing key-value pair '{key}': {e}")
raise
def _parse_json_object(self, obj_str: str):
"""
Parse a JSON object.
Parameters:
obj_str (str): The string representation of the JSON object.
Returns:
dict: The parsed JSON object.
"""
try:
# Use regular expression to find key-value pairs in the JSON object string
obj_dict = dict(re.findall(r'"([^"]*)"\s*:\s*("[^"]*"|[^,]*)', obj_str))
# Strip double quotes from values and return the parsed dictionary
return {k: v.strip('"') for k, v in obj_dict.items()}
except Exception as e:
logging.error(f"Error parsing JSON object: {e}")
raise
def _parse_non_json_value(self, value: str):
"""
Parse a non-JSON value.
Parameters:
value (str): The value to parse.
Returns:
object: The parsed value.
"""
try:
# Remove extra quotes and convert to lowercase
value = value.replace('"', "").strip().lower()
if value.endswith('\n}'):
value = value.replace('\n}', '')
# Check if the value matches 'true' or 'false' using regular expressions
if re.match(r'\btrue\b', value, re.IGNORECASE):
return True
elif re.match(r'\bfalse\b', value, re.IGNORECASE):
return False
return value
except Exception as e:
logging.error(f"Error parsing non-JSON value: {e}")
raise
def convert_to_dynamic_json(self):
"""
Convert the input string into dynamic JSON.
Returns:
str: The JSON representation of the result.
"""
try:
# Replace invalid characters with valid JSON syntax
self.input_string = "{" + self.input_string.replace("'", '"').replace("=", ":").replace(";", ",").replace("}\n", "},\n") + "}"
# Find all key-value matches in the input string using regular expression
matches = re.findall(r'(\w+)\s*:\s*({[^}]*}|[^,]+)', self.input_string)
for match in matches:
key = match[0].strip()
value = match[1].strip()
# Parse each key-value pair and add it to the json_data dictionary
self.json_data[key] = self._parse_key_value(key, value)
# Convert the json_data dictionary to a formatted JSON string
return self.json_data
except Exception as e:
logging.error(f"Error converting to dynamic JSON: {e}")
raise
return (f"WindowParameter(token='{self.token}', expires='{self.expires}', url='{self.url}', data={self.data})")

View File

@ -2,7 +2,7 @@
import sys
import logging
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
from urllib.parse import urlparse, urlencode, urlunparse
# External libraries
@ -14,107 +14,43 @@ from bs4 import BeautifulSoup
from StreamingCommunity.Src.Util.headers import get_headers
from StreamingCommunity.Src.Util.console import console, Panel
from StreamingCommunity.Src.Util._jsonConfig import config_manager
from .Helper.Vixcloud.util import Episode, EpisodeManager, Season, SeasonManager, WindowVideo, WindowParameter, DynamicJSONConverter
from .Helper.Vixcloud.util import WindowVideo, WindowParameter, StreamsCollection
from .Helper.Vixcloud.js_parser import JavaScriptParser
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class VideoSource:
def __init__(self, site_name: str):
def __init__(self, site_name: str, is_series: bool):
"""
Initialize a VideoSource object.
Initialize video source for streaming site.
Args:
site_name (str): Name of streaming site
is_series (bool): Flag for series or movie content
"""
self.headers = {'user-agent': get_headers()}
self.is_series = False
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
self.is_series = is_series
def setup(self, version: str = None, domain: str = None, media_id: int = None, series_name: str = None):
def setup(self, media_id: int):
"""
Set up the class
Parameters:
- version (str): The version to set.
- media_id (int): The media ID to set.
- series_name (str): The series name to set.
Configure media-specific context.
Args:
media_id (int): Unique identifier for media item
"""
self.version = version
self.domain = domain
self.media_id = media_id
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_season_manager: SeasonManager = SeasonManager()
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def collect_info_seasons(self) -> None:
def get_iframe(self, episode_id: int) -> None:
"""
Collect information about seasons.
"""
self.headers = {
'user-agent': get_headers(),
'x-inertia': 'true',
'x-inertia-version': self.version,
}
try:
response = httpx.get(
url=f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}",
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Extract JSON response if available
json_response = response.json().get('props', {}).get('title', {}).get('seasons', [])
# Iterate over JSON data and add titles to the manager
for dict_season in json_response:
self.obj_season_manager.add_season(dict_season)
except Exception as e:
logging.error(f"Error collecting season info: {e}")
raise
def collect_title_season(self, number_season: int) -> None:
"""
Collect information about a specific season.
Parameters:
- number_season (int): The season number.
"""
try:
# Make a request to collect information about a specific season
response = httpx.get(
url=f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}',
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Extract JSON response if available
json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', [])
# Iterate over JSON data and add episodes to the manager
for dict_episode in json_response:
self.obj_episode_manager.add_episode(dict_episode)
except Exception as e:
logging.error(f"Error collecting title season info: {e}")
raise
def get_iframe(self, episode_id: int = None) -> None:
"""
Get iframe source.
Parameters:
- episode_id (int): The episode ID, present only for series
Retrieve iframe source for specified episode.
Args:
episode_id (int): Unique identifier for episode
"""
params = {}
@ -144,19 +80,18 @@ class VideoSource:
def parse_script(self, script_text: str) -> None:
"""
Parse script text.
Parameters:
- script_text (str): The script text to parse.
Convert raw script to structured video metadata.
Args:
script_text (str): Raw JavaScript/HTML script content
"""
try:
converter = JavaScriptParser.parse(js_string=str(script_text))
converter = DynamicJSONConverter(script_text)
result = converter.convert_to_dynamic_json()
# Create window video and parameter objects
self.window_video = WindowVideo(result['video'])
self.window_parameter = WindowParameter(result['masterPlaylist'])
# Create window video, streams and parameter objects
self.window_video = WindowVideo(converter.get('video'))
self.window_streams = StreamsCollection(converter.get('streams'))
self.window_parameter = WindowParameter(converter.get('masterPlaylist'))
except Exception as e:
logging.error(f"Error parsing script: {e}")
@ -164,11 +99,14 @@ class VideoSource:
def get_content(self) -> None:
"""
Get content.
Fetch and process video content from iframe source.
Workflow:
- Validate iframe source
- Retrieve content
- Parse embedded script
"""
try:
# Check if iframe source is available
if self.iframe_src is not None:
# Make a request to get content
@ -198,134 +136,52 @@ class VideoSource:
def get_playlist(self) -> str:
"""
Get playlist.
Returns:
- str: The playlist URL, or None if there's an error.
"""
iframe_url = self.iframe_src
# Create base uri for playlist
base_url = f'https://vixcloud.co/playlist/{self.window_video.id}'
query = urlencode(list(self.window_parameter.data.items()))
master_playlist_url = urljoin(base_url, '?' + query)
# Parse the current query string and the master playlist URL query string
current_params = parse_qs(iframe_url[1:])
m = urlparse(master_playlist_url)
master_params = parse_qs(m.query)
# Create the final parameters dictionary with token and expires from the master playlist
final_params = {
"token": master_params.get("token", [""])[0],
"expires": master_params.get("expires", [""])[0]
}
# Add conditional parameters
if "b" in current_params:
final_params["b"] = "1"
if "canPlayFHD" in current_params:
final_params["h"] = "1"
# Construct the new query string and final URL
new_query = urlencode(final_params) # Encode final_params into a query string
new_url = m._replace(query=new_query) # Replace the old query string with the new one
final_url = urlunparse(new_url) # Construct the final URL from the modified parts
Generate authenticated playlist URL.
return final_url
Returns:
str: Fully constructed playlist URL with authentication parameters
"""
params = {}
if self.window_video.quality == 1080:
params['h'] = 1
if "b=1" in self.window_parameter.url:
params['b'] = 1
params.update({
"token": self.window_parameter.token,
"expires": self.window_parameter.expires
})
query_string = urlencode(params)
return urlunparse(urlparse(self.window_parameter.url)._replace(query=query_string))
class AnimeVideoSource(VideoSource):
class VideoSourceAnime(VideoSource):
def __init__(self, site_name: str):
"""
Initialize a VideoSource object.
Initialize anime-specific video source.
Args:
site_name (str): Name of anime streaming site
Extends base VideoSource with anime-specific initialization
"""
self.headers = {'user-agent': get_headers()}
self.is_series = False
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
self.src_mp4 = None
def setup(self, media_id: int = None, series_name: str = None):
"""
Set up the class
Parameters:
- media_id (int): The media ID to set.
- series_name (str): The series name to set.
"""
self.media_id = media_id
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def get_count_episodes(self):
"""
Fetches the total count of episodes available for the anime.
Returns:
- int or None: Total count of episodes if successful, otherwise None.
"""
try:
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/",
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Parse JSON response and return episode count
return response.json()["episodes_count"]
except Exception as e:
logging.error(f"(EpisodeDownloader) Error fetching episode count: {e}")
return None
def get_info_episode(self, index_ep: int) -> Episode:
"""
Fetches information about a specific episode.
Parameters:
- index_ep (int): Index of the episode.
Returns:
- obj Episode or None: Information about the episode if successful, otherwise None.
"""
try:
params = {
"start_range": index_ep,
"end_range": index_ep + 1
}
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}",
headers=self.headers,
params=params,
timeout=max_timeout
)
response.raise_for_status()
# Return information about the episode
json_data = response.json()["episodes"][-1]
return Episode(json_data)
except Exception as e:
logging.error(f"(EpisodeDownloader) Error fetching episode information: {e}")
return None
def get_embed(self, episode_id: int):
"""
Fetches the script text for a given episode ID.
Retrieve embed URL and extract video source.
Args:
episode_id (int): Unique identifier for episode
Parameters:
- episode_id (int): ID of the episode.
Returns:
- str or None: Script successful, otherwise None.
str: Parsed script content
"""
try:
@ -352,5 +208,5 @@ class AnimeVideoSource(VideoSource):
return script
except Exception as e:
logging.error(f"(EpisodeDownloader) Error fetching embed URL: {e}")
logging.error(f"Error fetching embed URL: {e}")
return None

View File

@ -9,7 +9,7 @@ from StreamingCommunity.Src.Util.console import console, msg
# Logic class
from .site import title_search, run_get_select_title, media_search_manager
from .anime import download_film, download_series
from .film_serie import download_film, download_series
# Variable
@ -34,10 +34,10 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
if len_database > 0:
# Select title from list
# Select title from list (type: TV \ Movie \ OVA)
select_title = run_get_select_title()
if select_title.type == 'Movie':
if select_title.type == 'Movie' or select_title.type == 'OVA':
download_film(select_title)
else:

View File

@ -0,0 +1,131 @@
# 11.03.24
import os
import sys
import logging
# Internal utilities
from StreamingCommunity.Src.Util.console import console, msg
from StreamingCommunity.Src.Util.os import os_manager
from StreamingCommunity.Src.Util.message import start_message
from StreamingCommunity.Src.Lib.Downloader import MP4_downloader
# Logic class
from .util.ScrapeSerie import ScrapeSerieAnime
from StreamingCommunity.Src.Api.Template.Util import manage_selection
from StreamingCommunity.Src.Api.Template.Class.SearchType import MediaItem
# Player
from StreamingCommunity.Src.Api.Player.vixcloud import VideoSourceAnime
# Variable
from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER, MOVIE_FOLDER
scrape_serie = ScrapeSerieAnime(SITE_NAME)
video_source = VideoSourceAnime(SITE_NAME)
def download_episode(index_select: int):
"""
Downloads the selected episode.
Parameters:
- index_select (int): Index of the episode to download.
"""
# Get information about the selected episode
obj_episode = scrape_serie.get_info_episode(index_select)
if obj_episode is not None:
start_message()
console.print(f"[yellow]Download: [red]EP_{obj_episode.number} \n")
# Collect mp4 url
video_source.get_embed(obj_episode.id)
# Get the js script from the episode
#js_script = video_source.get_embed(obj_episode.id)
# Parse parameter in embed text
#video_source.parse_script(js_script)
# Create output path
title_name = f"{obj_episode.number}.mp4"
if scrape_serie.is_series:
mp4_path = os_manager.get_sanitize_path(
os.path.join(ROOT_PATH, SITE_NAME, SERIES_FOLDER, scrape_serie.series_name)
)
else:
mp4_path = os_manager.get_sanitize_path(
os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, scrape_serie.series_name)
)
# Create output folder
os_manager.create_path(mp4_path)
# Start downloading
r_proc = MP4_downloader(
url = str(video_source.src_mp4).strip(),
path = os.path.join(mp4_path, title_name)
)
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
else:
logging.error(f"Skip index: {index_select} cant find info with api.")
def download_series(select_title: MediaItem):
"""
Function to download episodes of a TV series.
Parameters:
- tv_id (int): The ID of the TV series.
- tv_name (str): The name of the TV series.
"""
# Set up video source
scrape_serie.setup(None, select_title.id, select_title.slug)
# Get the count of episodes for the TV series
episoded_count = scrape_serie.get_count_episodes()
console.print(f"[cyan]Episodes find: [red]{episoded_count}")
# Prompt user to select an episode index
last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]or [red][3-*] [cyan]for a range of media")
# Manage user selection
list_episode_select = manage_selection(last_command, episoded_count)
# Download selected episodes
if len(list_episode_select) == 1 and last_command != "*":
download_episode(list_episode_select[0]-1)
# Download all other episodes selecter
else:
for i_episode in list_episode_select:
download_episode(i_episode-1)
def download_film(select_title: MediaItem):
"""
Function to download a film.
Parameters:
- id_film (int): The ID of the film.
- title_name (str): The title of the film.
"""
# Set up video source
scrape_serie.setup(None, select_title.id, select_title.slug)
scrape_serie.is_series = False
# Start download
download_episode(0)

View File

@ -0,0 +1,97 @@
# 01.03.24
import logging
# External libraries
import httpx
# Internal utilities
from StreamingCommunity.Src.Util.headers import get_headers
from StreamingCommunity.Src.Util._jsonConfig import config_manager
from StreamingCommunity.Src.Api.Player.Helper.Vixcloud.util import EpisodeManager, Episode
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class ScrapeSerieAnime():
def __init__(self, site_name: str):
"""
Initialize the media scraper for a specific website.
Args:
site_name (str): Name of the streaming site to scrape
"""
self.is_series = False
self.headers = {'user-agent': get_headers()}
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
def setup(self, version: str = None, media_id: int = None, series_name: str = None):
self.version = version
self.media_id = media_id
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def get_count_episodes(self):
"""
Retrieve total number of episodes for the selected media.
Returns:
int: Total episode count
"""
try:
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/",
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Parse JSON response and return episode count
return response.json()["episodes_count"]
except Exception as e:
logging.error(f"Error fetching episode count: {e}")
return None
def get_info_episode(self, index_ep: int) -> Episode:
"""
Fetch detailed information for a specific episode.
Args:
index_ep (int): Zero-based index of the target episode
Returns:
Episode: Detailed episode information
"""
try:
params = {
"start_range": index_ep,
"end_range": index_ep + 1
}
response = httpx.get(
url=f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}",
headers=self.headers,
params=params,
timeout=max_timeout
)
response.raise_for_status()
# Return information about the episode
json_data = response.json()["episodes"][-1]
return Episode(json_data)
except Exception as e:
logging.error(f"Error fetching episode information: {e}")
return None

View File

@ -19,7 +19,7 @@ from StreamingCommunity.Src.Api.Template.Util import manage_selection, map_episo
# Player
from .Player.ScrapeSerie import GetSerieInfo
from .util.ScrapeSerie import GetSerieInfo
from StreamingCommunity.Src.Api.Player.ddl import VideoSource
@ -92,7 +92,7 @@ def download_thread(dict_serie: MediaItem):
episodes_count = len(list_dict_episode)
# Display episodes list and manage user selection
last_command = display_episodes_list()
last_command = display_episodes_list(scape_info_serie.list_episodes)
list_episode_select = manage_selection(last_command, episodes_count)
try:

View File

@ -0,0 +1,83 @@
# 13.06.24
import sys
import logging
from typing import List, Dict
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Src.Util.headers import get_headers
# Logic class
from StreamingCommunity.Src.Api.Template.Class.SearchType import MediaItem
# Variable
from ..costant import COOKIE
class GetSerieInfo:
def __init__(self, dict_serie: MediaItem) -> None:
"""
Initializes the GetSerieInfo object with default values.
Parameters:
- dict_serie (MediaItem): Dictionary containing series information (optional).
"""
self.headers = {'user-agent': get_headers()}
self.cookies = COOKIE
self.url = dict_serie.url
self.tv_name = None
self.list_episodes = None
def get_episode_number(self) -> List[Dict[str, str]]:
"""
Retrieves the number of episodes for a specific season.
Parameters:
n_season (int): The season number.
Returns:
List[Dict[str, str]]: List of dictionaries containing episode information.
"""
try:
response = httpx.get(f"{self.url}?area=online", cookies=self.cookies, headers=self.headers, timeout=10)
response.raise_for_status()
except Exception as e:
logging.error(f"Insert value for [ips4_device_key, ips4_member_id, ips4_login_key] in config.json file SITE \\ ddlstreamitaly \\ cookie. Use browser debug and cookie request with a valid account, filter by DOC. Error: {e}")
sys.exit(0)
# Parse HTML content of the page
soup = BeautifulSoup(response.text, "html.parser")
# Get tv name
self.tv_name = soup.find("span", class_= "ipsType_break").get_text(strip=True)
# Find the container of episodes for the specified season
table_content = soup.find('div', class_='ipsMargin_bottom:half')
list_dict_episode = []
for episode_div in table_content.find_all('a', href=True):
# Get text of episode
part_name = episode_div.get_text(strip=True)
if part_name:
obj_episode = {
'name': part_name,
'url': episode_div['href']
}
list_dict_episode.append(obj_episode)
self.list_episodes = list_dict_episode
return list_dict_episode

View File

@ -19,7 +19,7 @@ from StreamingCommunity.Src.Api.Template.Class.SearchType import MediaItem
# Player
from .Player.ScrapeSerie import GetSerieInfo
from .util.ScrapeSerie import GetSerieInfo
from StreamingCommunity.Src.Api.Player.supervideo import VideoSource

View File

@ -0,0 +1,110 @@
# 13.06.24
import logging
from typing import List, Dict
# External libraries
import httpx
from bs4 import BeautifulSoup
# Internal utilities
from StreamingCommunity.Src.Util.headers import get_headers
# Logic class
from StreamingCommunity.Src.Api.Template .Class.SearchType import MediaItem
class GetSerieInfo:
def __init__(self, dict_serie: MediaItem) -> None:
"""
Initializes the GetSerieInfo object with default values.
Parameters:
dict_serie (MediaItem): Dictionary containing series information (optional).
"""
self.headers = {'user-agent': get_headers()}
self.url = dict_serie.url
self.tv_name = None
self.list_episodes = None
def get_seasons_number(self) -> int:
"""
Retrieves the number of seasons of a TV series.
Returns:
int: Number of seasons of the TV series.
"""
try:
# Make an HTTP request to the series URL
response = httpx.get(self.url, headers=self.headers, timeout=15)
response.raise_for_status()
# Parse HTML content of the page
soup = BeautifulSoup(response.text, "html.parser")
# Find the container of seasons
table_content = soup.find('div', class_="tt_season")
# Count the number of seasons
seasons_number = len(table_content.find_all("li"))
# Extract the name of the series
self.tv_name = soup.find("h1", class_="front_title").get_text(strip=True)
return seasons_number
except Exception as e:
logging.error(f"Error parsing HTML page: {e}")
return -999
def get_episode_number(self, n_season: int) -> List[Dict[str, str]]:
"""
Retrieves the number of episodes for a specific season.
Parameters:
n_season (int): The season number.
Returns:
List[Dict[str, str]]: List of dictionaries containing episode information.
"""
try:
# Make an HTTP request to the series URL
response = httpx.get(self.url, headers=self.headers, timeout=15)
response.raise_for_status()
# Parse HTML content of the page
soup = BeautifulSoup(response.text, "html.parser")
# Find the container of episodes for the specified season
table_content = soup.find('div', class_="tab-pane", id=f"season-{n_season}")
# Extract episode information
episode_content = table_content.find_all("li")
list_dict_episode = []
for episode_div in episode_content:
index = episode_div.find("a").get("data-num")
link = episode_div.find("a").get("data-link")
name = episode_div.find("a").get("data-title")
obj_episode = {
'number': index,
'name': name,
'url': link
}
list_dict_episode.append(obj_episode)
self.list_episodes = list_dict_episode
return list_dict_episode
except Exception as e:
logging.error(f"Error parsing HTML page: {e}")
return []

View File

@ -43,10 +43,10 @@ def search(string_to_search: str = None, get_onylDatabase: bool = False):
select_title = run_get_select_title()
if select_title.type == 'tv':
download_series(select_title, domain, site_version)
download_series(select_title, site_version)
else:
download_film(select_title, domain, site_version)
download_film(select_title)
else:
console.print(f"\n[red]Nothing matching was found for[white]: [purple]{string_to_search}")

View File

@ -23,10 +23,10 @@ from StreamingCommunity.Src.Api.Player.vixcloud import VideoSource
# Variable
from .costant import ROOT_PATH, SITE_NAME, MOVIE_FOLDER
video_source = VideoSource(site_name=SITE_NAME)
video_source = VideoSource(SITE_NAME, False)
def download_film(select_title: MediaItem, domain: str, version: str):
def download_film(select_title: MediaItem):
"""
Downloads a film using the provided film ID, title name, and domain.
@ -40,10 +40,10 @@ def download_film(select_title: MediaItem, domain: str, version: str):
console.print(f"[yellow]Download: [red]{select_title.slug} \n")
# Set domain and media ID for the video source
video_source.setup(version, domain, select_title.id)
video_source.setup(select_title.id)
# Retrieve scws and if available master playlist
video_source.get_iframe()
video_source.get_iframe(select_title.id)
video_source.get_content()
master_playlist = video_source.get_playlist()

View File

@ -14,6 +14,7 @@ from StreamingCommunity.Src.Lib.Downloader import HLS_Downloader
# Logic class
from .util.ScrapeSerie import ScrapeSerie
from StreamingCommunity.Src.Api.Template.Util import manage_selection, map_episode_title, validate_selection, validate_episode_selection, execute_search
from StreamingCommunity.Src.Api.Template.Class.SearchType import MediaItem
@ -24,7 +25,8 @@ from StreamingCommunity.Src.Api.Player.vixcloud import VideoSource
# Variable
from .costant import ROOT_PATH, SITE_NAME, SERIES_FOLDER
video_source = VideoSource(site_name=SITE_NAME)
scrape_serie = ScrapeSerie(SITE_NAME)
video_source = VideoSource(SITE_NAME, True)
table_show_manager = TVShowManager()
@ -42,7 +44,7 @@ def download_video(tv_name: str, index_season_selected: int, index_episode_selec
start_message()
# Get info about episode
obj_episode = video_source.obj_episode_manager.episodes[index_episode_selected - 1]
obj_episode = scrape_serie.obj_episode_manager.episodes[index_episode_selected - 1]
console.print(f"[yellow]Download: [red]{index_season_selected}:{index_episode_selected} {obj_episode.name}")
print()
@ -84,13 +86,13 @@ def download_episode(tv_name: str, index_season_selected: int, download_all: boo
"""
# Clean memory of all episodes and get the number of the season
video_source.obj_episode_manager.clear()
season_number = video_source.obj_season_manager.seasons[index_season_selected - 1].number
scrape_serie.obj_episode_manager.clear()
season_number = scrape_serie.obj_season_manager.seasons[index_season_selected - 1].number
# Start message and collect information about episodes
start_message()
video_source.collect_title_season(season_number)
episodes_count = video_source.obj_episode_manager.get_length()
scrape_serie.collect_title_season(season_number)
episodes_count = scrape_serie.obj_episode_manager.get_length()
if download_all:
@ -115,13 +117,12 @@ def download_episode(tv_name: str, index_season_selected: int, download_all: boo
for i_episode in list_episode_select:
download_video(tv_name, index_season_selected, i_episode)
def download_series(select_title: MediaItem, domain: str, version: str) -> None:
def download_series(select_season: MediaItem, version: str) -> None:
"""
Download episodes of a TV series based on user selection.
Parameters:
- select_title (MediaItem): Selected media item (TV series).
- select_season (MediaItem): Selected media item (TV series).
- domain (str): Domain from which to download.
- version (str): Version of the site.
"""
@ -130,11 +131,12 @@ def download_series(select_title: MediaItem, domain: str, version: str) -> None:
start_message()
# Setup video source
video_source.setup(version, domain, select_title.id, select_title.slug)
scrape_serie.setup(version, select_season.id, select_season.slug)
video_source.setup(select_season.id)
# Collect information about seasons
video_source.collect_info_seasons()
seasons_count = video_source.obj_season_manager.get_length()
scrape_serie.collect_info_seasons()
seasons_count = scrape_serie.obj_season_manager.get_length()
# Prompt user for season selection and download episodes
console.print(f"\n[green]Seasons found: [red]{seasons_count}")
@ -157,11 +159,11 @@ def download_series(select_title: MediaItem, domain: str, version: str) -> None:
if len(list_season_select) > 1 or index_season_selected == "*":
# Download all episodes if multiple seasons are selected or if '*' is used
download_episode(select_title.slug, i_season, download_all=True)
download_episode(select_season.slug, i_season, download_all=True)
else:
# Otherwise, let the user select specific episodes for the single season
download_episode(select_title.slug, i_season, download_all=False)
download_episode(select_season.slug, i_season, download_all=False)
def display_episodes_list() -> str:
@ -184,7 +186,7 @@ def display_episodes_list() -> str:
table_show_manager.add_column(column_info)
# Populate the table with episodes information
for i, media in enumerate(video_source.obj_episode_manager.episodes):
for i, media in enumerate(scrape_serie.obj_episode_manager.episodes):
table_show_manager.add_tv_show({
'Index': str(media.number),
'Name': media.name,

View File

@ -0,0 +1,113 @@
# 01.03.24
import logging
# External libraries
import httpx
# Internal utilities
from StreamingCommunity.Src.Util.headers import get_headers
from StreamingCommunity.Src.Util._jsonConfig import config_manager
from StreamingCommunity.Src.Api.Player.Helper.Vixcloud.util import SeasonManager, EpisodeManager
# Variable
max_timeout = config_manager.get_int("REQUESTS", "timeout")
class ScrapeSerie:
def __init__(self, site_name: str):
"""
Initialize the ScrapeSerie class for scraping TV series information.
Args:
site_name (str): Name of the streaming site to scrape from
"""
self.is_series = False
self.headers = {'user-agent': get_headers()}
self.base_name = site_name
self.domain = config_manager.get_dict('SITE', self.base_name)['domain']
def setup(self, version: str = None, media_id: int = None, series_name: str = None):
"""
Set up the scraper with specific media details.
Args:
version (str, optional): Site version for request headers
media_id (int, optional): Unique identifier for the media
series_name (str, optional): Name of the TV series
"""
self.version = version
self.media_id = media_id
# If series name is provided, initialize series-specific managers
if series_name is not None:
self.is_series = True
self.series_name = series_name
self.obj_season_manager: SeasonManager = SeasonManager()
self.obj_episode_manager: EpisodeManager = EpisodeManager()
def collect_info_seasons(self) -> None:
"""
Retrieve season information for a TV series from the streaming site.
Raises:
Exception: If there's an error fetching season information
"""
self.headers = {
'user-agent': get_headers(),
'x-inertia': 'true',
'x-inertia-version': self.version,
}
try:
response = httpx.get(
url=f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}",
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Extract seasons from JSON response
json_response = response.json().get('props', {}).get('title', {}).get('seasons', [])
# Add each season to the season manager
for dict_season in json_response:
self.obj_season_manager.add_season(dict_season)
except Exception as e:
logging.error(f"Error collecting season info: {e}")
raise
def collect_title_season(self, number_season: int) -> None:
"""
Retrieve episode information for a specific season.
Args:
number_season (int): Season number to fetch episodes for
Raises:
Exception: If there's an error fetching episode information
"""
try:
response = httpx.get(
url=f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}',
headers=self.headers,
timeout=max_timeout
)
response.raise_for_status()
# Extract episodes from JSON response
json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', [])
# Add each episode to the episode manager
for dict_episode in json_response:
self.obj_episode_manager.add_episode(dict_episode)
except Exception as e:
logging.error(f"Error collecting title season info: {e}")
raise

View File

@ -84,10 +84,22 @@ def map_episode_title(tv_name: str, number_season: int, episode_number: int, epi
str: The mapped episode title.
"""
map_episode_temp = MAP_EPISODE
map_episode_temp = map_episode_temp.replace("%(tv_name)", os_manager.get_sanitize_file(tv_name))
map_episode_temp = map_episode_temp.replace("%(season)", dynamic_format_number(number_season))
map_episode_temp = map_episode_temp.replace("%(episode)", dynamic_format_number(episode_number))
map_episode_temp = map_episode_temp.replace("%(episode_name)", os_manager.get_sanitize_file(episode_name))
if tv_name != None:
map_episode_temp = map_episode_temp.replace("%(tv_name)", os_manager.get_sanitize_file(tv_name))
if number_season != None:
map_episode_temp = map_episode_temp.replace("%(season)", dynamic_format_number(number_season))
else:
map_episode_temp = map_episode_temp.replace("%(season)", dynamic_format_number(0))
if episode_number != None:
map_episode_temp = map_episode_temp.replace("%(episode)", dynamic_format_number(episode_number))
else:
map_episode_temp = map_episode_temp.replace("%(episode)", dynamic_format_number(0))
if episode_name != None:
map_episode_temp = map_episode_temp.replace("%(episode_name)", os_manager.get_sanitize_file(episode_name))
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -786,7 +786,7 @@ class HLS_Downloader:
else:
console.log("[red]Error: URL passed to M3U8_Parser is an index playlist; expected a master playlist. Crucimorfo strikes again!")
else:
console.log("[red]Error: m3u8_playlist failed request")
console.log(f"[red]Error: m3u8_playlist failed request for: {self.m3u8_playlist}")
else:
console.log("[red]Error: m3u8_playlist is None")

View File

@ -7,9 +7,10 @@ import queue
import logging
import binascii
import threading
import signal
from queue import PriorityQueue
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
# External libraries
@ -81,6 +82,10 @@ class M3U8_Segments:
self.stop_event = threading.Event()
self.downloaded_segments = set()
# Stopping
self.interrupt_flag = threading.Event()
self.download_interrupted = False
def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes:
"""
Retrieves the encryption key from the M3U8 playlist.
@ -197,6 +202,19 @@ class M3U8_Segments:
# Parser data of content of index pass in input to class
self.parse_data(self.url)
def setup_interrupt_handler(self):
"""
Set up a signal handler for graceful interruption.
"""
def interrupt_handler(signum, frame):
if not self.interrupt_flag.is_set():
console.log("\n[red] Stopping download gracefully...")
self.interrupt_flag.set()
self.download_interrupted = True
self.stop_event.set()
signal.signal(signal.SIGINT, interrupt_handler)
def make_requests_stream(self, ts_url: str, index: int, progress_bar: tqdm, retries: int = 3, backoff_factor: float = 1.5) -> None:
"""
@ -209,10 +227,16 @@ class M3U8_Segments:
- retries (int): The number of times to retry on failure (default is 3).
- backoff_factor (float): The backoff factor for exponential backoff (default is 1.5 seconds).
"""
if self.interrupt_flag.is_set():
return
need_verify = REQUEST_VERIFY
min_segment_size = 100 # Minimum acceptable size for a TS segment in bytes
for attempt in range(retries):
if self.interrupt_flag.is_set():
return
try:
start_time = time.time()
@ -317,6 +341,10 @@ class M3U8_Segments:
segments_written = set()
while not self.stop_event.is_set() or not self.queue.empty():
if self.interrupt_flag.is_set():
break
try:
index, segment_content = self.queue.get(timeout=1)
@ -365,6 +393,7 @@ class M3U8_Segments:
Parameters:
- add_desc (str): Additional description for the progress bar.
"""
self.setup_interrupt_handler()
# Get config site from prev stack
frames = get_call_stack()
@ -420,45 +449,74 @@ class M3U8_Segments:
mininterval=0.05
)
# Start writer thread
writer_thread = threading.Thread(target=self.write_segments_to_file)
writer_thread.daemon = True
writer_thread.start()
try:
# Configure workers and delay
max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER
delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (len(self.valid_proxy) + 1))) if THERE_IS_PROXY_LIST else TQDM_DELAY_WORKER
# Start writer thread
writer_thread = threading.Thread(target=self.write_segments_to_file)
writer_thread.daemon = True
writer_thread.start()
# Download segments with completion verification
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for index, segment_url in enumerate(self.segments):
time.sleep(delay)
futures.append(executor.submit(self.make_requests_stream, segment_url, index, progress_bar))
# Configure workers and delay
max_workers = len(self.valid_proxy) if THERE_IS_PROXY_LIST else TQDM_MAX_WORKER
delay = max(PROXY_START_MIN, min(PROXY_START_MAX, 1 / (len(self.valid_proxy) + 1))) if THERE_IS_PROXY_LIST else TQDM_DELAY_WORKER
# Wait for all futures to complete
for future in futures:
try:
future.result()
except Exception as e:
logging.error(f"Error in download thread: {str(e)}")
# Download segments with completion verification
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for index, segment_url in enumerate(self.segments):
# Check for interrupt before submitting each task
if self.interrupt_flag.is_set():
break
# Verify completion and retry missing segments
total_segments = len(self.segments)
completed_segments = len(self.downloaded_segments)
if completed_segments < total_segments:
missing_segments = set(range(total_segments)) - self.downloaded_segments
logging.warning(f"Missing segments: {sorted(missing_segments)}")
# Retry missing segments
for index in missing_segments:
if self.stop_event.is_set():
time.sleep(delay)
futures.append(executor.submit(self.make_requests_stream, segment_url, index, progress_bar))
# Wait for futures with interrupt handling
for future in as_completed(futures):
if self.interrupt_flag.is_set():
break
try:
self.make_requests_stream(self.segments[index], index, progress_bar)
future.result()
except Exception as e:
logging.error(f"Failed to retry segment {index}: {str(e)}")
logging.error(f"Error in download thread: {str(e)}")
# Interrupt handling for missing segments
if not self.interrupt_flag.is_set():
total_segments = len(self.segments)
completed_segments = len(self.downloaded_segments)
if completed_segments < total_segments:
missing_segments = set(range(total_segments)) - self.downloaded_segments
logging.warning(f"Missing segments: {sorted(missing_segments)}")
# Retry missing segments with interrupt check
for index in missing_segments:
if self.interrupt_flag.is_set():
break
try:
self.make_requests_stream(self.segments[index], index, progress_bar)
except Exception as e:
logging.error(f"Failed to retry segment {index}: {str(e)}")
except Exception as e:
logging.error(f"Download failed: {str(e)}")
raise
finally:
# Clean up resources
self.stop_event.set()
writer_thread.join(timeout=30)
progress_bar.close()
# Check if download was interrupted
if self.download_interrupted:
console.log("[red] Download was manually stopped.")
# Optional: Delete partial download
if os.path.exists(self.tmp_file_path):
os.remove(self.tmp_file_path)
sys.exit(0)
# Clean up
self.stop_event.set()

View File

@ -2,6 +2,8 @@
import os
import sys
import ssl
import certifi
import logging
@ -22,6 +24,11 @@ from StreamingCommunity.Src.Util.os import internet_manager
from ...FFmpeg import print_duration_table
# Suppress SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Config
GET_ONLY_LINK = config_manager.get_bool('M3U8_PARSER', 'get_only_link')
TQDM_USE_LARGE_BAR = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_use_large_bar')
@ -30,72 +37,96 @@ REQUEST_TIMEOUT = config_manager.get_float('REQUESTS', 'timeout')
def MP4_downloader(url: str, path: str, referer: str = None, headers_: str = None):
def MP4_downloader(url: str, path: str, referer: str = None, headers_: dict = None):
"""
Downloads an MP4 video from a given URL using the specified referer header.
Downloads an MP4 video from a given URL with robust error handling and SSL bypass.
Parameter:
Parameters:
- url (str): The URL of the MP4 video to download.
- path (str): The local path where the downloaded MP4 file will be saved.
- referer (str): The referer header value to include in the HTTP request headers.
- referer (str, optional): The referer header value.
- headers_ (dict, optional): Custom headers for the request.
"""
# Early return for link-only mode
if GET_ONLY_LINK:
return {'path': path, 'url': url}
headers = None
# Validate URL
if not (url.lower().startswith('http://') or url.lower().startswith('https://')):
logging.error(f"Invalid URL: {url}")
console.print(f"[bold red]Invalid URL: {url}[/bold red]")
return None
if "http" not in str(url).lower().strip() or "https" not in str(url).lower().strip():
logging.error(f"Invalid url: {url}")
sys.exit(0)
if referer != None:
headers = {'Referer': referer, 'user-agent': get_headers()}
if headers == None:
headers = {'user-agent': get_headers()}
else:
headers = headers_
# Make request to get content of video
with httpx.Client(verify=REQUEST_VERIFY, timeout=REQUEST_TIMEOUT) as client:
with client.stream("GET", url, headers=headers, timeout=REQUEST_TIMEOUT) as response:
total = int(response.headers.get('content-length', 0))
# Prepare headers
try:
headers = {}
if referer:
headers['Referer'] = referer
# Use custom headers if provided, otherwise use default user agent
if headers_:
headers.update(headers_)
else:
headers['User-Agent'] = get_headers()
if total != 0:
except Exception as header_err:
logging.error(f"Error preparing headers: {header_err}")
console.print(f"[bold red]Error preparing headers: {header_err}[/bold red]")
return None
# Create bar format
if TQDM_USE_LARGE_BAR:
bar_format = (f"{Colors.YELLOW}[MP4] {Colors.WHITE}({Colors.CYAN}video{Colors.WHITE}): "
f"{Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ "
f"{Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] "
f"{Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}} {Colors.WHITE}| "
f"{Colors.YELLOW}{{rate_fmt}}{{postfix}} {Colors.WHITE}]")
else:
bar_format = (f"{Colors.YELLOW}Proc{Colors.WHITE}: {Colors.RED}{{percentage:.2f}}% "
f"{Colors.WHITE}| {Colors.CYAN}{{remaining}}{{postfix}} {Colors.WHITE}]")
try:
# Create a custom transport that bypasses SSL verification
transport = httpx.HTTPTransport(
verify=False, # Disable SSL certificate verification
http2=True # Optional: enable HTTP/2 support
)
# Download with streaming and progress tracking
with httpx.Client(transport=transport, timeout=httpx.Timeout(60.0)) as client:
with client.stream("GET", url, headers=headers, timeout=REQUEST_TIMEOUT) as response:
response.raise_for_status()
# Get total file size
total = int(response.headers.get('content-length', 0))
# Handle empty streams
if total == 0:
console.print("[bold red]No video stream found.[/bold red]")
return None
# Create progress bar
progress_bar = tqdm(
total=total,
ascii='░▒█',
bar_format=bar_format,
bar_format=f"{Colors.YELLOW}[MP4] {Colors.WHITE}({Colors.CYAN}video{Colors.WHITE}): "
f"{Colors.RED}{{percentage:.2f}}% {Colors.MAGENTA}{{bar}} {Colors.WHITE}[ "
f"{Colors.YELLOW}{{n_fmt}}{Colors.WHITE} / {Colors.RED}{{total_fmt}} {Colors.WHITE}] "
f"{Colors.YELLOW}{{elapsed}} {Colors.WHITE}< {Colors.CYAN}{{remaining}} {Colors.WHITE}| "
f"{Colors.YELLOW}{{rate_fmt}}{{postfix}} {Colors.WHITE}]",
unit='iB',
unit_scale=True,
unit_divisor=1024,
desc='Downloading',
mininterval=0.05
)
# Ensure directory exists
os.makedirs(os.path.dirname(path), exist_ok=True)
# Download file
with open(path, 'wb') as file, progress_bar as bar:
downloaded = 0
for chunk in response.iter_bytes(chunk_size=1024):
if chunk:
size = file.write(chunk)
downloaded += size
bar.update(size)
else:
console.print("[red]Cant find any stream.")
# Optional: Add a check to stop download if needed
# if downloaded > MAX_DOWNLOAD_SIZE:
# break
# Get summary
if total != 0:
# Post-download processing
if os.path.exists(path) and os.path.getsize(path) > 0:
console.print(Panel(
f"[bold green]Download completed![/bold green]\n"
f"[cyan]File size: [bold red]{internet_manager.format_file_size(os.path.getsize(path))}[/bold red]\n"
@ -103,3 +134,23 @@ def MP4_downloader(url: str, path: str, referer: str = None, headers_: str = Non
title=f"{os.path.basename(path.replace('.mp4', ''))}",
border_style="green"
))
return path
else:
console.print("[bold red]Download failed or file is empty.[/bold red]")
return None
except httpx.HTTPStatusError as http_err:
logging.error(f"HTTP error occurred: {http_err}")
console.print(f"[bold red]HTTP Error: {http_err}[/bold red]")
return None
except httpx.RequestError as req_err:
logging.error(f"Request error: {req_err}")
console.print(f"[bold red]Request Error: {req_err}[/bold red]")
return None
except Exception as e:
logging.error(f"Unexpected error during download: {e}")
console.print(f"[bold red]Unexpected Error: {e}[/bold red]")
return None

View File

@ -99,7 +99,7 @@ class TheMovieDB:
self.api_key = api_key
self.base_url = "https://api.themoviedb.org/3"
self.console = Console()
self.genres = self._fetch_genres()
#self.genres = self._fetch_genres()
def _make_request(self, endpoint, params=None):
"""

View File

@ -356,7 +356,6 @@ class OsSummary():
Includes:
- Python version and implementation details.
- Operating system and architecture.
- OpenSSL and glibc versions.
- Versions of `ffmpeg` and `ffprobe` executables.
- Installed Python libraries as listed in `requirements.txt`.
"""
@ -370,11 +369,10 @@ class OsSummary():
python_implementation = platform.python_implementation()
arch = platform.machine()
os_info = platform.platform()
openssl_version = ssl.OPENSSL_VERSION
glibc_version = 'glibc ' + '.'.join(map(str, platform.libc_ver()[1]))
console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})[/bold red]")
logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({openssl_version}, {glibc_version})")
console.print(f"[cyan]Python[white]: [bold red]{python_version} ({python_implementation} {arch}) - {os_info} ({glibc_version})[/bold red]")
logging.info(f"Python: {python_version} ({python_implementation} {arch}) - {os_info} ({glibc_version})")
# ffmpeg and ffprobe versions
ffmpeg_version = self.get_executable_version(['ffmpeg', '-version'])

View File

@ -135,9 +135,9 @@ def main():
# Create logger
log_not = Logger()
#initialize()
# Load search functions
search_functions = load_search_functions()
logging.info(f"Load module in: {time.time() - start} s")
@ -194,11 +194,3 @@ def main():
else:
console.print("[red]Invalid category.")
sys.exit(0)
def run():
initialize()
main()
if __name__ == '__main__':
run()

View File

@ -0,0 +1,38 @@
# Fix import
import sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
sys.path.append(src_path)
# Import
from urllib.parse import urlparse, urlencode, urlunparse
from StreamingCommunity.Src.Api.Player.Helper.Vixcloud.js_parser import JavaScriptParser
from StreamingCommunity.Src.Api.Player.Helper.Vixcloud.util import WindowVideo, WindowParameter, StreamsCollection
# Data
script_text = '''
window.video = {"id":271977,"name":"Smile 2","filename":"Smile.2.2024.1080p.WEB-DL.DDP5.1.H.264-FHC.mkv","size":10779891,"quality":1080,"duration":7758,"views":0,"is_viewable":1,"status":"public","fps":24,"legacy":0,"folder_id":"301e469a-786f-493a-ad2b-302248aa2d23","created_at_diff":"4 giorni fa"};
window.streams = [{"name":"Server1","active":false,"url":"https:\/\/vixcloud.co\/playlist\/271977?b=1\u0026ub=1"},{"name":"Server2","active":1,"url":"https:\/\/vixcloud.co\/playlist\/271977?b=1\u0026ab=1"}];
window.masterPlaylist = {
params: {
'token': '890a3e7db7f1c8213a11007947362b21',
'expires': '1737812156',
},
url: 'https://vixcloud.co/playlist/271977?b=1',
}
window.canPlayFHD = true
'''
# Test
converter = JavaScriptParser.parse(js_string=str(script_text))
window_video = WindowVideo(converter.get('video'))
window_streams = StreamsCollection(converter.get('streams'))
window_parameter = WindowParameter(converter.get('masterPlaylist'))
print(window_video, "\n")
print(window_streams, "\n")
print(window_parameter, "\n")

View File

@ -1,20 +0,0 @@
FROM python:3.11-slim
COPY . /app
WORKDIR /app
ENV TEMP /tmp
RUN mkdir -p $TEMP
RUN apt-get update && apt-get install -y \
ffmpeg \
build-essential \
libssl-dev \
libffi-dev \
python3-dev \
libxml2-dev \
libxslt1-dev
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python", "run.py"]

View File

@ -1,3 +1,5 @@
# 26.11.24
from StreamingCommunity.run import main
main()

View File

@ -1,200 +0,0 @@
#!/bin/sh
# Function to check if a command exists
command_exists() {
command -v "$1" > /dev/null 2>&1
}
# Install on Debian/Ubuntu-based systems
install_on_debian() {
echo "Installing $1..."
sudo apt update
sudo apt install -y "$1"
}
# Install on Red Hat/CentOS/Fedora-based systems
install_on_redhat() {
echo "Installing $1..."
sudo yum install -y "$1"
}
# Install on Arch-based systems
install_on_arch() {
echo "Installing $1..."
sudo pacman -Sy --noconfirm "$1"
}
# Install on BSD-based systems
install_on_bsd() {
echo "Installing $1..."
env ASSUME_ALWAYS_YES=yes sudo pkg install -y "$1"
}
# Install on macOS
install_on_macos() {
echo "Installing $1..."
if command_exists brew; then
brew install "$1"
else
echo "Homebrew is not installed. Installing Homebrew first..."
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
brew install "$1"
fi
}
set -e
# Check and install Python3
# if command_exists python3 > /dev/null 2>&1; then
# echo "Checking Python..."
# else
# # Detect the platform and install Python3 accordingly
# if [[ "$OSTYPE" == "linux-gnu"* ]]; then
# # Detect the package manager
# if command_exists apt; then
# install_on_debian "python3"
# elif command_exists yum; then
# install_on_redhat "python3"
# elif command_exists pacman; then
# install_on_arch "python-pip"
# else
# echo "Unsupported Linux distribution."
# exit 1
# fi
# elif [[ "$OSTYPE" == "bsd"* ]]; then
# echo "Detected BSD-based system."
# install_on_bsd "python39"
# elif [[ "$OSTYPE" == "darwin"* ]]; then
# install_on_macos "python"
# else
# echo "Unsupported operating system."
# exit 1
# fi
# fi
# Get the Python version
PYTHON_VERSION=$(python3 -c 'import sys; print(".".join(map(str, sys.version_info[:3])))')
# Compare the Python version with 3.8
REQUIRED_VERSION="3.8"
if [ "$(echo -e "$PYTHON_VERSION\n$REQUIRED_VERSION" | sort -V | head -n1)" = "$REQUIRED_VERSION" ]; then
echo "Python version $PYTHON_VERSION is >= $REQUIRED_VERSION. Continuing..."
else
echo "ERROR: Python version $PYTHON_VERSION is < $REQUIRED_VERSION. Exiting..."
exit 1
fi
if [ -d ".venv/" ]; then
echo ".venv exists. Installing requirements.txt..."
.venv/bin/pip install -r requirements.txt
else
echo "Making .venv and installing requirements.txt..."
if [ "$(uname)" = "Linux" ]; then
# Detect the package manager for venv installation check.
if command_exists apt; then
echo "Detected Debian-based system. Checking python3-venv."
if dpkg -l | grep -q "python3-venv"; then
echo "python3-venv found."
else
echo "python3-venv not found, installing..."
install_on_debian "python3-venv"
fi
fi
fi
python3 -m venv .venv
.venv/bin/pip install -r requirements.txt
fi
if command_exists ffmpeg; then
echo "ffmpeg exists."
else
echo "ffmpeg does not exist."
# Detect the platform and install ffmpeg accordingly.
case "$(uname)" in
Linux)
if command_exists apt; then
echo "Detected Debian-based system."
install_on_debian "ffmpeg"
elif command_exists yum; then
echo "Detected Red Hat-based system."
echo "Installing needed repos for ffmpeg..."
sudo yum config-manager --set-enabled crb > /dev/null 2>&1 || true
sudo yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-$(rpm -E %rhel).noarch.rpm https://dl.fedoraproject.org/pub/epel/epel-next-release-latest-$(rpm -E %rhel).noarch.rpm > /dev/null 2>&1 || true
sudo yum install -y --nogpgcheck https://mirrors.rpmfusion.org/free/el/rpmfusion-free-release-$(rpm -E %rhel).noarch.rpm https://mirrors.rpmfusion.org/nonfree/el/rpmfusion-nonfree-release-$(rpm -E %rhel).noarch.rpm > /dev/null 2>&1 || true
install_on_redhat "ffmpeg"
elif command_exists pacman; then
echo "Detected Arch-based system."
install_on_arch "ffmpeg"
else
echo "Unsupported Linux distribution."
exit 1
fi
;;
FreeBSD|NetBSD|OpenBSD)
echo "Detected BSD-based system."
install_on_bsd "ffmpeg"
;;
Darwin)
echo "Detected macOS."
install_on_macos "ffmpeg"
;;
*)
echo "Unsupported operating system."
exit 1
;;
esac
fi
if command_exists openssl || .venv/bin/pip list | grep -q pycryptodome; then
echo "openssl or pycryptodome exists."
else
echo "Please choose an option:"
echo "1) openssl"
echo "2) pycryptodome"
read -p "Enter your choice (1): " choice
case "$choice" in
2)
echo "Installing pycryptodome."
.venv/bin/pip install pycryptodome
;;
*)
# Detect the platform and install OpenSSL accordingly.
case "$(uname)" in
Linux)
if command_exists apt; then
install_on_debian openssl
elif command_exists yum; then
install_on_redhat openssl
elif command_exists pacman; then
install_on_arch openssl
else
echo "Unsupported Linux distribution."
exit 1
fi
;;
FreeBSD|NetBSD|OpenBSD)
install_on_bsd openssl
;;
Darwin)
install_on_macos openssl
;;
*)
echo "Unsupported operating system."
exit 1
;;
esac
;;
esac
fi
sed -i.bak '1s|.*|#!.venv/bin/python3|' run.py
sudo chmod +x run.py
echo 'Everything is installed!'
echo 'Run StreamingCommunity with "./run.py"'

194
update.py
View File

@ -1,194 +0,0 @@
# 15.06.24
import os
import shutil
from io import BytesIO
from zipfile import ZipFile
from datetime import datetime
# External library
import httpx
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
from rich.table import Table
# Variable
console = Console()
local_path = os.path.join(".")
from StreamingCommunity.Src.Upload.version import __author__, __title__
def move_content(source: str, destination: str):
"""
Move all content from the source folder to the destination folder.
Parameters:
- source (str): The path to the source folder.
- destination (str): The path to the destination folder.
"""
os.makedirs(destination, exist_ok=True)
# Iterate through all elements in the source folder
for element in os.listdir(source):
source_path = os.path.join(source, element)
destination_path = os.path.join(destination, element)
# If it's a directory, recursively call the function
if os.path.isdir(source_path):
move_content(source_path, destination_path)
else:
shutil.move(source_path, destination_path)
def keep_specific_items(directory: str, keep_folder: str, keep_file: str):
"""
Delete all items in the directory except for the specified folder and file.
Parameters:
- directory (str): The path to the directory.
- keep_folder (str): The name of the folder to keep.
- keep_file (str): The name of the file to keep.
"""
try:
if not os.path.exists(directory) or not os.path.isdir(directory):
raise ValueError(f"Error: '{directory}' is not a valid directory.")
# Iterate through items in the directory
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
# Check if the item is the specified folder or file
if os.path.isdir(item_path) and item != keep_folder:
shutil.rmtree(item_path)
elif os.path.isfile(item_path) and item != keep_file:
os.remove(item_path)
except PermissionError as pe:
console.print(f"[red]PermissionError: {pe}. Check permissions and try again.")
except Exception as e:
console.print(f"[red]Error: {e}")
def print_commit_info(commit_info: dict):
"""
Print detailed information about the commit in a formatted table.
Parameters:
- commit_info (dict): The commit information from GitHub API
"""
# Create a table for commit information
table = Table(title=f"[bold green]Latest Commit Information - {__title__}", show_header=False)
table.add_column("Field", style="cyan")
table.add_column("Value", style="yellow")
# Basic commit info
commit = commit_info['commit']
commit_date = datetime.strptime(commit['author']['date'], "%Y-%m-%dT%H:%M:%SZ")
formatted_date = commit_date.strftime("%Y-%m-%d %H:%M:%S")
# Add rows to the table
table.add_row("Repository", f"{__author__}/{__title__}")
table.add_row("Commit SHA", commit_info['sha'][:8])
table.add_row("Author", f"{commit['author']['name']} <{commit['author']['email']}>")
table.add_row("Date", formatted_date)
table.add_row("Committer", f"{commit['committer']['name']} <{commit['committer']['email']}>")
table.add_row("Message", commit['message'])
# Add stats if available
if 'stats' in commit_info:
stats = commit_info['stats']
table.add_row("Changes", f"+{stats['additions']} -[red]{stats['deletions']}[/red] ({stats['total']} total)")
# Add URL info
table.add_row("HTML URL", commit_info['html_url'])
# Print the table in a panel
console.print(Panel.fit(table))
def download_and_extract_latest_commit():
"""
Download and extract the latest commit from a GitHub repository.
"""
try:
# Get the latest commit information using GitHub API
api_url = f'https://api.github.com/repos/{__author__}/{__title__}/commits?per_page=1'
console.log("[green]Requesting latest commit from GitHub repository...")
headers = {
'Accept': 'application/vnd.github.v3+json',
'User-Agent': f'{__title__}-updater'
}
response = httpx.get(api_url, headers=headers, timeout=10)
if response.status_code == 200:
commit_info = response.json()[0]
commit_sha = commit_info['sha']
# Print detailed commit information
print_commit_info(commit_info)
zipball_url = f'https://github.com/{__author__}/{__title__}/archive/{commit_sha}.zip'
console.log("[green]Downloading latest commit zip file...")
# Download the zipball
response = httpx.get(zipball_url, follow_redirects=True, timeout=10)
temp_path = os.path.join(os.path.dirname(os.getcwd()), 'temp_extracted')
# Extract the content of the zipball into a temporary folder
with ZipFile(BytesIO(response.content)) as zip_ref:
zip_ref.extractall(temp_path)
console.log("[green]Extracting files...")
# Move files from the temporary folder to the current folder
for item in os.listdir(temp_path):
item_path = os.path.join(temp_path, item)
destination_path = os.path.join(local_path, item)
shutil.move(item_path, destination_path)
# Remove the temporary folder
shutil.rmtree(temp_path)
# Move all folder to main folder
new_folder_name = f"{__title__}-{commit_sha}"
move_content(new_folder_name, ".")
shutil.rmtree(new_folder_name)
console.log("[cyan]Latest commit downloaded and extracted successfully.")
else:
console.log(f"[red]Failed to fetch commit information. Status code: {response.status_code}")
except httpx.RequestError as e:
console.print(f"[red]Request failed: {e}")
except Exception as e:
console.print(f"[red]An unexpected error occurred: {e}")
def main_upload():
"""
Main function to upload the latest commit of a GitHub repository.
"""
cmd_insert = Prompt.ask(
"[bold red]Are you sure you want to delete all files? (Only 'Video' folder and 'update_version.py' will remain)",
choices=['y', 'n'],
default='y',
show_choices=True
)
if cmd_insert.lower().strip() == 'y' or cmd_insert.lower().strip() == 'yes':
console.print("[red]Deleting all files except 'Video' folder and 'update_version.py'...")
keep_specific_items(".", "Video", "upload.py")
download_and_extract_latest_commit()
else:
console.print("[red]Operation cancelled.")
if __name__ == "__main__":
main_upload()

View File

@ -1,134 +0,0 @@
@echo off
:: Check if the script is running as administrator
net session >nul 2>&1
if %errorlevel% neq 0 (
echo Running as administrator...
:: Restart the script with administrator privileges
powershell -Command "Start-Process '%~f0' -Verb RunAs"
exit /b
)
chcp 65001 > nul
SETLOCAL ENABLEDELAYEDEXPANSION
echo Script starting...
:: Check if Chocolatey is already installed
:check_choco
echo Checking if Chocolatey is installed...
choco --version >nul 2>&1
IF %ERRORLEVEL% EQU 0 (
echo Chocolatey is already installed. Skipping installation.
goto install_python
) ELSE (
echo Installing Chocolatey...
@"%SystemRoot%\System32\WindowsPowerShell\v1.0\powershell.exe" -NoProfile -InputFormat None -ExecutionPolicy Bypass -Command "iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))" || (
echo Error during Chocolatey installation.
exit /b 1
)
echo Chocolatey installed successfully.
call choco --version
echo.
)
:: Check if Python is already installed
:install_python
echo Checking if Python is installed...
python -V >nul 2>&1
IF %ERRORLEVEL% EQU 0 (
echo Python is already installed. Skipping installation.
goto install_openssl
) ELSE (
echo Installing Python...
choco install python --confirm --params="'/NoStore'" --allow-downgrade || (
echo Error during Python installation.
exit /b 1
)
echo Python installed successfully.
call python -V
echo.
)
:: Ask to restart the terminal
echo Please restart the terminal to continue...
pause
exit /b
:: Check if OpenSSL is already installed
:install_openssl
echo Checking if OpenSSL is installed...
openssl version -a >nul 2>&1
IF %ERRORLEVEL% EQU 0 (
echo OpenSSL is already installed. Skipping installation.
goto install_ffmpeg
) ELSE (
echo Installing OpenSSL...
choco install openssl --confirm || (
echo Error during OpenSSL installation.
exit /b 1
)
echo OpenSSL installed successfully.
call openssl version -a
echo.
)
:: Check if FFmpeg is already installed
:install_ffmpeg
echo Checking if FFmpeg is installed...
ffmpeg -version >nul 2>&1
IF %ERRORLEVEL% EQU 0 (
echo FFmpeg is already installed. Skipping installation.
goto create_venv
) ELSE (
echo Installing FFmpeg...
choco install ffmpeg --confirm || (
echo Error during FFmpeg installation.
exit /b 1
)
echo FFmpeg installed successfully.
call ffmpeg -version
echo.
)
:: Verify installations
:verifica_installazioni
echo Verifying installations...
call choco --version
call python -V
call openssl version -a
call ffmpeg -version
echo All programs have been successfully installed and verified.
:: Create a virtual environment .venv
:create_venv
echo Checking if the .venv virtual environment already exists...
if exist .venv (
echo The .venv virtual environment already exists. Skipping creation.
) ELSE (
echo Creating the .venv virtual environment...
python -m venv .venv || (
echo Error during virtual environment creation.
exit /b 1
)
echo Virtual environment created successfully.
)
:: Activate the virtual environment and install requirements
echo Installing requirements...
call .venv\Scripts\activate.bat
pip install -r requirements.txt || (
echo Error during requirements installation.
exit /b 1
)
:: Run run.py
echo Running run.py...
call .venv\Scripts\python .\run.py || (
echo Error during run.py execution.
exit /b 1
)
echo End of script.
ENDLOCAL