mirror of
https://github.com/Arrowar/StreamingCommunity.git
synced 2025-06-06 11:35:29 +00:00
Delete Src/Lib/Hls/M3U8 directory
This commit is contained in:
parent
762970c0ed
commit
e576a200da
@ -1,6 +0,0 @@
|
||||
# 02.04.24
|
||||
|
||||
from .decryption import M3U8_Decryption
|
||||
from .math_calc import M3U8_Ts_Files
|
||||
from .parser import M3U8_Parser, M3U8_Codec
|
||||
from .url_fix import m3u8_url_fix
|
@ -1,127 +0,0 @@
|
||||
# 03.04.24
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import subprocess
|
||||
import importlib.util
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.console import console
|
||||
|
||||
|
||||
# Check if Crypto module is installed
|
||||
crypto_spec = importlib.util.find_spec("Crypto")
|
||||
crypto_installed = crypto_spec is not None
|
||||
|
||||
|
||||
if crypto_installed:
|
||||
logging.info("Decrypy use: Crypto")
|
||||
from Crypto.Cipher import AES # type: ignore
|
||||
from Crypto.Util.Padding import unpad # type: ignore
|
||||
|
||||
class M3U8_Decryption:
|
||||
"""
|
||||
Class for decrypting M3U8 playlist content using AES encryption when the Crypto module is available.
|
||||
"""
|
||||
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
|
||||
"""
|
||||
Initialize the M3U8_Decryption object.
|
||||
|
||||
Args:
|
||||
- key (bytes): The encryption key.
|
||||
- iv (bytes): The initialization vector (IV).
|
||||
- method (str): The encryption method.
|
||||
"""
|
||||
self.key = key
|
||||
if "0x" in str(iv):
|
||||
self.iv = bytes.fromhex(iv.replace("0x", ""))
|
||||
else:
|
||||
self.iv = iv
|
||||
self.method = method
|
||||
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
|
||||
|
||||
def decrypt(self, ciphertext: bytes) -> bytes:
|
||||
"""
|
||||
Decrypt the ciphertext using the specified encryption method.
|
||||
|
||||
Args:
|
||||
- ciphertext (bytes): The encrypted content to decrypt.
|
||||
|
||||
Returns:
|
||||
bytes: The decrypted content.
|
||||
"""
|
||||
if self.method == "AES":
|
||||
cipher = AES.new(self.key, AES.MODE_ECB)
|
||||
decrypted_data = cipher.decrypt(ciphertext)
|
||||
return unpad(decrypted_data, AES.block_size)
|
||||
|
||||
elif self.method == "AES-128":
|
||||
cipher = AES.new(self.key[:16], AES.MODE_CBC, iv=self.iv)
|
||||
decrypted_data = cipher.decrypt(ciphertext)
|
||||
return unpad(decrypted_data, AES.block_size)
|
||||
|
||||
elif self.method == "AES-128-CTR":
|
||||
cipher = AES.new(self.key[:16], AES.MODE_CTR, nonce=self.iv)
|
||||
return cipher.decrypt(ciphertext)
|
||||
|
||||
else:
|
||||
raise ValueError("Invalid or unsupported method")
|
||||
|
||||
else:
|
||||
|
||||
# Check if openssl command is available
|
||||
openssl_available = subprocess.run(["openssl", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
|
||||
logging.info("Decrypy use: OPENSSL")
|
||||
|
||||
if not openssl_available:
|
||||
console.log("[red]Neither Crypto nor openssl is installed. Please install either one of them.")
|
||||
sys.exit(0)
|
||||
|
||||
class M3U8_Decryption:
|
||||
"""
|
||||
Class for decrypting M3U8 playlist content using OpenSSL when the Crypto module is not available.
|
||||
"""
|
||||
def __init__(self, key: bytes, iv: bytes, method: str) -> None:
|
||||
"""
|
||||
Initialize the M3U8_Decryption object.
|
||||
|
||||
Args:
|
||||
- key (bytes): The encryption key.
|
||||
- iv (bytes): The initialization vector (IV).
|
||||
- method (str): The encryption method.
|
||||
"""
|
||||
self.key = key
|
||||
if "0x" in str(iv):
|
||||
self.iv = bytes.fromhex(iv.replace("0x", ""))
|
||||
else:
|
||||
self.iv = iv
|
||||
self.method = method
|
||||
logging.info(f"Decrypt add: ('key': {self.key}, 'iv': {self.iv}, 'method': {self.method})")
|
||||
|
||||
def decrypt(self, ciphertext: bytes) -> bytes:
|
||||
"""
|
||||
Decrypt the ciphertext using the specified encryption method.
|
||||
|
||||
Args:
|
||||
- ciphertext (bytes): The encrypted content to decrypt.
|
||||
|
||||
Returns:
|
||||
bytes: The decrypted content.
|
||||
"""
|
||||
if self.method == "AES":
|
||||
openssl_cmd = f'openssl enc -d -aes-256-ecb -K {self.key.hex()} -nosalt'
|
||||
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext)
|
||||
|
||||
elif self.method == "AES-128":
|
||||
openssl_cmd = f'openssl enc -d -aes-128-cbc -K {self.key[:16].hex()} -iv {self.iv.hex()}'
|
||||
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext)
|
||||
|
||||
elif self.method == "AES-128-CTR":
|
||||
openssl_cmd = f'openssl enc -d -aes-128-ctr -K {self.key[:16].hex()} -iv {self.iv.hex()}'
|
||||
decrypted_data = subprocess.check_output(openssl_cmd.split(), input=ciphertext)
|
||||
|
||||
else:
|
||||
raise ValueError("Invalid or unsupported method")
|
||||
|
||||
return decrypted_data
|
@ -1,38 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
import os
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from .model import M3U8
|
||||
|
||||
|
||||
def load(raw_content, uri):
|
||||
"""
|
||||
Parses the content of an M3U8 playlist and returns an M3U8 object.
|
||||
|
||||
Args:
|
||||
raw_content (str): The content of the M3U8 playlist as a string.
|
||||
uri (str): The URI of the M3U8 playlist file or stream.
|
||||
|
||||
Returns:
|
||||
M3U8: An object representing the parsed M3U8 playlist.
|
||||
|
||||
Raises:
|
||||
IOError: If the raw_content is empty or if the URI cannot be accessed.
|
||||
ValueError: If the raw_content is not a valid M3U8 playlist format.
|
||||
|
||||
Example:
|
||||
>>> m3u8_content = "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:10\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:10.0,\nhttp://example.com/segment0.ts\n#EXTINF:10.0,\nhttp://example.com/segment1.ts\n"
|
||||
>>> uri = "http://example.com/playlist.m3u8"
|
||||
>>> playlist = load(m3u8_content, uri)
|
||||
"""
|
||||
|
||||
if not raw_content:
|
||||
raise IOError("Empty content provided.")
|
||||
|
||||
if not uri:
|
||||
raise IOError("Empty URI provided.")
|
||||
|
||||
base_uri = os.path.dirname(uri)
|
||||
return M3U8(raw_content, base_uri=base_uri)
|
@ -1,28 +0,0 @@
|
||||
# 19.04.24
|
||||
|
||||
import itertools
|
||||
|
||||
|
||||
def remove_quotes_parser(*attrs):
|
||||
"""
|
||||
Returns a dictionary mapping attribute names to a function that removes quotes from their values.
|
||||
"""
|
||||
return dict(zip(attrs, itertools.repeat(remove_quotes)))
|
||||
|
||||
|
||||
def remove_quotes(string):
|
||||
"""
|
||||
Removes quotes from a string.
|
||||
"""
|
||||
quotes = ('"', "'")
|
||||
if string and string[0] in quotes and string[-1] in quotes:
|
||||
return string[1:-1]
|
||||
return string
|
||||
|
||||
|
||||
def normalize_attribute(attribute):
|
||||
"""
|
||||
Normalizes an attribute name by converting hyphens to underscores and converting to lowercase.
|
||||
"""
|
||||
return attribute.replace('-', '_').lower().strip()
|
||||
|
@ -1,359 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
import os
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from ..lib_parser import parser
|
||||
|
||||
|
||||
# Variable
|
||||
StreamInfo = namedtuple('StreamInfo', ['bandwidth', 'program_id', 'resolution', 'codecs'])
|
||||
Media = namedtuple('Media', ['uri', 'type', 'group_id', 'language', 'name','default', 'autoselect', 'forced', 'characteristics'])
|
||||
|
||||
|
||||
|
||||
class M3U8:
|
||||
"""
|
||||
Represents a single M3U8 playlist. Should be instantiated with the content as string.
|
||||
|
||||
Args:
|
||||
- content: the m3u8 content as string
|
||||
- base_path: all urls (key and segments url) will be updated with this base_path,
|
||||
ex: base_path = "http://videoserver.com/hls"
|
||||
- base_uri: uri the playlist comes from. it is propagated to SegmentList and Key
|
||||
ex: http://example.com/path/to
|
||||
|
||||
Attribute:
|
||||
- key: it's a `Key` object, the EXT-X-KEY from m3u8. Or None
|
||||
- segments: a `SegmentList` object, represents the list of `Segment`s from this playlist
|
||||
- is_variant: Returns true if this M3U8 is a variant playlist, with links to other M3U8s with different bitrates.
|
||||
If true, `playlists` is a list of the playlists available, and `iframe_playlists` is a list of the i-frame playlists available.
|
||||
- is_endlist: Returns true if EXT-X-ENDLIST tag present in M3U8.
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8
|
||||
- playlists: If this is a variant playlist (`is_variant` is True), returns a list of Playlist objects
|
||||
- iframe_playlists: If this is a variant playlist (`is_variant` is True), returns a list of IFramePlaylist objects
|
||||
- playlist_type: A lower-case string representing the type of the playlist, which can be one of VOD (video on demand) or EVENT.
|
||||
- media: If this is a variant playlist (`is_variant` is True), returns a list of Media objects
|
||||
- target_duration: Returns the EXT-X-TARGETDURATION as an integer
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2
|
||||
- media_sequence: Returns the EXT-X-MEDIA-SEQUENCE as an integer
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3
|
||||
- program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a string
|
||||
Info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
|
||||
- version: Return the EXT-X-VERSION as is
|
||||
- allow_cache: Return the EXT-X-ALLOW-CACHE as is
|
||||
- files: Returns an iterable with all files from playlist, in order. This includes segments and key uri, if present.
|
||||
- base_uri: It is a property (getter and setter) used by SegmentList and Key to have absolute URIs.
|
||||
- is_i_frames_only: Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8.
|
||||
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12
|
||||
|
||||
"""
|
||||
|
||||
# Mapping of simple attributes (obj attribute, parser attribute)
|
||||
SIMPLE_ATTRIBUTES = (
|
||||
('is_variant', 'is_variant'),
|
||||
('is_endlist', 'is_endlist'),
|
||||
('is_i_frames_only', 'is_i_frames_only'),
|
||||
('target_duration', 'targetduration'),
|
||||
('media_sequence', 'media_sequence'),
|
||||
('program_date_time', 'program_date_time'),
|
||||
('version', 'version'),
|
||||
('allow_cache', 'allow_cache'),
|
||||
('playlist_type', 'playlist_type')
|
||||
)
|
||||
|
||||
def __init__(self, content=None, base_path=None, base_uri=None):
|
||||
"""
|
||||
Initialize the M3U8 object.
|
||||
|
||||
Parameters:
|
||||
- content: M3U8 content (string).
|
||||
- base_path: Base path for relative URIs (string).
|
||||
- base_uri: Base URI for absolute URIs (string).
|
||||
"""
|
||||
if content is not None:
|
||||
self.data = parser.parse(content)
|
||||
else:
|
||||
self.data = {}
|
||||
self._base_uri = base_uri
|
||||
self.base_path = base_path
|
||||
self._initialize_attributes()
|
||||
|
||||
def _initialize_attributes(self):
|
||||
"""
|
||||
Initialize attributes based on parsed data.
|
||||
"""
|
||||
# Initialize key and segments
|
||||
self.key = Key(base_uri=self.base_uri, **self.data.get('key', {})) if 'key' in self.data else None
|
||||
self.segments = SegmentList([Segment(base_uri=self.base_uri, **params) for params in self.data.get('segments', [])])
|
||||
|
||||
# Initialize simple attributes
|
||||
for attr, param in self.SIMPLE_ATTRIBUTES:
|
||||
setattr(self, attr, self.data.get(param))
|
||||
|
||||
# Initialize files, media, playlists, and iframe_playlists
|
||||
self.files = []
|
||||
if self.key:
|
||||
self.files.append(self.key.uri)
|
||||
self.files.extend(self.segments.uri)
|
||||
|
||||
self.media = [Media(
|
||||
uri = media.get('uri'),
|
||||
type = media.get('type'),
|
||||
group_id = media.get('group_id'),
|
||||
language = media.get('language'),
|
||||
name = media.get('name'),
|
||||
default = media.get('default'),
|
||||
autoselect = media.get('autoselect'),
|
||||
forced = media.get('forced'),
|
||||
characteristics = media.get('characteristics'))
|
||||
for media in self.data.get('media', [])
|
||||
]
|
||||
self.playlists = PlaylistList([Playlist(
|
||||
base_uri = self.base_uri,
|
||||
media = self.media,
|
||||
**playlist
|
||||
)for playlist in self.data.get('playlists', [])
|
||||
])
|
||||
self.iframe_playlists = PlaylistList()
|
||||
for ifr_pl in self.data.get('iframe_playlists', []):
|
||||
self.iframe_playlists.append(
|
||||
IFramePlaylist(
|
||||
base_uri = self.base_uri,
|
||||
uri = ifr_pl['uri'],
|
||||
iframe_stream_info=ifr_pl['iframe_stream_info'])
|
||||
)
|
||||
|
||||
@property
|
||||
def base_uri(self):
|
||||
"""
|
||||
Get the base URI.
|
||||
"""
|
||||
return self._base_uri
|
||||
|
||||
@base_uri.setter
|
||||
def base_uri(self, new_base_uri):
|
||||
"""
|
||||
Set the base URI.
|
||||
"""
|
||||
self._base_uri = new_base_uri
|
||||
self.segments.base_uri = new_base_uri
|
||||
|
||||
|
||||
class BasePathMixin:
|
||||
"""
|
||||
Mixin class for managing base paths.
|
||||
"""
|
||||
@property
|
||||
def base_path(self):
|
||||
"""
|
||||
Get the base path.
|
||||
"""
|
||||
return os.path.dirname(self.uri)
|
||||
|
||||
@base_path.setter
|
||||
def base_path(self, newbase_path):
|
||||
"""
|
||||
Set the base path.
|
||||
"""
|
||||
if not self.base_path:
|
||||
self.uri = "%s/%s" % (newbase_path, self.uri)
|
||||
self.uri = self.uri.replace(self.base_path, newbase_path)
|
||||
|
||||
|
||||
class GroupedBasePathMixin:
|
||||
"""
|
||||
Mixin class for managing base paths across a group of items.
|
||||
"""
|
||||
|
||||
def _set_base_uri(self, new_base_uri):
|
||||
"""
|
||||
Set the base URI for each item in the group.
|
||||
"""
|
||||
for item in self:
|
||||
item.base_uri = new_base_uri
|
||||
|
||||
base_uri = property(None, _set_base_uri)
|
||||
|
||||
def _set_base_path(self, new_base_path):
|
||||
"""
|
||||
Set the base path for each item in the group.
|
||||
"""
|
||||
for item in self:
|
||||
item.base_path = new_base_path
|
||||
|
||||
base_path = property(None, _set_base_path)
|
||||
|
||||
|
||||
class Segment(BasePathMixin):
|
||||
"""
|
||||
Class representing a segment in an M3U8 playlist.
|
||||
Inherits from BasePathMixin for managing base paths.
|
||||
"""
|
||||
|
||||
def __init__(self, uri, base_uri, program_date_time=None, duration=None,
|
||||
title=None, byterange=None, discontinuity=False, key=None):
|
||||
"""
|
||||
Initialize a Segment object.
|
||||
|
||||
Args:
|
||||
- uri: URI of the segment.
|
||||
- base_uri: Base URI for the segment.
|
||||
- program_date_time: Returns the EXT-X-PROGRAM-DATE-TIME as a datetime
|
||||
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
|
||||
- duration: Duration of the segment (optional).
|
||||
- title: Title attribute from EXTINF parameter
|
||||
- byterange: Byterange information of the segment (optional).
|
||||
- discontinuity: Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists
|
||||
Guide: http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11
|
||||
- key: Key for encryption (optional).
|
||||
"""
|
||||
self.uri = uri
|
||||
self.duration = duration
|
||||
self.title = title
|
||||
self.base_uri = base_uri
|
||||
self.byterange = byterange
|
||||
self.program_date_time = program_date_time
|
||||
self.discontinuity = discontinuity
|
||||
#self.key = key
|
||||
|
||||
|
||||
class SegmentList(list, GroupedBasePathMixin):
|
||||
"""
|
||||
Class representing a list of segments in an M3U8 playlist.
|
||||
Inherits from list and GroupedBasePathMixin for managing base paths across a group of items.
|
||||
"""
|
||||
|
||||
@property
|
||||
def uri(self):
|
||||
"""
|
||||
Get the URI of each segment in the SegmentList.
|
||||
|
||||
Returns:
|
||||
- List of URIs of segments in the SegmentList.
|
||||
"""
|
||||
return [seg.uri for seg in self]
|
||||
|
||||
|
||||
class Key(BasePathMixin):
|
||||
"""
|
||||
Class representing a key used for encryption in an M3U8 playlist.
|
||||
Inherits from BasePathMixin for managing base paths.
|
||||
"""
|
||||
|
||||
def __init__(self, method, uri, base_uri, iv=None):
|
||||
"""
|
||||
Initialize a Key object.
|
||||
|
||||
Args:
|
||||
- method: Encryption method.
|
||||
ex: "AES-128"
|
||||
- uri: URI of the key.
|
||||
ex: "https://priv.example.com/key.php?r=52"
|
||||
- base_uri: Base URI for the key.
|
||||
ex: http://example.com/path/to
|
||||
- iv: Initialization vector (optional).
|
||||
ex: 0X12A
|
||||
"""
|
||||
self.method = method
|
||||
self.uri = uri
|
||||
self.iv = iv
|
||||
self.base_uri = base_uri
|
||||
|
||||
|
||||
class Playlist(BasePathMixin):
|
||||
"""
|
||||
Playlist object representing a link to a variant M3U8 with a specific bitrate.
|
||||
|
||||
More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10
|
||||
"""
|
||||
|
||||
def __init__(self, uri, stream_info, media, base_uri):
|
||||
"""
|
||||
Initialize a Playlist object.
|
||||
|
||||
Args:
|
||||
- uri: URI of the playlist.
|
||||
- stream_info: is a named tuple containing the attributes: `program_id`,
|
||||
- media: List of Media objects associated with the playlist.
|
||||
- base_uri: Base URI for the playlist.
|
||||
"""
|
||||
self.uri = uri
|
||||
self.base_uri = base_uri
|
||||
|
||||
# Extract resolution information from stream_info
|
||||
resolution = stream_info.get('resolution')
|
||||
if resolution is not None:
|
||||
values = resolution.split('x')
|
||||
resolution_pair = (int(values[0]), int(values[1]))
|
||||
else:
|
||||
resolution_pair = None
|
||||
|
||||
# Create StreamInfo object
|
||||
self.stream_info = StreamInfo(
|
||||
bandwidth = stream_info['bandwidth'],
|
||||
program_id = stream_info.get('program_id'),
|
||||
resolution = resolution_pair,
|
||||
codecs = stream_info.get('codecs')
|
||||
)
|
||||
|
||||
# Filter media based on group ID and media type
|
||||
self.media = []
|
||||
for media_type in ('audio', 'video', 'subtitles'):
|
||||
group_id = stream_info.get(media_type)
|
||||
if group_id:
|
||||
self.media += filter(lambda m: m.group_id == group_id, media)
|
||||
|
||||
|
||||
class IFramePlaylist(BasePathMixin):
|
||||
"""
|
||||
Class representing an I-Frame playlist in an M3U8 playlist.
|
||||
Inherits from BasePathMixin for managing base paths.
|
||||
"""
|
||||
|
||||
def __init__(self, base_uri, uri, iframe_stream_info):
|
||||
"""
|
||||
Initialize an IFramePlaylist object.
|
||||
|
||||
Args:
|
||||
- base_uri: Base URI for the I-Frame playlist.
|
||||
- uri: URI of the I-Frame playlist.
|
||||
- iframe_stream_info, is a named tuple containing the attributes:
|
||||
`program_id`, `bandwidth`, `codecs` and `resolution` which is a tuple (w, h) of integers
|
||||
"""
|
||||
self.uri = uri
|
||||
self.base_uri = base_uri
|
||||
|
||||
# Extract resolution information from iframe_stream_info
|
||||
resolution = iframe_stream_info.get('resolution')
|
||||
if resolution is not None:
|
||||
values = resolution.split('x')
|
||||
resolution_pair = (int(values[0]), int(values[1]))
|
||||
else:
|
||||
resolution_pair = None
|
||||
|
||||
# Create StreamInfo object for I-Frame playlist
|
||||
self.iframe_stream_info = StreamInfo(
|
||||
bandwidth = iframe_stream_info.get('bandwidth'),
|
||||
program_id = iframe_stream_info.get('program_id'),
|
||||
resolution = resolution_pair,
|
||||
codecs = iframe_stream_info.get('codecs')
|
||||
)
|
||||
|
||||
class PlaylistList(list, GroupedBasePathMixin):
|
||||
"""
|
||||
Class representing a list of playlists in an M3U8 playlist.
|
||||
Inherits from list and GroupedBasePathMixin for managing base paths across a group of items.
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Return a string representation of the PlaylistList.
|
||||
|
||||
Returns:
|
||||
- String representation of the PlaylistList.
|
||||
"""
|
||||
output = [str(playlist) for playlist in self]
|
||||
return '\n'.join(output)
|
@ -1,338 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
import re
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from ..lib_parser import protocol
|
||||
from ._util import (
|
||||
remove_quotes,
|
||||
remove_quotes_parser,
|
||||
normalize_attribute
|
||||
)
|
||||
|
||||
|
||||
# External utilities
|
||||
from Src.Util._jsonConfig import config_manager
|
||||
|
||||
|
||||
# Variable
|
||||
REMOVE_EMPTY_ROW = config_manager.get_bool('M3U8_PARSER', 'skip_empty_row_playlist')
|
||||
ATTRIBUTELISTPATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''')
|
||||
|
||||
|
||||
def parse(content):
|
||||
"""
|
||||
Given an M3U8 playlist content, parses the content and extracts metadata.
|
||||
|
||||
Args:
|
||||
content (str): The M3U8 playlist content.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the parsed metadata.
|
||||
"""
|
||||
|
||||
# Initialize data dictionary with default values
|
||||
data = {
|
||||
'is_variant': False,
|
||||
'is_endlist': False,
|
||||
'is_i_frames_only': False,
|
||||
'playlist_type': None,
|
||||
'playlists': [],
|
||||
'iframe_playlists': [],
|
||||
'segments': [],
|
||||
'media': [],
|
||||
}
|
||||
|
||||
# Initialize state dictionary for tracking parsing state
|
||||
state = {
|
||||
'expect_segment': False,
|
||||
'expect_playlist': False,
|
||||
}
|
||||
|
||||
# Iterate over lines in the content
|
||||
content = content.split("\n")
|
||||
content_length = len(content)
|
||||
i = 0
|
||||
|
||||
while i < content_length:
|
||||
line = content[i]
|
||||
line_stripped = line.strip()
|
||||
is_end = i + 1 == content_length - 2
|
||||
|
||||
if REMOVE_EMPTY_ROW:
|
||||
if i < content_length - 2:
|
||||
actual_row = extract_params(line_stripped)
|
||||
next_row = extract_params(content[i + 2].strip())
|
||||
|
||||
if actual_row is not None and next_row is None and not is_end:
|
||||
logging.info(f"Skip row: {line_stripped}")
|
||||
i += 1
|
||||
continue
|
||||
|
||||
i += 1
|
||||
|
||||
if line.startswith(protocol.ext_x_byterange):
|
||||
_parse_byterange(line, state)
|
||||
state['expect_segment'] = True
|
||||
|
||||
elif state['expect_segment']:
|
||||
_parse_ts_chunk(line, data, state)
|
||||
state['expect_segment'] = False
|
||||
|
||||
elif state['expect_playlist']:
|
||||
_parse_variant_playlist(line, data, state)
|
||||
state['expect_playlist'] = False
|
||||
|
||||
elif line.startswith(protocol.ext_x_targetduration):
|
||||
_parse_simple_parameter(line, data, float)
|
||||
elif line.startswith(protocol.ext_x_media_sequence):
|
||||
_parse_simple_parameter(line, data, int)
|
||||
elif line.startswith(protocol.ext_x_discontinuity):
|
||||
state['discontinuity'] = True
|
||||
elif line.startswith(protocol.ext_x_version):
|
||||
_parse_simple_parameter(line, data)
|
||||
elif line.startswith(protocol.ext_x_allow_cache):
|
||||
_parse_simple_parameter(line, data)
|
||||
|
||||
elif line.startswith(protocol.ext_x_key):
|
||||
state['current_key'] = _parse_key(line)
|
||||
data['key'] = data.get('key', state['current_key'])
|
||||
|
||||
elif line.startswith(protocol.extinf):
|
||||
_parse_extinf(line, data, state)
|
||||
state['expect_segment'] = True
|
||||
|
||||
elif line.startswith(protocol.ext_x_stream_inf):
|
||||
state['expect_playlist'] = True
|
||||
_parse_stream_inf(line, data, state)
|
||||
|
||||
elif line.startswith(protocol.ext_x_i_frame_stream_inf):
|
||||
_parse_i_frame_stream_inf(line, data)
|
||||
|
||||
elif line.startswith(protocol.ext_x_media):
|
||||
_parse_media(line, data, state)
|
||||
|
||||
elif line.startswith(protocol.ext_x_playlist_type):
|
||||
_parse_simple_parameter(line, data)
|
||||
|
||||
elif line.startswith(protocol.ext_i_frames_only):
|
||||
data['is_i_frames_only'] = True
|
||||
|
||||
elif line.startswith(protocol.ext_x_endlist):
|
||||
data['is_endlist'] = True
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def extract_params(line):
|
||||
"""
|
||||
Extracts parameters from a formatted input string.
|
||||
|
||||
Args:
|
||||
- line (str): The string containing the parameters to extract.
|
||||
|
||||
Returns:
|
||||
dict or None: A dictionary containing the extracted parameters with their respective values.
|
||||
"""
|
||||
params = {}
|
||||
matches = re.findall(r'([A-Z\-]+)=("[^"]*"|[^",\s]*)', line)
|
||||
if not matches:
|
||||
return None
|
||||
for match in matches:
|
||||
param, value = match
|
||||
params[param] = value.strip('"')
|
||||
return params
|
||||
|
||||
def _parse_key(line):
|
||||
"""
|
||||
Parses the #EXT-X-KEY line and extracts key attributes.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-KEY line from the playlist.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the key attributes.
|
||||
"""
|
||||
params = ATTRIBUTELISTPATTERN.split(line.replace(protocol.ext_x_key + ':', ''))[1::2]
|
||||
key = {}
|
||||
for param in params:
|
||||
name, value = param.split('=', 1)
|
||||
key[normalize_attribute(name)] = remove_quotes(value)
|
||||
return key
|
||||
|
||||
def _parse_extinf(line, data, state):
|
||||
"""
|
||||
Parses the #EXTINF line and extracts segment duration and title.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXTINF line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
duration, title = line.replace(protocol.extinf + ':', '').split(',')
|
||||
state['segment'] = {'duration': float(duration), 'title': remove_quotes(title)}
|
||||
|
||||
def _parse_ts_chunk(line, data, state):
|
||||
"""
|
||||
Parses a segment URI line and adds it to the segment list.
|
||||
|
||||
Args:
|
||||
line (str): The segment URI line from the playlist.
|
||||
data (dict): The dictionary to store the parsed data.
|
||||
state (dict): The parsing state.
|
||||
"""
|
||||
segment = state.pop('segment')
|
||||
if state.get('current_program_date_time'):
|
||||
segment['program_date_time'] = state['current_program_date_time']
|
||||
state['current_program_date_time'] += datetime.timedelta(seconds=segment['duration'])
|
||||
segment['uri'] = line
|
||||
segment['discontinuity'] = state.pop('discontinuity', False)
|
||||
if state.get('current_key'):
|
||||
segment['key'] = state['current_key']
|
||||
data['segments'].append(segment)
|
||||
|
||||
def _parse_attribute_list(prefix, line, atribute_parser):
|
||||
"""
|
||||
Parses a line containing a list of attributes and their values.
|
||||
|
||||
Args:
|
||||
- prefix (str): The prefix to identify the line.
|
||||
- line (str): The line containing the attributes.
|
||||
- atribute_parser (dict): A dictionary mapping attribute names to parsing functions.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the parsed attributes.
|
||||
"""
|
||||
params = ATTRIBUTELISTPATTERN.split(line.replace(prefix + ':', ''))[1::2]
|
||||
|
||||
attributes = {}
|
||||
for param in params:
|
||||
name, value = param.split('=', 1)
|
||||
name = normalize_attribute(name)
|
||||
|
||||
if name in atribute_parser:
|
||||
value = atribute_parser[name](value)
|
||||
|
||||
attributes[name] = value
|
||||
|
||||
return attributes
|
||||
|
||||
def _parse_stream_inf(line, data, state):
|
||||
"""
|
||||
Parses the #EXT-X-STREAM-INF line and extracts stream information.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-STREAM-INF line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
data['is_variant'] = True
|
||||
atribute_parser = remove_quotes_parser('codecs', 'audio', 'video', 'subtitles')
|
||||
atribute_parser["program_id"] = int
|
||||
atribute_parser["bandwidth"] = int
|
||||
state['stream_info'] = _parse_attribute_list(protocol.ext_x_stream_inf, line, atribute_parser)
|
||||
|
||||
def _parse_i_frame_stream_inf(line, data):
|
||||
"""
|
||||
Parses the #EXT-X-I-FRAME-STREAM-INF line and extracts I-frame stream information.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-I-FRAME-STREAM-INF line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
"""
|
||||
atribute_parser = remove_quotes_parser('codecs', 'uri')
|
||||
atribute_parser["program_id"] = int
|
||||
atribute_parser["bandwidth"] = int
|
||||
iframe_stream_info = _parse_attribute_list(protocol.ext_x_i_frame_stream_inf, line, atribute_parser)
|
||||
iframe_playlist = {'uri': iframe_stream_info.pop('uri'),
|
||||
'iframe_stream_info': iframe_stream_info}
|
||||
|
||||
data['iframe_playlists'].append(iframe_playlist)
|
||||
|
||||
def _parse_media(line, data, state):
|
||||
"""
|
||||
Parses the #EXT-X-MEDIA line and extracts media attributes.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-MEDIA line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
quoted = remove_quotes_parser('uri', 'group_id', 'language', 'name', 'characteristics')
|
||||
media = _parse_attribute_list(protocol.ext_x_media, line, quoted)
|
||||
data['media'].append(media)
|
||||
|
||||
def _parse_variant_playlist(line, data, state):
|
||||
"""
|
||||
Parses a variant playlist line and extracts playlist information.
|
||||
|
||||
Args:
|
||||
- line (str): The variant playlist line from the playlist.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
playlist = {'uri': line, 'stream_info': state.pop('stream_info')}
|
||||
|
||||
data['playlists'].append(playlist)
|
||||
|
||||
def _parse_byterange(line, state):
|
||||
"""
|
||||
Parses the #EXT-X-BYTERANGE line and extracts byte range information.
|
||||
|
||||
Args:
|
||||
- line (str): The #EXT-X-BYTERANGE line from the playlist.
|
||||
- state (dict): The parsing state.
|
||||
"""
|
||||
state['segment']['byterange'] = line.replace(protocol.ext_x_byterange + ':', '')
|
||||
|
||||
def _parse_simple_parameter_raw_value(line, cast_to=str, normalize=False):
|
||||
"""
|
||||
Parses a line containing a simple parameter and its value.
|
||||
|
||||
Args:
|
||||
- line (str): The line containing the parameter and its value.
|
||||
- cast_to (type): The type to which the value should be cast.
|
||||
- normalize (bool): Whether to normalize the parameter name.
|
||||
|
||||
Returns:
|
||||
tuple: A tuple containing the parameter name and its value.
|
||||
"""
|
||||
param, value = line.split(':', 1)
|
||||
param = normalize_attribute(param.replace('#EXT-X-', ''))
|
||||
if normalize:
|
||||
value = normalize_attribute(value)
|
||||
return param, cast_to(value)
|
||||
|
||||
def _parse_and_set_simple_parameter_raw_value(line, data, cast_to=str, normalize=False):
|
||||
"""
|
||||
Parses a line containing a simple parameter and its value, and sets it in the data dictionary.
|
||||
|
||||
Args:
|
||||
- line (str): The line containing the parameter and its value.
|
||||
- data (dict): The dictionary to store the parsed data.
|
||||
- cast_to (type): The type to which the value should be cast.
|
||||
- normalize (bool): Whether to normalize the parameter name.
|
||||
|
||||
Returns:
|
||||
The parsed value.
|
||||
"""
|
||||
param, value = _parse_simple_parameter_raw_value(line, cast_to, normalize)
|
||||
data[param] = value
|
||||
return data[param]
|
||||
|
||||
def _parse_simple_parameter(line, data, cast_to=str):
|
||||
"""
|
||||
Parses a line containing a simple parameter and its value, and sets it in the data dictionary.
|
||||
|
||||
Args:
|
||||
line (str): The line containing the parameter and its value.
|
||||
data (dict): The dictionary to store the parsed data.
|
||||
cast_to (type): The type to which the value should be cast.
|
||||
|
||||
Returns:
|
||||
The parsed value.
|
||||
"""
|
||||
return _parse_and_set_simple_parameter_raw_value(line, data, cast_to, True)
|
@ -1,17 +0,0 @@
|
||||
# 15.04.24
|
||||
|
||||
ext_x_targetduration = '#EXT-X-TARGETDURATION'
|
||||
ext_x_media_sequence = '#EXT-X-MEDIA-SEQUENCE'
|
||||
ext_x_program_date_time = '#EXT-X-PROGRAM-DATE-TIME'
|
||||
ext_x_media = '#EXT-X-MEDIA'
|
||||
ext_x_playlist_type = '#EXT-X-PLAYLIST-TYPE'
|
||||
ext_x_key = '#EXT-X-KEY'
|
||||
ext_x_stream_inf = '#EXT-X-STREAM-INF'
|
||||
ext_x_version = '#EXT-X-VERSION'
|
||||
ext_x_allow_cache = '#EXT-X-ALLOW-CACHE'
|
||||
ext_x_endlist = '#EXT-X-ENDLIST'
|
||||
extinf = '#EXTINF'
|
||||
ext_i_frames_only = '#EXT-X-I-FRAMES-ONLY'
|
||||
ext_x_byterange = '#EXT-X-BYTERANGE'
|
||||
ext_x_i_frame_stream_inf = '#EXT-X-I-FRAME-STREAM-INF'
|
||||
ext_x_discontinuity = '#EXT-X-DISCONTINUITY'
|
@ -1,41 +0,0 @@
|
||||
# 20.02.24
|
||||
|
||||
# Internal utilities
|
||||
from Src.Util.os import format_size
|
||||
|
||||
|
||||
class M3U8_Ts_Files:
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the TSFileSizeCalculator object.
|
||||
|
||||
Args:
|
||||
- num_segments (int): The number of segments.
|
||||
"""
|
||||
self.ts_file_sizes = []
|
||||
|
||||
def add_ts_file_size(self, size: int):
|
||||
"""
|
||||
Add a file size to the list of file sizes.
|
||||
|
||||
Args:
|
||||
- size (float): The size of the ts file to be added.
|
||||
"""
|
||||
self.ts_file_sizes.append(size)
|
||||
|
||||
def calculate_total_size(self):
|
||||
"""
|
||||
Calculate the total size of the files.
|
||||
|
||||
Returns:
|
||||
float: The mean size of the files in a human-readable format.
|
||||
"""
|
||||
|
||||
if len(self.ts_file_sizes) == 0:
|
||||
return 0
|
||||
|
||||
total_size = sum(self.ts_file_sizes)
|
||||
mean_size = total_size / len(self.ts_file_sizes)
|
||||
|
||||
# Return format mean
|
||||
return format_size(mean_size)
|
@ -1,547 +0,0 @@
|
||||
# 20.04.25
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
# Internal utilities
|
||||
from .lib_parser import load
|
||||
|
||||
|
||||
# External libraries
|
||||
from Src.Lib.Request.my_requests import requests
|
||||
|
||||
|
||||
# Costant
|
||||
CODEC_MAPPINGS = {
|
||||
"video": {
|
||||
"avc1": "libx264",
|
||||
"avc2": "libx264",
|
||||
"avc3": "libx264",
|
||||
"avc4": "libx264",
|
||||
"hev1": "libx265",
|
||||
"hev2": "libx265",
|
||||
"hvc1": "libx265",
|
||||
"hvc2": "libx265",
|
||||
"vp8": "libvpx",
|
||||
"vp9": "libvpx-vp9",
|
||||
"vp10": "libvpx-vp9"
|
||||
},
|
||||
"audio": {
|
||||
"mp4a": "aac",
|
||||
"mp3": "libmp3lame",
|
||||
"ac-3": "ac3",
|
||||
"ec-3": "eac3",
|
||||
"opus": "libopus",
|
||||
"vorbis": "libvorbis"
|
||||
}
|
||||
}
|
||||
|
||||
RESOLUTIONS = [
|
||||
(7680, 4320),
|
||||
(3840, 2160),
|
||||
(2560, 1440),
|
||||
(1920, 1080),
|
||||
(1280, 720),
|
||||
(640, 480)
|
||||
]
|
||||
|
||||
|
||||
|
||||
class M3U8_Codec:
|
||||
"""
|
||||
Represents codec information for an M3U8 playlist.
|
||||
"""
|
||||
|
||||
def __init__(self, bandwidth, resolution, codecs):
|
||||
"""
|
||||
Initializes the M3U8Codec object with the provided parameters.
|
||||
|
||||
Args:
|
||||
- bandwidth (int): Bandwidth of the codec.
|
||||
- resolution (str): Resolution of the codec.
|
||||
- codecs (str): Codecs information in the format "avc1.xxxxxx,mp4a.xx".
|
||||
"""
|
||||
self.bandwidth = bandwidth
|
||||
self.resolution = resolution
|
||||
self.codecs = codecs
|
||||
self.audio_codec = None
|
||||
self.video_codec = None
|
||||
self.extract_codecs()
|
||||
self.parse_codecs()
|
||||
|
||||
def extract_codecs(self):
|
||||
"""
|
||||
Parses the codecs information to extract audio and video codecs.
|
||||
Extracted codecs are set as attributes: audio_codec and video_codec.
|
||||
"""
|
||||
|
||||
# Split the codecs string by comma
|
||||
codecs_list = self.codecs.split(',')
|
||||
|
||||
# Separate audio and video codecs
|
||||
for codec in codecs_list:
|
||||
if codec.startswith('avc'):
|
||||
self.video_codec = codec
|
||||
elif codec.startswith('mp4a'):
|
||||
self.audio_codec = codec
|
||||
|
||||
def convert_video_codec(self, video_codec_identifier) -> str:
|
||||
|
||||
"""
|
||||
Convert video codec identifier to codec name.
|
||||
|
||||
Args:
|
||||
- video_codec_identifier (str): Identifier of the video codec.
|
||||
|
||||
Returns:
|
||||
str: Codec name corresponding to the identifier.
|
||||
"""
|
||||
|
||||
# Extract codec type from the identifier
|
||||
codec_type = video_codec_identifier.split('.')[0]
|
||||
|
||||
# Retrieve codec mapping from the provided mappings or fallback to static mappings
|
||||
video_codec_mapping = CODEC_MAPPINGS.get('video', {})
|
||||
codec_name = video_codec_mapping.get(codec_type)
|
||||
|
||||
if codec_name:
|
||||
return codec_name
|
||||
|
||||
else:
|
||||
logging.warning(f"No corresponding video codec found for {video_codec_identifier}. Using default codec libx264.")
|
||||
return "libx264" # Default
|
||||
|
||||
def convert_audio_codec(self, audio_codec_identifier) -> str:
|
||||
|
||||
"""
|
||||
Convert audio codec identifier to codec name.
|
||||
|
||||
Args:
|
||||
- audio_codec_identifier (str): Identifier of the audio codec.
|
||||
|
||||
Returns:
|
||||
str: Codec name corresponding to the identifier.
|
||||
"""
|
||||
|
||||
# Extract codec type from the identifier
|
||||
codec_type = audio_codec_identifier.split('.')[0]
|
||||
|
||||
# Retrieve codec mapping from the provided mappings or fallback to static mappings
|
||||
audio_codec_mapping = CODEC_MAPPINGS.get('audio', {})
|
||||
codec_name = audio_codec_mapping.get(codec_type)
|
||||
|
||||
if codec_name:
|
||||
return codec_name
|
||||
|
||||
else:
|
||||
logging.warning(f"No corresponding audio codec found for {audio_codec_identifier}. Using default codec aac.")
|
||||
return "aac" # Default
|
||||
|
||||
def parse_codecs(self):
|
||||
"""
|
||||
Parse video and audio codecs.
|
||||
This method updates `video_codec_name` and `audio_codec_name` attributes.
|
||||
"""
|
||||
|
||||
self.video_codec_name = self.convert_video_codec(self.video_codec)
|
||||
self.audio_codec_name = self.convert_audio_codec(self.audio_codec)
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Returns a string representation of the M3U8Codec object.
|
||||
"""
|
||||
return f"BANDWIDTH={self.bandwidth},RESOLUTION={self.resolution},CODECS=\"{self.codecs}\""
|
||||
|
||||
|
||||
class M3U8_Video:
|
||||
def __init__(self, video_playlist) -> None:
|
||||
"""
|
||||
Initializes an M3U8_Video object with the provided video playlist.
|
||||
|
||||
Args:
|
||||
- video_playlist (M3U8): An M3U8 object representing the video playlist.
|
||||
"""
|
||||
self.video_playlist = video_playlist
|
||||
|
||||
def get_best_uri(self):
|
||||
"""
|
||||
Returns the URI with the highest resolution from the video playlist.
|
||||
|
||||
Returns:
|
||||
tuple or None: A tuple containing the URI with the highest resolution and its resolution value, or None if the video list is empty.
|
||||
"""
|
||||
if not self.video_playlist:
|
||||
return None
|
||||
|
||||
best_uri = max(self.video_playlist, key=lambda x: x['resolution'])
|
||||
return best_uri['uri'], best_uri['resolution']
|
||||
|
||||
def get_worst_uri(self):
|
||||
"""
|
||||
Returns the URI with the lowest resolution from the video playlist.
|
||||
|
||||
Returns:
|
||||
- tuple or None: A tuple containing the URI with the lowest resolution and its resolution value, or None if the video list is empty.
|
||||
"""
|
||||
if not self.video_playlist:
|
||||
return None
|
||||
|
||||
worst_uri = min(self.video_playlist, key=lambda x: x['resolution'])
|
||||
return worst_uri['uri'], worst_uri['resolution']
|
||||
|
||||
def get_custom_uri(self, y_resolution):
|
||||
"""
|
||||
Returns the URI corresponding to a custom resolution from the video list.
|
||||
|
||||
Args:
|
||||
- video_list (list): A list of dictionaries containing video URIs and resolutions.
|
||||
- custom_resolution (tuple): A tuple representing the custom resolution.
|
||||
|
||||
Returns:
|
||||
str or None: The URI corresponding to the custom resolution, or None if not found.
|
||||
"""
|
||||
for video in self.video_playlist:
|
||||
logging.info(f"Check resolution from playlist: {int(video['resolution'][1])}, with input: {int(y_resolution)}")
|
||||
|
||||
if int(video['resolution'][1]) == int(y_resolution):
|
||||
return video['uri'], video['resolution']
|
||||
|
||||
return None, None
|
||||
|
||||
def get_list_resolution(self):
|
||||
"""
|
||||
Retrieve a list of resolutions from the video playlist.
|
||||
|
||||
Returns:
|
||||
list: A list of resolutions extracted from the video playlist.
|
||||
"""
|
||||
return [video['resolution'] for video in self.video_playlist]
|
||||
|
||||
|
||||
class M3U8_Audio:
|
||||
def __init__(self, audio_playlist) -> None:
|
||||
"""
|
||||
Initializes an M3U8_Audio object with the provided audio playlist.
|
||||
|
||||
Args:
|
||||
- audio_playlist (M3U8): An M3U8 object representing the audio playlist.
|
||||
"""
|
||||
self.audio_playlist = audio_playlist
|
||||
|
||||
def get_uri_by_language(self, language):
|
||||
"""
|
||||
Returns a dictionary with 'name' and 'uri' given a specific language.
|
||||
|
||||
Args:
|
||||
- audio_list (list): List of dictionaries containing audio information.
|
||||
- language (str): The desired language.
|
||||
|
||||
Returns:
|
||||
dict or None: Dictionary with 'name', 'language', and 'uri' for the specified language, or None if not found.
|
||||
"""
|
||||
for audio in self.audio_playlist:
|
||||
if audio['language'] == language:
|
||||
return {'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']}
|
||||
return None
|
||||
|
||||
def get_all_uris_and_names(self):
|
||||
"""
|
||||
Returns a list of dictionaries containing all URIs and names.
|
||||
|
||||
Args:
|
||||
- audio_list (list): List of dictionaries containing audio information.
|
||||
|
||||
Returns:
|
||||
list: List of dictionaries containing 'name', 'language', and 'uri' for all audio in the list.
|
||||
"""
|
||||
return [{'name': audio['name'], 'language': audio['language'], 'uri': audio['uri']} for audio in self.audio_playlist]
|
||||
|
||||
def get_default_uri(self):
|
||||
"""
|
||||
Returns the dictionary with 'default' equal to 'YES'.
|
||||
|
||||
Args:
|
||||
- audio_list (list): List of dictionaries containing audio information.
|
||||
|
||||
Returns:
|
||||
dict or None: Dictionary with 'default' equal to 'YES', or None if not found.
|
||||
"""
|
||||
for audio in self.audio_playlist:
|
||||
if audio['default'] == 'YES':
|
||||
return audio.get('uri')
|
||||
return None
|
||||
|
||||
|
||||
class M3U8_Subtitle:
|
||||
def __init__(self, subtitle_playlist) -> None:
|
||||
"""
|
||||
Initializes an M3U8_Subtitle object with the provided subtitle playlist.
|
||||
|
||||
Args:
|
||||
- subtitle_playlist (M3U8): An M3U8 object representing the subtitle playlist.
|
||||
"""
|
||||
self.subtitle_playlist = subtitle_playlist
|
||||
|
||||
def get_uri_by_language(self, language):
|
||||
"""
|
||||
Returns a dictionary with 'name' and 'uri' given a specific language for subtitles.
|
||||
|
||||
Args:
|
||||
- subtitle_list (list): List of dictionaries containing subtitle information.
|
||||
- language (str): The desired language.
|
||||
|
||||
Returns:
|
||||
dict or None: Dictionary with 'name' and 'uri' for the specified language for subtitles, or None if not found.
|
||||
"""
|
||||
for subtitle in self.subtitle_playlist:
|
||||
if subtitle['language'] == language:
|
||||
return {'name': subtitle['name'], 'uri': subtitle['uri']}
|
||||
return None
|
||||
|
||||
def get_all_uris_and_names(self):
|
||||
"""
|
||||
Returns a list of dictionaries containing all URIs and names of subtitles.
|
||||
|
||||
Args:
|
||||
- subtitle_list (list): List of dictionaries containing subtitle information.
|
||||
|
||||
Returns:
|
||||
list: List of dictionaries containing 'name' and 'uri' for all subtitles in the list.
|
||||
"""
|
||||
return [{'name': subtitle['name'], 'language': subtitle['language'], 'uri': subtitle['uri']} for subtitle in self.subtitle_playlist]
|
||||
|
||||
def get_default_uri(self):
|
||||
"""
|
||||
Returns the dictionary with 'default' equal to 'YES' for subtitles.
|
||||
|
||||
Args:
|
||||
- subtitle_list (list): List of dictionaries containing subtitle information.
|
||||
|
||||
Returns:
|
||||
dict or None: Dictionary with 'default' equal to 'YES' for subtitles, or None if not found.
|
||||
"""
|
||||
for subtitle in self.subtitle_playlist:
|
||||
if subtitle['default'] == 'YES':
|
||||
return subtitle
|
||||
return None
|
||||
|
||||
def download_all(self, custom_subtitle):
|
||||
"""
|
||||
Download all subtitles listed in the object's attributes, filtering based on a provided list of custom subtitles.
|
||||
|
||||
Args:
|
||||
- custom_subtitle (list): A list of custom subtitles to download.
|
||||
|
||||
Returns:
|
||||
list: A list containing dictionaries with subtitle information including name, language, and URI.
|
||||
"""
|
||||
|
||||
output = [] # Initialize an empty list to store subtitle information
|
||||
|
||||
# Iterate through all available subtitles
|
||||
for obj_subtitle in self.subtitle_get_all_uris_and_names():
|
||||
|
||||
# Check if the subtitle name is not in the list of custom subtitles, and skip if not found
|
||||
if obj_subtitle.get('name') not in custom_subtitle:
|
||||
continue
|
||||
|
||||
# Send a request to retrieve the subtitle content
|
||||
logging.info(f"Download subtitle: {obj_subtitle.get('name')}")
|
||||
response_subitle = requests.get(obj_subtitle.get('uri'))
|
||||
|
||||
try:
|
||||
# Try to extract the VTT URL from the subtitle content
|
||||
sub_parse = M3U8_Parser()
|
||||
sub_parse.parse_data(obj_subtitle.get('uri'), response_subitle.text)
|
||||
url_subititle = sub_parse.subtitle[0]
|
||||
|
||||
output.append({
|
||||
'name': obj_subtitle.get('name'),
|
||||
'language': obj_subtitle.get('language'),
|
||||
'uri': url_subititle
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Cant donwload: {obj_subtitle.get('name')}, error: {e}")
|
||||
|
||||
return output
|
||||
|
||||
|
||||
class M3U8_Parser:
|
||||
def __init__(self):
|
||||
self.segments = []
|
||||
self.video_playlist = []
|
||||
self.keys = None
|
||||
self.subtitle_playlist = []
|
||||
self.subtitle = []
|
||||
self.audio_playlist = []
|
||||
self.codec: M3U8_Codec = None
|
||||
self._video: M3U8_Video = None
|
||||
self._audio: M3U8_Audio = None
|
||||
self._subtitle: M3U8_Subtitle = None
|
||||
|
||||
self.__create_variable__()
|
||||
|
||||
def parse_data(self, uri, raw_content) -> None:
|
||||
"""
|
||||
Extracts all information present in the provided M3U8 content.
|
||||
|
||||
Args:
|
||||
- m3u8_content (str): The content of the M3U8 file.
|
||||
"""
|
||||
|
||||
|
||||
# Get obj of the m3u8 text content download, dictionary with video, audio, segments, subtitles
|
||||
m3u8_obj = load(raw_content, uri)
|
||||
|
||||
self.__parse_video_info__(m3u8_obj)
|
||||
self.__parse_encryption_keys__(m3u8_obj)
|
||||
self.__parse_subtitles_and_audio__(m3u8_obj)
|
||||
self.__parse_segments__(m3u8_obj)
|
||||
|
||||
@staticmethod
|
||||
def extract_resolution(uri: str) -> int:
|
||||
"""
|
||||
Extracts the video resolution from the given URI.
|
||||
|
||||
Args:
|
||||
- uri (str): The URI containing video information.
|
||||
|
||||
Returns:
|
||||
int: The video resolution if found, otherwise 0.
|
||||
"""
|
||||
|
||||
# Log
|
||||
logging.info(f"Try extract resolution from: {uri}")
|
||||
|
||||
for resolution in RESOLUTIONS:
|
||||
if "http" in str(uri):
|
||||
if str(resolution[1]) in uri:
|
||||
return resolution
|
||||
|
||||
# Default resolution return (not best)
|
||||
logging.error("No resolution found with custom parsing.")
|
||||
logging.warning("Try set remove duplicate line to TRUE.")
|
||||
return (0, 0)
|
||||
|
||||
def __parse_video_info__(self, m3u8_obj) -> None:
|
||||
"""
|
||||
Extracts video information from the M3U8 object.
|
||||
|
||||
Args:
|
||||
- m3u8_obj: The M3U8 object containing video playlists.
|
||||
"""
|
||||
|
||||
try:
|
||||
for playlist in m3u8_obj.playlists:
|
||||
|
||||
# Direct access resolutions in m3u8 obj
|
||||
if playlist.stream_info.resolution is not None:
|
||||
|
||||
self.video_playlist.append({
|
||||
"uri": playlist.uri,
|
||||
"resolution": playlist.stream_info.resolution
|
||||
})
|
||||
|
||||
# Find resolutions in uri
|
||||
else:
|
||||
|
||||
self.video_playlist.append({
|
||||
"uri": playlist.uri,
|
||||
"resolution": M3U8_Parser.extract_resolution(playlist.uri)
|
||||
})
|
||||
|
||||
# Dont stop
|
||||
continue
|
||||
|
||||
# Check if all key is present to create codec
|
||||
try:
|
||||
self.codec = M3U8_Codec(
|
||||
playlist.stream_info.bandwidth,
|
||||
playlist.stream_info.resolution,
|
||||
playlist.stream_info.codecs
|
||||
)
|
||||
except:
|
||||
logging.error(f"Error parsing codec: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing video info: {e}")
|
||||
|
||||
def __parse_encryption_keys__(self, m3u8_obj) -> None:
|
||||
"""
|
||||
Extracts encryption keys from the M3U8 object.
|
||||
|
||||
Args:
|
||||
- m3u8_obj: The M3U8 object containing encryption keys.
|
||||
"""
|
||||
try:
|
||||
|
||||
if m3u8_obj.key is not None:
|
||||
if self.keys is None:
|
||||
self.keys = {
|
||||
'method': m3u8_obj.key.method,
|
||||
'iv': m3u8_obj.key.iv,
|
||||
'uri': m3u8_obj.key.uri
|
||||
}
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing encryption keys: {e}")
|
||||
pass
|
||||
|
||||
def __parse_subtitles_and_audio__(self, m3u8_obj) -> None:
|
||||
"""
|
||||
Extracts subtitles and audio information from the M3U8 object.
|
||||
|
||||
Args:
|
||||
- m3u8_obj: The M3U8 object containing subtitles and audio data.
|
||||
"""
|
||||
try:
|
||||
for media in m3u8_obj.media:
|
||||
if media.type == "SUBTITLES":
|
||||
self.subtitle_playlist.append({
|
||||
"type": media.type,
|
||||
"name": media.name,
|
||||
"default": media.default,
|
||||
"language": media.language,
|
||||
"uri": media.uri
|
||||
})
|
||||
|
||||
if media.type == "AUDIO":
|
||||
self.audio_playlist.append({
|
||||
"type": media.type,
|
||||
"name": media.name,
|
||||
"default": media.default,
|
||||
"language": media.language,
|
||||
"uri": media.uri
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing subtitles and audio: {e}")
|
||||
|
||||
def __parse_segments__(self, m3u8_obj) -> None:
|
||||
"""
|
||||
Extracts segment information from the M3U8 object.
|
||||
|
||||
Args:
|
||||
- m3u8_obj: The M3U8 object containing segment data.
|
||||
"""
|
||||
|
||||
try:
|
||||
for segment in m3u8_obj.segments:
|
||||
if "vtt" not in segment.uri:
|
||||
self.segments.append(segment.uri)
|
||||
else:
|
||||
self.subtitle.append(segment.uri)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing segments: {e}")
|
||||
|
||||
def __create_variable__(self):
|
||||
"""
|
||||
Initialize variables for video, audio, and subtitle playlists.
|
||||
"""
|
||||
|
||||
self._video = M3U8_Video(self.video_playlist)
|
||||
self._audio = M3U8_Audio(self.audio_playlist)
|
||||
self._subtitle = M3U8_Subtitle(self.subtitle_playlist)
|
@ -1,54 +0,0 @@
|
||||
# 20.03.24
|
||||
|
||||
import logging
|
||||
from urllib.parse import urlparse, urljoin
|
||||
|
||||
|
||||
class M3U8_UrlFix:
|
||||
def __init__(self, url: str = None) -> None:
|
||||
"""
|
||||
Initializes an M3U8_UrlFix object with the provided playlist URL.
|
||||
|
||||
Args:
|
||||
- url (str, optional): The URL of the playlist. Defaults to None.
|
||||
"""
|
||||
self.url_playlist: str = url
|
||||
|
||||
def set_playlist(self, url: str) -> None:
|
||||
"""
|
||||
Set the M3U8 playlist URL.
|
||||
|
||||
Args:
|
||||
- url (str): The M3U8 playlist URL.
|
||||
"""
|
||||
self.url_playlist = url
|
||||
|
||||
def generate_full_url(self, url_resource: str) -> str:
|
||||
"""
|
||||
Generate a full URL for a given resource using the base URL from the playlist.
|
||||
|
||||
Args:
|
||||
- url_resource (str): The relative URL of the resource within the playlist.
|
||||
|
||||
Returns:
|
||||
str: The full URL for the specified resource.
|
||||
"""
|
||||
|
||||
# Check if m3u8 url playlist is present
|
||||
if self.url_playlist == None:
|
||||
logging.error("[M3U8_UrlFix] Cant generate full url, playlist not present")
|
||||
raise
|
||||
|
||||
# Parse the playlist URL to extract the base URL components
|
||||
parsed_playlist_url = urlparse(self.url_playlist)
|
||||
|
||||
# Construct the base URL using the scheme, netloc, and path from the playlist URL
|
||||
base_url = f"{parsed_playlist_url.scheme}://{parsed_playlist_url.netloc}{parsed_playlist_url.path}"
|
||||
|
||||
# Join the base URL with the relative resource URL to get the full URL
|
||||
full_url = urljoin(base_url, url_resource)
|
||||
|
||||
return full_url
|
||||
|
||||
# Output
|
||||
m3u8_url_fix = M3U8_UrlFix()
|
Loading…
x
Reference in New Issue
Block a user