Add altadefinizione

This commit is contained in:
Ghost 2024-05-26 11:08:46 +02:00
parent 762970c0ed
commit 2542b5c6d4
43 changed files with 980 additions and 2250 deletions

View File

@ -65,7 +65,7 @@ You can change some behaviors by tweaking the configuration file.
- Example Value: %(tv_name) [S%(season)] [E%(episode)] %(episode_name)
### Options (M3U8)
### Options (M3U8_DOWNLOAD)
* tdqm_workers: The number of workers that will cooperate to download .ts files. **A high value may slow down your PC**
- Default Value: 20
@ -73,10 +73,6 @@ You can change some behaviors by tweaking the configuration file.
* tqdm_show_progress: Whether to show progress during downloads or not.
- Default Value: true
* save_m3u8_content: Enabling this feature saves various playlists and indexes in the temporary folder during the download process, ensuring all necessary files are retained for playback or further processing.
- Default Value: true
* fake_proxy: Speed up download for streaming film and series. **Dont work for anime, need to set to FALSE**
- Default Value: true
@ -84,7 +80,7 @@ You can change some behaviors by tweaking the configuration file.
- Default Value: false
### Options (M3U8_OPTIONS)
### Options (M3U8_FILTER)
* cleanup_tmp_folder: Upon final conversion, this option ensures the removal of all unformatted audio, video tracks, and subtitles from the temporary folder, thereby maintaining cleanliness and efficiency.
- Default Value: true

View File

@ -0,0 +1,62 @@
# 26.05.24
from typing import List
class MediaItem:
def __init__(self, data: dict):
self.name: str = data.get('name')
self.type: str = "film"
self.score: str = data.get('score')
self.url: int = data.get('url')
def __str__(self):
return f"MediaItem(name='{self.name}', type='{self.type}', score='{self.score}', url={self.url})"
class MediaManager:
def __init__(self):
self.media_list: List[MediaItem] = []
def add_media(self, data: dict) -> None:
"""
Add media to the list.
Args:
data (dict): Media data to add.
"""
self.media_list.append(MediaItem(data))
def get(self, index: int) -> MediaItem:
"""
Get a media item from the list by index.
Args:
index (int): The index of the media item to retrieve.
Returns:
MediaItem: The media item at the specified index.
"""
return self.media_list[index]
def get_length(self) -> int:
"""
Get the number of media find with research
Returns:
int: Number of episodes.
"""
return len(self.media_list)
def clear(self) -> None:
"""
This method clears the medias list.
Args:
self: The object instance.
"""
self.media_list.clear()
def __str__(self):
return f"MediaManager(num_media={len(self.media_list)})"

View File

@ -0,0 +1,182 @@
# 26.05.24
import re
import os
import sys
import time
import logging
import subprocess
# External libraries
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.console import console
from Src.Lib.Request import requests
from Src.Util.headers import get_headers
from Src.Util.node_jjs import run_node_script
class VideoSource:
def __init__(self) -> None:
"""
Initializes the VideoSource object with default values.
Attributes:
headers (dict): An empty dictionary to store HTTP headers.
"""
self.headers = {'user-agent': get_headers()}
def setup(self, url: str) -> None:
"""
Sets up the video source with the provided URL.
Args:
url (str): The URL of the video source.
"""
self.url = url
def make_request(self, url: str) -> str:
"""
Make an HTTP GET request to the provided URL.
Args:
url (str): The URL to make the request to.
Returns:
str: The response content if successful, None otherwise.
"""
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Request failed: {e}")
return None
def parse_html(self, html_content: str) -> BeautifulSoup:
"""
Parse the provided HTML content using BeautifulSoup.
Args:
html_content (str): The HTML content to parse.
Returns:
BeautifulSoup: Parsed HTML content if successful, None otherwise.
"""
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup
except Exception as e:
logging.error(f"Failed to parse HTML content: {e}")
return None
def get_iframe(self, soup):
"""
Extracts the source URL of the second iframe in the provided BeautifulSoup object.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML.
Returns:
str: The source URL of the second iframe, or None if not found.
"""
iframes = soup.find_all("iframe")
if iframes and len(iframes) > 1:
return iframes[1].get("src")
return None
def find_content(self, url):
"""
Makes a request to the specified URL and parses the HTML content.
Args:
url (str): The URL to fetch content from.
Returns:
BeautifulSoup: A BeautifulSoup object representing the parsed HTML content, or None if the request fails.
"""
content = self.make_request(url)
if content:
return self.parse_html(content)
return None
def get_result_node_js(self, soup):
"""
Prepares and runs a Node.js script from the provided BeautifulSoup object to retrieve the video URL.
Args:
soup (BeautifulSoup): A BeautifulSoup object representing the parsed HTML content.
Returns:
str: The output from the Node.js script, or None if the script cannot be found or executed.
"""
for script in soup.find_all("script"):
if "eval" in str(script):
new_script = str(script.text).replace("eval", "var a = ")
new_script = new_script.replace(")))", ")));console.log(a);")
return run_node_script(new_script)
return None
def get_playlist(self) -> str:
"""
Download a video from the provided URL.
Returns:
str: The URL of the downloaded video if successful, None otherwise.
"""
try:
html_content = self.make_request(self.url)
if not html_content:
logging.error("Failed to fetch HTML content.")
return None
soup = self.parse_html(html_content)
if not soup:
logging.error("Failed to parse HTML content.")
return None
iframe_src = self.get_iframe(soup)
if not iframe_src:
logging.error("No iframe found.")
return None
down_page_soup = self.find_content(iframe_src)
if not down_page_soup:
logging.error("Failed to fetch down page content.")
return None
pattern = r'data-link="(//supervideo[^"]+)"'
match = re.search(pattern, str(down_page_soup))
if not match:
logging.error("No match found for supervideo URL.")
return None
supervideo_url = "https:" + match.group(1)
supervideo_soup = self.find_content(supervideo_url)
if not supervideo_soup:
logging.error("Failed to fetch supervideo content.")
return None
result = self.get_result_node_js(supervideo_soup)
if not result:
logging.error("No video URL found in script.")
return None
master_playlist = str(result).split(":")[3].split('"}')[0]
return f"https:{master_playlist}"
except Exception as e:
logging.error(f"An error occurred: {e}")
return None

View File

@ -0,0 +1,38 @@
# 26.05.24
# Internal utilities
from Src.Util.console import console, msg
# Logic class
from .site import (
title_search,
get_select_title,
manager_clear
)
from .film import download_film
def main_film():
"""
Main function of the application for film and series.
"""
# Make request to site to get content that corrsisponde to that string
film_search = msg.ask("\n[purple]Insert word to search in all site: ").strip()
len_database = title_search(film_search)
if len_database != 0:
# Select title from list
select_title = get_select_title()
# Download only film
download_film(
title_name=select_title.name,
url=select_title.url
)
# End
console.print("\n[red]Done")

View File

@ -0,0 +1,58 @@
# 26.05.24
import os
import sys
import logging
# Internal utilities
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
from Src.Lib.Hls.downloader import Downloader
from Src.Util.message import start_message
# Logic class
from .Core.Player.supervideo import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
STREAMING_FOLDER = "altadefinizione"
MOVIE_FOLDER = "Movie"
# Variable
video_source = VideoSource()
def download_film(title_name: str, url: str):
"""
Downloads a film using the provided film ID, title name, and domain.
Args:
- title_name (str): The name of the film title.
- url (str): The url of the video
"""
# Start message and display film information
start_message()
console.print(f"[yellow]Download: [red]{title_name} \n")
# Set domain and media ID for the video source
video_source.setup(
url = url
)
# Define output path
mp4_name = str(title_name).replace("-", "_") + ".mp4"
mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name)
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, key, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()

View File

@ -0,0 +1,140 @@
# 26.05.24
import sys
import json
import logging
# External libraries
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.table import TVShowManager
from Src.Lib.Request import requests
from Src.Util.headers import get_headers
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
from Src.Lib.Unidecode import transliterate
# Logic class
from .Core.Class.SearchType import MediaManager, MediaItem
# Config
AD_SITE_NAME = "altadefinizione"
AD_DOMAIN_NOW = config_manager.get('SITE', AD_SITE_NAME)
# Variable
media_search_manager = MediaManager()
table_show_manager = TVShowManager()
def title_search(title_search: str) -> int:
"""
Search for titles based on a search query.
Args:
- title_search (str): The title to search for.
- domain (str): The domain to search on.
Returns:
int: The number of titles found.
"""
# Send request to search for titles
response = requests.get(f"https://{AD_SITE_NAME}.{AD_DOMAIN_NOW}/page/1/?story={transliterate(title_search).replace(' ', '+')}&do=search&subaction=search&titleonly=3")
# Create soup and find table
soup = BeautifulSoup(response.text, "html.parser")
table_content = soup.find('div', id="dle-content")
# Scrape div film in table on single page
for film_div in table_content.find_all('div', class_='col-lg-3'):
title = film_div.find('h2', class_='titleFilm').get_text(strip=True)
link = film_div.find('h2', class_='titleFilm').find('a')['href']
imdb_rating = film_div.find('div', class_='imdb-rate').get_text(strip=True).split(":")[-1]
film_info = {
'name': title,
'url': link,
'score': imdb_rating
}
media_search_manager.add_media(film_info)
# Return the number of titles found
return media_search_manager.get_length()
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
MediaItem: The selected media item.
"""
# Set up table for displaying titles
table_show_manager.set_slice_end(10)
# Add columns to the table
column_info = {
"Index": {'color': 'red'},
"Name": {'color': 'magenta'},
"Type": {'color': 'yellow'},
"Score": {'color': 'cyan'},
}
table_show_manager.add_column(column_info)
# Populate the table with title information
for i, media in enumerate(media_search_manager.media_list):
# Filter for only a list of category
if type_filter is not None:
if str(media.type) not in type_filter:
continue
table_show_manager.add_tv_show({
'Index': str(i),
'Name': media.name,
'Type': media.type,
'Score': media.score,
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
if last_command == "q":
console.print("\n[red]Quit [white]...")
sys.exit(0)
# Check if the selected index is within range
if 0 <= int(last_command) <= len(media_search_manager.media_list):
return media_search_manager.get(int(last_command))
else:
console.print("\n[red]Wrong index")
sys.exit(0)
def manager_clear():
"""
Clears the data lists managed by media_search_manager and table_show_manager.
This function clears the data lists managed by global variables media_search_manager
and table_show_manager. It removes all the items from these lists, effectively
resetting them to empty lists.
"""
global media_search_manager, table_show_manager
# Clear list of data
media_search_manager.clear()
table_show_manager.clear()

View File

@ -56,14 +56,11 @@ class VideoSource:
try:
# Make a request to collect information about preview of the title
response = requests.post(f"https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}", headers = self.headers)
response.raise_for_status()
if response.ok:
# Collect all info about preview
self.obj_preview = PreviewManager(response.json())
# Collect all info about preview
self.obj_preview = PreviewManager(response.json())
except Exception as e:
logging.error(f"Error collecting preview info: {e}")
@ -78,7 +75,6 @@ class VideoSource:
"""
try:
# Fetch episode count from API endpoint
response = requests.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/")
response.raise_for_status()
@ -101,13 +97,11 @@ class VideoSource:
"""
try:
# Define parameters for API request
params = {
"start_range": index_ep,
"end_range": index_ep + 1
}
# Fetch episode information from API endpoint
response = requests.get(f"https://www.{self.base_name}.{self.domain}/info_api/{self.media_id}/{index_ep}", params = params)
response.raise_for_status()
@ -131,7 +125,6 @@ class VideoSource:
"""
try:
# Fetch embed URL from API endpoint
response = requests.get(f"https://www.{self.base_name}.{self.domain}/embed-url/{episode_id}")
response.raise_for_status()

View File

@ -17,7 +17,7 @@ def main_anime():
if len_database != 0:
# Select title from list
select_title = get_select_title(True)
select_title = get_select_title()
if select_title.type == 'TV':
donwload_series(

View File

@ -49,16 +49,17 @@ def download_episode(index_select: int):
video_source.parse_script(embed_url)
# Create output path
out_path = None
mp4_path = None
mp4_name = f"{index_select}.mp4"
if video_source.is_series:
out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, SERIES_FOLDER, video_source.series_name, f"{index_select+1}.mp4")
mp4_path = os.path.join(ROOT_PATH, ANIME_FOLDER, SERIES_FOLDER, video_source.series_name)
else:
out_path = os.path.join(ROOT_PATH, ANIME_FOLDER, MOVIE_FOLDER, video_source.series_name, f"{index_select}.mp4")
mp4_path = os.path.join(ROOT_PATH, ANIME_FOLDER, MOVIE_FOLDER, video_source.series_name)
# Crete downloader
obj_download = Downloader(
m3u8_playlist = video_source.get_playlist(),
output_filename = out_path
output_filename = os.path.join(mp4_path, mp4_name)
)
# Start downloading

View File

@ -181,12 +181,11 @@ def title_search(title: str) -> int:
def get_select_title(switch: bool = False, type_filter: list = None) -> MediaItem:
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- switch (bool): switch from film to anime
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
@ -223,7 +222,7 @@ def get_select_title(switch: bool = False, type_filter: list = None) -> MediaIte
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list), switch=switch)
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command

View File

