Fix 1337x dont work, add get_only_link

This commit is contained in:
Lovi 2024-11-01 15:57:11 +01:00
parent 727ee59066
commit 62dc037a89
15 changed files with 151 additions and 76 deletions

View File

@ -187,6 +187,9 @@ You can change some behaviors by tweaking the configuration file.
- **Default Value**: `-1`
- **Example Value**: `1080`
* **get_only_link**: Print hls m3u8 link and path file.
- **Default Value**: `false`
</details>
> [!IMPORTANT]

View File

@ -36,7 +36,7 @@ def google_search(query):
return first_result
def get_final_redirect_url(initial_url):
def get_final_redirect_url(initial_url, max_timeout):
"""
Follow redirects from the initial URL and return the final URL after all redirects.
@ -48,13 +48,19 @@ def get_final_redirect_url(initial_url):
"""
# Create a client with redirects enabled
with httpx.Client(follow_redirects=True) as client:
response = client.get(initial_url)
try:
with httpx.Client(follow_redirects=True, timeout=max_timeout, headers={'user-agent': get_headers()}) as client:
response = client.get(initial_url)
response.raise_for_status()
# Capture the final URL after all redirects
final_url = response.url
# Capture the final URL after all redirects
final_url = response.url
return final_url
return final_url
except Exception as e:
console.print(f"[cyan]Test url[white]: [red]{initial_url}, [cyan]error[white]: [red]{e}")
return None
def search_domain(site_name: str, base_url: str):
"""
@ -72,7 +78,7 @@ def search_domain(site_name: str, base_url: str):
# Extract config domain
max_timeout = config_manager.get_int("REQUESTS", "timeout")
domain = str(config_manager.get_dict("SITE", site_name)['domain'])
console.print(f"[cyan]Test site[white]: [red]{base_url}.{domain}")
#console.print(f"[cyan]Test site[white]: [red]{base_url}.{domain}")
try:
@ -82,32 +88,35 @@ def search_domain(site_name: str, base_url: str):
response_follow.raise_for_status()
except Exception as e:
console.print(f"[cyan]Change domain for site[white]: [red]{base_url}.{domain}, [cyan]error[white]: [red]{e}")
console.print(f"[cyan]Test url[white]: [red]{base_url}.{domain}, [cyan]error[white]: [red]{e}")
query = base_url.split("/")[-1]
first_url = google_search(query)
console.print(f"[green]First url from google seach[white]: [red]{first_url}")
if first_url:
final_url = get_final_redirect_url(first_url)
console.print(f"\n[bold yellow]Suggestion:[/bold yellow] [white](Experimental)\n"
f"[cyan]New final URL[white]: [green]{final_url}")
def extract_domain(url):
parsed_url = urlparse(url)
domain = parsed_url.netloc
return domain.split(".")[-1]
final_url = get_final_redirect_url(first_url, max_timeout)
new_domain_extract = extract_domain(str(final_url))
if msg.ask(f"[red]Do you want to auto update config.json - '[green]{site_name}[red]' with domain: [green]{new_domain_extract}", choices=["y", "n"], default="y").lower() == "y":
if final_url != None:
console.print(f"\n[bold yellow]Suggestion:[/bold yellow] [white](Experimental)\n"
f"[cyan]New final URL[white]: [green]{final_url}")
# Update domain in config.json
config_manager.config['SITE'][site_name]['domain'] = new_domain_extract
config_manager.write_config()
def extract_domain(url):
parsed_url = urlparse(url)
domain = parsed_url.netloc
return domain.split(".")[-1]
# Return config domain
console.print(f"[cyan]Return domain: [red]{new_domain_extract} \n")
return new_domain_extract, f"{base_url}.{new_domain_extract}"
new_domain_extract = extract_domain(str(final_url))
if msg.ask(f"[red]Do you want to auto update config.json - '[green]{site_name}[red]' with domain: [green]{new_domain_extract}", choices=["y", "n"], default="y").lower() == "y":
# Update domain in config.json
config_manager.config['SITE'][site_name]['domain'] = new_domain_extract
config_manager.write_config()
# Return config domain
console.print(f"[cyan]Return domain: [red]{new_domain_extract} \n")
return new_domain_extract, f"{base_url}.{new_domain_extract}"
else:
console.print("[bold red]\nManually change the domain in the JSON file.[/bold red]")

View File

@ -90,9 +90,6 @@ def map_episode_title(tv_name: str, number_season: int, episode_number: int, epi
map_episode_temp = map_episode_temp.replace("%(episode)", dynamic_format_number(episode_number))
map_episode_temp = map_episode_temp.replace("%(episode_name)", remove_special_characters(episode_name))
# Additional fix
map_episode_temp = map_episode_temp.replace(".", "_")
logging.info(f"Map episode string return: {map_episode_temp}")
return map_episode_temp

View File

@ -6,6 +6,7 @@ import time
# Internal utilities
from Src.Util.console import console, msg
from Src.Util.os import remove_special_characters
from Src.Util.message import start_message
from Src.Util.call_stack import get_call_stack
from Src.Lib.Downloader import HLS_Downloader
@ -38,17 +39,23 @@ def download_film(select_title: MediaItem):
video_source = VideoSource(select_title.url)
# Define output path
mp4_name = select_title.name + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, select_title.name)
mp4_name = remove_special_characters(select_title.name) + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(select_title.name))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
if HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() == 404:
r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start()
if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)

View File

@ -12,7 +12,7 @@ from .film import download_film
# Variable
indice = 9
_use_for = "film"
_deprecate = False
_deprecate = True
def search():

View File

@ -1,14 +1,16 @@
# 03.07.24
import os
import sys
import logging
import time
# Internal utilities
from Src.Util.console import console
from Src.Util.console import console, msg
from Src.Util.os import remove_special_characters
from Src.Util.message import start_message
from Src.Util.call_stack import get_call_stack
from Src.Lib.Downloader import HLS_Downloader
from ..Template import execute_search
# Logic class
@ -36,14 +38,23 @@ def download_film(select_title: MediaItem):
video_source = VideoSource(select_title.url)
# Define output path
mp4_name = select_title.name +".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, select_title.name)
mp4_name = remove_special_characters(select_title.name) +".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(select_title.name))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
HLS_Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_name)
).start()
r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start()
if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)

View File

@ -54,7 +54,10 @@ def download_video(scape_info_serie: GetSerieInfo, index_season_selected: int, i
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
if HLS_Downloader(os.path.join(mp4_path, mp4_name), master_playlist).start() == 404:
# Download the film using the m3u8 playlist, and output filename
r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start()
if r_proc == 404:
time.sleep(2)
# Re call search function
@ -62,6 +65,11 @@ def download_video(scape_info_serie: GetSerieInfo, index_season_selected: int, i
frames = get_call_stack()
execute_search(frames[-4])
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
def download_episode(scape_info_serie: GetSerieInfo, index_season_selected: int, download_all: bool = False) -> None:
"""

