From e576a200da383ee9489baa24714946cd8c1ebfb3 Mon Sep 17 00:00:00 2001 From: Giova <62809003+Ghost6446@users.noreply.github.com> Date: Fri, 24 May 2024 12:04:20 +0200 Subject: [PATCH] Delete Src/Lib/Hls/M3U8 directory --- Src/Lib/Hls/M3U8/__init__.py | 6 - Src/Lib/Hls/M3U8/decryption.py | 127 ------ Src/Lib/Hls/M3U8/lib_parser/__init__.py | 38 -- Src/Lib/Hls/M3U8/lib_parser/_util.py | 28 -- Src/Lib/Hls/M3U8/lib_parser/model.py | 359 ---------------- Src/Lib/Hls/M3U8/lib_parser/parser.py | 338 --------------- Src/Lib/Hls/M3U8/lib_parser/protocol.py | 17 - Src/Lib/Hls/M3U8/math_calc.py | 41 -- Src/Lib/Hls/M3U8/parser.py | 547 ------------------------ Src/Lib/Hls/M3U8/url_fix.py | 54 --- 10 files changed, 1555 deletions(-) delete mode 100644 Src/Lib/Hls/M3U8/__init__.py delete mode 100644 Src/Lib/Hls/M3U8/decryption.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/__init__.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/_util.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/model.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/parser.py delete mode 100644 Src/Lib/Hls/M3U8/lib_parser/protocol.py delete mode 100644 Src/Lib/Hls/M3U8/math_calc.py delete mode 100644 Src/Lib/Hls/M3U8/parser.py delete mode 100644 Src/Lib/Hls/M3U8/url_fix.py diff --git a/Src/Lib/Hls/M3U8/__init__.py b/Src/Lib/Hls/M3U8/__init__.py deleted file mode 100644 index a0bacd5..0000000 --- a/Src/Lib/Hls/M3U8/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# 02.04.24 - -from .decryption import M3U8_Decryption -from .math_calc import M3U8_Ts_Files -from .parser import M3U8_Parser, M3U8_Codec -from .url_fix import m3u8_url_fix \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/decryption.py b/Src/Lib/Hls/M3U8/decryption.py deleted file mode 100644 index b02946c..0000000 --- a/Src/Lib/Hls/M3U8/decryption.py +++ /dev/null @@ -1,127 +0,0 @@ -# 03.04.24 - -import sys -import logging -import subprocess -import importlib.util - - -# Internal utilities -from Src.Util.console import console - - -# Check if Crypto module is installed -crypto_spec = importlib.util.find_spec("Crypto") -crypto_installed = crypto_spec is not None - - -if crypto_installed: - logging.info("Decrypy use: Crypto") - from Crypto.Cipher import AES # type: ignore - from Crypto.Util.Padding import unpad # type: ignore - - class M3U8_Decryption: - """ - Class for decrypting M3U8 playlist content using AES encryption when the Crypto module is available. - """ - def __init__(self, key: bytes, iv: bytes, method: str) -> None: - """ - Initialize the M3U8_Decryption object. - - Args: - - key (bytes): The encryption key. - - iv (bytes): The initialization vector (IV). - - method (str): The encryption method. - """ - self.key = key - if "0x" in str(iv): - self.iv = bytes.fromhex(iv.replace("0x", "")) - else: - self.iv = iv - self.method = method - logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})") - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt the ciphertext using the specified encryption method. - - Args: - - ciphertext (bytes): The encrypted content to decrypt. - - Returns: - bytes: The decrypted content. - """ - if self.method == "AES": - cipher = AES.new(self.key, AES.MODE_ECB) - decrypted_data = cipher.decrypt(ciphertext) - return unpad(decrypted_data, AES.block_size) - - elif self.method == "AES-128": - cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv) - decrypted_data = cipher.decrypt(ciphertext) - return unpad(decrypted_data, AES.block_size) - - elif self.method == "AES-128-CTR": - cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv) - return cipher.decrypt(ciphertext) - - else: - raise ValueError("Invalid or unsupported method") - -else: - - # Check if openssl command is available - openssl_available = subprocess.run(["openssl", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0 - logging.info("Decrypy use: OPENSSL") - - if not openssl_available: - console.log("[red]Neither Crypto nor openssl is installed. Please install either one of them.") - sys.exit(0) - - class M3U8_Decryption: - """ - Class for decrypting M3U8 playlist content using OpenSSL when the Crypto module is not available. - """ - def __init__(self, key: bytes, iv: bytes, method: str) -> None: - """ - Initialize the M3U8_Decryption object. - - Args: - - key (bytes): The encryption key. - - iv (bytes): The initialization vector (IV). - - method (str): The encryption method. - """ - self.key = key - if "0x" in str(iv): - self.iv = bytes.fromhex(iv.replace("0x", "")) - else: - self.iv = iv - self.method = method - logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})") - - def decrypt(self, ciphertext: bytes) -> bytes: - """ - Decrypt the ciphertext using the specified encryption method. - - Args: - - ciphertext (bytes): The encrypted content to decrypt. - - Returns: - bytes: The decrypted content. - """ - if self.method == "AES": - openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt' - decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) - - elif self.method == "AES-128": - openssl_cmd = f'openssl enc -d -aes-128-cbc -K {self.key[:16].hex()} -iv {self.iv.hex()}' - decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) - - elif self.method == "AES-128-CTR": - openssl_cmd = f'openssl enc -d -aes-128-ctr -K {self.key[:16].hex()} -iv {self.iv.hex()}' - decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext) - - else: - raise ValueError("Invalid or unsupported method") - - return decrypted_data diff --git a/Src/Lib/Hls/M3U8/lib_parser/__init__.py b/Src/Lib/Hls/M3U8/lib_parser/__init__.py deleted file mode 100644 index 1d99d3d..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -# 15.04.24 - -import os - - -# Internal utilities -from .model import M3U8 - - -def load(raw_content, uri): - """ - Parses the content of an M3U8 playlist and returns an M3U8 object. - - Args: - raw_content (str): The content of the M3U8 playlist as a string. - uri (str): The URI of the M3U8 playlist file or stream. - - Returns: - M3U8: An object representing the parsed M3U8 playlist. - - Raises: - IOError: If the raw_content is empty or if the URI cannot be accessed. - ValueError: If the raw_content is not a valid M3U8 playlist format. - - Example: - >>> m3u8_content = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:10\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:10.0,\nhttp://example.com/segment0.ts\n#EXTINF:10.0,\nhttp://example.com/segment1.ts\n" - >>> uri = "http://example.com/playlist.m3u8" - >>> playlist = load(m3u8_content, uri) - """ - - if not raw_content: - raise IOError("Empty content provided.") - - if not uri: - raise IOError("Empty URI provided.") - - base_uri = os.path.dirname(uri) - return M3U8(raw_content, base_uri=base_uri) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/lib_parser/_util.py b/Src/Lib/Hls/M3U8/lib_parser/_util.py deleted file mode 100644 index 48b5b0a..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/_util.py +++ /dev/null @@ -1,28 +0,0 @@ -# 19.04.24 - -import itertools - - -def remove_quotes_parser(*attrs): - """ - Returns a dictionary mapping attribute names to a function that removes quotes from their values. - """ - return dict(zip(attrs, itertools.repeat(remove_quotes))) - - -def remove_quotes(string): - """ - Removes quotes from a string. - """ - quotes = ('"', "'") - if string and string[0] in quotes and string[-1] in quotes: - return string[1:-1] - return string - - -def normalize_attribute(attribute): - """ - Normalizes an attribute name by converting hyphens to underscores and converting to lowercase. - """ - return attribute.replace('-', '_').lower().strip() - diff --git a/Src/Lib/Hls/M3U8/lib_parser/model.py b/Src/Lib/Hls/M3U8/lib_parser/model.py deleted file mode 100644 index c1a89ef..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/model.py +++ /dev/null @@ -1,359 +0,0 @@ -# 15.04.24 - -import os -from collections import namedtuple - - -# Internal utilities -from ..lib_parser import parser - - -# Variable -StreamInfo = namedtuple('StreamInfo', ['bandwidth', 'program_id', 'resolution', 'codecs']) -Media = namedtuple('Media', ['uri', 'type', 'group_id', 'language', 'name','default', 'autoselect', 'forced', 'characteristics']) - - - -class M3U8: - """ - Represents a single M3U8 playlist. Should be instantiated with the content as string. - - Args: - - content: the m3u8 content as string - - base_path: all urls (key and segments url) will be updated with this base_path, - ex: base_path = "http://videoserver.com/hls" - - base_uri: uri the playlist comes from. it is propagated to SegmentList and Key - ex: http://example.com/path/to - - Attribute: - - key: it's a `Key` object, the EXT-X-KEY from m3u8. Or None - - segments: a `SegmentList` object, represents the list of `Segment`s from this playlist - - is_variant: Returns true if this M3U8 is a variant playlist, with links to other M3U8s with different bitrates. - If true, `playlists` is a list of the playlists available, and `iframe_playlists` is a list of the i-frame playlists available. - - is_endlist: Returns true if EXT-X-ENDLIST tag present in M3U8. - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8 - - playlists: If this is a variant playlist (`is_variant` is True), returns a list of Playlist objects - - iframe_playlists: If this is a variant playlist (`is_variant` is True), returns a list of IFramePlaylist objects - - playlist_type: A lower-case string representing the type of the playlist, which can be one of VOD (video on demand) or EVENT. - - media: If this is a variant playlist (`is_variant` is True), returns a list of Media objects - - target_duration: Returns the EXT-X-TARGETDURATION as an integer - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2 - - media_sequence: Returns the EXT-X-MEDIA-SEQUENCE as an integer - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3 - - program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a string - Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5 - - version: Return the EXT-X-VERSION as is - - allow_cache: Return the EXT-X-ALLOW-CACHE as is - - files: Returns an iterable with all files from playlist, in order. This includes segments and key uri, if present. - - base_uri: It is a property (getter and setter) used by SegmentList and Key to have absolute URIs. - - is_i_frames_only: Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8. - Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12 - - """ - - # Mapping of simple attributes (obj attribute, parser attribute) - SIMPLE_ATTRIBUTES = ( - ('is_variant', 'is_variant'), - ('is_endlist', 'is_endlist'), - ('is_i_frames_only', 'is_i_frames_only'), - ('target_duration', 'targetduration'), - ('media_sequence', 'media_sequence'), - ('program_date_time', 'program_date_time'), - ('version', 'version'), - ('allow_cache', 'allow_cache'), - ('playlist_type', 'playlist_type') - ) - - def __init__(self, content=None, base_path=None, base_uri=None): - """ - Initialize the M3U8 object. - - Parameters: - - content: M3U8 content (string). - - base_path: Base path for relative URIs (string). - - base_uri: Base URI for absolute URIs (string). - """ - if content is not None: - self.data = parser.parse(content) - else: - self.data = {} - self._base_uri = base_uri - self.base_path = base_path - self._initialize_attributes() - - def _initialize_attributes(self): - """ - Initialize attributes based on parsed data. - """ - # Initialize key and segments - self.key = Key(base_uri=self.base_uri, **self.data.get('key', {})) if 'key' in self.data else None - self.segments = SegmentList([Segment(base_uri=self.base_uri, **params) for params in self.data.get('segments', [])]) - - # Initialize simple attributes - for attr, param in self.SIMPLE_ATTRIBUTES: - setattr(self, attr, self.data.get(param)) - - # Initialize files, media, playlists, and iframe_playlists - self.files = [] - if self.key: - self.files.append(self.key.uri) - self.files.extend(self.segments.uri) - - self.media = [Media( - uri = media.get('uri'), - type = media.get('type'), - group_id = media.get('group_id'), - language = media.get('language'), - name = media.get('name'), - default = media.get('default'), - autoselect = media.get('autoselect'), - forced = media.get('forced'), - characteristics = media.get('characteristics')) - for media in self.data.get('media', []) - ] - self.playlists = PlaylistList([Playlist( - base_uri = self.base_uri, - media = self.media, - **playlist - )for playlist in self.data.get('playlists', []) - ]) - self.iframe_playlists = PlaylistList() - for ifr_pl in self.data.get('iframe_playlists', []): - self.iframe_playlists.append( - IFramePlaylist( - base_uri = self.base_uri, - uri = ifr_pl['uri'], - iframe_stream_info=ifr_pl['iframe_stream_info']) - ) - - @property - def base_uri(self): - """ - Get the base URI. - """ - return self._base_uri - - @base_uri.setter - def base_uri(self, new_base_uri): - """ - Set the base URI. - """ - self._base_uri = new_base_uri - self.segments.base_uri = new_base_uri - - -class BasePathMixin: - """ - Mixin class for managing base paths. - """ - @property - def base_path(self): - """ - Get the base path. - """ - return os.path.dirname(self.uri) - - @base_path.setter - def base_path(self, newbase_path): - """ - Set the base path. - """ - if not self.base_path: - self.uri = "%s/%s" % (newbase_path, self.uri) - self.uri = self.uri.replace(self.base_path, newbase_path) - - -class GroupedBasePathMixin: - """ - Mixin class for managing base paths across a group of items. - """ - - def _set_base_uri(self, new_base_uri): - """ - Set the base URI for each item in the group. - """ - for item in self: - item.base_uri = new_base_uri - - base_uri = property(None, _set_base_uri) - - def _set_base_path(self, new_base_path): - """ - Set the base path for each item in the group. - """ - for item in self: - item.base_path = new_base_path - - base_path = property(None, _set_base_path) - - -class Segment(BasePathMixin): - """ - Class representing a segment in an M3U8 playlist. - Inherits from BasePathMixin for managing base paths. - """ - - def __init__(self, uri, base_uri, program_date_time=None, duration=None, - title=None, byterange=None, discontinuity=False, key=None): - """ - Initialize a Segment object. - - Args: - - uri: URI of the segment. - - base_uri: Base URI for the segment. - - program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a datetime - Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5 - - duration: Duration of the segment (optional). - - title: Title attribute from EXTINF parameter - - byterange: Byterange information of the segment (optional). - - discontinuity: Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists - Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11 - - key: Key for encryption (optional). - """ - self.uri = uri - self.duration = duration - self.title = title - self.base_uri = base_uri - self.byterange = byterange - self.program_date_time = program_date_time - self.discontinuity = discontinuity - #self.key = key - - -class SegmentList(list, GroupedBasePathMixin): - """ - Class representing a list of segments in an M3U8 playlist. - Inherits from list and GroupedBasePathMixin for managing base paths across a group of items. - """ - - @property - def uri(self): - """ - Get the URI of each segment in the SegmentList. - - Returns: - - List of URIs of segments in the SegmentList. - """ - return [seg.uri for seg in self] - - -class Key(BasePathMixin): - """ - Class representing a key used for encryption in an M3U8 playlist. - Inherits from BasePathMixin for managing base paths. - """ - - def __init__(self, method, uri, base_uri, iv=None): - """ - Initialize a Key object. - - Args: - - method: Encryption method. - ex: "AES-128" - - uri: URI of the key. - ex: "https://priv.example.com/key.php?r=52" - - base_uri: Base URI for the key. - ex: http://example.com/path/to - - iv: Initialization vector (optional). - ex: 0X12A - """ - self.method = method - self.uri = uri - self.iv = iv - self.base_uri = base_uri - - -class Playlist(BasePathMixin): - """ - Playlist object representing a link to a variant M3U8 with a specific bitrate. - - More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10 - """ - - def __init__(self, uri, stream_info, media, base_uri): - """ - Initialize a Playlist object. - - Args: - - uri: URI of the playlist. - - stream_info: is a named tuple containing the attributes: `program_id`, - - media: List of Media objects associated with the playlist. - - base_uri: Base URI for the playlist. - """ - self.uri = uri - self.base_uri = base_uri - - # Extract resolution information from stream_info - resolution = stream_info.get('resolution') - if resolution is not None: - values = resolution.split('x') - resolution_pair = (int(values[0]), int(values[1])) - else: - resolution_pair = None - - # Create StreamInfo object - self.stream_info = StreamInfo( - bandwidth = stream_info['bandwidth'], - program_id = stream_info.get('program_id'), - resolution = resolution_pair, - codecs = stream_info.get('codecs') - ) - - # Filter media based on group ID and media type - self.media = [] - for media_type in ('audio', 'video', 'subtitles'): - group_id = stream_info.get(media_type) - if group_id: - self.media += filter(lambda m: m.group_id == group_id, media) - - -class IFramePlaylist(BasePathMixin): - """ - Class representing an I-Frame playlist in an M3U8 playlist. - Inherits from BasePathMixin for managing base paths. - """ - - def __init__(self, base_uri, uri, iframe_stream_info): - """ - Initialize an IFramePlaylist object. - - Args: - - base_uri: Base URI for the I-Frame playlist. - - uri: URI of the I-Frame playlist. - - iframe_stream_info, is a named tuple containing the attributes: - `program_id`, `bandwidth`, `codecs` and `resolution` which is a tuple (w, h) of integers - """ - self.uri = uri - self.base_uri = base_uri - - # Extract resolution information from iframe_stream_info - resolution = iframe_stream_info.get('resolution') - if resolution is not None: - values = resolution.split('x') - resolution_pair = (int(values[0]), int(values[1])) - else: - resolution_pair = None - - # Create StreamInfo object for I-Frame playlist - self.iframe_stream_info = StreamInfo( - bandwidth = iframe_stream_info.get('bandwidth'), - program_id = iframe_stream_info.get('program_id'), - resolution = resolution_pair, - codecs = iframe_stream_info.get('codecs') - ) - -class PlaylistList(list, GroupedBasePathMixin): - """ - Class representing a list of playlists in an M3U8 playlist. - Inherits from list and GroupedBasePathMixin for managing base paths across a group of items. - """ - - def __str__(self): - """ - Return a string representation of the PlaylistList. - - Returns: - - String representation of the PlaylistList. - """ - output = [str(playlist) for playlist in self] - return '\n'.join(output) diff --git a/Src/Lib/Hls/M3U8/lib_parser/parser.py b/Src/Lib/Hls/M3U8/lib_parser/parser.py deleted file mode 100644 index dd49f96..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/parser.py +++ /dev/null @@ -1,338 +0,0 @@ -# 15.04.24 - -import re -import logging -import datetime - - -# Internal utilities -from ..lib_parser import protocol -from ._util import ( - remove_quotes, - remove_quotes_parser, - normalize_attribute -) - - -# External utilities -from Src.Util._jsonConfig import config_manager - - -# Variable -REMOVE_EMPTY_ROW = config_manager.get_bool('M3U8_PARSER', 'skip_empty_row_playlist') -ATTRIBUTELISTPATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''') - - -def parse(content): - """ - Given an M3U8 playlist content, parses the content and extracts metadata. - - Args: - content (str): The M3U8 playlist content. - - Returns: - dict: A dictionary containing the parsed metadata. - """ - - # Initialize data dictionary with default values - data = { - 'is_variant': False, - 'is_endlist': False, - 'is_i_frames_only': False, - 'playlist_type': None, - 'playlists': [], - 'iframe_playlists': [], - 'segments': [], - 'media': [], - } - - # Initialize state dictionary for tracking parsing state - state = { - 'expect_segment': False, - 'expect_playlist': False, - } - - # Iterate over lines in the content - content = content.split("\n") - content_length = len(content) - i = 0 - - while i < content_length: - line = content[i] - line_stripped = line.strip() - is_end = i + 1 == content_length - 2 - - if REMOVE_EMPTY_ROW: - if i < content_length - 2: - actual_row = extract_params(line_stripped) - next_row = extract_params(content[i + 2].strip()) - - if actual_row is not None and next_row is None and not is_end: - logging.info(f"Skip row: {line_stripped}") - i += 1 - continue - - i += 1 - - if line.startswith(protocol.ext_x_byterange): - _parse_byterange(line, state) - state['expect_segment'] = True - - elif state['expect_segment']: - _parse_ts_chunk(line, data, state) - state['expect_segment'] = False - - elif state['expect_playlist']: - _parse_variant_playlist(line, data, state) - state['expect_playlist'] = False - - elif line.startswith(protocol.ext_x_targetduration): - _parse_simple_parameter(line, data, float) - elif line.startswith(protocol.ext_x_media_sequence): - _parse_simple_parameter(line, data, int) - elif line.startswith(protocol.ext_x_discontinuity): - state['discontinuity'] = True - elif line.startswith(protocol.ext_x_version): - _parse_simple_parameter(line, data) - elif line.startswith(protocol.ext_x_allow_cache): - _parse_simple_parameter(line, data) - - elif line.startswith(protocol.ext_x_key): - state['current_key'] = _parse_key(line) - data['key'] = data.get('key', state['current_key']) - - elif line.startswith(protocol.extinf): - _parse_extinf(line, data, state) - state['expect_segment'] = True - - elif line.startswith(protocol.ext_x_stream_inf): - state['expect_playlist'] = True - _parse_stream_inf(line, data, state) - - elif line.startswith(protocol.ext_x_i_frame_stream_inf): - _parse_i_frame_stream_inf(line, data) - - elif line.startswith(protocol.ext_x_media): - _parse_media(line, data, state) - - elif line.startswith(protocol.ext_x_playlist_type): - _parse_simple_parameter(line, data) - - elif line.startswith(protocol.ext_i_frames_only): - data['is_i_frames_only'] = True - - elif line.startswith(protocol.ext_x_endlist): - data['is_endlist'] = True - - return data - - -def extract_params(line): - """ - Extracts parameters from a formatted input string. - - Args: - - line (str): The string containing the parameters to extract. - - Returns: - dict or None: A dictionary containing the extracted parameters with their respective values. - """ - params = {} - matches = re.findall(r'([A-Z\-]+)=("[^"]*"|[^",\s]*)', line) - if not matches: - return None - for match in matches: - param, value = match - params[param] = value.strip('"') - return params - -def _parse_key(line): - """ - Parses the #EXT-X-KEY line and extracts key attributes. - - Args: - - line (str): The #EXT-X-KEY line from the playlist. - - Returns: - dict: A dictionary containing the key attributes. - """ - params = ATTRIBUTELISTPATTERN.split(line.replace(protocol.ext_x_key + ':', ''))[1::2] - key = {} - for param in params: - name, value = param.split('=', 1) - key[normalize_attribute(name)] = remove_quotes(value) - return key - -def _parse_extinf(line, data, state): - """ - Parses the #EXTINF line and extracts segment duration and title. - - Args: - - line (str): The #EXTINF line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - duration, title = line.replace(protocol.extinf + ':', '').split(',') - state['segment'] = {'duration': float(duration), 'title': remove_quotes(title)} - -def _parse_ts_chunk(line, data, state): - """ - Parses a segment URI line and adds it to the segment list. - - Args: - line (str): The segment URI line from the playlist. - data (dict): The dictionary to store the parsed data. - state (dict): The parsing state. - """ - segment = state.pop('segment') - if state.get('current_program_date_time'): - segment['program_date_time'] = state['current_program_date_time'] - state['current_program_date_time'] += datetime.timedelta(seconds=segment['duration']) - segment['uri'] = line - segment['discontinuity'] = state.pop('discontinuity', False) - if state.get('current_key'): - segment['key'] = state['current_key'] - data['segments'].append(segment) - -def _parse_attribute_list(prefix, line, atribute_parser): - """ - Parses a line containing a list of attributes and their values. - - Args: - - prefix (str): The prefix to identify the line. - - line (str): The line containing the attributes. - - atribute_parser (dict): A dictionary mapping attribute names to parsing functions. - - Returns: - dict: A dictionary containing the parsed attributes. - """ - params = ATTRIBUTELISTPATTERN.split(line.replace(prefix + ':', ''))[1::2] - - attributes = {} - for param in params: - name, value = param.split('=', 1) - name = normalize_attribute(name) - - if name in atribute_parser: - value = atribute_parser[name](value) - - attributes[name] = value - - return attributes - -def _parse_stream_inf(line, data, state): - """ - Parses the #EXT-X-STREAM-INF line and extracts stream information. - - Args: - - line (str): The #EXT-X-STREAM-INF line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - data['is_variant'] = True - atribute_parser = remove_quotes_parser('codecs', 'audio', 'video', 'subtitles') - atribute_parser["program_id"] = int - atribute_parser["bandwidth"] = int - state['stream_info'] = _parse_attribute_list(protocol.ext_x_stream_inf, line, atribute_parser) - -def _parse_i_frame_stream_inf(line, data): - """ - Parses the #EXT-X-I-FRAME-STREAM-INF line and extracts I-frame stream information. - - Args: - - line (str): The #EXT-X-I-FRAME-STREAM-INF line from the playlist. - - data (dict): The dictionary to store the parsed data. - """ - atribute_parser = remove_quotes_parser('codecs', 'uri') - atribute_parser["program_id"] = int - atribute_parser["bandwidth"] = int - iframe_stream_info = _parse_attribute_list(protocol.ext_x_i_frame_stream_inf, line, atribute_parser) - iframe_playlist = {'uri': iframe_stream_info.pop('uri'), - 'iframe_stream_info': iframe_stream_info} - - data['iframe_playlists'].append(iframe_playlist) - -def _parse_media(line, data, state): - """ - Parses the #EXT-X-MEDIA line and extracts media attributes. - - Args: - - line (str): The #EXT-X-MEDIA line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - quoted = remove_quotes_parser('uri', 'group_id', 'language', 'name', 'characteristics') - media = _parse_attribute_list(protocol.ext_x_media, line, quoted) - data['media'].append(media) - -def _parse_variant_playlist(line, data, state): - """ - Parses a variant playlist line and extracts playlist information. - - Args: - - line (str): The variant playlist line from the playlist. - - data (dict): The dictionary to store the parsed data. - - state (dict): The parsing state. - """ - playlist = {'uri': line, 'stream_info': state.pop('stream_info')} - - data['playlists'].append(playlist) - -def _parse_byterange(line, state): - """ - Parses the #EXT-X-BYTERANGE line and extracts byte range information. - - Args: - - line (str): The #EXT-X-BYTERANGE line from the playlist. - - state (dict): The parsing state. - """ - state['segment']['byterange'] = line.replace(protocol.ext_x_byterange + ':', '') - -def _parse_simple_parameter_raw_value(line, cast_to=str, normalize=False): - """ - Parses a line containing a simple parameter and its value. - - Args: - - line (str): The line containing the parameter and its value. - - cast_to (type): The type to which the value should be cast. - - normalize (bool): Whether to normalize the parameter name. - - Returns: - tuple: A tuple containing the parameter name and its value. - """ - param, value = line.split(':', 1) - param = normalize_attribute(param.replace('#EXT-X-', '')) - if normalize: - value = normalize_attribute(value) - return param, cast_to(value) - -def _parse_and_set_simple_parameter_raw_value(line, data, cast_to=str, normalize=False): - """ - Parses a line containing a simple parameter and its value, and sets it in the data dictionary. - - Args: - - line (str): The line containing the parameter and its value. - - data (dict): The dictionary to store the parsed data. - - cast_to (type): The type to which the value should be cast. - - normalize (bool): Whether to normalize the parameter name. - - Returns: - The parsed value. - """ - param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize) - data[param] = value - return data[param] - -def _parse_simple_parameter(line, data, cast_to=str): - """ - Parses a line containing a simple parameter and its value, and sets it in the data dictionary. - - Args: - line (str): The line containing the parameter and its value. - data (dict): The dictionary to store the parsed data. - cast_to (type): The type to which the value should be cast. - - Returns: - The parsed value. - """ - return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/lib_parser/protocol.py b/Src/Lib/Hls/M3U8/lib_parser/protocol.py deleted file mode 100644 index 7fcf5a5..0000000 --- a/Src/Lib/Hls/M3U8/lib_parser/protocol.py +++ /dev/null @@ -1,17 +0,0 @@ -# 15.04.24 - -ext_x_targetduration = '#EXT-X-TARGETDURATION' -ext_x_media_sequence = '#EXT-X-MEDIA-SEQUENCE' -ext_x_program_date_time = '#EXT-X-PROGRAM-DATE-TIME' -ext_x_media = '#EXT-X-MEDIA' -ext_x_playlist_type = '#EXT-X-PLAYLIST-TYPE' -ext_x_key = '#EXT-X-KEY' -ext_x_stream_inf = '#EXT-X-STREAM-INF' -ext_x_version = '#EXT-X-VERSION' -ext_x_allow_cache = '#EXT-X-ALLOW-CACHE' -ext_x_endlist = '#EXT-X-ENDLIST' -extinf = '#EXTINF' -ext_i_frames_only = '#EXT-X-I-FRAMES-ONLY' -ext_x_byterange = '#EXT-X-BYTERANGE' -ext_x_i_frame_stream_inf = '#EXT-X-I-FRAME-STREAM-INF' -ext_x_discontinuity = '#EXT-X-DISCONTINUITY' diff --git a/Src/Lib/Hls/M3U8/math_calc.py b/Src/Lib/Hls/M3U8/math_calc.py deleted file mode 100644 index 46aaccb..0000000 --- a/Src/Lib/Hls/M3U8/math_calc.py +++ /dev/null @@ -1,41 +0,0 @@ -# 20.02.24 - -# Internal utilities -from Src.Util.os import format_size - - -class M3U8_Ts_Files: - def __init__(self): - """ - Initialize the TSFileSizeCalculator object. - - Args: - - num_segments (int): The number of segments. - """ - self.ts_file_sizes = [] - - def add_ts_file_size(self, size: int): - """ - Add a file size to the list of file sizes. - - Args: - - size (float): The size of the ts file to be added. - """ - self.ts_file_sizes.append(size) - - def calculate_total_size(self): - """ - Calculate the total size of the files. - - Returns: - float: The mean size of the files in a human-readable format. - """ - - if len(self.ts_file_sizes) == 0: - return 0 - - total_size = sum(self.ts_file_sizes) - mean_size = total_size / len(self.ts_file_sizes) - - # Return format mean - return format_size(mean_size) \ No newline at end of file diff --git a/Src/Lib/Hls/M3U8/parser.py b/Src/Lib/Hls/M3U8/parser.py deleted file mode 100644 index b387891..0000000 --- a/Src/Lib/Hls/M3U8/parser.py +++ /dev/null @@ -1,547 +0,0 @@ -# 20.04.25 - -import logging - - -# Internal utilities -from .lib_parser import load - - -# External libraries -from Src.Lib.Request.my_requests import requests - - -# Costant -CODEC_MAPPINGS = { - "video": { - "avc1": "libx264", - "avc2": "libx264", - "avc3": "libx264", - "avc4": "libx264", - "hev1": "libx265", - "hev2": "libx265", - "hvc1": "libx265", - "hvc2": "libx265", - "vp8": "libvpx", - "vp9": "libvpx-vp9", - "vp10": "libvpx-vp9" - }, - "audio": { - "mp4a": "aac", - "mp3": "libmp3lame", - "ac-3": "ac3", - "ec-3": "eac3", - "opus": "libopus", - "vorbis": "libvorbis" - } -} - -RESOLUTIONS = [ - (7680, 4320), - (3840, 2160), - (2560, 1440), - (1920, 1080), - (1280, 720), - (640, 480) -] - - - -class M3U8_Codec: - """ - Represents codec information for an M3U8 playlist. - """ - - def __init__(self, bandwidth, resolution, codecs): - """ - Initializes the M3U8Codec object with the provided parameters. - - Args: - - bandwidth (int): Bandwidth of the codec. - - resolution (str): Resolution of the codec. - - codecs (str): Codecs information in the format "avc1.xxxxxx,mp4a.xx". - """ - self.bandwidth = bandwidth - self.resolution = resolution - self.codecs = codecs - self.audio_codec = None - self.video_codec = None - self.extract_codecs() - self.parse_codecs() - - def extract_codecs(self): - """ - Parses the codecs information to extract audio and video codecs. - Extracted codecs are set as attributes: audio_codec and video_codec. - """ - - # Split the codecs string by comma - codecs_list = self.codecs.split(',') - - # Separate audio and video codecs - for codec in codecs_list: - if codec.startswith('avc'): - self.video_codec = codec - elif codec.startswith('mp4a'): - self.audio_codec = codec - - def convert_video_codec(self, video_codec_identifier) -> str: - - """ - Convert video codec identifier to codec name. - - Args: - - video_codec_identifier (str): Identifier of the video codec. - - Returns: - str: Codec name corresponding to the identifier. - """ - - # Extract codec type from the identifier - codec_type = video_codec_identifier.split('.')[0] - - # Retrieve codec mapping from the provided mappings or fallback to static mappings - video_codec_mapping = CODEC_MAPPINGS.get('video', {}) - codec_name = video_codec_mapping.get(codec_type) - - if codec_name: - return codec_name - - else: - logging.warning(f"No corresponding video codec found for {video_codec_identifier}. Using default codec libx264.") - return "libx264" # Default - - def convert_audio_codec(self, audio_codec_identifier) -> str: - - """ - Convert audio codec identifier to codec name. - - Args: - - audio_codec_identifier (str): Identifier of the audio codec. - - Returns: - str: Codec name corresponding to the identifier. - """ - - # Extract codec type from the identifier - codec_type = audio_codec_identifier.split('.')[0] - - # Retrieve codec mapping from the provided mappings or fallback to static mappings - audio_codec_mapping = CODEC_MAPPINGS.get('audio', {}) - codec_name = audio_codec_mapping.get(codec_type) - - if codec_name: - return codec_name - - else: - logging.warning(f"No corresponding audio codec found for {audio_codec_identifier}. Using default codec aac.") - return "aac" # Default - - def parse_codecs(self): - """ - Parse video and audio codecs. - This method updates `video_codec_name` and `audio_codec_name` attributes. - """ - - self.video_codec_name = self.convert_video_codec(self.video_codec) - self.audio_codec_name = self.convert_audio_codec(self.audio_codec) - - def __str__(self): - """ - Returns a string representation of the M3U8Codec object. - """ - return f"BANDWIDTH={self.bandwidth},RESOLUTION={self.resolution},CODECS=\"{self.codecs}\"" - - -class M3U8_Video: - def __init__(self, video_playlist) -> None: - """ - Initializes an M3U8_Video object with the provided video playlist. - - Args: - - video_playlist (M3U8): An M3U8 object representing the video playlist. - """ - self.video_playlist = video_playlist - - def get_best_uri(self): - """ - Returns the URI with the highest resolution from the video playlist. - - Returns: - tuple or None: A tuple containing the URI with the highest resolution and its resolution value, or None if the video list is empty. - """ - if not self.video_playlist: - return None - - best_uri = max(self.video_playlist, key=lambda x: x['resolution']) - return best_uri['uri'], best_uri['resolution'] - - def get_worst_uri(self): - """ - Returns the URI with the lowest resolution from the video playlist. - - Returns: - - tuple or None: A tuple containing the URI with the lowest resolution and its resolution value, or None if the video list is empty. - """ - if not self.video_playlist: - return None - - worst_uri = min(self.video_playlist, key=lambda x: x['resolution']) - return worst_uri['uri'], worst_uri['resolution'] - - def get_custom_uri(self, y_resolution): - """ - Returns the URI corresponding to a custom resolution from the video list. - - Args: - - video_list (list): A list of dictionaries containing video URIs and resolutions. - - custom_resolution (tuple): A tuple representing the custom resolution. - - Returns: - str or None: The URI corresponding to the custom resolution, or None if not found. - """ - for video in self.video_playlist: - logging.info(f"Check resolution from playlist: {int(video['resolution'][1])}, with input: {int(y_resolution)}") - - if int(video['resolution'][1]) == int(y_resolution): - return video['uri'], video['resolution'] - - return None, None - - def get_list_resolution(self): - """ - Retrieve a list of resolutions from the video playlist. - - Returns: - list: A list of resolutions extracted from the video playlist. - """ - return [video['resolution'] for video in self.video_playlist] - - -class M3U8_Audio: - def __init__(self, audio_playlist) -> None: - """ - Initializes an M3U8_Audio object with the provided audio playlist. - - Args: - - audio_playlist (M3U8): An M3U8 object representing the audio playlist. - """ - self.audio_playlist = audio_playlist - - def get_uri_by_language(self, language): - """ - Returns a dictionary with 'name' and 'uri' given a specific language. - - Args: - - audio_list (list): List of dictionaries containing audio information. - - language (str): The desired language. - - Returns: - dict or None: Dictionary with 'name', 'language', and 'uri' for the specified language, or None if not found. - """ - for audio in self.audio_playlist: - if audio['language'] == language: - return {'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} - return None - - def get_all_uris_and_names(self): - """ - Returns a list of dictionaries containing all URIs and names. - - Args: - - audio_list (list): List of dictionaries containing audio information. - - Returns: - list: List of dictionaries containing 'name', 'language', and 'uri' for all audio in the list. - """ - return [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist] - - def get_default_uri(self): - """ - Returns the dictionary with 'default' equal to 'YES'. - - Args: - - audio_list (list): List of dictionaries containing audio information. - - Returns: - dict or None: Dictionary with 'default' equal to 'YES', or None if not found. - """ - for audio in self.audio_playlist: - if audio['default'] == 'YES': - return audio.get('uri') - return None - - -class M3U8_Subtitle: - def __init__(self, subtitle_playlist) -> None: - """ - Initializes an M3U8_Subtitle object with the provided subtitle playlist. - - Args: - - subtitle_playlist (M3U8): An M3U8 object representing the subtitle playlist. - """ - self.subtitle_playlist = subtitle_playlist - - def get_uri_by_language(self, language): - """ - Returns a dictionary with 'name' and 'uri' given a specific language for subtitles. - - Args: - - subtitle_list (list): List of dictionaries containing subtitle information. - - language (str): The desired language. - - Returns: - dict or None: Dictionary with 'name' and 'uri' for the specified language for subtitles, or None if not found. - """ - for subtitle in self.subtitle_playlist: - if subtitle['language'] == language: - return {'name': subtitle['name'], 'uri': subtitle['uri']} - return None - - def get_all_uris_and_names(self): - """ - Returns a list of dictionaries containing all URIs and names of subtitles. - - Args: - - subtitle_list (list): List of dictionaries containing subtitle information. - - Returns: - list: List of dictionaries containing 'name' and 'uri' for all subtitles in the list. - """ - return [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist] - - def get_default_uri(self): - """ - Returns the dictionary with 'default' equal to 'YES' for subtitles. - - Args: - - subtitle_list (list): List of dictionaries containing subtitle information. - - Returns: - dict or None: Dictionary with 'default' equal to 'YES' for subtitles, or None if not found. - """ - for subtitle in self.subtitle_playlist: - if subtitle['default'] == 'YES': - return subtitle - return None - - def download_all(self, custom_subtitle): - """ - Download all subtitles listed in the object's attributes, filtering based on a provided list of custom subtitles. - - Args: - - custom_subtitle (list): A list of custom subtitles to download. - - Returns: - list: A list containing dictionaries with subtitle information including name, language, and URI. - """ - - output = [] # Initialize an empty list to store subtitle information - - # Iterate through all available subtitles - for obj_subtitle in self.subtitle_get_all_uris_and_names(): - - # Check if the subtitle name is not in the list of custom subtitles, and skip if not found - if obj_subtitle.get('name') not in custom_subtitle: - continue - - # Send a request to retrieve the subtitle content - logging.info(f"Download subtitle: {obj_subtitle.get('name')}") - response_subitle = requests.get(obj_subtitle.get('uri')) - - try: - # Try to extract the VTT URL from the subtitle content - sub_parse = M3U8_Parser() - sub_parse.parse_data(obj_subtitle.get('uri'), response_subitle.text) - url_subititle = sub_parse.subtitle[0] - - output.append({ - 'name': obj_subtitle.get('name'), - 'language': obj_subtitle.get('language'), - 'uri': url_subititle - }) - - except Exception as e: - logging.error(f"Cant donwload: {obj_subtitle.get('name')}, error: {e}") - - return output - - -class M3U8_Parser: - def __init__(self): - self.segments = [] - self.video_playlist = [] - self.keys = None - self.subtitle_playlist = [] - self.subtitle = [] - self.audio_playlist = [] - self.codec: M3U8_Codec = None - self._video: M3U8_Video = None - self._audio: M3U8_Audio = None - self._subtitle: M3U8_Subtitle = None - - self.__create_variable__() - - def parse_data(self, uri, raw_content) -> None: - """ - Extracts all information present in the provided M3U8 content. - - Args: - - m3u8_content (str): The content of the M3U8 file. - """ - - - # Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles - m3u8_obj = load(raw_content, uri) - - self.__parse_video_info__(m3u8_obj) - self.__parse_encryption_keys__(m3u8_obj) - self.__parse_subtitles_and_audio__(m3u8_obj) - self.__parse_segments__(m3u8_obj) - - @staticmethod - def extract_resolution(uri: str) -> int: - """ - Extracts the video resolution from the given URI. - - Args: - - uri (str): The URI containing video information. - - Returns: - int: The video resolution if found, otherwise 0. - """ - - # Log - logging.info(f"Try extract resolution from: {uri}") - - for resolution in RESOLUTIONS: - if "http" in str(uri): - if str(resolution[1]) in uri: - return resolution - - # Default resolution return (not best) - logging.error("No resolution found with custom parsing.") - logging.warning("Try set remove duplicate line to TRUE.") - return (0, 0) - - def __parse_video_info__(self, m3u8_obj) -> None: - """ - Extracts video information from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing video playlists. - """ - - try: - for playlist in m3u8_obj.playlists: - - # Direct access resolutions in m3u8 obj - if playlist.stream_info.resolution is not None: - - self.video_playlist.append({ - "uri": playlist.uri, - "resolution": playlist.stream_info.resolution - }) - - # Find resolutions in uri - else: - - self.video_playlist.append({ - "uri": playlist.uri, - "resolution": M3U8_Parser.extract_resolution(playlist.uri) - }) - - # Dont stop - continue - - # Check if all key is present to create codec - try: - self.codec = M3U8_Codec( - playlist.stream_info.bandwidth, - playlist.stream_info.resolution, - playlist.stream_info.codecs - ) - except: - logging.error(f"Error parsing codec: {e}") - - except Exception as e: - logging.error(f"Error parsing video info: {e}") - - def __parse_encryption_keys__(self, m3u8_obj) -> None: - """ - Extracts encryption keys from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing encryption keys. - """ - try: - - if m3u8_obj.key is not None: - if self.keys is None: - self.keys = { - 'method': m3u8_obj.key.method, - 'iv': m3u8_obj.key.iv, - 'uri': m3u8_obj.key.uri - } - - - except Exception as e: - logging.error(f"Error parsing encryption keys: {e}") - pass - - def __parse_subtitles_and_audio__(self, m3u8_obj) -> None: - """ - Extracts subtitles and audio information from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing subtitles and audio data. - """ - try: - for media in m3u8_obj.media: - if media.type == "SUBTITLES": - self.subtitle_playlist.append({ - "type": media.type, - "name": media.name, - "default": media.default, - "language": media.language, - "uri": media.uri - }) - - if media.type == "AUDIO": - self.audio_playlist.append({ - "type": media.type, - "name": media.name, - "default": media.default, - "language": media.language, - "uri": media.uri - }) - - except Exception as e: - logging.error(f"Error parsing subtitles and audio: {e}") - - def __parse_segments__(self, m3u8_obj) -> None: - """ - Extracts segment information from the M3U8 object. - - Args: - - m3u8_obj: The M3U8 object containing segment data. - """ - - try: - for segment in m3u8_obj.segments: - if "vtt" not in segment.uri: - self.segments.append(segment.uri) - else: - self.subtitle.append(segment.uri) - - except Exception as e: - logging.error(f"Error parsing segments: {e}") - - def __create_variable__(self): - """ - Initialize variables for video, audio, and subtitle playlists. - """ - - self._video = M3U8_Video(self.video_playlist) - self._audio = M3U8_Audio(self.audio_playlist) - self._subtitle = M3U8_Subtitle(self.subtitle_playlist) diff --git a/Src/Lib/Hls/M3U8/url_fix.py b/Src/Lib/Hls/M3U8/url_fix.py deleted file mode 100644 index 17f6f7d..0000000 --- a/Src/Lib/Hls/M3U8/url_fix.py +++ /dev/null @@ -1,54 +0,0 @@ -# 20.03.24 - -import logging -from urllib.parse import urlparse, urljoin - - -class M3U8_UrlFix: - def __init__(self, url: str = None) -> None: - """ - Initializes an M3U8_UrlFix object with the provided playlist URL. - - Args: - - url (str, optional): The URL of the playlist. Defaults to None. - """ - self.url_playlist: str = url - - def set_playlist(self, url: str) -> None: - """ - Set the M3U8 playlist URL. - - Args: - - url (str): The M3U8 playlist URL. - """ - self.url_playlist = url - - def generate_full_url(self, url_resource: str) -> str: - """ - Generate a full URL for a given resource using the base URL from the playlist. - - Args: - - url_resource (str): The relative URL of the resource within the playlist. - - Returns: - str: The full URL for the specified resource. - """ - - # Check if m3u8 url playlist is present - if self.url_playlist == None: - logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present") - raise - - # Parse the playlist URL to extract the base URL components - parsed_playlist_url = urlparse(self.url_playlist) - - # Construct the base URL using the scheme, netloc, and path from the playlist URL - base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}" - - # Join the base URL with the relative resource URL to get the full URL - full_url = urljoin(base_url, url_resource) - - return full_url - -# Output -m3u8_url_fix = M3U8_UrlFix() \ No newline at end of file