@ -7,7 +7,6 @@ from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
from Src.Lib.Unidecode import transliterate
# Logic class
@ -70,8 +69,6 @@ def map_episode_title(tv_name: str, episode: Episode, number_season: int):
# Additional fix
map_episode_temp = map_episode_temp.replace(".", "_")
#map_episode_temp = map_episode_temp.replace(" ", "_")
map_episode_temp = transliterate(map_episode_temp)
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -61,14 +61,11 @@ class VideoSource:
try:
# Make a request to collect information about preview of the title
response = requests.post(f"https://{self.base_name}.{self.domain}/api/titles/preview/{self.media_id}", headers = self.headers)
response.raise_for_status()
if response.ok:
# Collect all info about preview
self.obj_preview = PreviewManager(response.json())
# Collect all info about preview
self.obj_preview = PreviewManager(response.json())
except Exception as e:
logging.error(f"Error collecting preview info: {e}")
@ -78,6 +75,7 @@ class VideoSource:
"""
Collect information about seasons.
"""
self.headers = {
'user-agent': get_headers(),
'x-inertia': 'true',
@ -86,18 +84,15 @@ class VideoSource:
try:
# Make a request to collect information about seasons
response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers = self.headers)
response.raise_for_status()
if response.ok:
# Extract JSON response if available
json_response = response.json().get('props', {}).get('title', {}).get('seasons', [])
# Extract JSON response if available
json_response = response.json().get('props', {}).get('title', {}).get('seasons', [])
# Iterate over JSON data and add titles to the manager
for dict_season in json_response:
self.obj_title_manager.add_title(dict_season)
# Iterate over JSON data and add titles to the manager
for dict_season in json_response:
self.obj_title_manager.add_title(dict_season)
except Exception as e:
logging.error(f"Error collecting season info: {e}")
@ -116,14 +111,12 @@ class VideoSource:
response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers = self.headers)
response.raise_for_status()
if response.ok:
# Extract JSON response if available
json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', [])
# Extract JSON response if available
json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', [])
# Iterate over JSON data and add episodes to the manager
for dict_episode in json_response:
self.obj_episode_manager.add_episode(dict_episode)
# Iterate over JSON data and add episodes to the manager
for dict_episode in json_response:
self.obj_episode_manager.add_episode(dict_episode)
except Exception as e:
logging.error(f"Error collecting title season info: {e}")
@ -150,11 +143,9 @@ class VideoSource:
response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params = params)
response.raise_for_status()
if response.ok:
# Parse response with BeautifulSoup to get iframe source
soup = BeautifulSoup(response.text, "html.parser")
self.iframe_src: str = soup.find("iframe").get("src")
# Parse response with BeautifulSoup to get iframe source
soup = BeautifulSoup(response.text, "html.parser")
self.iframe_src: str = soup.find("iframe").get("src")
except Exception as e:
logging.error(f"Error getting iframe source: {e}")

View File

@ -9,10 +9,7 @@ import logging
from Src.Util.console import console
from Src.Util._jsonConfig import config_manager
from Src.Lib.Hls.downloader import Downloader
from Src.Util.file_validator import can_create_file
from Src.Util.message import start_message
from Src.Util.os import remove_special_characters
from Src.Lib.Unidecode import transliterate
# Logic class
@ -21,7 +18,7 @@ from .Core.Vix_player.player import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
STREAMING_FOLDER = config_manager.get('SITE', 'streamingcommunity')
STREAMING_FOLDER = "streamingcommunity"
MOVIE_FOLDER = "Movie"
@ -54,17 +51,13 @@ def download_film(id_film: str, title_name: str, domain: str):
video_source.get_content()
master_playlist = video_source.get_playlist()
# Define the filename and path for the downloaded film
mp4_name = title_name.replace("-", "_")
mp4_format = remove_special_characters(transliterate(mp4_name) + ".mp4")
if not can_create_file(mp4_format):
logging.error("Invalid mp4 name.")
sys.exit(0)
mp4_format = (mp4_name) + ".mp4"
mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name)
# Download the film using the m3u8 playlist, key, and output filename
Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(ROOT_PATH, STREAMING_FOLDER, MOVIE_FOLDER, title_name, mp4_format)
output_filename = os.path.join(mp4_path, mp4_format)
).start()

View File