View File

@ -13,6 +13,7 @@ from bs4 import BeautifulSoup
# Internal utilities
from Src.Util.console import console, msg
from Src.Util.os import remove_special_characters
from Src.Util.message import start_message
from Src.Util.call_stack import get_call_stack
from Src.Util.headers import get_headers
@ -62,17 +63,23 @@ def download_film(movie_details: Json_film):
video_source.setup(supervideo_url)
# Define output path
mp4_name = movie_details.title + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, movie_details.title)
mp4_name = remove_special_characters(movie_details.title) + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, remove_special_characters(movie_details.title))
# Get m3u8 master playlist
master_playlist = video_source.get_playlist()
# Download the film using the m3u8 playlist, and output filename
if HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start() == 404:
r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_name)).start()
if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)

View File

@ -1,14 +1,16 @@
# 3.12.23
import os
import sys
import logging
import time
# Internal utilities
from Src.Util.console import console
from Src.Util.console import console, msg
from Src.Util.os import remove_special_characters
from Src.Util.message import start_message
from Src.Util.call_stack import get_call_stack
from Src.Lib.Downloader import HLS_Downloader
from ..Template import execute_search
# Logic class
@ -43,11 +45,20 @@ def download_film(select_title: MediaItem, domain: str, version: str):
master_playlist = video_source.get_playlist()
# Define the filename and path for the downloaded film
mp4_format = (select_title.slug) + ".mp4"
mp4_format = remove_special_characters(select_title.slug) + ".mp4"
mp4_path = os.path.join(ROOT_PATH, SITE_NAME, MOVIE_FOLDER, select_title.slug)
# Download the film using the m3u8 playlist, and output filename
HLS_Downloader(
m3u8_playlist = master_playlist,
output_filename = os.path.join(mp4_path, mp4_format)
).start()
r_proc = HLS_Downloader(m3u8_playlist = master_playlist, output_filename = os.path.join(mp4_path, mp4_format)).start()
if r_proc == 404:
time.sleep(2)
# Re call search function
if msg.ask("[green]Do you want to continue [white]([red]y[white])[green] or return at home[white]([red]n[white]) ", choices=['y', 'n'], default='y', show_choices=True) == "n":
frames = get_call_stack()
execute_search(frames[-4])
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)

View File

