mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-07 12:05:35 +00:00
Fix stuttering #95 and auto install of ffmpeg.
This commit is contained in:
parent
f9b92d076d
commit
e91e1b1aad
@ -10,6 +10,7 @@ from Src.Util.console import console, msg
|
||||
from Src.Util.config import config_manager
|
||||
from Src.Util.table import TVShowManager
|
||||
from Src.Util.message import start_message
|
||||
from Src.Util.os import remove_special_characters
|
||||
from Src.Lib.Unidecode import transliterate
|
||||
from Src.Lib.FFmpeg.my_m3u8 import Downloader
|
||||
from .Class import VideoSource
|
||||
@ -113,8 +114,8 @@ def donwload_video(tv_name: str, index_season_selected: int, index_episode_selec
|
||||
episode_id = video_source.obj_episode_manager.episodes[index_episode_selected - 1].id
|
||||
|
||||
# Define filename and path for the downloaded video
|
||||
mp4_name = f"{index_episode_selected}_{transliterate(video_source.obj_episode_manager.episodes[index_episode_selected - 1].name)}.mp4"
|
||||
mp4_path = os.path.join(ROOT_PATH, SERIES_FOLDER, tv_name, f"S{index_season_selected}")
|
||||
mp4_name = remove_special_characters(f"{index_episode_selected}_{transliterate(video_source.obj_episode_manager.episodes[index_episode_selected - 1].name)}.mp4")
|
||||
mp4_path = remove_special_characters(os.path.join(ROOT_PATH, SERIES_FOLDER, tv_name, f"S{index_season_selected}"))
|
||||
os.makedirs(mp4_path, exist_ok=True)
|
||||
|
||||
# Get iframe and content for the episode
|
||||
|
@ -241,7 +241,7 @@ class M3U8_Segments:
|
||||
# Get ts url and create a filename based on index
|
||||
ts_url = self.segments[index]
|
||||
ts_filename = os.path.join(self.temp_folder, f"{index}.ts")
|
||||
logging.info(f"Requesting: {ts_url}, saving to: {ts_filename}")
|
||||
#logging.info(f"Requesting: {ts_url}, saving to: {ts_filename}")
|
||||
|
||||
# If file already exists, skip download
|
||||
if os.path.exists(ts_filename):
|
||||
@ -715,6 +715,8 @@ class Downloader():
|
||||
# Sort files (1.ts, 2.ts, ...) based on their numbers
|
||||
ts_files = [f for f in os.listdir(full_path) if f.endswith(".ts")]
|
||||
ts_files.sort(key=Downloader.extract_number)
|
||||
logging.info(f"Find {len(ts_files)} stream files to join")
|
||||
logging.info(f"Using parameter: \n-c:v = {self.video_decoding} -c:a = {self.audio_decoding}])")
|
||||
|
||||
# Check if there are enough .ts files to join (at least 10)
|
||||
if len(ts_files) < 10:
|
||||
|
@ -13,6 +13,27 @@ import ffmpeg
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console
|
||||
from Src.Util.config import config_manager
|
||||
|
||||
|
||||
# Variable
|
||||
DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug")
|
||||
DEBUG_FFMPEG = "debug" if DEBUG_MODE else "error"
|
||||
|
||||
|
||||
|
||||
"""
|
||||
DOC:
|
||||
|
||||
The `'c': 'copy'` option in the `output_args` dictionary indicates that ffmpeg should perform stream copying for both the audio and video streams.
|
||||
Stream copying means that the audio and video streams are copied directly from the input file(s) to the output file without any re-encoding.
|
||||
This process preserves the original quality of the streams and is much faster than re-encoding.
|
||||
|
||||
When using `'c': 'copy'`, ffmpeg simply copies the bitstream from the input file(s) to the output file without decoding or altering it.
|
||||
This is useful when you want to quickly concatenate or merge multimedia files without any loss in quality or additional processing time.
|
||||
It's particularly efficient when the input and output formats are compatible, and you don't need to make any modifications to the streams.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
def has_audio_stream(video_path: str) -> bool:
|
||||
@ -186,17 +207,10 @@ def concatenate_and_save(file_list_path: str, output_filename: str, video_decodi
|
||||
|
||||
output_args = {
|
||||
'c': 'copy',
|
||||
'loglevel': 'error',
|
||||
'loglevel': DEBUG_FFMPEG,
|
||||
'y': None
|
||||
}
|
||||
|
||||
# Add encoding parameter for video and audio
|
||||
global_args = []
|
||||
if video_decoding:
|
||||
global_args.extend(['-c:v', video_decoding])
|
||||
if audio_decoding:
|
||||
global_args.extend(['-c:a', audio_decoding])
|
||||
|
||||
# Set up the output file name by modifying the video file name
|
||||
output_file_name = os.path.splitext(output_filename)[0] + f"_{prefix}.mp4"
|
||||
|
||||
@ -208,10 +222,22 @@ def concatenate_and_save(file_list_path: str, output_filename: str, video_decodi
|
||||
|
||||
# Concatenate input files and output
|
||||
output = (
|
||||
ffmpeg.input(file_list_path, **input_args)
|
||||
.output(output_file_path, **output_args)
|
||||
.global_args(*global_args)
|
||||
ffmpeg.input(
|
||||
file_list_path,
|
||||
**input_args
|
||||
)
|
||||
.output(
|
||||
output_file_path,
|
||||
**output_args
|
||||
)
|
||||
)
|
||||
|
||||
# Overwrite output file if exists
|
||||
output = ffmpeg.overwrite_output(output)
|
||||
|
||||
# Retrieve the command that will be executed
|
||||
command = output.compile()
|
||||
logging.info(f"Execute command: {command}")
|
||||
|
||||
# Execute the process
|
||||
process = output.run()
|
||||
@ -278,11 +304,11 @@ def join_audios(video_path: str, audio_tracks: list[dict[str, str]], prefix: str
|
||||
output_args = {
|
||||
'vcodec': 'copy',
|
||||
'acodec': 'copy',
|
||||
'loglevel': 'error'
|
||||
'loglevel': DEBUG_FFMPEG
|
||||
}
|
||||
|
||||
# Combine inputs, map audio streams, and set output
|
||||
process = (
|
||||
output = (
|
||||
ffmpeg.output(
|
||||
video_stream,
|
||||
*audio_streams,
|
||||
@ -295,9 +321,17 @@ def join_audios(video_path: str, audio_tracks: list[dict[str, str]], prefix: str
|
||||
'-shortest',
|
||||
'-strict', 'experimental',
|
||||
)
|
||||
.run(overwrite_output=True)
|
||||
)
|
||||
|
||||
# Overwrite output file if exists
|
||||
output = ffmpeg.overwrite_output(output)
|
||||
|
||||
# Retrieve the command that will be executed
|
||||
command = output.compile()
|
||||
logging.info(f"Execute command: {command}")
|
||||
|
||||
# Execute the process
|
||||
process = output.run()
|
||||
logging.info("[M3U8_Downloader] Merge completed successfully.")
|
||||
|
||||
# Return
|
||||
@ -363,7 +397,7 @@ def transcode_with_subtitles(video: str, subtitles_list: list[dict[str, str]], o
|
||||
output_file = os.path.join(os.path.dirname(output_file), output_filename)
|
||||
|
||||
# Configure ffmpeg output
|
||||
output_ffmpeg = ffmpeg.output(
|
||||
output = ffmpeg.output(
|
||||
input_video,
|
||||
*(input_audio,) if has_audio_stream(video) else (), # If there is no audio stream
|
||||
*input_subtitles,
|
||||
@ -371,14 +405,18 @@ def transcode_with_subtitles(video: str, subtitles_list: list[dict[str, str]], o
|
||||
vcodec='copy',
|
||||
acodec='copy' if has_audio_stream(video) else (), # If there is no audio stream
|
||||
**metadata,
|
||||
loglevel='error'
|
||||
loglevel=DEBUG_FFMPEG
|
||||
)
|
||||
|
||||
# Overwrite output file if exists
|
||||
output_ffmpeg = ffmpeg.overwrite_output(output_ffmpeg)
|
||||
output = ffmpeg.overwrite_output(output)
|
||||
|
||||
# Retrieve the command that will be executed
|
||||
command = output.compile()
|
||||
logging.info(f"Execute command: {command}")
|
||||
|
||||
# Run ffmpeg command
|
||||
ffmpeg.run(output_ffmpeg, overwrite_output=True)
|
||||
ffmpeg.run(output, overwrite_output=True)
|
||||
|
||||
# Rename video from mkv -> mp4
|
||||
output_filename_mp4 = output_file.replace("mkv", "mp4")
|
||||
|
@ -1,32 +1,30 @@
|
||||
# 24.01.2023
|
||||
|
||||
import subprocess
|
||||
|
||||
import logging
|
||||
import os
|
||||
import requests
|
||||
import zipfile
|
||||
import sys
|
||||
import ctypes
|
||||
import shutil
|
||||
import subprocess
|
||||
import urllib.request
|
||||
from tqdm.rich import tqdm
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.os import decompress_file
|
||||
from Src.Util._win32 import set_env_path
|
||||
from Src.Util.console import console
|
||||
|
||||
# Constants
|
||||
FFMPEG_BUILDS = {
|
||||
'release-full': {
|
||||
'7z': ('release-full', 'full_build'),
|
||||
'zip': (None, 'full_build')
|
||||
}
|
||||
}
|
||||
INSTALL_DIR = os.path.expanduser("~")
|
||||
|
||||
def isAdmin() -> (bool):
|
||||
"""
|
||||
Check if the current user has administrative privileges.
|
||||
|
||||
Returns:
|
||||
bool: True if the user is an administrator, False otherwise.
|
||||
"""
|
||||
|
||||
try:
|
||||
is_admin = (os.getuid() == 0)
|
||||
|
||||
except AttributeError:
|
||||
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
|
||||
|
||||
return is_admin
|
||||
# Variable
|
||||
show_version = False
|
||||
|
||||
|
||||
def get_version():
|
||||
@ -55,54 +53,141 @@ def get_version():
|
||||
print("Error executing FFmpeg command:", e.output.strip())
|
||||
raise e
|
||||
|
||||
def get_ffmpeg_download_url(build: str = 'release-full', format: str = 'zip') -> str:
|
||||
'''
|
||||
Construct the URL for downloading FFMPEG build.
|
||||
|
||||
Args:
|
||||
build (str): The type of FFMPEG build.
|
||||
format (str): The format of the build (e.g., zip, 7z).
|
||||
|
||||
Returns:
|
||||
str: The URL for downloading the FFMPEG build.
|
||||
'''
|
||||
for ffbuild_name, formats in FFMPEG_BUILDS.items():
|
||||
for ffbuild_format, names in formats.items():
|
||||
if not (format is None or format == ffbuild_format):
|
||||
continue
|
||||
|
||||
if names[0]:
|
||||
return f'https://gyan.dev/ffmpeg/builds/ffmpeg-{names[0]}.{ffbuild_format}'
|
||||
if names[1]:
|
||||
github_version = urllib.request.urlopen(
|
||||
'https://www.gyan.dev/ffmpeg/builds/release-version').read().decode()
|
||||
assert github_version, 'failed to retreive latest version from github'
|
||||
return (
|
||||
'https://github.com/GyanD/codexffmpeg/releases/download/'
|
||||
f'{github_version}/ffmpeg-{github_version}-{names[1]}.{ffbuild_format}'
|
||||
)
|
||||
|
||||
raise ValueError(f'{build} as format {format} does not exist')
|
||||
|
||||
class FFMPEGDownloader:
|
||||
def __init__(self, url: str, destination: str, hash_url: str = None) -> None:
|
||||
'''
|
||||
Initialize the FFMPEGDownloader object.
|
||||
|
||||
Args:
|
||||
url (str): The URL to download the file from.
|
||||
destination (str): The path where the downloaded file will be saved.
|
||||
hash_url (str): The URL containing the file's expected hash.
|
||||
'''
|
||||
self.url = url
|
||||
self.destination = destination
|
||||
self.expected_hash = urllib.request.urlopen(hash_url).read().decode() if hash_url is not None else None
|
||||
with urllib.request.urlopen(self.url) as data:
|
||||
self.file_size = data.length
|
||||
|
||||
def download(self) -> None:
|
||||
'''
|
||||
Download the file from the provided URL.
|
||||
'''
|
||||
try:
|
||||
with urllib.request.urlopen(self.url) as response, open(self.destination, 'wb') as out_file:
|
||||
with tqdm(total=self.file_size, unit='B', unit_scale=True, unit_divisor=1024, desc='[yellow]Downloading') as pbar:
|
||||
while True:
|
||||
data = response.read(4096)
|
||||
if not data:
|
||||
break
|
||||
out_file.write(data)
|
||||
pbar.update(len(data))
|
||||
except Exception as e:
|
||||
logging.error(f"Error downloading file: {e}")
|
||||
raise
|
||||
|
||||
def move_ffmpeg_exe_to_top_level(install_dir: str) -> None:
|
||||
'''
|
||||
Move the FFMPEG executable to the top-level directory.
|
||||
|
||||
Args:
|
||||
install_dir (str): The directory to search for the executable.
|
||||
'''
|
||||
try:
|
||||
for root, _, files in os.walk(install_dir):
|
||||
for file in files:
|
||||
if file == 'ffmpeg.exe':
|
||||
base_path = os.path.abspath(os.path.join(root, '..'))
|
||||
to_remove = os.listdir(install_dir)
|
||||
|
||||
# Move ffmpeg.exe to the top level
|
||||
for item in os.listdir(base_path):
|
||||
shutil.move(os.path.join(base_path, item), install_dir)
|
||||
|
||||
# Remove other files from the top level
|
||||
for item in to_remove:
|
||||
item = os.path.join(install_dir, item)
|
||||
if os.path.isdir(item):
|
||||
shutil.rmtree(item)
|
||||
else:
|
||||
os.remove(item)
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(f"Error moving ffmpeg executable: {e}")
|
||||
raise
|
||||
|
||||
def add_install_dir_to_environment_path(install_dir: str) -> None:
|
||||
'''
|
||||
Add the install directory to the environment PATH variable.
|
||||
|
||||
Args:
|
||||
install_dir (str): The directory to be added to the environment PATH variable.
|
||||
'''
|
||||
|
||||
install_dir = os.path.abspath(os.path.join(install_dir, 'bin'))
|
||||
set_env_path(install_dir)
|
||||
|
||||
def download_ffmpeg():
|
||||
"""
|
||||
Download FFmpeg binary for Windows and add it to the system PATH.
|
||||
|
||||
This function downloads the FFmpeg binary zip file from the specified URL,
|
||||
extracts it to a directory named 'ffmpeg', and adds the 'bin' directory of
|
||||
FFmpeg to the system PATH so that it can be accessed from the command line.
|
||||
"""
|
||||
# Get FFMPEG download URL
|
||||
ffmpeg_url = get_ffmpeg_download_url()
|
||||
|
||||
# SInizializate start variable
|
||||
ffmpeg_url = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-git-full.7z"
|
||||
ffmpeg_dir = "ffmpeg"
|
||||
# Generate install directory path
|
||||
install_dir = os.path.join(INSTALL_DIR, 'FFMPEG')
|
||||
|
||||
print("[yellow]Downloading FFmpeg...[/yellow]")
|
||||
console.print(f"[cyan]f'Making install directory: [red]{install_dir!r}")
|
||||
logging.info(f'Making install directory {install_dir!r}')
|
||||
os.makedirs(install_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
response = requests.get(ffmpeg_url)
|
||||
# Download FFMPEG
|
||||
console.print(f'[cyan]Downloading: [red]{ffmpeg_url!r} [cyan]to [red]{os.path.join(install_dir, os.path.basename(ffmpeg_url))!r}')
|
||||
logging.info(f'Downloading {ffmpeg_url!r} to {os.path.join(install_dir, os.path.basename(ffmpeg_url))!r}')
|
||||
downloader = FFMPEGDownloader(ffmpeg_url, os.path.join(install_dir, os.path.basename(ffmpeg_url)))
|
||||
downloader.download()
|
||||
|
||||
# Create the directory to extract FFmpeg if it doesn't exist
|
||||
os.makedirs(ffmpeg_dir, exist_ok=True)
|
||||
# Decompress downloaded file
|
||||
console.print(f'[cyan]Decompressing downloaded file to: [red]{install_dir!r}')
|
||||
logging.info(f'Decompressing downloaded file to {install_dir!r}')
|
||||
decompress_file(os.path.join(install_dir, os.path.basename(ffmpeg_url)), install_dir)
|
||||
|
||||
# Save the zip file
|
||||
zip_file_path = os.path.join(ffmpeg_dir, "ffmpeg.zip")
|
||||
with open(zip_file_path, "wb") as zip_file:
|
||||
zip_file.write(response.content)
|
||||
|
||||
# Extract the zip file
|
||||
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
|
||||
zip_ref.extractall(ffmpeg_dir)
|
||||
|
||||
# Add the FFmpeg directory to the system PATH
|
||||
ffmpeg_bin_dir = os.path.join(os.getcwd(), ffmpeg_dir, "bin")
|
||||
os.environ["PATH"] += os.pathsep + ffmpeg_bin_dir
|
||||
|
||||
# Remove the downloaded zip file
|
||||
os.remove(zip_file_path)
|
||||
|
||||
except requests.RequestException as e:
|
||||
# If there's an issue with downloading FFmpeg
|
||||
print(f"Failed to download FFmpeg: {e}")
|
||||
raise e
|
||||
|
||||
except zipfile.BadZipFile as e:
|
||||
# If the downloaded file is not a valid zip file
|
||||
print(f"Failed to extract FFmpeg zip file: {e}")
|
||||
raise e
|
||||
# Move ffmpeg executable to top level
|
||||
console.print(f'[cyan]Moving ffmpeg executable to top level of [red]{install_dir!r}')
|
||||
logging.info(f'Moving ffmpeg executable to top level of {install_dir!r}')
|
||||
move_ffmpeg_exe_to_top_level(install_dir)
|
||||
|
||||
# Add install directory to environment PATH variable
|
||||
console.print(f'[cyan]Adding [red]{install_dir} [cyan]to environment PATH variable')
|
||||
logging.info(f'Adding {install_dir} to environment PATH variable')
|
||||
add_install_dir_to_environment_path(install_dir)
|
||||
|
||||
def check_ffmpeg():
|
||||
"""
|
||||
@ -117,29 +202,26 @@ def check_ffmpeg():
|
||||
|
||||
try:
|
||||
# Try running the FFmpeg command to check if it exists
|
||||
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
||||
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
console.print("[blue]FFmpeg is installed. \n")
|
||||
|
||||
# Get and print FFmpeg version
|
||||
#get_version()
|
||||
if show_version:
|
||||
get_version()
|
||||
|
||||
except:
|
||||
|
||||
except subprocess.CalledProcessError:
|
||||
try:
|
||||
# If FFmpeg is not found, attempt to download and add it to the PATH
|
||||
console.print("[cyan]FFmpeg is not found in the PATH. Downloading and adding to the PATH...[/cyan]")
|
||||
|
||||
# Check if user has admin privileges
|
||||
if not isAdmin():
|
||||
console.log("[red]You need to be admin to proceed!")
|
||||
sys.exit(0)
|
||||
|
||||
# Download FFmpeg and add it to the PATH
|
||||
download_ffmpeg()
|
||||
sys.exit(0)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
|
||||
# If unable to download or add FFmpeg to the PATH
|
||||
console.print("[red]Unable to download or add FFmpeg to the PATH.[/red]")
|
||||
console.print(f"Error: {e}")
|
||||
sys.exit(0)
|
||||
raise
|
||||
|
131
Src/Util/_win32.py
Normal file
131
Src/Util/_win32.py
Normal file
@ -0,0 +1,131 @@
|
||||
# 07.04.24
|
||||
|
||||
|
||||
# run somehwere backup
|
||||
# add config to trace if ffmpeg is install, using config in local or temp
|
||||
|
||||
import winreg
|
||||
import os
|
||||
import logging
|
||||
|
||||
|
||||
# Define Windows registry key for user environment variables
|
||||
env_keys = winreg.HKEY_CURRENT_USER, "Environment"
|
||||
|
||||
|
||||
def get_env(name: str) -> str:
|
||||
"""
|
||||
Retrieve the value of the specified environment variable from the Windows registry.
|
||||
|
||||
Args:
|
||||
name (str): The name of the environment variable to retrieve.
|
||||
|
||||
Returns:
|
||||
str: The value of the specified environment variable.
|
||||
"""
|
||||
try:
|
||||
with winreg.OpenKey(*env_keys, 0, winreg.KEY_READ) as key:
|
||||
return winreg.QueryValueEx(key, name)[0]
|
||||
|
||||
except FileNotFoundError:
|
||||
return ""
|
||||
|
||||
|
||||
def set_env_path(dir: str) -> None:
|
||||
"""
|
||||
Add a directory to the user's PATH environment variable.
|
||||
|
||||
Args:
|
||||
dir (str): The directory to add to the PATH environment variable.
|
||||
"""
|
||||
user_path = get_env("Path")
|
||||
|
||||
if dir not in user_path:
|
||||
new_path = user_path + os.pathsep + dir
|
||||
|
||||
try:
|
||||
with winreg.OpenKey(*env_keys, 0, winreg.KEY_WRITE) as key:
|
||||
winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ, new_path)
|
||||
logging.info(f"Added {dir} to PATH.")
|
||||
print("Path set successfully.")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to set PATH: {e}")
|
||||
print("Failed to set PATH.")
|
||||
|
||||
else:
|
||||
print("Directory already exists in the Path.")
|
||||
|
||||
|
||||
def remove_from_path(dir) -> None:
|
||||
"""
|
||||
Remove a directory from the user's PATH environment variable.
|
||||
|
||||
Args:
|
||||
dir (str): The directory to remove from the PATH environment variable.
|
||||
"""
|
||||
user_path = get_env("Path")
|
||||
|
||||
if dir in user_path:
|
||||
new_path = user_path.replace(dir + os.pathsep, "").replace(os.pathsep + dir, "")
|
||||
|
||||
try:
|
||||
with winreg.OpenKey(*env_keys, 0, winreg.KEY_WRITE) as key:
|
||||
winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ, new_path)
|
||||
logging.info(f"Removed {dir} from PATH.")
|
||||
print("Directory removed from Path.")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to remove directory from PATH: {e}")
|
||||
print("Failed to remove directory from Path.")
|
||||
|
||||
else:
|
||||
print("Directory does not exist in the Path.")
|
||||
|
||||
|
||||
def backup_path():
|
||||
"""
|
||||
Backup the original state of the PATH environment variable.
|
||||
"""
|
||||
original_path = get_env("Path")
|
||||
|
||||
try:
|
||||
script_dir = os.path.dirname(__file__)
|
||||
backup_file = os.path.join(script_dir, "path_backup.txt")
|
||||
|
||||
with open(backup_file, "w") as f:
|
||||
for path in original_path.split("\n"):
|
||||
if len(path) > 3:
|
||||
f.write(f"{path}; \n")
|
||||
|
||||
logging.info("Backup of PATH variable created.")
|
||||
print("Backup of PATH variable created.")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to create backup of PATH variable: {e}")
|
||||
print(f"Failed to create backup of PATH variable: {e}")
|
||||
|
||||
|
||||
def restore_path():
|
||||
"""
|
||||
Restore the original state of the PATH environment variable.
|
||||
"""
|
||||
try:
|
||||
backup_file = "path_backup.txt"
|
||||
|
||||
if os.path.isfile(backup_file):
|
||||
with open(backup_file, "r") as f:
|
||||
new_path = f.read()
|
||||
with winreg.OpenKey(*env_keys, 0, winreg.KEY_WRITE) as key:
|
||||
winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ, new_path)
|
||||
|
||||
logging.info("Restored original PATH variable.")
|
||||
print("Restored original PATH variable.")
|
||||
os.remove(backup_file)
|
||||
|
||||
else:
|
||||
logging.warning("No backup file found.")
|
||||
print("No backup file found.")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to restore PATH variable: {e}")
|
||||
print("Failed to restore PATH variable.")
|
@ -18,5 +18,5 @@ def get_headers() -> str:
|
||||
# Get a random user agent string from the user agent rotator
|
||||
random_headers = ua.get_random_user_agent("firefox")
|
||||
|
||||
logging.info(f"Use headers: {random_headers}")
|
||||
#logging.info(f"Use headers: {random_headers}")
|
||||
return random_headers
|
@ -7,6 +7,36 @@ import json
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
import zipfile
|
||||
|
||||
|
||||
# Costant
|
||||
special_chars_to_remove = [
|
||||
'!',
|
||||
'@',
|
||||
'#',
|
||||
'$',
|
||||
'%',
|
||||
'^',
|
||||
'&',
|
||||
'*',
|
||||
'(',
|
||||
')',
|
||||
'[',
|
||||
']',
|
||||
'{',
|
||||
'}',
|
||||
'<',
|
||||
'|',
|
||||
'`',
|
||||
'~',
|
||||
"'",
|
||||
'"',
|
||||
';',
|
||||
':',
|
||||
',',
|
||||
'?'
|
||||
]
|
||||
|
||||
|
||||
def remove_folder(folder_path: str) -> None:
|
||||
@ -41,24 +71,24 @@ def remove_file(file_path: str) -> None:
|
||||
print(f"Error removing file '{file_path}': {e}")
|
||||
|
||||
|
||||
def remove_special_characters(filename) -> str:
|
||||
def remove_special_characters(input_string):
|
||||
"""
|
||||
Removes special characters from a filename to make it suitable for creating a filename in Windows.
|
||||
Remove specified special characters from a string.
|
||||
|
||||
Args:
|
||||
filename (str): The original filename containing special characters.
|
||||
Parameters:
|
||||
input_string (str): The input string containing special characters.
|
||||
special_chars (list): List of special characters to be removed.
|
||||
|
||||
Returns:
|
||||
str: The cleaned filename without special characters.
|
||||
str: A new string with specified special characters removed.
|
||||
"""
|
||||
# Compile regular expression pattern to match special characters
|
||||
pattern = re.compile('[' + re.escape(''.join(special_chars_to_remove)) + ']')
|
||||
|
||||
# Define the regex pattern to match special characters
|
||||
pattern = r'[^\w\-_\. ]'
|
||||
# Use compiled pattern to replace special characters with an empty string
|
||||
cleaned_string = pattern.sub('', input_string)
|
||||
|
||||
# Replace special characters with an empty string
|
||||
cleaned_filename = re.sub(pattern, '', filename)
|
||||
|
||||
return cleaned_filename
|
||||
return cleaned_string
|
||||
|
||||
|
||||
def move_file_one_folder_up(file_path) -> None:
|
||||
@ -85,6 +115,22 @@ def move_file_one_folder_up(file_path) -> None:
|
||||
os.rename(file_path, new_path)
|
||||
|
||||
|
||||
def decompress_file(downloaded_file_path: str, destination: str) -> None:
|
||||
'''
|
||||
Decompress one file.
|
||||
|
||||
Args:
|
||||
downloaded_file_path (str): The path to the downloaded file.
|
||||
destination (str): The directory where the file will be decompressed.
|
||||
'''
|
||||
try:
|
||||
with zipfile.ZipFile(downloaded_file_path) as zip_file:
|
||||
zip_file.extractall(destination)
|
||||
except Exception as e:
|
||||
logging.error(f"Error decompressing file: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def read_json(path: str):
|
||||
"""Reads JSON file and returns its content.
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user