@ -10,8 +10,6 @@ from Src.Util.console import console, msg
from Src.Util._jsonConfig import config_manager
from Src.Util.table import TVShowManager
from Src.Util.message import start_message
from Src.Util.os import remove_special_characters
from Src.Util.file_validator import can_create_file
from Src.Lib.Hls.downloader import Downloader
@ -87,13 +85,8 @@ def donwload_video(tv_name: str, index_season_selected: int, index_episode_selec
print()
# Define filename and path for the downloaded video
mp4_name = remove_special_characters(f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4")
mp4_name = f"{map_episode_title(tv_name, obj_episode, index_season_selected)}.mp4"
mp4_path = os.path.join(ROOT_PATH, STREAMING_FOLDER, SERIES_FOLDER, tv_name, f"S{index_season_selected}")
os.makedirs(mp4_path, exist_ok=True)
if not can_create_file(mp4_name):
logging.error("Invalid mp4 name.")
sys.exit(0)
# Retrieve scws and if available master playlist
video_source.get_iframe(obj_episode.id)

View File

@ -58,8 +58,6 @@ def get_version(text: str) -> tuple[str, list]:
version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version']
sliders = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['props']['sliders']
title_trending = sliders[0]
title_lates = sliders[1]
title_top_10 = sliders[2]
# Collect info about only top 10 title
@ -149,12 +147,11 @@ def title_search(title_search: str, domain: str) -> int:
return media_search_manager.get_length()
def get_select_title(switch: bool = False, type_filter: list = None) -> MediaItem:
def get_select_title(type_filter: list = None) -> MediaItem:
"""
Display a selection of titles and prompt the user to choose one.
Args:
- switch (bool): switch from film to anime
- type_filter (list): A list of media types to filter. Can include 'film', 'tv', 'ova'. Ex. ['tv', 'film']
Returns:
@ -191,7 +188,7 @@ def get_select_title(switch: bool = False, type_filter: list = None) -> MediaIte
})
# Run the table and handle user input
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list), switch=switch)
last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list))
table_show_manager.clear()
# Handle user's quit command
@ -220,4 +217,3 @@ def manager_clear():
# Clear list of data
media_search_manager.clear()
table_show_manager.clear()

View File

@ -11,7 +11,7 @@ from Src.Util._jsonConfig import config_manager
# Variable
CREATE_REPORT = config_manager.get_bool('M3U8', 'create_report')
CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report')
CREATE_JOB_DB = config_manager.get_bool('DEFAULT', 'create_job_database')

View File

@ -16,6 +16,7 @@ except: pass
# Internal utilities
from Src.Util.os import check_file_existence
from Src.Util._jsonConfig import config_manager
from .util import has_audio_stream
from .capture import capture_ffmpeg_real_time
@ -24,10 +25,9 @@ from .capture import capture_ffmpeg_real_time
# Variable
DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug")
DEBUG_FFMPEG = "debug" if DEBUG_MODE else "error"
terminate_flag = threading.Event()
USE_CODECS = config_manager.get_bool("M3U8_OPTIONS", "use_codec")
USE_GPU = config_manager.get_bool("M3U8_OPTIONS", "use_gpu")
FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_OPTIONS", "default_preset")
USE_CODECS = config_manager.get_bool("M3U8_FILTER", "use_codec")
USE_GPU = config_manager.get_bool("M3U8_FILTER", "use_gpu")
FFMPEG_DEFAULT_PRESET = config_manager.get("M3U8_FILTER", "default_preset")
@ -271,6 +271,9 @@ def join_video(video_path: str, out_path: str, vcodec: str = None, acodec: str =
- force_ts (bool): Force video path to be mpegts as input.
"""
if not check_file_existence(video_path):
sys.exit(0)
# Start command
ffmpeg_cmd = ['ffmpeg']
@ -324,6 +327,9 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s
- preset (str): The preset for encoding. Defaults to 'ultrafast'.
"""
if not check_file_existence(video_path):
sys.exit(0)
# Start command
ffmpeg_cmd = ['ffmpeg', '-i', video_path]
@ -331,6 +337,9 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s
for i, audio_track in enumerate(audio_tracks):
ffmpeg_cmd.extend(['-i', audio_track.get('path')])
if not check_file_existence(audio_track.get('path')):
sys.exit(0)
# Add output args
if USE_CODECS:
ffmpeg_cmd.extend(['-c:v', vcodec, '-c:a', acodec, '-b:a', str(bitrate), '-preset', FFMPEG_DEFAULT_PRESET])
@ -348,7 +357,7 @@ def join_audios(video_path: str, audio_tracks: List[Dict[str, str]], out_path: s
capture_ffmpeg_real_time(ffmpeg_cmd, "[cyan]Join audio")
print()
def join_subtitle(video: str, subtitles_list: List[Dict[str, str]], output_file: str):
def join_subtitle(video_path: str, subtitles_list: List[Dict[str, str]], output_file: str):
"""
Joins subtitles with a video file using FFmpeg.
@ -359,9 +368,13 @@ def join_subtitle(video: str, subtitles_list: List[Dict[str, str]], output_file:
- output_file (str): The path to save the output file.
"""
if not check_file_existence(video_path):
sys.exit(0)
# Start command
added_subtitle_names = set() # Remove subtitle with same name
ffmpeg_cmd = ["ffmpeg", "-i", video]
ffmpeg_cmd = ["ffmpeg", "-i", video_path]
# Add subtitle with language
for idx, subtitle in enumerate(subtitles_list):
@ -375,6 +388,9 @@ def join_subtitle(video: str, subtitles_list: List[Dict[str, str]], output_file:
ffmpeg_cmd += ["-map", "0:v", "-map", "0:a", "-map", f"{idx + 1}:s"]
ffmpeg_cmd += ["-metadata:s:s:{}".format(idx), "title={}".format(subtitle['name'])]
if not check_file_existence(subtitle['path']):
sys.exit(0)
# Add output args
if USE_CODECS:
ffmpeg_cmd.extend(['-c:v', 'copy', '-c:a', 'copy', '-c:s', 'mov_text'])

View File

@ -1,6 +0,0 @@
# 02.04.24
from .decryption import M3U8_Decryption
from .math_calc import M3U8_Ts_Files
from .parser import M3U8_Parser, M3U8_Codec
from .url_fix import m3u8_url_fix

View File

@ -1,127 +0,0 @@
# 03.04.24
import sys
import logging
import subprocess
import importlib.util
# Internal utilities
from Src.Util.console import console
# Check if Crypto module is installed
crypto_spec = importlib.util.find_spec("Crypto")
crypto_installed = crypto_spec is not None
if crypto_installed:
logging.info("Decrypy use: Crypto")
from Crypto.Cipher import AES # type: ignore
from Crypto.Util.Padding import unpad # type: ignore
class M3U8_Decryption:
"""
Class for decrypting M3U8 playlist content using AES encryption when the Crypto module is available.
"""
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
"""
Initialize the M3U8_Decryption object.
Args:
- key (bytes): The encryption key.
- iv (bytes): The initialization vector (IV).
- method (str): The encryption method.
"""
self.key = key
if "0x" in str(iv):
self.iv = bytes.fromhex(iv.replace("0x", ""))
else:
self.iv = iv
self.method = method
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt the ciphertext using the specified encryption method.
Args:
- ciphertext (bytes): The encrypted content to decrypt.
Returns:
bytes: The decrypted content.
"""
if self.method == "AES":
cipher = AES.new(self.key, AES.MODE_ECB)
decrypted_data = cipher.decrypt(ciphertext)
return unpad(decrypted_data, AES.block_size)
elif self.method == "AES-128":
cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv)
decrypted_data = cipher.decrypt(ciphertext)
return unpad(decrypted_data, AES.block_size)
elif self.method == "AES-128-CTR":
cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv)
return cipher.decrypt(ciphertext)
else:
raise ValueError("Invalid or unsupported method")
else:
# Check if openssl command is available
openssl_available = subprocess.run(["openssl", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
logging.info("Decrypy use: OPENSSL")
if not openssl_available:
console.log("[red]Neither Crypto nor openssl is installed. Please install either one of them.")
sys.exit(0)
class M3U8_Decryption:
"""
Class for decrypting M3U8 playlist content using OpenSSL when the Crypto module is not available.
"""
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
"""
Initialize the M3U8_Decryption object.
Args:
- key (bytes): The encryption key.
- iv (bytes): The initialization vector (IV).
- method (str): The encryption method.
"""
self.key = key
if "0x" in str(iv):
self.iv = bytes.fromhex(iv.replace("0x", ""))
else:
self.iv = iv
self.method = method
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypt the ciphertext using the specified encryption method.
Args:
- ciphertext (bytes): The encrypted content to decrypt.
Returns:
bytes: The decrypted content.
"""
if self.method == "AES":
openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt'
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext)
elif self.method == "AES-128":
openssl_cmd = f'openssl enc -d -aes-128-cbc -K {self.key[:16].hex()} -iv {self.iv.hex()}'
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext)
elif self.method == "AES-128-CTR":
openssl_cmd = f'openssl enc -d -aes-128-ctr -K {self.key[:16].hex()} -iv {self.iv.hex()}'
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext)
else:
raise ValueError("Invalid or unsupported method")
return decrypted_data

View File

@ -1,38 +0,0 @@
# 15.04.24
import os
# Internal utilities
from .model import M3U8
def load(raw_content, uri):
"""
Parses the content of an M3U8 playlist and returns an M3U8 object.
Args:
raw_content (str): The content of the M3U8 playlist as a string.
uri (str): The URI of the M3U8 playlist file or stream.
Returns:
M3U8: An object representing the parsed M3U8 playlist.
Raises:
IOError: If the raw_content is empty or if the URI cannot be accessed.
ValueError: If the raw_content is not a valid M3U8 playlist format.
Example:
>>> m3u8_content = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:10\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:10.0,\nhttp://example.com/segment0.ts\n#EXTINF:10.0,\nhttp://example.com/segment1.ts\n"
>>> uri = "http://example.com/playlist.m3u8"
>>> playlist = load(m3u8_content, uri)
"""
if not raw_content:
raise IOError("Empty content provided.")
if not uri:
raise IOError("Empty URI provided.")
base_uri = os.path.dirname(uri)
return M3U8(raw_content, base_uri=base_uri)

View File

@ -1,28 +0,0 @@
# 19.04.24
import itertools
def remove_quotes_parser(*attrs):
"""
Returns a dictionary mapping attribute names to a function that removes quotes from their values.
"""
return dict(zip(attrs, itertools.repeat(remove_quotes)))
def remove_quotes(string):
"""
Removes quotes from a string.
"""
quotes = ('"', "'")
if string and string[0] in quotes and string[-1] in quotes:
return string[1:-1]
return string
def normalize_attribute(attribute):
"""
Normalizes an attribute name by converting hyphens to underscores and converting to lowercase.
"""
return attribute.replace('-', '_').lower().strip()

View File

@ -1,359 +0,0 @@
# 15.04.24
import os
from collections import namedtuple
# Internal utilities
from ..lib_parser import parser
# Variable
StreamInfo = namedtuple('StreamInfo', ['bandwidth', 'program_id', 'resolution', 'codecs'])
Media = namedtuple('Media', ['uri', 'type', 'group_id', 'language', 'name','default', 'autoselect', 'forced', 'characteristics'])
class M3U8:
"""
Represents a single M3U8 playlist. Should be instantiated with the content as string.
Args:
- content: the m3u8 content as string
- base_path: all urls (key and segments url) will be updated with this base_path,
ex: base_path = "http://videoserver.com/hls"
- base_uri: uri the playlist comes from. it is propagated to SegmentList and Key
ex: http://example.com/path/to
Attribute:
- key: it's a `Key` object, the EXT-X-KEY from m3u8. Or None
- segments: a `SegmentList` object, represents the list of `Segment`s from this playlist
- is_variant: Returns true if this M3U8 is a variant playlist, with links to other M3U8s with different bitrates.
If true, `playlists` is a list of the playlists available, and `iframe_playlists` is a list of the i-frame playlists available.
- is_endlist: Returns true if EXT-X-ENDLIST tag present in M3U8.
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8
- playlists: If this is a variant playlist (`is_variant` is True), returns a list of Playlist objects
- iframe_playlists: If this is a variant playlist (`is_variant` is True), returns a list of IFramePlaylist objects
- playlist_type: A lower-case string representing the type of the playlist, which can be one of VOD (video on demand) or EVENT.
- media: If this is a variant playlist (`is_variant` is True), returns a list of Media objects
- target_duration: Returns the EXT-X-TARGETDURATION as an integer
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2
- media_sequence: Returns the EXT-X-MEDIA-SEQUENCE as an integer
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3
- program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a string
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
- version: Return the EXT-X-VERSION as is
- allow_cache: Return the EXT-X-ALLOW-CACHE as is
- files: Returns an iterable with all files from playlist, in order. This includes segments and key uri, if present.
- base_uri: It is a property (getter and setter) used by SegmentList and Key to have absolute URIs.
- is_i_frames_only: Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8.
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12
"""
# Mapping of simple attributes (obj attribute, parser attribute)
SIMPLE_ATTRIBUTES = (
('is_variant', 'is_variant'),
('is_endlist', 'is_endlist'),
('is_i_frames_only', 'is_i_frames_only'),
('target_duration', 'targetduration'),
('media_sequence', 'media_sequence'),
('program_date_time', 'program_date_time'),
('version', 'version'),
('allow_cache', 'allow_cache'),
('playlist_type', 'playlist_type')
)
def __init__(self, content=None, base_path=None, base_uri=None):
"""
Initialize the M3U8 object.
Parameters:
- content: M3U8 content (string).
- base_path: Base path for relative URIs (string).
- base_uri: Base URI for absolute URIs (string).
"""
if content is not None:
self.data = parser.parse(content)
else:
self.data = {}
self._base_uri = base_uri
self.base_path = base_path
self._initialize_attributes()
def _initialize_attributes(self):
"""
Initialize attributes based on parsed data.
"""
# Initialize key and segments
self.key = Key(base_uri=self.base_uri, **self.data.get('key', {})) if 'key' in self.data else None
self.segments = SegmentList([Segment(base_uri=self.base_uri, **params) for params in self.data.get('segments', [])])
# Initialize simple attributes
for attr, param in self.SIMPLE_ATTRIBUTES:
setattr(self, attr, self.data.get(param))
# Initialize files, media, playlists, and iframe_playlists
self.files = []
if self.key:
self.files.append(self.key.uri)
self.files.extend(self.segments.uri)
self.media = [Media(
uri = media.get('uri'),
type = media.get('type'),
group_id = media.get('group_id'),
language = media.get('language'),
name = media.get('name'),
default = media.get('default'),
autoselect = media.get('autoselect'),
forced = media.get('forced'),
characteristics = media.get('characteristics'))
for media in self.data.get('media', [])
]
self.playlists = PlaylistList([Playlist(
base_uri = self.base_uri,
media = self.media,
**playlist
)for playlist in self.data.get('playlists', [])
])
self.iframe_playlists = PlaylistList()
for ifr_pl in self.data.get('iframe_playlists', []):
self.iframe_playlists.append(
IFramePlaylist(
base_uri = self.base_uri,
uri = ifr_pl['uri'],
iframe_stream_info=ifr_pl['iframe_stream_info'])
)
@property
def base_uri(self):
"""
Get the base URI.
"""
return self._base_uri
@base_uri.setter
def base_uri(self, new_base_uri):
"""
Set the base URI.
"""
self._base_uri = new_base_uri
self.segments.base_uri = new_base_uri
class BasePathMixin:
"""
Mixin class for managing base paths.
"""
@property
def base_path(self):
"""
Get the base path.
"""
return os.path.dirname(self.uri)
@base_path.setter
def base_path(self, newbase_path):
"""
Set the base path.
"""
if not self.base_path:
self.uri = "%s/%s" % (newbase_path, self.uri)
self.uri = self.uri.replace(self.base_path, newbase_path)
class GroupedBasePathMixin:
"""
Mixin class for managing base paths across a group of items.
"""
def _set_base_uri(self, new_base_uri):
"""
Set the base URI for each item in the group.
"""
for item in self:
item.base_uri = new_base_uri
base_uri = property(None, _set_base_uri)
def _set_base_path(self, new_base_path):
"""
Set the base path for each item in the group.
"""
for item in self:
item.base_path = new_base_path
base_path = property(None, _set_base_path)
class Segment(BasePathMixin):
"""
Class representing a segment in an M3U8 playlist.
Inherits from BasePathMixin for managing base paths.
"""
def __init__(self, uri, base_uri, program_date_time=None, duration=None,
title=None, byterange=None, discontinuity=False, key=None):
"""
Initialize a Segment object.
Args:
- uri: URI of the segment.
- base_uri: Base URI for the segment.
- program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a datetime
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
- duration: Duration of the segment (optional).
- title: Title attribute from EXTINF parameter
- byterange: Byterange information of the segment (optional).
- discontinuity: Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11
- key: Key for encryption (optional).
"""
self.uri = uri
self.duration = duration
self.title = title
self.base_uri = base_uri
self.byterange = byterange
self.program_date_time = program_date_time
self.discontinuity = discontinuity
#self.key = key
class SegmentList(list, GroupedBasePathMixin):
"""
Class representing a list of segments in an M3U8 playlist.
Inherits from list and GroupedBasePathMixin for managing base paths across a group of items.
"""
@property
def uri(self):
"""
Get the URI of each segment in the SegmentList.
Returns:
- List of URIs of segments in the SegmentList.
"""
return [seg.uri for seg in self]
class Key(BasePathMixin):
"""
Class representing a key used for encryption in an M3U8 playlist.
Inherits from BasePathMixin for managing base paths.
"""
def __init__(self, method, uri, base_uri, iv=None):
"""
Initialize a Key object.
Args:
- method: Encryption method.
ex: "AES-128"
- uri: URI of the key.
ex: "https://priv.example.com/key.php?r=52"
- base_uri: Base URI for the key.
ex: http://example.com/path/to
- iv: Initialization vector (optional).
ex: 0X12A
"""
self.method = method
self.uri = uri
self.iv = iv
self.base_uri = base_uri
class Playlist(BasePathMixin):
"""
Playlist object representing a link to a variant M3U8 with a specific bitrate.
More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10
"""
def __init__(self, uri, stream_info, media, base_uri):
"""
Initialize a Playlist object.
Args:
- uri: URI of the playlist.
- stream_info: is a named tuple containing the attributes: `program_id`,
- media: List of Media objects associated with the playlist.
- base_uri: Base URI for the playlist.
"""
self.uri = uri
self.base_uri = base_uri
# Extract resolution information from stream_info
resolution = stream_info.get('resolution')
if resolution is not None:
values = resolution.split('x')
resolution_pair = (int(values[0]), int(values[1]))
else:
resolution_pair = None
# Create StreamInfo object
self.stream_info = StreamInfo(
bandwidth = stream_info['bandwidth'],
program_id = stream_info.get('program_id'),
resolution = resolution_pair,
codecs = stream_info.get('codecs')
)
# Filter media based on group ID and media type
self.media = []
for media_type in ('audio', 'video', 'subtitles'):
group_id = stream_info.get(media_type)
if group_id:
self.media += filter(lambda m: m.group_id == group_id, media)
class IFramePlaylist(BasePathMixin):
"""
Class representing an I-Frame playlist in an M3U8 playlist.
Inherits from BasePathMixin for managing base paths.
"""
def __init__(self, base_uri, uri, iframe_stream_info):
"""
Initialize an IFramePlaylist object.
Args:
- base_uri: Base URI for the I-Frame playlist.
- uri: URI of the I-Frame playlist.
- iframe_stream_info, is a named tuple containing the attributes:
`program_id`, `bandwidth`, `codecs` and `resolution` which is a tuple (w, h) of integers
"""
self.uri = uri
self.base_uri = base_uri
# Extract resolution information from iframe_stream_info
resolution = iframe_stream_info.get('resolution')
if resolution is not None:
values = resolution.split('x')
resolution_pair = (int(values[0]), int(values[1]))
else:
resolution_pair = None
# Create StreamInfo object for I-Frame playlist
self.iframe_stream_info = StreamInfo(
bandwidth = iframe_stream_info.get('bandwidth'),
program_id = iframe_stream_info.get('program_id'),
resolution = resolution_pair,
codecs = iframe_stream_info.get('codecs')
)
class PlaylistList(list, GroupedBasePathMixin):
"""
Class representing a list of playlists in an M3U8 playlist.
Inherits from list and GroupedBasePathMixin for managing base paths across a group of items.
"""
def __str__(self):
"""
Return a string representation of the PlaylistList.
Returns:
- String representation of the PlaylistList.
"""
output = [str(playlist) for playlist in self]
return '\n'.join(output)

View File

@ -1,338 +0,0 @@
# 15.04.24
import re
import logging
import datetime
# Internal utilities
from ..lib_parser import protocol
from ._util import (
remove_quotes,
remove_quotes_parser,
normalize_attribute
)
# External utilities
from Src.Util._jsonConfig import config_manager
# Variable
REMOVE_EMPTY_ROW = config_manager.get_bool('M3U8_PARSER', 'skip_empty_row_playlist')
ATTRIBUTELISTPATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''')
def parse(content):
"""
Given an M3U8 playlist content, parses the content and extracts metadata.
Args:
content (str): The M3U8 playlist content.
Returns:
dict: A dictionary containing the parsed metadata.
"""
# Initialize data dictionary with default values
data = {
'is_variant': False,
'is_endlist': False,
'is_i_frames_only': False,
'playlist_type': None,
'playlists': [],
'iframe_playlists': [],
'segments': [],
'media': [],
}
# Initialize state dictionary for tracking parsing state
state = {
'expect_segment': False,
'expect_playlist': False,
}
# Iterate over lines in the content
content = content.split("\n")
content_length = len(content)
i = 0
while i < content_length:
line = content[i]
line_stripped = line.strip()
is_end = i + 1 == content_length - 2
if REMOVE_EMPTY_ROW:
if i < content_length - 2:
actual_row = extract_params(line_stripped)
next_row = extract_params(content[i + 2].strip())
if actual_row is not None and next_row is None and not is_end:
logging.info(f"Skip row: {line_stripped}")
i += 1
continue
i += 1
if line.startswith(protocol.ext_x_byterange):
_parse_byterange(line, state)
state['expect_segment'] = True
elif state['expect_segment']:
_parse_ts_chunk(line, data, state)
state['expect_segment'] = False
elif state['expect_playlist']:
_parse_variant_playlist(line, data, state)
state['expect_playlist'] = False
elif line.startswith(protocol.ext_x_targetduration):
_parse_simple_parameter(line, data, float)
elif line.startswith(protocol.ext_x_media_sequence):
_parse_simple_parameter(line, data, int)
elif line.startswith(protocol.ext_x_discontinuity):
state['discontinuity'] = True
elif line.startswith(protocol.ext_x_version):
_parse_simple_parameter(line, data)
elif line.startswith(protocol.ext_x_allow_cache):
_parse_simple_parameter(line, data)
elif line.startswith(protocol.ext_x_key):
state['current_key'] = _parse_key(line)
data['key'] = data.get('key', state['current_key'])
elif line.startswith(protocol.extinf):
_parse_extinf(line, data, state)
state['expect_segment'] = True
elif line.startswith(protocol.ext_x_stream_inf):
state['expect_playlist'] = True
_parse_stream_inf(line, data, state)
elif line.startswith(protocol.ext_x_i_frame_stream_inf):
_parse_i_frame_stream_inf(line, data)
elif line.startswith(protocol.ext_x_media):
_parse_media(line, data, state)
elif line.startswith(protocol.ext_x_playlist_type):
_parse_simple_parameter(line, data)
elif line.startswith(protocol.ext_i_frames_only):
data['is_i_frames_only'] = True
elif line.startswith(protocol.ext_x_endlist):
data['is_endlist'] = True
return data
def extract_params(line):
"""
Extracts parameters from a formatted input string.
Args:
- line (str): The string containing the parameters to extract.
Returns:
dict or None: A dictionary containing the extracted parameters with their respective values.
"""
params = {}
matches = re.findall(r'([A-Z\-]+)=("[^"]*"|[^",\s]*)', line)
if not matches:
return None
for match in matches:
param, value = match
params[param] = value.strip('"')
return params
def _parse_key(line):
"""
Parses the #EXT-X-KEY line and extracts key attributes.
Args:
- line (str): The #EXT-X-KEY line from the playlist.
Returns:
dict: A dictionary containing the key attributes.
"""
params = ATTRIBUTELISTPATTERN.split(line.replace(protocol.ext_x_key + ':', ''))[1::2]
key = {}
for param in params:
name, value = param.split('=', 1)
key[normalize_attribute(name)] = remove_quotes(value)
return key
def _parse_extinf(line, data, state):
"""
Parses the #EXTINF line and extracts segment duration and title.
Args:
- line (str): The #EXTINF line from the playlist.
- data (dict): The dictionary to store the parsed data.
- state (dict): The parsing state.
"""
duration, title = line.replace(protocol.extinf + ':', '').split(',')
state['segment'] = {'duration': float(duration), 'title': remove_quotes(title)}
def _parse_ts_chunk(line, data, state):
"""
Parses a segment URI line and adds it to the segment list.
Args:
line (str): The segment URI line from the playlist.
data (dict): The dictionary to store the parsed data.
state (dict): The parsing state.
"""
segment = state.pop('segment')
if state.get('current_program_date_time'):
segment['program_date_time'] = state['current_program_date_time']
state['current_program_date_time'] += datetime.timedelta(seconds=segment['duration'])
segment['uri'] = line
segment['discontinuity'] = state.pop('discontinuity', False)
if state.get('current_key'):
segment['key'] = state['current_key']
data['segments'].append(segment)
def _parse_attribute_list(prefix, line, atribute_parser):
"""
Parses a line containing a list of attributes and their values.
Args:
- prefix (str): The prefix to identify the line.
- line (str): The line containing the attributes.
- atribute_parser (dict): A dictionary mapping attribute names to parsing functions.
Returns:
dict: A dictionary containing the parsed attributes.
"""
params = ATTRIBUTELISTPATTERN.split(line.replace(prefix + ':', ''))[1::2]
attributes = {}
for param in params:
name, value = param.split('=', 1)
name = normalize_attribute(name)
if name in atribute_parser:
value = atribute_parser[name](value)
attributes[name] = value
return attributes
def _parse_stream_inf(line, data, state):
"""
Parses the #EXT-X-STREAM-INF line and extracts stream information.
Args:
- line (str): The #EXT-X-STREAM-INF line from the playlist.
- data (dict): The dictionary to store the parsed data.
- state (dict): The parsing state.
"""
data['is_variant'] = True
atribute_parser = remove_quotes_parser('codecs', 'audio', 'video', 'subtitles')
atribute_parser["program_id"] = int
atribute_parser["bandwidth"] = int
state['stream_info'] = _parse_attribute_list(protocol.ext_x_stream_inf, line, atribute_parser)
def _parse_i_frame_stream_inf(line, data):
"""
Parses the #EXT-X-I-FRAME-STREAM-INF line and extracts I-frame stream information.
Args:
- line (str): The #EXT-X-I-FRAME-STREAM-INF line from the playlist.
- data (dict): The dictionary to store the parsed data.
"""
atribute_parser = remove_quotes_parser('codecs', 'uri')
atribute_parser["program_id"] = int
atribute_parser["bandwidth"] = int
iframe_stream_info = _parse_attribute_list(protocol.ext_x_i_frame_stream_inf, line, atribute_parser)
iframe_playlist = {'uri': iframe_stream_info.pop('uri'),
'iframe_stream_info': iframe_stream_info}
data['iframe_playlists'].append(iframe_playlist)
def _parse_media(line, data, state):
"""
Parses the #EXT-X-MEDIA line and extracts media attributes.
Args:
- line (str): The #EXT-X-MEDIA line from the playlist.
- data (dict): The dictionary to store the parsed data.
- state (dict): The parsing state.
"""
quoted = remove_quotes_parser('uri', 'group_id', 'language', 'name', 'characteristics')
media = _parse_attribute_list(protocol.ext_x_media, line, quoted)
data['media'].append(media)
def _parse_variant_playlist(line, data, state):
"""
Parses a variant playlist line and extracts playlist information.
Args:
- line (str): The variant playlist line from the playlist.
- data (dict): The dictionary to store the parsed data.
- state (dict): The parsing state.
"""
playlist = {'uri': line, 'stream_info': state.pop('stream_info')}
data['playlists'].append(playlist)
def _parse_byterange(line, state):
"""
Parses the #EXT-X-BYTERANGE line and extracts byte range information.
Args:
- line (str): The #EXT-X-BYTERANGE line from the playlist.
- state (dict): The parsing state.
"""
state['segment']['byterange'] = line.replace(protocol.ext_x_byterange + ':', '')
def _parse_simple_parameter_raw_value(line, cast_to=str, normalize=False):
"""
Parses a line containing a simple parameter and its value.
Args:
- line (str): The line containing the parameter and its value.
- cast_to (type): The type to which the value should be cast.
- normalize (bool): Whether to normalize the parameter name.
Returns:
tuple: A tuple containing the parameter name and its value.
"""
param, value = line.split(':', 1)
param = normalize_attribute(param.replace('#EXT-X-', ''))
if normalize:
value = normalize_attribute(value)
return param, cast_to(value)
def _parse_and_set_simple_parameter_raw_value(line, data, cast_to=str, normalize=False):
"""
Parses a line containing a simple parameter and its value, and sets it in the data dictionary.
Args:
- line (str): The line containing the parameter and its value.
- data (dict): The dictionary to store the parsed data.
- cast_to (type): The type to which the value should be cast.
- normalize (bool): Whether to normalize the parameter name.
Returns:
The parsed value.
"""
param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize)
data[param] = value
return data[param]
def _parse_simple_parameter(line, data, cast_to=str):
"""
Parses a line containing a simple parameter and its value, and sets it in the data dictionary.
Args:
line (str): The line containing the parameter and its value.
data (dict): The dictionary to store the parsed data.
cast_to (type): The type to which the value should be cast.
Returns:
The parsed value.
"""
return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True)

View File

@ -1,17 +0,0 @@
# 15.04.24
ext_x_targetduration = '#EXT-X-TARGETDURATION'
ext_x_media_sequence = '#EXT-X-MEDIA-SEQUENCE'
ext_x_program_date_time = '#EXT-X-PROGRAM-DATE-TIME'
ext_x_media = '#EXT-X-MEDIA'
ext_x_playlist_type = '#EXT-X-PLAYLIST-TYPE'
ext_x_key = '#EXT-X-KEY'
ext_x_stream_inf = '#EXT-X-STREAM-INF'
ext_x_version = '#EXT-X-VERSION'
ext_x_allow_cache = '#EXT-X-ALLOW-CACHE'
ext_x_endlist = '#EXT-X-ENDLIST'
extinf = '#EXTINF'
ext_i_frames_only = '#EXT-X-I-FRAMES-ONLY'
ext_x_byterange = '#EXT-X-BYTERANGE'
ext_x_i_frame_stream_inf = '#EXT-X-I-FRAME-STREAM-INF'
ext_x_discontinuity = '#EXT-X-DISCONTINUITY'

View File

@ -1,41 +0,0 @@
# 20.02.24
# Internal utilities
from Src.Util.os import format_size
class M3U8_Ts_Files:
def __init__(self):
"""
Initialize the TSFileSizeCalculator object.
Args:
- num_segments (int): The number of segments.
"""
self.ts_file_sizes = []
def add_ts_file_size(self, size: int):
"""
Add a file size to the list of file sizes.
Args:
- size (float): The size of the ts file to be added.
"""
self.ts_file_sizes.append(size)
def calculate_total_size(self):
"""
Calculate the total size of the files.
Returns:
float: The mean size of the files in a human-readable format.
"""
if len(self.ts_file_sizes) == 0:
return 0
total_size = sum(self.ts_file_sizes)
mean_size = total_size / len(self.ts_file_sizes)
# Return format mean
return format_size(mean_size)

View File

@ -1,547 +0,0 @@
# 20.04.25
import logging
# Internal utilities
from .lib_parser import load
# External libraries
from Src.Lib.Request.my_requests import requests
# Costant
CODEC_MAPPINGS = {
"video": {
"avc1": "libx264",
"avc2": "libx264",
"avc3": "libx264",
"avc4": "libx264",
"hev1": "libx265",
"hev2": "libx265",
"hvc1": "libx265",
"hvc2": "libx265",
"vp8": "libvpx",
"vp9": "libvpx-vp9",
"vp10": "libvpx-vp9"
},
"audio": {
"mp4a": "aac",
"mp3": "libmp3lame",
"ac-3": "ac3",
"ec-3": "eac3",
"opus": "libopus",
"vorbis": "libvorbis"
}
}
RESOLUTIONS = [
(7680, 4320),
(3840, 2160),
(2560, 1440),
(1920, 1080),
(1280, 720),
(640, 480)
]
class M3U8_Codec:
"""
Represents codec information for an M3U8 playlist.
"""
def __init__(self, bandwidth, resolution, codecs):
"""
Initializes the M3U8Codec object with the provided parameters.
Args:
- bandwidth (int): Bandwidth of the codec.
- resolution (str): Resolution of the codec.
- codecs (str): Codecs information in the format "avc1.xxxxxx,mp4a.xx".
"""
self.bandwidth = bandwidth
self.resolution = resolution
self.codecs = codecs
self.audio_codec = None
self.video_codec = None
self.extract_codecs()
self.parse_codecs()
def extract_codecs(self):
"""
Parses the codecs information to extract audio and video codecs.
Extracted codecs are set as attributes: audio_codec and video_codec.
"""
# Split the codecs string by comma
codecs_list = self.codecs.split(',')
# Separate audio and video codecs
for codec in codecs_list:
if codec.startswith('avc'):
self.video_codec = codec
elif codec.startswith('mp4a'):
self.audio_codec = codec
def convert_video_codec(self, video_codec_identifier) -> str:
"""
Convert video codec identifier to codec name.
Args:
- video_codec_identifier (str): Identifier of the video codec.
Returns:
str: Codec name corresponding to the identifier.
"""
# Extract codec type from the identifier
codec_type = video_codec_identifier.split('.')[0]
# Retrieve codec mapping from the provided mappings or fallback to static mappings
video_codec_mapping = CODEC_MAPPINGS.get('video', {})
codec_name = video_codec_mapping.get(codec_type)
if codec_name:
return codec_name
else:
logging.warning(f"No corresponding video codec found for {video_codec_identifier}. Using default codec libx264.")
return "libx264" # Default
def convert_audio_codec(self, audio_codec_identifier) -> str:
"""
Convert audio codec identifier to codec name.
Args:
- audio_codec_identifier (str): Identifier of the audio codec.
Returns:
str: Codec name corresponding to the identifier.
"""
# Extract codec type from the identifier
codec_type = audio_codec_identifier.split('.')[0]
# Retrieve codec mapping from the provided mappings or fallback to static mappings
audio_codec_mapping = CODEC_MAPPINGS.get('audio', {})
codec_name = audio_codec_mapping.get(codec_type)
if codec_name:
return codec_name
else:
logging.warning(f"No corresponding audio codec found for {audio_codec_identifier}. Using default codec aac.")
return "aac" # Default
def parse_codecs(self):
"""
Parse video and audio codecs.
This method updates `video_codec_name` and `audio_codec_name` attributes.
"""
self.video_codec_name = self.convert_video_codec(self.video_codec)
self.audio_codec_name = self.convert_audio_codec(self.audio_codec)
def __str__(self):
"""
Returns a string representation of the M3U8Codec object.
"""
return f"BANDWIDTH={self.bandwidth},RESOLUTION={self.resolution},CODECS=\"{self.codecs}\""
class M3U8_Video:
def __init__(self, video_playlist) -> None:
"""
Initializes an M3U8_Video object with the provided video playlist.
Args:
- video_playlist (M3U8): An M3U8 object representing the video playlist.
"""
self.video_playlist = video_playlist
def get_best_uri(self):
"""
Returns the URI with the highest resolution from the video playlist.
Returns:
tuple or None: A tuple containing the URI with the highest resolution and its resolution value, or None if the video list is empty.
"""
if not self.video_playlist:
return None
best_uri = max(self.video_playlist, key=lambda x: x['resolution'])
return best_uri['uri'], best_uri['resolution']
def get_worst_uri(self):
"""
Returns the URI with the lowest resolution from the video playlist.
Returns:
- tuple or None: A tuple containing the URI with the lowest resolution and its resolution value, or None if the video list is empty.
"""
if not self.video_playlist:
return None
worst_uri = min(self.video_playlist, key=lambda x: x['resolution'])
return worst_uri['uri'], worst_uri['resolution']
def get_custom_uri(self, y_resolution):
"""
Returns the URI corresponding to a custom resolution from the video list.
Args:
- video_list (list): A list of dictionaries containing video URIs and resolutions.
- custom_resolution (tuple): A tuple representing the custom resolution.
Returns:
str or None: The URI corresponding to the custom resolution, or None if not found.
"""
for video in self.video_playlist:
logging.info(f"Check resolution from playlist: {int(video['resolution'][1])}, with input: {int(y_resolution)}")
if int(video['resolution'][1]) == int(y_resolution):
return video['uri'], video['resolution']
return None, None
def get_list_resolution(self):
"""
Retrieve a list of resolutions from the video playlist.
Returns:
list: A list of resolutions extracted from the video playlist.
"""
return [video['resolution'] for video in self.video_playlist]
class M3U8_Audio:
def __init__(self, audio_playlist) -> None:
"""
Initializes an M3U8_Audio object with the provided audio playlist.
Args:
- audio_playlist (M3U8): An M3U8 object representing the audio playlist.
"""
self.audio_playlist = audio_playlist
def get_uri_by_language(self, language):
"""
Returns a dictionary with 'name' and 'uri' given a specific language.
Args:
- audio_list (list): List of dictionaries containing audio information.
- language (str): The desired language.
Returns:
dict or None: Dictionary with 'name', 'language', and 'uri' for the specified language, or None if not found.
"""
for audio in self.audio_playlist:
if audio['language'] == language:
return {'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']}
return None
def get_all_uris_and_names(self):
"""
Returns a list of dictionaries containing all URIs and names.
Args:
- audio_list (list): List of dictionaries containing audio information.
Returns:
list: List of dictionaries containing 'name', 'language', and 'uri' for all audio in the list.
"""
return [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist]
def get_default_uri(self):
"""
Returns the dictionary with 'default' equal to 'YES'.
Args:
- audio_list (list): List of dictionaries containing audio information.
Returns:
dict or None: Dictionary with 'default' equal to 'YES', or None if not found.
"""
for audio in self.audio_playlist:
if audio['default'] == 'YES':
return audio.get('uri')
return None
class M3U8_Subtitle:
def __init__(self, subtitle_playlist) -> None:
"""
Initializes an M3U8_Subtitle object with the provided subtitle playlist.
Args:
- subtitle_playlist (M3U8): An M3U8 object representing the subtitle playlist.
"""
self.subtitle_playlist = subtitle_playlist
def get_uri_by_language(self, language):
"""
Returns a dictionary with 'name' and 'uri' given a specific language for subtitles.
Args:
- subtitle_list (list): List of dictionaries containing subtitle information.
- language (str): The desired language.
Returns:
dict or None: Dictionary with 'name' and 'uri' for the specified language for subtitles, or None if not found.
"""
for subtitle in self.subtitle_playlist:
if subtitle['language'] == language:
return {'name': subtitle['name'], 'uri': subtitle['uri']}
return None
def get_all_uris_and_names(self):
"""
Returns a list of dictionaries containing all URIs and names of subtitles.
Args:
- subtitle_list (list): List of dictionaries containing subtitle information.
Returns:
list: List of dictionaries containing 'name' and 'uri' for all subtitles in the list.
"""
return [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist]
def get_default_uri(self):
"""
Returns the dictionary with 'default' equal to 'YES' for subtitles.
Args:
- subtitle_list (list): List of dictionaries containing subtitle information.
Returns:
dict or None: Dictionary with 'default' equal to 'YES' for subtitles, or None if not found.
"""
for subtitle in self.subtitle_playlist:
if subtitle['default'] == 'YES':
return subtitle
return None
def download_all(self, custom_subtitle):
"""
Download all subtitles listed in the object's attributes, filtering based on a provided list of custom subtitles.
Args:
- custom_subtitle (list): A list of custom subtitles to download.
Returns:
list: A list containing dictionaries with subtitle information including name, language, and URI.
"""
output = [] # Initialize an empty list to store subtitle information
# Iterate through all available subtitles
for obj_subtitle in self.subtitle_get_all_uris_and_names():
# Check if the subtitle name is not in the list of custom subtitles, and skip if not found
if obj_subtitle.get('name') not in custom_subtitle:
continue
# Send a request to retrieve the subtitle content
logging.info(f"Download subtitle: {obj_subtitle.get('name')}")
response_subitle = requests.get(obj_subtitle.get('uri'))
try:
# Try to extract the VTT URL from the subtitle content
sub_parse = M3U8_Parser()
sub_parse.parse_data(obj_subtitle.get('uri'), response_subitle.text)
url_subititle = sub_parse.subtitle[0]
output.append({
'name': obj_subtitle.get('name'),
'language': obj_subtitle.get('language'),
'uri': url_subititle
})
except Exception as e:
logging.error(f"Cant donwload: {obj_subtitle.get('name')}, error: {e}")
return output
class M3U8_Parser:
def __init__(self):
self.segments = []
self.video_playlist = []
self.keys = None
self.subtitle_playlist = []
self.subtitle = []
self.audio_playlist = []
self.codec: M3U8_Codec = None
self._video: M3U8_Video = None
self._audio: M3U8_Audio = None
self._subtitle: M3U8_Subtitle = None
self.__create_variable__()
def parse_data(self, uri, raw_content) -> None:
"""
Extracts all information present in the provided M3U8 content.
Args:
- m3u8_content (str): The content of the M3U8 file.
"""
# Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles
m3u8_obj = load(raw_content, uri)
self.__parse_video_info__(m3u8_obj)
self.__parse_encryption_keys__(m3u8_obj)
self.__parse_subtitles_and_audio__(m3u8_obj)
self.__parse_segments__(m3u8_obj)
@staticmethod
def extract_resolution(uri: str) -> int:
"""
Extracts the video resolution from the given URI.
Args:
- uri (str): The URI containing video information.
Returns:
int: The video resolution if found, otherwise 0.
"""
# Log
logging.info(f"Try extract resolution from: {uri}")
for resolution in RESOLUTIONS:
if "http" in str(uri):
if str(resolution[1]) in uri:
return resolution
# Default resolution return (not best)
logging.error("No resolution found with custom parsing.")
logging.warning("Try set remove duplicate line to TRUE.")
return (0, 0)
def __parse_video_info__(self, m3u8_obj) -> None:
"""
Extracts video information from the M3U8 object.
Args:
- m3u8_obj: The M3U8 object containing video playlists.
"""
try:
for playlist in m3u8_obj.playlists:
# Direct access resolutions in m3u8 obj
if playlist.stream_info.resolution is not None:
self.video_playlist.append({
"uri": playlist.uri,
"resolution": playlist.stream_info.resolution
})
# Find resolutions in uri
else:
self.video_playlist.append({
"uri": playlist.uri,
"resolution": M3U8_Parser.extract_resolution(playlist.uri)
})
# Dont stop
continue
# Check if all key is present to create codec
try:
self.codec = M3U8_Codec(
playlist.stream_info.bandwidth,
playlist.stream_info.resolution,
playlist.stream_info.codecs
)
except:
logging.error(f"Error parsing codec: {e}")
except Exception as e:
logging.error(f"Error parsing video info: {e}")
def __parse_encryption_keys__(self, m3u8_obj) -> None:
"""
Extracts encryption keys from the M3U8 object.
Args:
- m3u8_obj: The M3U8 object containing encryption keys.
"""
try:
if m3u8_obj.key is not None:
if self.keys is None:
self.keys = {
'method': m3u8_obj.key.method,
'iv': m3u8_obj.key.iv,
'uri': m3u8_obj.key.uri
}
except Exception as e:
logging.error(f"Error parsing encryption keys: {e}")
pass
def __parse_subtitles_and_audio__(self, m3u8_obj) -> None:
"""
Extracts subtitles and audio information from the M3U8 object.
Args:
- m3u8_obj: The M3U8 object containing subtitles and audio data.
"""
try:
for media in m3u8_obj.media:
if media.type == "SUBTITLES":
self.subtitle_playlist.append({
"type": media.type,
"name": media.name,
"default": media.default,
"language": media.language,
"uri": media.uri
})
if media.type == "AUDIO":
self.audio_playlist.append({
"type": media.type,
"name": media.name,
"default": media.default,
"language": media.language,
"uri": media.uri
})
except Exception as e:
logging.error(f"Error parsing subtitles and audio: {e}")
def __parse_segments__(self, m3u8_obj) -> None:
"""
Extracts segment information from the M3U8 object.
Args:
- m3u8_obj: The M3U8 object containing segment data.
"""
try:
for segment in m3u8_obj.segments:
if "vtt" not in segment.uri:
self.segments.append(segment.uri)
else:
self.subtitle.append(segment.uri)
except Exception as e:
logging.error(f"Error parsing segments: {e}")
def __create_variable__(self):
"""
Initialize variables for video, audio, and subtitle playlists.
"""
self._video = M3U8_Video(self.video_playlist)
self._audio = M3U8_Audio(self.audio_playlist)
self._subtitle = M3U8_Subtitle(self.subtitle_playlist)

View File

@ -1,23 +1,29 @@
# 5.01.24
import os
import sys
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# Internal utilities
from Src.Util.console import console, Panel
from Src.Lib.Request.my_requests import requests
from Src.Util.headers import get_headers
from Src.Util.color import Colors
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console, Panel
from Src.Util.color import Colors
from Src.Util.os import (
remove_folder,
delete_files_except_one,
compute_sha1_hash,
format_size
format_size,
create_folder,
reduce_base_name,
remove_special_characters
)
from Src.Lib.Unidecode import transliterate
from Src.Util.file_validator import can_create_file
# Logic class
@ -30,23 +36,23 @@ from ..FFmpeg import (
from ..M3U8 import (
M3U8_Parser,
M3U8_Codec,
m3u8_url_fix
M3U8_UrlFix
)
from .segments import M3U8_Segments
from ..E_Table import report_table
# Config
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_OPTIONS', 'specific_list_audio')
DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_OPTIONS', 'specific_list_subtitles')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_OPTIONS', 'cleanup_tmp_folder')
DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_FILTER', 'specific_list_audio')
DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_FILTER', 'specific_list_subtitles')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_FILTER', 'cleanup_tmp_folder')
FORCE_TS = config_manager.get_dict('M3U8_FILTER', 'force_ts')
FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution')
CREATE_REPORT = config_manager.get_bool('M3U8', 'create_report')
CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report')
# Variable
headers_index = config_manager.get_dict('M3U8_REQUESTS', 'index')
FORCE_TS = config_manager.get_dict('M3U8_OPTIONS', 'force_ts')
class Downloader():
@ -71,6 +77,17 @@ class Downloader():
self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_playlist))
else:
self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_index))
else:
folder, base_name = os.path.split(self.output_filename) # Split file_folder output
base_name = reduce_base_name(remove_special_characters(transliterate(base_name))) # Remove special char
create_folder(folder) # Create folder and check if exist
if not can_create_file(base_name): # Check if folder file name can be create
logging.error("Invalid mp4 name.")
sys.exit(0)
self.output_filename = os.path.join(folder, base_name)
logging.info(f"Output filename: {self.output_filename}")
# Initialize temp base path
@ -93,6 +110,9 @@ class Downloader():
# Path converted ts files
self.path_video_audio = None
self.path_video_subtitle = None
# Class
self.m3u8_url_fixer = M3U8_UrlFix()
def __df_make_req__(self, url: str) -> str:
"""
@ -112,18 +132,15 @@ class Downloader():
headers_index['user-agent'] = get_headers()
response = requests.get(url, headers=headers_index)
# Check status response of request
response.raise_for_status()
if response.ok:
return response.text
else:
logging.error(f"Request to {url} failed with status code: {response.status_code}")
logging.error(f"Test request to {url} failed with status code: {response.status_code}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
logging.error(f"An unexpected error occurred with test request: {e}")
return None
def __manage_playlist__(self, m3u8_playlist_text):
@ -188,7 +205,7 @@ class Downloader():
if "http" not in self.m3u8_index:
# Generate full URL
self.m3u8_index = m3u8_url_fix.generate_full_url(self.m3u8_index)
self.m3u8_index = self.m3u8_url_fixer.generate_full_url(self.m3u8_index)
logging.info(f"Generate index url: {self.m3u8_index}")
# Check if a valid HTTPS URL is obtained
@ -457,11 +474,11 @@ class Downloader():
logging.info("Download from PLAYLIST")
# Fetch the M3U8 playlist content
if not len(str(self.m3u8_playlist).split("\n")) > 2:
if not len(str(self.m3u8_playlist).split("\n")) > 2: # Is a single link
m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist)
# Add full URL of the M3U8 playlist to fix next .ts without https if necessary
m3u8_url_fix.set_playlist(self.m3u8_playlist) # !!!!!!!!!!!!!!!!!! to fix for playlist with text
self.m3u8_url_fixer.set_playlist(self.m3u8_playlist) # !!!!!!!!!!!!!!!!!! to fix for playlist with text
else:
logging.warning("M3U8 master url not set.") # TO DO
@ -506,7 +523,7 @@ class Downloader():
logging.info("Download from INDEX")
# Add full URL of the M3U8 playlist to fix next .ts without https if necessary
m3u8_url_fix.set_playlist(self.m3u8_index)
self.m3u8_url_fixer.set_playlist(self.m3u8_index)
# Start all download ...
self.__donwload_video__()

View File

@ -22,31 +22,24 @@ from Src.Util.headers import get_headers
from Src.Util.color import Colors
from Src.Lib.Request.my_requests import requests
from Src.Util._jsonConfig import config_manager
from Src.Util.os import (
format_size
)
# Logic class
from ..M3U8 import (
M3U8_Decryption,
M3U8_Ts_Files,
M3U8_Ts_Estimator,
M3U8_Parser,
m3u8_url_fix
M3U8_UrlFix
)
# Config
TQDM_MAX_WORKER = config_manager.get_int('M3U8', 'tdqm_workers')
DELAY_START_WORKER = config_manager.get_float('M3U8', 'delay_start_workers')
TQDM_PROGRESS_TIMEOUT = config_manager.get_int('M3U8', 'tqdm_progress_timeout')
REQUESTS_TIMEOUT = config_manager.get_int('M3U8', 'requests_timeout')
ENABLE_TIME_TIMEOUT = config_manager.get_bool('M3U8', 'enable_time_quit')
TQDM_SHOW_PROGRESS = config_manager.get_bool('M3U8', 'tqdm_show_progress')
LIMIT_DONWLOAD_PERCENTAGE = config_manager.get_float('M3U8', 'download_percentage')
SAVE_M3U8_FILE = config_manager.get_float('M3U8', 'save_m3u8_content')
FAKE_PROXY = config_manager.get_float('M3U8', 'fake_proxy')
FAKE_PROXY_IP = config_manager.get_list('M3U8', 'fake_proxy_ip')
TQDM_MAX_WORKER = config_manager.get_int('M3U8_DOWNLOAD', 'tdqm_workers')
TQDM_SHOW_PROGRESS = config_manager.get_int('M3U8_DOWNLOAD', 'tqdm_show_progress')
FAKE_PROXY = config_manager.get_float('M3U8_DOWNLOAD', 'fake_proxy')
FAKE_PROXY_IP = config_manager.get_list('M3U8_DOWNLOAD', 'fake_proxy_ip')
REQUEST_TIMEOUT = config_manager.get_int('M3U8_REQUESTS', 'timeout')
REQUEST_VERIFY_SSL = config_manager.get_bool('M3U8_REQUESTS', 'verify_ssl')
REQUEST_DISABLE_ERROR = config_manager.get_bool('M3U8_REQUESTS', 'disable_error')
# Variable
@ -66,9 +59,7 @@ class M3U8_Segments:
"""
self.url = url
self.tmp_folder = tmp_folder
self.downloaded_size = 0
self.decryption: M3U8_Decryption = None # Initialize decryption as None
self.class_ts_files_size = M3U8_Ts_Files() # Initialize the TS files size class
self.segment_queue = queue.PriorityQueue() # Priority queue to maintain the order of segments
self.current_index = 0 # Index of the current segment to be written
self.tmp_file_path = os.path.join(self.tmp_folder, "0.ts") # Path to the temporary file
@ -76,8 +67,8 @@ class M3U8_Segments:
self.ctrl_c_detected = False # Global variable to track Ctrl+C detection
os.makedirs(self.tmp_folder, exist_ok=True) # Create the temporary folder if it does not exist
self.list_speeds = []
self.average_over = int(TQDM_MAX_WORKER / 3)
self.class_ts_estimator = M3U8_Ts_Estimator(TQDM_MAX_WORKER)
self.class_url_fixer = M3U8_UrlFix(url)
def __get_key__(self, m3u8_parser: M3U8_Parser) -> bytes:
"""
@ -143,6 +134,27 @@ class M3U8_Segments:
# Store the segment information parsed from the playlist
self.segments = m3u8_parser.segments
# Fix URL if it is incomplete (missing 'http')
for i in range(len(self.segments)):
segment_url = self.segments[i]
if "http" not in segment_url:
self.segments[i] = self.class_url_fixer.generate_full_url(segment_url)
logging.info(f"Generated new URL: {self.segments[i]}, from: {segment_url}")
# Change IP address of server
if FAKE_PROXY:
for i in range(len(self.segments)):
segment_url = self.segments[i]
self.segments[i] = self.__gen_proxy__(segment_url, self.segments.index(segment_url))
# Save new playlist of segment
path_m3u8_file = os.path.join(self.tmp_folder, "playlist_fix.m3u8")
with open(path_m3u8_file, "w") as file:
for item in self.segments:
file.write(f"{item}\n")
def get_info(self) -> None:
"""
Makes a request to the index M3U8 file to get information about segments.
@ -154,9 +166,8 @@ class M3U8_Segments:
response.raise_for_status() # Raise an exception for HTTP errors
# Save the M3U8 file to the temporary folder
if SAVE_M3U8_FILE:
path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8")
open(path_m3u8_file, "w+").write(response.text)
path_m3u8_file = os.path.join(self.tmp_folder, "playlist.m3u8")
open(path_m3u8_file, "w+").write(response.text)
# Parse the text from the M3U8 index file
self.parse_data(response.text)
@ -176,9 +187,39 @@ class M3U8_Segments:
# Parse the original URL and replace the hostname with the new IP address
parsed_url = urlparse(url)._replace(netloc=new_ip_address)
return urlunparse(parsed_url)
def make_requests_stream(self, ts_url: str, index: int, stop_event: threading.Event, progress_counter: tqdm, add_desc: str) -> None:
def update_progress_bar(self, segment_content: bytes, duration: float, progress_counter: tqdm) -> None:
"""
Updates the progress bar with information about the TS segment download.
Args:
segment_content (bytes): The content of the downloaded TS segment.
duration (float): The duration of the segment download in seconds.
progress_counter (tqdm): The tqdm object representing the progress bar.
"""
if TQDM_SHOW_PROGRESS:
total_downloaded = len(segment_content)
# Add the size of the downloaded segment to the estimator
self.class_ts_estimator.add_ts_file(total_downloaded * len(self.segments), total_downloaded, duration)
# Get downloaded size and total estimated size
downloaded_file_size_str = self.class_ts_estimator.get_downloaded_size().split(' ')[0]
file_total_size = self.class_ts_estimator.calculate_total_size()
number_file_total_size = file_total_size.split(' ')[0]
units_file_total_size = file_total_size.split(' ')[1]
average_internet_speed = self.class_ts_estimator.get_average_speed()
# Update the progress bar's postfix
progress_counter.set_postfix_str(
f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_file_size_str} {Colors.WHITE}< {Colors.GREEN}{number_file_total_size} {Colors.RED}{units_file_total_size} "
f"{Colors.WHITE}| {Colors.CYAN}{average_internet_speed:.2f} {Colors.RED}MB/s"
)
def make_requests_stream(self, ts_url: str, index: int, stop_event: threading.Event, progress_bar: tqdm) -> None:
"""
Downloads a TS segment and adds it to the segment queue.
@ -186,72 +227,47 @@ class M3U8_Segments:
- ts_url (str): The URL of the TS segment.
- index (int): The index of the segment.
- stop_event (threading.Event): Event to signal the stop of downloading.
- progress_counter (tqdm): Progress counter for tracking download progress.
- progress_bar (tqdm): Progress counter for tracking download progress.
- add_desc (str): Additional description for the progress bar.
"""
if stop_event.is_set():
return # Exit if the stop event is set
headers_segments['user-agent'] = get_headers()
# Fix URL if it is incomplete (missing 'http')
if "http" not in ts_url:
ts_url = m3u8_url_fix.generate_full_url(ts_url)
logging.info(f"Generated new URL: {ts_url}")
# Generate new user agent
headers_segments['user-agent'] = get_headers()
try:
# Change IP address if FAKE_PROXY is enabled
if FAKE_PROXY:
ts_url = self.__gen_proxy__(ts_url, self.segments.index(ts_url))
# Make request and calculate time duration
start_time = time.time()
response = requests.get(ts_url, headers=headers_segments, timeout=REQUESTS_TIMEOUT, verify_ssl=False) # Send GET request for the segment
response = requests.get(ts_url, headers=headers_segments, timeout=REQUEST_TIMEOUT, verify_ssl=REQUEST_VERIFY_SSL)
duration = time.time() - start_time
if response.ok:
# Get the content of the segment
segment_content = response.content
total_downloaded = len(response.content)
# Calculate mbps
speed_mbps = (total_downloaded * 8) / (duration * 1_000_000) * TQDM_MAX_WORKER
self.list_speeds.append(speed_mbps)
# Get average speed after (average_over)
if len(self.list_speeds) > self.average_over:
self.list_speeds.pop(0)
average_speed = ( sum(self.list_speeds) / len(self.list_speeds) ) / 10 # MB/s
#print(f"{average_speed:.2f} MB/s")
#progress_counter.set_postfix_str(f"{average_speed:.2f} MB/s")
if TQDM_SHOW_PROGRESS:
self.downloaded_size += len(response.content) # Update the downloaded size
self.class_ts_files_size.add_ts_file_size(len(response.content) * len(self.segments)) # Update the TS file size class
downloaded_size_str = format_size(self.downloaded_size) # Format the downloaded size
estimate_total_size = self.class_ts_files_size.calculate_total_size() # Calculate the estimated total size
progress_counter.set_postfix_str(f"{Colors.WHITE}[ {Colors.GREEN}{downloaded_size_str.split(' ')[0]} {Colors.WHITE}< {Colors.GREEN}{estimate_total_size.split(' ')[0]} {Colors.RED}MB {Colors.WHITE}| {Colors.CYAN}{average_speed:.2f} {Colors.RED}MB/s")
self.update_progress_bar(segment_content, duration, progress_bar)
# Decrypt the segment content if decryption is needed
if self.decryption is not None:
segment_content = self.decryption.decrypt(segment_content)
with self.condition:
self.segment_queue.put((index, segment_content)) # Add the segment to the queue
self.condition.notify() # Notify the writer thread that a new segment is available
progress_counter.update(1) # Update the progress counter
else:
logging.warning(f"Failed to download segment: {ts_url}")
if not REQUEST_DISABLE_ERROR:
logging.error(f"Failed to download segment: {ts_url}")
except Exception as e:
logging.error(f"Exception while downloading segment: {e}")
if not REQUEST_DISABLE_ERROR:
logging.error(f"Exception while downloading segment: {e}")
# Update bar
progress_bar.update(1)
def write_segments_to_file(self, stop_event: threading.Event):
"""
@ -265,7 +281,10 @@ class M3U8_Segments:
while not stop_event.is_set() or not self.segment_queue.empty():
with self.condition:
while self.segment_queue.empty() and not stop_event.is_set():
self.condition.wait() # Wait until a new segment is available or stop_event is set
self.condition.wait(timeout=1) # Wait until a new segment is available or stop_event is set
if stop_event.is_set():
break
if not self.segment_queue.empty():
@ -280,7 +299,7 @@ class M3U8_Segments:
else:
self.segment_queue.put((index, segment_content)) # Requeue the segment if it is not the next to be written
self.condition.notify() # Notify that a segment has been requeued # Notify that a segment has been requeued
self.condition.notify() # Notify that a segment has been requeued
def download_streams(self, add_desc):
"""
@ -316,21 +335,8 @@ class M3U8_Segments:
writer_thread = threading.Thread(target=self.write_segments_to_file, args=(stop_event,))
writer_thread.start()
# Start progress monitor thread
progress_thread = threading.Thread(target=self.timer, args=(progress_bar, stop_event))
progress_thread.start()
# Delay the start of each worker
for index, segment_url in enumerate(self.segments):
time.sleep(DELAY_START_WORKER)
# da 0.0 a 100.00
if int(LIMIT_DONWLOAD_PERCENTAGE) != 0:
score_percentage = (progress_bar.n / progress_bar.total) * 100
if score_percentage>= LIMIT_DONWLOAD_PERCENTAGE:
#progress_bar.refresh()
break
# Check for Ctrl+C before starting each download task
time.sleep(0.025)
@ -345,7 +351,7 @@ class M3U8_Segments:
break
# Submit the download task to the executor
executor.submit(self.make_requests_stream, segment_url, index, stop_event, progress_bar, add_desc)
executor.submit(self.make_requests_stream, segment_url, index, stop_event, progress_bar)
# Wait for all segments to be downloaded
executor.shutdown(wait=True)
@ -353,51 +359,3 @@ class M3U8_Segments:
with self.condition:
self.condition.notify_all() # Wake up the writer thread if it's waiting
writer_thread.join() # Wait for the writer thread to finish
def timer(self, progress_counter: tqdm, quit_event: threading.Event):
"""
Function to monitor progress and quit if no progress is made within a certain time
Args:
- progress_counter (tqdm): The progress counter object.
- quit_event (threading.Event): The event to signal when to quit.
"""
# If timer is disabled, return immediately without starting it, to reduce cpu use
if not ENABLE_TIME_TIMEOUT:
return
start_time = time.time()
last_count = 0
# Loop until quit event is set
while not quit_event.is_set():
current_count = progress_counter.n
# Update start time when progress is made
if current_count != last_count:
start_time = time.time()
last_count = current_count
# Calculate elapsed time
elapsed_time = time.time() - start_time
# Check if elapsed time exceeds progress timeout
if elapsed_time > TQDM_PROGRESS_TIMEOUT:
console.log(f"[red]No progress for {TQDM_PROGRESS_TIMEOUT} seconds. Stopping.")
# Set quit event to break the loop
quit_event.set()
break
# Calculate remaining time until timeout
remaining_time = max(0, TQDM_PROGRESS_TIMEOUT - elapsed_time)
# Determine sleep interval dynamically based on remaining time
sleep_interval = min(1, remaining_time)
# Wait for the calculated sleep interval
time.sleep(sleep_interval)
# Refresh progress bar
#progress_counter.refresh()

View File

@ -1,6 +1,6 @@
# 02.04.24
from .decryption import M3U8_Decryption
from .math_calc import M3U8_Ts_Files
from .estimator import M3U8_Ts_Estimator
from .parser import M3U8_Parser, M3U8_Codec
from .url_fix import m3u8_url_fix
from .url_fixer import M3U8_UrlFix

81
Src/Lib/M3U8/estimator.py Normal file
View File

@ -0,0 +1,81 @@
# 20.02.24
from collections import deque
# Internal utilities
from Src.Util.os import format_size
class M3U8_Ts_Estimator:
def __init__(self, workers: int):
"""
Initialize the TSFileSizeCalculator object.
Args:
- workers (int): The number of workers using with ThreadPool.
"""
self.ts_file_sizes = []
self.now_downloaded_size = 0
self.average_over = 5
self.list_speeds = deque(maxlen=self.average_over)
self.smoothed_speeds = []
self.tqdm_workers = workers
def add_ts_file(self, size: int, size_download: int, duration: float):
"""
Add a file size to the list of file sizes.
Args:
- size (float): The size of the ts file to be added.
- size_download (int): Single size of the ts file.
- duration (float): Time to download segment file.
"""
self.ts_file_sizes.append(size)
self.now_downloaded_size += size_download
# Calculate mbps
speed_mbps = (size_download * 8) / (duration * 1_000_000) * self.tqdm_workers
self.list_speeds.append(speed_mbps)
# Calculate moving average
smoothed_speed = sum(self.list_speeds) / len(self.list_speeds)
self.smoothed_speeds.append(smoothed_speed)
# Update smooth speeds
if len(self.smoothed_speeds) > self.average_over:
self.smoothed_speeds.pop(0)
def calculate_total_size(self) -> str:
"""
Calculate the total size of the files.
Returns:
float: The mean size of the files in a human-readable format.
"""
if len(self.ts_file_sizes) == 0:
return 0
total_size = sum(self.ts_file_sizes)
mean_size = total_size / len(self.ts_file_sizes)
# Return format mean
return format_size(mean_size)
def get_average_speed(self) -> float:
"""
Calculate the average speed from a list of speeds and convert it to megabytes per second (MB/s).
Returns:
float: The average speed in megabytes per second (MB/s).
"""
return (sum(self.smoothed_speeds) / len(self.smoothed_speeds)) / 10 # MB/s
def get_downloaded_size(self) -> str:
"""
Get the total downloaded size formatted as a human-readable string.
Returns:
str: The total downloaded size as a human-readable string.
"""
return format_size(self.now_downloaded_size)

View File

@ -1,41 +0,0 @@
# 20.02.24
# Internal utilities
from Src.Util.os import format_size
class M3U8_Ts_Files:
def __init__(self):
"""
Initialize the TSFileSizeCalculator object.
Args:
- num_segments (int): The number of segments.
"""
self.ts_file_sizes = []
def add_ts_file_size(self, size: int):
"""
Add a file size to the list of file sizes.
Args:
- size (float): The size of the ts file to be added.
"""
self.ts_file_sizes.append(size)
def calculate_total_size(self):
"""
Calculate the total size of the files.
Returns:
float: The mean size of the files in a human-readable format.
"""
if len(self.ts_file_sizes) == 0:
return 0
total_size = sum(self.ts_file_sizes)
mean_size = total_size / len(self.ts_file_sizes)
# Return format mean
return format_size(mean_size)

View File

@ -1,54 +0,0 @@
# 20.03.24
import logging
from urllib.parse import urlparse, urljoin
class M3U8_UrlFix:
def __init__(self, url: str = None) -> None:
"""
Initializes an M3U8_UrlFix object with the provided playlist URL.
Args:
- url (str, optional): The URL of the playlist. Defaults to None.
"""
self.url_playlist: str = url
def set_playlist(self, url: str) -> None:
"""
Set the M3U8 playlist URL.
Args:
- url (str): The M3U8 playlist URL.
"""
self.url_playlist = url
def generate_full_url(self, url_resource: str) -> str:
"""
Generate a full URL for a given resource using the base URL from the playlist.
Args:
- url_resource (str): The relative URL of the resource within the playlist.
Returns:
str: The full URL for the specified resource.
"""
# Check if m3u8 url playlist is present
if self.url_playlist == None:
logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present")
raise
# Parse the playlist URL to extract the base URL components
parsed_playlist_url = urlparse(self.url_playlist)
# Construct the base URL using the scheme, netloc, and path from the playlist URL
base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}"
# Join the base URL with the relative resource URL to get the full URL
full_url = urljoin(base_url, url_resource)
return full_url
# Output
m3u8_url_fix = M3U8_UrlFix()

View File

@ -49,6 +49,4 @@ class M3U8_UrlFix:
full_url = urljoin(base_url, url_resource)
return full_url
# Output
m3u8_url_fix = M3U8_UrlFix()

View File

@ -31,10 +31,15 @@ except ImportError:
from bs4 import BeautifulSoup
# Internal utilities
from Src.Util._jsonConfig import config_manager
# Default settings
HTTP_TIMEOUT = 5
HTTP_RETRIES = 1
HTTP_DELAY = 1
HTTP_DISABLE_ERROR = config_manager.get_bool('M3U8_REQUESTS', 'disable_error')
@ -325,7 +330,7 @@ class ManageRequests:
response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl_context)
else:
response = urllib.request.urlopen(req, timeout=self.timeout)
response = urllib.request.urlopen(req, timeout=self.timeout, context=ssl.create_default_context())
return response
@ -374,7 +379,8 @@ class ManageRequests:
"""
Handle request error.
"""
logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}")
if not HTTP_DISABLE_ERROR:
logging.error(f"Request failed for URL '{self.url}': {parse_http_error(str(e))}")
if self.attempt < self.retries:
logging.info(f"Retrying request for URL '{self.url}' (attempt {self.attempt}/{self.retries})")

View File

@ -85,8 +85,10 @@ def can_create_file(file_path):
try:
with open(file_path, 'w') as file:
pass
os.remove(file_path) # Cleanup if the file was created
return True
except OSError as e:
if e.errno in (errno.EACCES, errno.ENOENT, errno.EEXIST, errno.ENOTDIR):
return False

View File

@ -15,7 +15,7 @@ CLEAN = config_manager.get_bool('DEFAULT', 'clean_console')
SHOW = config_manager.get_bool('DEFAULT', 'show_message')
def start_message(switch = False):
def start_message():
"""
Display a start message.
"""
@ -33,18 +33,6 @@ def start_message(switch = False):
'''
if switch:
msg = '''
_ _ _ _
/ \ _ __ (_)_ __ ___ ___ _ _ _ __ (_) |_ _ _
/ _ \ | '_ \| | '_ ` _ \ / _ \ | | | '_ \| | __| | | |
/ ___ \| | | | | | | | | | __/ |_| | | | | | |_| |_| |
/_/ \_\_| |_|_|_| |_| |_|\___|\__,_|_| |_|_|\__|\__, |
|___/
'''
if CLEAN:
if platform.system() == 'Windows':
os.system("cls")
@ -52,7 +40,6 @@ def start_message(switch = False):
os.system("clear")
if SHOW:
console.print(f"[bold yellow]{msg}")
console.print(f"[magenta]Created by: Ghost6446\n")

56
Src/Util/node_jjs.py Normal file
View File

@ -0,0 +1,56 @@
# 26.05.24
import subprocess
def is_node_installed() -> bool:
"""
Checks if Node.js is installed on the system.
Returns:
bool: True if Node.js is installed, False otherwise.
"""
try:
# Run the command 'node -v' to get the Node.js version
result = subprocess.run(['node', '-v'], capture_output=True, text=True, check=True)
# If the command runs successfully and returns a version number, Node.js is installed
if result.stdout.startswith('v'):
return True
except (subprocess.CalledProcessError, FileNotFoundError):
# If there is an error running the command or the command is not found, Node.js is not installed
return False
return False
def run_node_script(script_content: str) -> str:
"""
Runs a Node.js script and returns its output.
Args:
script_content (str): The content of the Node.js script to run.
Returns:
str: The output of the Node.js script.
"""
# Check if Node.js is installed
if not is_node_installed():
raise EnvironmentError("Node.js is not installed on the system.")
# Write the script content to a temporary file
with open('script.js', 'w') as file:
file.write(script_content)
try:
# Run the Node.js script using subprocess and capture the output
result = subprocess.run(['node', 'script.js'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error running Node.js script: {e.stderr}")
finally:
# Clean up the temporary script file
import os
os.remove('script.js')

View File

@ -1,13 +1,15 @@
# 24.01.24
import shutil
import re
import os
import time
import json
import shutil
import hashlib
import logging
import re
import zipfile
import platform
from typing import List
@ -42,6 +44,108 @@ special_chars_to_remove = [
]
def get_max_length_by_os(system: str) -> int:
"""
Determines the maximum length for a base name based on the operating system.
Args:
system (str): The operating system name.
Returns:
int: The maximum length for the base name.
"""
if system == 'windows':
return 255 # NTFS and other common Windows filesystems support 255 characters for filenames
elif system == 'darwin': # macOS
return 255 # HFS+ and APFS support 255 characters for filenames
elif system == 'linux':
return 255 # Most Linux filesystems (e.g., ext4) support 255 characters for filenames
else:
raise ValueError(f"Unsupported operating system: {system}")
def reduce_base_name(base_name: str) -> str:
"""
Splits the file path into folder and base name, and reduces the base name based on the operating system.
Args:
base_name (str): The name of the file.
Returns:
str: The reduced base name.
"""
# Determine the operating system
system = platform.system().lower()
# Get the maximum length for the base name based on the operating system
max_length = get_max_length_by_os(system)
# Reduce the base name if necessary
if len(base_name) > max_length:
if system == 'windows':
# For Windows, truncate and add a suffix if needed
base_name = base_name[:max_length - 3] + '___'
elif system == 'darwin': # macOS
# For macOS, truncate without adding suffix
base_name = base_name[:max_length]
elif system == 'linux':
# For Linux, truncate and add a numeric suffix if needed
base_name = base_name[:max_length - 2] + '___'
return base_name
def create_folder(folder_name: str) -> None:
"""
Create a directory if it does not exist, and log the result.
Args:
folder_name (str): The path of the directory to be created.
"""
try:
logging.info(f"Try create folder: {folder_name}")
os.makedirs(folder_name, exist_ok=True)
if os.path.exists(folder_name) and os.path.isdir(folder_name):
logging.info(f"Directory successfully created or already exists: {folder_name}")
else:
logging.error(f"Failed to create directory: {folder_name}")
except OSError as e:
logging.error(f"OS error occurred while creating the directory {folder_name}: {e}")
raise
except Exception as e:
logging.error(f"An unexpected error occurred while creating the directory {folder_name}: {e}")
raise
def check_file_existence(file_path):
"""
Check if a file exists at the given file path.
Args:
file_path (str): The path to the file.
Returns:
bool: True if the file exists, False otherwise.
"""
try:
logging.info(f"Check if file exists: {file_path}")
if os.path.exists(file_path):
logging.info(f"The file '{file_path}' exists.")
return True
else:
logging.warning(f"The file '{file_path}' does not exist.")
return False
except Exception as e:
logging.error(f"An error occurred while checking file existence: {e}")
return False
def remove_folder(folder_path: str) -> None:
"""
Remove a folder if it exists.

View File

@ -80,14 +80,13 @@ class TVShowManager:
self.console.print(table) # Use self.console.print instead of print
def run(self, force_int_input: bool = False, max_int_input: int = 0, switch: bool = False) -> str:
def run(self, force_int_input: bool = False, max_int_input: int = 0) -> str:
"""
Run the TV show manager application.
Args:
- force_int_input(bool): If True, only accept integer inputs from 0 to max_int_input
- max_int_input (int): range of row to show
- switch (bool): switch from film to anime
Returns:
str: Last command executed before breaking out of the loop.
@ -96,7 +95,7 @@ class TVShowManager:
last_command = "" # Variable to store the last command executed
while True:
start_message(switch)
start_message()
# Display table
self.display_data(self.tv_shows[self.slice_start:self.slice_end])

View File

@ -3,8 +3,8 @@
"debug": false,
"log_file": "app.log",
"log_to_file": true,
"show_message": true,
"clean_console": true,
"show_message": false,
"clean_console": false,
"root_path": "Video",
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
"create_job_database": false,
@ -12,26 +12,17 @@
},
"SITE": {
"streamingcommunity": "foo",
"animeunity": "epic"
"animeunity": "to",
"altadefinizione": "food"
},
"M3U8": {
"M3U8_DOWNLOAD": {
"tdqm_workers": 30,
"delay_start_workers": 0,
"requests_timeout": 10,
"enable_time_quit": false,
"tqdm_progress_timeout": 10,
"download_percentage": 0,
"tqdm_show_progress": true,
"save_m3u8_content": true,
"fake_proxy": true,
"fake_proxy_ip": ["57.129.7.85", "57.129.7.188", "57.129.7.174", "57.129.4.77", "57.129.16.196", "57.129.16.156", "57.129.16.139", "57.129.16.135", "57.129.13.175", "57.129.13.157", "51.38.112.237", "51.195.107.7", "51.195.107.230"],
"fake_proxy": false,
"fake_proxy_ip": ["57.129.7.85","57.129.7.188","57.129.7.174","57.129.4.77","57.129.16.196","57.129.16.156","57.129.16.139","57.129.16.135","57.129.13.175","57.129.13.157","51.38.112.237","51.195.107.7","51.195.107.230"],
"create_report": false
},
"M3U8_PARSER": {
"skip_empty_row_playlist": false,
"force_resolution": -1
},
"M3U8_OPTIONS": {
"M3U8_FILTER": {
"use_codec": false,
"use_gpu": false,
"force_ts": false,
@ -41,7 +32,14 @@
"specific_list_subtitles": ["eng"]
},
"M3U8_REQUESTS": {
"disable_error": false,
"timeout": 10,
"verify_ssl": false,
"index": {"user-agent": ""},
"segments": {"user-agent": ""}
},
"M3U8_PARSER": {
"skip_empty_row_playlist": false,
"force_resolution": -1
}
}

View File

@ -1,205 +0,0 @@
# 08.05.24
import logging
# Internal utilities
from Src.Api.Streamingcommunity import (
get_version_and_domain,
title_search,
manager_clear,
get_select_title
)
from Src.Util.message import start_message
from Src.Util._jsonConfig import config_manager
from Src.Util.console import console, msg
from Src.Lib.E_Table import job_database
from Src.Api.Streamingcommunity.Core.Vix_player.player import VideoSource
# Config
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
SERIES_FOLDER = config_manager.get('DEFAULT', 'series_folder_name')
STREAM_SITE_NAME = config_manager.get('SITE', 'streaming_site_name')
DOMAIN_SITE_NAME = config_manager.get('SITE', 'streaming_domain')
CREATE_JOB_DB = config_manager.get_bool('DEFAULT', 'create_job_database')
class SuppressedConsolePrint:
def __enter__(self):
self.original_print = console.print
console.print = lambda *args, **kwargs: None
def __exit__(self, exc_type, exc_value, traceback):
console.print = self.original_print
class SeriesManager:
def __init__(self):
"""
Initialize SeriesManager object.
"""
with SuppressedConsolePrint():
self.version, self.domain = get_version_and_domain()
self.video_source = VideoSource()
def add_series(self):
"""
Add a new series to the database.
"""
try:
# Ask the user to input the search term
input_title_search = msg.ask("\n[cyan]Insert word to search in all site: [/cyan]")
if input_title_search:
# Perform streaming search based on the search term
len_database = title_search(input_title_search, self.domain)
if len_database != 0:
# Get the selected title from the search results
select_title = get_select_title(type_filter=['tv'])
if select_title.type == 'tv':
# Set series name and media ID for the selected title
self.video_source.setup(
domain = DOMAIN_SITE_NAME,
series_name = select_title.slug,
media_id = select_title.id
)
# Collect info about the season
self.video_source.get_preview()
seasons_count = self.video_source.obj_preview.seasons_count
# Add the series to the database
console.print("[green]Series '[/green][bold cyan]" + select_title.slug + "[/bold cyan][green]' added successfully.[/green]")
job_database.add_row_to_database(select_title.id, select_title.slug, seasons_count)
job_database.save_database()
# Clear old data added
manager_clear()
else:
console.print("[red]No series found for the given search term.[/red]")
else:
console.print("[red]Invalid choice. Please select a valid option.[/red]")
except Exception as e:
logging.error(f"Error occurred while adding series: {str(e)}")
def check_series(self):
"""
Check for new seasons in existing series.
"""
try:
# Loop through each series in the database
for data_series in job_database.db[1:]:
self.video_source.setup(
domain = DOMAIN_SITE_NAME,
series_name = data_series[1],
media_id = data_series[0]
)
# Collect information about seasons for the series
self.video_source.get_preview()
seasons_count = self.video_source.obj_preview.seasons_count
if int(data_series[2]) < seasons_count:
# Notify if a new season is found for the series
console.print("[bold yellow]Series '[/bold yellow][bold cyan]" + data_series[1] + "[/bold cyan][bold yellow]' found new season.[/bold yellow]")
else:
# Notify if no new seasons are found for the series
console.print("[bold red]Series '[/bold red][bold cyan]" + data_series[1] + "[/bold cyan][bold red]' has no new seasons.[/bold red]")
except Exception as e:
logging.error(f"Error occurred while checking series: {str(e)}")
def list_series(self):
"""
Print the list of series in the database.
"""
try:
# Print the header for the series list
console.print("\n[bold cyan]Series List:[/bold cyan]\n")
job_database.print_database_as_sql()
except Exception as e:
logging.error(f"Error occurred while listing series: {str(e)}")
def remove_series(self):
"""
Remove a series from the database.
"""
if len(job_database.db) > 1:
# Ask the user to input the index of the series to remove
index_to_remove = msg.ask("\n[cyan]Insert [bold red]ID [cyan]to remove").strip()
if index_to_remove != 0:
# Remove the series from the database
data_row_remove = job_database.remove_row_from_database(0, index_to_remove)
job_database.save_database()
if data_row_remove:
console.print("[bold green]Series '[/bold green][bold cyan]" + data_row_remove[1] + "[/bold cyan][bold green]' removed successfully.[/bold green]")
else:
console.print("[bold red]Cannot remove columns from the database.[/bold red]")
else:
console.print("[bold yellow]No data to remove[/bold yellow]")
def run(self):
"""
Run the SeriesManager application.
"""
while True:
# Reload all database all time
start_message()
job_database.load_database()
# Prompt the user for action choice
action = msg.ask("\n[green]What would you like to do?", choices=["add", "check", "remove", "print", "quit"])
if action == "add":
self.add_series()
elif action == "check":
self.check_series()
elif action == "remove":
self.remove_series()
elif action == "print":
self.list_series()
elif action == "quit":
console.print("\n[bold magenta]Exiting Series Manager. Goodbye![/bold magenta]")
break
else:
console.print("[red]Invalid action. Please try again.[/red]")
confirmation = msg.ask("\n[blue]Press 'y' to continue, or 'n' to quit[/blue]")
if confirmation.lower() == "n":
console.print("\n[bold magenta]Exiting Series Manager. Goodbye![/bold magenta]")
break
def main():
if CREATE_JOB_DB:
manager = SeriesManager()
manager.run()
else:
console.print("[red]Set to true 'create_job_database' on config.json file.")
if __name__ == '__main__':
main()

74
run.py
View File

@ -5,6 +5,7 @@ import os
import platform
import argparse
import logging
from typing import Callable
@ -18,15 +19,16 @@ from Src.Lib.FFmpeg import check_ffmpeg
from Src.Util.logger import Logger
# Internal api
from Src.Api.Streamingcommunity import main_film_series
from Src.Api.Animeunity import main_anime
from Src.Api.Streamingcommunity import main_film_series as streamingcommunity_film_serie
from Src.Api.Animeunity import main_anime as streamingcommunity_anime
from Src.Api.Altadefinizione import main_film as altadefinizione_film
# Config
CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close')
def initialize(switch = False):
def initialize():
"""
Initialize the application.
Checks Python version, removes temporary folder, and displays start message.
@ -44,7 +46,7 @@ def initialize(switch = False):
# Removing temporary folder
start_message(switch)
start_message()
# Attempting GitHub update
@ -88,36 +90,56 @@ def run_function(func: Callable[..., None], close_console: bool = False) -> None
def main():
log_not = Logger()
# Parse command line arguments
parser = argparse.ArgumentParser(description='Script to download film and series from internet.')
parser.add_argument('-a', '--anime', action='store_true', help='Check into anime category')
parser.add_argument('-f', '--film', action='store_true', help='Check into film/tv series category')
parser = argparse.ArgumentParser(description='Script to download film and series from the internet.')
parser.add_argument('-sa', '--streaming_anime', action='store_true', help='Check into anime category')
parser.add_argument('-sf', '--streaming_film', action='store_true', help='Check into film/tv series category')
parser.add_argument('-af', '--altadefinizione_film', action='store_true', help='Check into film/tv series category')
args = parser.parse_args()
if args.anime:
run_function(main_anime, CLOSE_CONSOLE)
# Mapping command-line arguments to functions
arg_to_function = {
'streaming_anime': streamingcommunity_anime,
'streaming_film': streamingcommunity_film_serie,
'altadefinizione_film': altadefinizione_film,
}
elif args.film:
run_function(main_film_series, CLOSE_CONSOLE)
# Check which argument is provided and run the corresponding function
for arg, func in arg_to_function.items():
if getattr(args, arg):
run_function(func, CLOSE_CONSOLE)
return
# Mapping user input to functions
input_to_function = {
'0': streamingcommunity_film_serie,
'1': streamingcommunity_anime,
'2': altadefinizione_film,
}
# Create dynamic prompt message and choices
choices = list(input_to_function.keys())
choice_labels = {
'0': "Film/Series",
'1': "Anime",
'2': "Altadefinizione"
}
prompt_message = "[cyan]Insert category [white](" + ", ".join(
f"[red]{key}[white]: [bold magenta]{label}[white]" for key, label in choice_labels.items()
) + ")[white]:[/cyan]"
# Ask the user for input
category = msg.ask(prompt_message, choices=choices, default="0")
# Run the corresponding function based on user input
if category in input_to_function:
run_function(input_to_function[category], CLOSE_CONSOLE)
else:
# If no arguments are provided, ask the user to input the category, if nothing insert return 0
category = msg.ask("[cyan]Insert category [white]([red]0[white]: [bold magenta]Film/Series[white], [red]1[white]: [bold magenta]Anime[white])[white]:[/cyan]", choices={"0": "", "1": ""}, default="0")
if category == '0':
run_function(main_film_series, CLOSE_CONSOLE)
elif category == '1':
run_function(main_anime, CLOSE_CONSOLE)
else:
console.print("[red]Invalid category, you need to insert 0 or 1.")
sys.exit(0)
console.print("[red]Invalid category, you need to insert 0, 1, or 2.")
sys.exit(0)
if __name__ == '__main__':
main()

152
update.py
View File

@ -1,152 +0,0 @@
# 10.12.24
import os
import shutil
from io import BytesIO
from zipfile import ZipFile
# Internal utilities
from Src.Util._jsonConfig import config_manager
# External libraries
import requests
from rich.console import Console
# Variable
console = Console()
local_path = os.path.join(".")
ROOT_PATH = config_manager.get('DEFAULT', 'root_path')
def move_content(source: str, destination: str) :
"""
Move all content from the source folder to the destination folder.
Args:
source (str): The path to the source folder.
destination (str): The path to the destination folder.
"""
os.makedirs(destination, exist_ok=True)
# Iterate through all elements in the source folder
for element in os.listdir(source):
source_path = os.path.join(source, element)
destination_path = os.path.join(destination, element)
# If it's a directory, recursively call the function
if os.path.isdir(source_path):
move_content(source_path, destination_path)
# Otherwise, move the file, replacing if it already exists
else:
shutil.move(source_path, destination_path)
def keep_specific_items(directory: str, keep_folder: str, keep_file: str):
"""
Delete all items in the directory except for the specified folder and file.
Args:
directory (str): The path to the directory.
keep_folder (str): The name of the folder to keep.
keep_file (str): The name of the file to keep.
"""
try:
if not os.path.exists(directory) or not os.path.isdir(directory):
raise ValueError(f"Error: '{directory}' is not a valid directory.")
# Iterate through items in the directory
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
# Check if the item is the specified folder or file
if os.path.isdir(item_path) and item != keep_folder:
shutil.rmtree(item_path)
elif os.path.isfile(item_path) and item != keep_file:
os.remove(item_path)
except PermissionError as pe:
print(f"PermissionError: {pe}. Check permissions and try running the script with admin privileges.")
except Exception as e:
print(f"Error: {e}")
def download_and_extract_latest_commit(author: str, repo_name: str):
"""
Download and extract the latest commit from a GitHub repository.
Args:
author (str): The owner of the GitHub repository.
repo_name (str): The name of the GitHub repository.
"""
# Get the latest commit information using GitHub API
api_url = f'https://api.github.com/repos/{author}/{repo_name}/commits?per_page=1'
response = requests.get(api_url)
console.log("[green]Making a request to GitHub repository...")
if response.ok:
commit_info = response.json()[0]
commit_sha = commit_info['sha']
zipball_url = f'https://github.com/{author}/{repo_name}/archive/{commit_sha}.zip'
console.log("[green]Getting zip file from repository...")
# Download the zipball
response = requests.get(zipball_url)
# Extract the content of the zipball into a temporary folder
temp_path = os.path.join(os.path.dirname(os.getcwd()), 'temp_extracted')
with ZipFile(BytesIO(response.content)) as zip_ref:
zip_ref.extractall(temp_path)
console.log("[green]Extracting file ...")
# Move files from the temporary folder to the current folder
for item in os.listdir(temp_path):
item_path = os.path.join(temp_path, item)
destination_path = os.path.join(local_path, item)
shutil.move(item_path, destination_path)
# Remove the temporary folder
shutil.rmtree(temp_path)
# Move all folder to main folder
new_folder_name = f"{repo_name}-{commit_sha}"
move_content(new_folder_name, ".")
# Remove old temp folder
shutil.rmtree(new_folder_name)
console.log(f"[cyan]Latest commit downloaded and extracted successfully.")
else:
console.log(f"[red]Failed to fetch commit information. Status code: {response.status_code}")
def main_upload():
"""
Main function to upload the latest commit of a GitHub repository.
"""
repository_owner = 'Ghost6446'
repository_name = 'StreamingCommunity_api'
cmd_insert = input("Are you sure you want to delete all files? (Only videos folder will remain) [yes/no]: ")
if cmd_insert == "yes":
# Remove all old file
keep_specific_items(".", ROOT_PATH, "upload.py")
download_and_extract_latest_commit(repository_owner, repository_name)
main_upload()
# win
# pyinstaller --upx-dir="C:\Program Files\upx" --onefile run.py