@ -53,7 +53,9 @@ def download_video(tv_name: str, index_season_selected: int, index_episode_selec
master_playlist = video_source.get_playlist()
# Download the episode
if HLS_Downloader(os.path.join(mp4_path, mp4_name), master_playlist).start() == 404:
r_proc = HLS_Downloader(os.path.join(mp4_path, mp4_name), master_playlist).start()
if r_proc == 404:
time.sleep(2)
# Re call search function
@ -61,6 +63,10 @@ def download_video(tv_name: str, index_season_selected: int, index_episode_selec
frames = get_call_stack()
execute_search(frames[-4])
if r_proc != None:
console.print("[green]Result: ")
console.print(r_proc)
def download_episode(tv_name: str, index_season_selected: int, download_all: bool = False) -> None:
"""
Download episodes of a selected season.

View File

@ -52,6 +52,7 @@ DOWNLOAD_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'download_sub')
MERGE_SUBTITLE = config_manager.get_bool('M3U8_DOWNLOAD', 'merge_subs')
REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8_DOWNLOAD', 'cleanup_tmp_folder')
FILTER_CUSTOM_REOLUTION = config_manager.get_int('M3U8_PARSER', 'force_resolution')
GET_ONLY_LINK = config_manager.get_bool('M3U8_PARSER', 'get_only_link')
# Variable
@ -736,7 +737,8 @@ class HLS_Downloader:
def start(self):
"""
Initiates the downloading process. Checks if the output file already exists and proceeds with processing the playlist or index.
"""
"""
if os.path.exists(self.output_filename):
console.log("[red]Output file already exists.")
return
@ -745,13 +747,30 @@ class HLS_Downloader:
# Determine whether to process a playlist or index
if self.m3u8_playlist:
r_proc = self._process_playlist()
if not GET_ONLY_LINK:
r_proc = self._process_playlist()
if r_proc == 404:
return 404
if r_proc == 404:
return 404
else:
return None
else:
return {
'path': self.output_filename,
'url': self.m3u8_playlist
}
elif self.m3u8_index:
self._process_index()
if not GET_ONLY_LINK:
self._process_index()
return None
else:
return {
'path': self.output_filename,
'url': self.m3u8_index
}
def _clean(self, out_path: str) -> None:
"""

View File

@ -25,14 +25,11 @@ import unicodedata
# Internal utilities
from .console import console
from ._jsonConfig import config_manager
# --> OS FILE ASCII
special_chars_to_remove = [
'!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '[', ']', '{', '}', '<',
'>', '|', '`', '~', "'", '"', ';', ':', ',', '?', '\\', '/', '\t', ' ', '=',
'+', '\n', '\r', '\0', ':'
]
special_chars_to_remove = config_manager.get("DEFAULT", "special_chars_to_remove")
def get_max_length_by_os(system: str) -> int:
@ -91,13 +88,12 @@ def remove_special_characters(input_string):
Parameters:
- input_string (str): The input string containing special characters.
- special_chars (list): List of special characters to be removed.
Returns:
str: A new string with specified special characters removed.
"""
# Compile regular expression pattern to match special characters
pattern = re.compile('[' + re.escape(''.join(special_chars_to_remove)) + ']')
pattern = re.compile('[' + re.escape(special_chars_to_remove) + ']')
# Use compiled pattern to replace special characters with an empty string
cleaned_string = pattern.sub('', input_string)
@ -322,7 +318,7 @@ def get_system_summary():
ffprobe_version = get_executable_version(['ffprobe', '-version'])
console.print(f"[cyan]Exe versions[white]: [bold red]ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}[/bold red]")
logging.info(f"Exe versions: ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}")
logging.info(f"Dependencies: ffmpeg {ffmpeg_version}, ffprobe {ffprobe_version}")
# Optional libraries versions
optional_libraries = [line.strip() for line in open('requirements.txt', 'r', encoding='utf-8-sig')]

View File

@ -2,11 +2,12 @@
"DEFAULT": {
"debug": false,
"log_file": "app.log",
"log_to_file": true,
"show_message": true,
"clean_console": true,
"log_to_file": false,
"show_message": false,
"clean_console": false,
"root_path": "Video",
"map_episode_name": "%(tv_name)_S%(season)E%(episode)_%(episode_name)",
"special_chars_to_remove": "!@#$%^&*()[]{}<>|`~'\";:,.?=+\u00e2\u20ac\u00a6",
"config_qbit_tor": {
"host": "192.168.1.58",
"port": "8080",
@ -53,7 +54,8 @@
"default_preset": "ultrafast"
},
"M3U8_PARSER": {
"force_resolution": -1
"force_resolution": -1,
"get_only_link": false
},
"SITE": {
"streamingcommunity": {
@ -64,12 +66,12 @@
"altadefinizione": {
"video_workers": 12,
"audio_workers": 12,
"domain": "my"
"domain": "now"
},
"guardaserie": {
"video_workers": 12,
"audio_workers": 12,
"domain": "dev"
"domain": "academy"
},
"mostraguarda": {
"video_workers": 12,
@ -94,7 +96,7 @@
"domain": "to"
},
"cb01": {
"domain": "nexus"
"domain": "vet"
}
}
}

View File

@ -6,7 +6,6 @@ m3u8
psutil
unidecode
jsbeautifier
seleniumbase
fake-useragent
qbittorrent-api
python-qbittorrent

2
run.py
View File

@ -199,5 +199,5 @@ def main():
if __name__ == '__main__':
initialize()
#initialize()
main()