Update undetected-chromedriver

This commit is contained in:
ngosang 2023-01-06 17:18:54 +01:00
parent 7d84f1b663
commit e163019f28
10 changed files with 1925 additions and 1729 deletions

View File

@ -4,6 +4,7 @@
* Kill Chromium processes properly to avoid defunct/zombie processes * Kill Chromium processes properly to avoid defunct/zombie processes
* Include procps (ps), curl and vim packages in the Docker image * Include procps (ps), curl and vim packages in the Docker image
* Update undetected-chromedriver
## v3.0.0 (2023/01/04) ## v3.0.0 (2023/01/04)

View File

@ -1,9 +1,9 @@
bottle==0.12.23 bottle==0.12.23
waitress==2.1.2 waitress==2.1.2
selenium==4.4.3 selenium==4.7.2
func-timeout==4.3.5 func-timeout==4.3.5
# required by undetected_chromedriver # required by undetected_chromedriver
requests==2.28.1 requests==2.28.1
websockets==10.3 websockets==10.4
# only required for linux # only required for linux
xvfbwrapper==0.2.9 xvfbwrapper==0.2.9

File diff suppressed because it is too large Load Diff

View File

@ -1,259 +1,262 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
""" """
888 888 d8b 888 888 d8b
888 888 Y8P 888 888 Y8P
888 888 888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888 .d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P" d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888 888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888 Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888 "Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam) by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
""" """
import io from distutils.version import LooseVersion
import logging import io
import os import logging
import random import os
import re import random
import string import re
import sys import string
import zipfile import sys
from distutils.version import LooseVersion from urllib.request import urlopen
from urllib.request import urlopen, urlretrieve from urllib.request import urlretrieve
import zipfile
from selenium.webdriver import Chrome as _Chrome, ChromeOptions as _ChromeOptions
from selenium.webdriver import Chrome as _Chrome
TARGET_VERSION = 0 from selenium.webdriver import ChromeOptions as _ChromeOptions
logger = logging.getLogger("uc")
TARGET_VERSION = 0
class Chrome: logger = logging.getLogger("uc")
def __new__(cls, *args, emulate_touch=False, **kwargs):
if not ChromeDriverManager.installed: class Chrome:
ChromeDriverManager(*args, **kwargs).install() def __new__(cls, *args, emulate_touch=False, **kwargs):
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver() if not ChromeDriverManager.installed:
if not kwargs.get("executable_path"): ChromeDriverManager(*args, **kwargs).install()
kwargs["executable_path"] = "./{}".format( if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).executable_path ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
) if not kwargs.get("executable_path"):
if not kwargs.get("options"): kwargs["executable_path"] = "./{}".format(
kwargs["options"] = ChromeOptions() ChromeDriverManager(*args, **kwargs).executable_path
instance = object.__new__(_Chrome) )
instance.__init__(*args, **kwargs) if not kwargs.get("options"):
kwargs["options"] = ChromeOptions()
instance._orig_get = instance.get instance = object.__new__(_Chrome)
instance.__init__(*args, **kwargs)
def _get_wrapped(*args, **kwargs):
if instance.execute_script("return navigator.webdriver"): instance._orig_get = instance.get
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument", def _get_wrapped(*args, **kwargs):
{ if instance.execute_script("return navigator.webdriver"):
"source": """ instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
Object.defineProperty(window, 'navigator', { {
value: new Proxy(navigator, { "source": """
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) => Object.defineProperty(window, 'navigator', {
key === 'webdriver' value: new Proxy(navigator, {
? undefined has: (target, key) => (key === 'webdriver' ? false : key in target),
: typeof target[key] === 'function' get: (target, key) =>
? target[key].bind(target) key === 'webdriver'
: target[key] ? undefined
}) : typeof target[key] === 'function'
}); ? target[key].bind(target)
: target[key]
})
""" });
},
)
return instance._orig_get(*args, **kwargs) """
},
instance.get = _get_wrapped )
instance.get = _get_wrapped return instance._orig_get(*args, **kwargs)
instance.get = _get_wrapped
instance.get = _get_wrapped
original_user_agent_string = instance.execute_script( instance.get = _get_wrapped
"return navigator.userAgent" instance.get = _get_wrapped
)
instance.execute_cdp_cmd( original_user_agent_string = instance.execute_script(
"Network.setUserAgentOverride", "return navigator.userAgent"
{ )
"userAgent": original_user_agent_string.replace("Headless", ""), instance.execute_cdp_cmd(
}, "Network.setUserAgentOverride",
) {
if emulate_touch: "userAgent": original_user_agent_string.replace("Headless", ""),
instance.execute_cdp_cmd( },
"Page.addScriptToEvaluateOnNewDocument", )
{ if emulate_touch:
"source": """ instance.execute_cdp_cmd(
Object.defineProperty(navigator, 'maxTouchPoints', { "Page.addScriptToEvaluateOnNewDocument",
get: () => 1 {
})""" "source": """
}, Object.defineProperty(navigator, 'maxTouchPoints', {
) get: () => 1
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})") })"""
return instance },
)
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})")
class ChromeOptions: return instance
def __new__(cls, *args, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install() class ChromeOptions:
if not ChromeDriverManager.selenium_patched: def __new__(cls, *args, **kwargs):
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver() if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
instance = object.__new__(_ChromeOptions) if not ChromeDriverManager.selenium_patched:
instance.__init__() ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
instance.add_argument("start-maximized")
instance.add_experimental_option("excludeSwitches", ["enable-automation"]) instance = object.__new__(_ChromeOptions)
instance.add_argument("--disable-blink-features=AutomationControlled") instance.__init__()
return instance instance.add_argument("start-maximized")
instance.add_experimental_option("excludeSwitches", ["enable-automation"])
instance.add_argument("--disable-blink-features=AutomationControlled")
class ChromeDriverManager(object): return instance
installed = False
selenium_patched = False
target_version = None class ChromeDriverManager(object):
installed = False
DL_BASE = "https://chromedriver.storage.googleapis.com/" selenium_patched = False
target_version = None
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
DL_BASE = "https://chromedriver.storage.googleapis.com/"
_platform = sys.platform
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
if TARGET_VERSION:
# use global if set _platform = sys.platform
self.target_version = TARGET_VERSION
if TARGET_VERSION:
if target_version: # use global if set
# use explicitly passed target self.target_version = TARGET_VERSION
self.target_version = target_version # user override
if target_version:
if not self.target_version: # use explicitly passed target
# none of the above (default) and just get current version self.target_version = target_version # user override
self.target_version = self.get_release_version_number().version[
0 if not self.target_version:
] # only major version int # none of the above (default) and just get current version
self.target_version = self.get_release_version_number().version[
self._base = base_ = "chromedriver{}" 0
] # only major version int
exe_name = self._base
if _platform in ("win32",): self._base = base_ = "chromedriver{}"
exe_name = base_.format(".exe")
if _platform in ("linux",): exe_name = self._base
_platform += "64" if _platform in ("win32",):
exe_name = exe_name.format("") exe_name = base_.format(".exe")
if _platform in ("darwin",): if _platform in ("linux",):
_platform = "mac64" _platform += "64"
exe_name = exe_name.format("") exe_name = exe_name.format("")
self.platform = _platform if _platform in ("darwin",):
self.executable_path = executable_path or exe_name _platform = "mac64"
self._exe_name = exe_name exe_name = exe_name.format("")
self.platform = _platform
def patch_selenium_webdriver(self_): self.executable_path = executable_path or exe_name
""" self._exe_name = exe_name
Patches selenium package Chrome, ChromeOptions classes for current session
def patch_selenium_webdriver(self_):
:return: """
""" Patches selenium package Chrome, ChromeOptions classes for current session
import selenium.webdriver.chrome.service
import selenium.webdriver :return:
"""
selenium.webdriver.Chrome = Chrome import selenium.webdriver.chrome.service
selenium.webdriver.ChromeOptions = ChromeOptions import selenium.webdriver
logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
self_.__class__.selenium_patched = True selenium.webdriver.Chrome = Chrome
selenium.webdriver.ChromeOptions = ChromeOptions
def install(self, patch_selenium=True): logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
""" self_.__class__.selenium_patched = True
Initialize the patch
def install(self, patch_selenium=True):
This will: """
download chromedriver if not present Initialize the patch
patch the downloaded chromedriver
patch selenium package if <patch_selenium> is True (default) This will:
download chromedriver if not present
:param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session) patch the downloaded chromedriver
:return: patch selenium package if <patch_selenium> is True (default)
"""
if not os.path.exists(self.executable_path): :param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session)
self.fetch_chromedriver() :return:
if not self.__class__.installed: """
if self.patch_binary(): if not os.path.exists(self.executable_path):
self.__class__.installed = True self.fetch_chromedriver()
if not self.__class__.installed:
if patch_selenium: if self.patch_binary():
self.patch_selenium_webdriver() self.__class__.installed = True
def get_release_version_number(self): if patch_selenium:
""" self.patch_selenium_webdriver()
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
def get_release_version_number(self):
:return: version string """
""" Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
path = (
"LATEST_RELEASE" :return: version string
if not self.target_version """
else f"LATEST_RELEASE_{self.target_version}" path = (
) "LATEST_RELEASE"
return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode()) if not self.target_version
else f"LATEST_RELEASE_{self.target_version}"
def fetch_chromedriver(self): )
""" return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode())
Downloads ChromeDriver from source and unpacks the executable
def fetch_chromedriver(self):
:return: on success, name of the unpacked executable """
""" Downloads ChromeDriver from source and unpacks the executable
base_ = self._base
zip_name = base_.format(".zip") :return: on success, name of the unpacked executable
ver = self.get_release_version_number().vstring """
if os.path.exists(self.executable_path): base_ = self._base
return self.executable_path zip_name = base_.format(".zip")
urlretrieve( ver = self.get_release_version_number().vstring
f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip", if os.path.exists(self.executable_path):
filename=zip_name, return self.executable_path
) urlretrieve(
with zipfile.ZipFile(zip_name) as zf: f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip",
zf.extract(self._exe_name) filename=zip_name,
os.remove(zip_name) )
if sys.platform != "win32": with zipfile.ZipFile(zip_name) as zf:
os.chmod(self._exe_name, 0o755) zf.extract(self._exe_name)
return self._exe_name os.remove(zip_name)
if sys.platform != "win32":
@staticmethod os.chmod(self._exe_name, 0o755)
def random_cdc(): return self._exe_name
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4]) @staticmethod
cdc[2] = cdc[0] def random_cdc():
cdc[3] = "_" cdc = random.choices(string.ascii_lowercase, k=26)
return "".join(cdc).encode() cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
def patch_binary(self): cdc[3] = "_"
""" return "".join(cdc).encode()
Patches the ChromeDriver binary
def patch_binary(self):
:return: False on failure, binary name on success """
""" Patches the ChromeDriver binary
linect = 0
replacement = self.random_cdc() :return: False on failure, binary name on success
with io.open(self.executable_path, "r+b") as fh: """
for line in iter(lambda: fh.readline(), b""): linect = 0
if b"cdc_" in line: replacement = self.random_cdc()
fh.seek(-len(line), 1) with io.open(self.executable_path, "r+b") as fh:
newline = re.sub(b"cdc_.{22}", replacement, line) for line in iter(lambda: fh.readline(), b""):
fh.write(newline) if b"cdc_" in line:
linect += 1 fh.seek(-len(line), 1)
return linect newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
def install(executable_path=None, target_version=None, *args, **kwargs): return linect
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()
def install(executable_path=None, target_version=None, *args, **kwargs):
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()

View File

@ -1,112 +1,112 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import json import json
import logging import logging
from collections.abc import Mapping, Sequence
import requests
import requests import websockets
import websockets
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class CDPObject(dict): class CDPObject(dict):
def __init__(self, *a, **k): def __init__(self, *a, **k):
super().__init__(*a, **k) super().__init__(*a, **k)
self.__dict__ = self self.__dict__ = self
for k in self.__dict__: for k in self.__dict__:
if isinstance(self.__dict__[k], dict): if isinstance(self.__dict__[k], dict):
self.__dict__[k] = CDPObject(self.__dict__[k]) self.__dict__[k] = CDPObject(self.__dict__[k])
elif isinstance(self.__dict__[k], list): elif isinstance(self.__dict__[k], list):
for i in range(len(self.__dict__[k])): for i in range(len(self.__dict__[k])):
if isinstance(self.__dict__[k][i], dict): if isinstance(self.__dict__[k][i], dict):
self.__dict__[k][i] = CDPObject(self) self.__dict__[k][i] = CDPObject(self)
def __repr__(self): def __repr__(self):
tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)" tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)"
return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items())) return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items()))
class PageElement(CDPObject): class PageElement(CDPObject):
pass pass
class CDP: class CDP:
log = logging.getLogger("CDP") log = logging.getLogger("CDP")
endpoints = CDPObject( endpoints = CDPObject(
{ {
"json": "/json", "json": "/json",
"protocol": "/json/protocol", "protocol": "/json/protocol",
"list": "/json/list", "list": "/json/list",
"new": "/json/new?{url}", "new": "/json/new?{url}",
"activate": "/json/activate/{id}", "activate": "/json/activate/{id}",
"close": "/json/close/{id}", "close": "/json/close/{id}",
} }
) )
def __init__(self, options: "ChromeOptions"): # noqa def __init__(self, options: "ChromeOptions"): # noqa
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":")) self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
self._reqid = 0 self._reqid = 0
self._session = requests.Session() self._session = requests.Session()
self._last_resp = None self._last_resp = None
self._last_json = None self._last_json = None
resp = self.get(self.endpoints.json) # noqa resp = self.get(self.endpoints.json) # noqa
self.sessionId = resp[0]["id"] self.sessionId = resp[0]["id"]
self.wsurl = resp[0]["webSocketDebuggerUrl"] self.wsurl = resp[0]["webSocketDebuggerUrl"]
def tab_activate(self, id=None): def tab_activate(self, id=None):
if not id: if not id:
active_tab = self.tab_list()[0] active_tab = self.tab_list()[0]
id = active_tab.id # noqa id = active_tab.id # noqa
self.wsurl = active_tab.webSocketDebuggerUrl # noqa self.wsurl = active_tab.webSocketDebuggerUrl # noqa
return self.post(self.endpoints["activate"].format(id=id)) return self.post(self.endpoints["activate"].format(id=id))
def tab_list(self): def tab_list(self):
retval = self.get(self.endpoints["list"]) retval = self.get(self.endpoints["list"])
return [PageElement(o) for o in retval] return [PageElement(o) for o in retval]
def tab_new(self, url): def tab_new(self, url):
return self.post(self.endpoints["new"].format(url=url)) return self.post(self.endpoints["new"].format(url=url))
def tab_close_last_opened(self): def tab_close_last_opened(self):
sessions = self.tab_list() sessions = self.tab_list()
opentabs = [s for s in sessions if s["type"] == "page"] opentabs = [s for s in sessions if s["type"] == "page"]
return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"])) return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"]))
async def send(self, method: str, params: dict): async def send(self, method: str, params: dict):
self._reqid += 1 self._reqid += 1
async with websockets.connect(self.wsurl) as ws: async with websockets.connect(self.wsurl) as ws:
await ws.send( await ws.send(
json.dumps({"method": method, "params": params, "id": self._reqid}) json.dumps({"method": method, "params": params, "id": self._reqid})
) )
self._last_resp = await ws.recv() self._last_resp = await ws.recv()
self._last_json = json.loads(self._last_resp) self._last_json = json.loads(self._last_resp)
self.log.info(self._last_json) self.log.info(self._last_json)
def get(self, uri): def get(self, uri):
resp = self._session.get(self.server_addr + uri) resp = self._session.get(self.server_addr + uri)
try: try:
self._last_resp = resp self._last_resp = resp
self._last_json = resp.json() self._last_json = resp.json()
except Exception: except Exception:
return return
else: else:
return self._last_json return self._last_json
def post(self, uri, data: dict = None): def post(self, uri, data: dict = None):
if not data: if not data:
data = {} data = {}
resp = self._session.post(self.server_addr + uri, json=data) resp = self._session.post(self.server_addr + uri, json=data)
try: try:
self._last_resp = resp self._last_resp = resp
self._last_json = resp.json() self._last_json = resp.json()
except Exception: except Exception:
return self._last_resp return self._last_resp
@property @property
def last_json(self): def last_json(self):
return self._last_json return self._last_json

View File

@ -1,191 +1,190 @@
import asyncio import asyncio
import logging from collections.abc import Mapping
import time from collections.abc import Sequence
import traceback from functools import wraps
from collections.abc import Mapping import logging
from collections.abc import Sequence import threading
from typing import Any import time
from typing import Awaitable import traceback
from typing import Callable from typing import Any
from typing import List from typing import Awaitable
from typing import Optional from typing import Callable
from contextlib import ExitStack from typing import List
import threading from typing import Optional
from functools import wraps, partial
class Structure(dict):
class Structure(dict): """
""" This is a dict-like object structure, which you should subclass
This is a dict-like object structure, which you should subclass Only properties defined in the class context are used on initialization.
Only properties defined in the class context are used on initialization.
See example
See example """
"""
_store = {}
_store = {}
def __init__(self, *a, **kw):
def __init__(self, *a, **kw): """
""" Instantiate a new instance.
Instantiate a new instance.
:param a:
:param a: :param kw:
:param kw: """
"""
super().__init__()
super().__init__()
# auxiliar dict
# auxiliar dict d = dict(*a, **kw)
d = dict(*a, **kw) for k, v in d.items():
for k, v in d.items(): if isinstance(v, Mapping):
if isinstance(v, Mapping): self[k] = self.__class__(v)
self[k] = self.__class__(v) elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): self[k] = [self.__class__(i) for i in v]
self[k] = [self.__class__(i) for i in v] else:
else: self[k] = v
self[k] = v super().__setattr__("__dict__", self)
super().__setattr__("__dict__", self)
def __getattr__(self, item):
def __getattr__(self, item): return getattr(super(), item)
return getattr(super(), item)
def __getitem__(self, item):
def __getitem__(self, item): return super().__getitem__(item)
return super().__getitem__(item)
def __setattr__(self, key, value):
def __setattr__(self, key, value): self.__setitem__(key, value)
self.__setitem__(key, value)
def __setitem__(self, key, value):
def __setitem__(self, key, value): super().__setitem__(key, value)
super().__setitem__(key, value)
def update(self, *a, **kw):
def update(self, *a, **kw): super().update(*a, **kw)
super().update(*a, **kw)
def __eq__(self, other):
def __eq__(self, other): return frozenset(other.items()) == frozenset(self.items())
return frozenset(other.items()) == frozenset(self.items())
def __hash__(self):
def __hash__(self): return hash(frozenset(self.items()))
return hash(frozenset(self.items()))
@classmethod
@classmethod def __init_subclass__(cls, **kwargs):
def __init_subclass__(cls, **kwargs): cls._store = {}
cls._store = {}
def _normalize_strings(self):
def _normalize_strings(self): for k, v in self.copy().items():
for k, v in self.copy().items(): if isinstance(v, (str)):
if isinstance(v, (str)): self[k] = v.strip()
self[k] = v.strip()
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None): def wrapper(func):
def wrapper(func): @wraps(func)
@wraps(func) def wrapped(*args, **kwargs):
def wrapped(*args, **kwargs): def function_reached_timeout():
def function_reached_timeout(): if on_timeout:
if on_timeout: on_timeout(func)
on_timeout(func) else:
else: raise TimeoutError("function call timed out")
raise TimeoutError("function call timed out")
t = threading.Timer(interval=seconds, function=function_reached_timeout)
t = threading.Timer(interval=seconds, function=function_reached_timeout) t.start()
t.start() try:
try: return func(*args, **kwargs)
return func(*args, **kwargs) except:
except: t.cancel()
t.cancel() raise
raise finally:
finally: t.cancel()
t.cancel()
return wrapped
return wrapped
return wrapper
return wrapper
def test():
def test(): import sys, os
import sys, os
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) import undetected_chromedriver as uc
import undetected_chromedriver as uc import threading
import threading
def collector(
def collector( driver: uc.Chrome,
driver: uc.Chrome, stop_event: threading.Event,
stop_event: threading.Event, on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None, listen_events: Sequence = ("browser", "network", "performance"),
listen_events: Sequence = ("browser", "network", "performance"), ):
): def threaded(driver, stop_event, on_event_coro):
def threaded(driver, stop_event, on_event_coro): async def _ensure_service_started():
async def _ensure_service_started(): while (
while ( getattr(driver, "service", False)
getattr(driver, "service", False) and getattr(driver.service, "process", False)
and getattr(driver.service, "process", False) and driver.service.process.poll()
and driver.service.process.poll() ):
): print("waiting for driver service to come back on")
print("waiting for driver service to come back on") await asyncio.sleep(0.05)
await asyncio.sleep(0.05) # await asyncio.sleep(driver._delay or .25)
# await asyncio.sleep(driver._delay or .25)
async def get_log_lines(typ):
async def get_log_lines(typ): await _ensure_service_started()
await _ensure_service_started() return driver.get_log(typ)
return driver.get_log(typ)
async def looper():
async def looper(): while not stop_event.is_set():
while not stop_event.is_set(): log_lines = []
log_lines = [] try:
try: for _ in listen_events:
for _ in listen_events: try:
try: log_lines += await get_log_lines(_)
log_lines += await get_log_lines(_) except:
except: if logging.getLogger().getEffectiveLevel() <= 10:
if logging.getLogger().getEffectiveLevel() <= 10: traceback.print_exc()
traceback.print_exc() continue
continue if log_lines and on_event_coro:
if log_lines and on_event_coro: await on_event_coro(log_lines)
await on_event_coro(log_lines) except Exception as e:
except Exception as e: if logging.getLogger().getEffectiveLevel() <= 10:
if logging.getLogger().getEffectiveLevel() <= 10: traceback.print_exc()
traceback.print_exc()
loop = asyncio.new_event_loop()
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
asyncio.set_event_loop(loop) loop.run_until_complete(looper())
loop.run_until_complete(looper())
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro)) t.start()
t.start()
async def on_event(data):
async def on_event(data): print("on_event")
print("on_event") print("data:", data)
print("data:", data)
def func_called(fn):
def func_called(fn): def wrapped(*args, **kwargs):
def wrapped(*args, **kwargs): print(
print( "func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs) )
) while driver.service.process and driver.service.process.poll() is not None:
while driver.service.process and driver.service.process.poll() is not None: time.sleep(0.1)
time.sleep(0.1) res = fn(*args, **kwargs)
res = fn(*args, **kwargs) print("func completed! (result: %s)" % res)
print("func completed! (result: %s)" % res) return res
return res
return wrapped
return wrapped
logging.basicConfig(level=10)
logging.basicConfig(level=10)
options = uc.ChromeOptions()
options = uc.ChromeOptions() options.set_capability(
options.set_capability( "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"} )
)
driver = uc.Chrome(version_main=96, options=options)
driver = uc.Chrome(version_main=96, options=options)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request) driver.command_executor._request = func_called(driver.command_executor._request)
driver.command_executor._request = func_called(driver.command_executor._request) collector_stop = threading.Event()
collector_stop = threading.Event() collector(driver, collector_stop, on_event)
collector(driver, collector_stop, on_event)
driver.get("https://nowsecure.nl")
driver.get("https://nowsecure.nl")
time.sleep(10)
time.sleep(10)
driver.quit()
driver.quit()

View File

@ -1,77 +1,75 @@
import multiprocessing import atexit
import os import logging
import platform import multiprocessing
import sys import os
from subprocess import PIPE import platform
from subprocess import Popen import signal
import atexit from subprocess import PIPE
import traceback from subprocess import Popen
import logging import sys
import signal
CREATE_NEW_PROCESS_GROUP = 0x00000200 CREATE_NEW_PROCESS_GROUP = 0x00000200
DETACHED_PROCESS = 0x00000008 DETACHED_PROCESS = 0x00000008
REGISTERED = [] REGISTERED = []
def start_detached(executable, *args): def start_detached(executable, *args):
""" """
Starts a fully independent subprocess (with no parent) Starts a fully independent subprocess (with no parent)
:param executable: executable :param executable: executable
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...] :param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
:return: pid of the grandchild process :return: pid of the grandchild process
""" """
# create pipe # create pipe
reader, writer = multiprocessing.Pipe(False) reader, writer = multiprocessing.Pipe(False)
# do not keep reference # do not keep reference
process = multiprocessing.Process( multiprocessing.Process(
target=_start_detached, target=_start_detached,
args=(executable, *args), args=(executable, *args),
kwargs={"writer": writer}, kwargs={"writer": writer},
daemon=True, daemon=True,
) ).start()
process.start() # receive pid from pipe
process.join() pid = reader.recv()
# receive pid from pipe REGISTERED.append(pid)
pid = reader.recv() # close pipes
REGISTERED.append(pid) writer.close()
# close pipes reader.close()
writer.close()
reader.close() return pid
return pid
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None): # configure launch
kwargs = {}
# configure launch if platform.system() == "Windows":
kwargs = {} kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
if platform.system() == "Windows": elif sys.version_info < (3, 2):
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP) # assume posix
elif sys.version_info < (3, 2): kwargs.update(preexec_fn=os.setsid)
# assume posix else: # Python 3.2+ and Unix
kwargs.update(preexec_fn=os.setsid) kwargs.update(start_new_session=True)
else: # Python 3.2+ and Unix
kwargs.update(start_new_session=True) # run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
# run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs) # send pid to pipe
writer.send(p.pid)
# send pid to pipe sys.exit()
writer.send(p.pid)
sys.exit()
def _cleanup():
for pid in REGISTERED:
def _cleanup(): try:
for pid in REGISTERED: logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
try: os.kill(pid, signal.SIGTERM)
logging.getLogger(__name__).debug("cleaning up pid %d " % pid) except: # noqa
os.kill(pid, signal.SIGTERM) pass
except: # noqa
pass
atexit.register(_cleanup)
atexit.register(_cleanup)

View File

@ -1,70 +1,86 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import json import json
import os import os
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
class ChromeOptions(_ChromiumOptions): class ChromeOptions(_ChromiumOptions):
_session = None _session = None
_user_data_dir = None _user_data_dir = None
@property @property
def user_data_dir(self): def user_data_dir(self):
return self._user_data_dir return self._user_data_dir
@user_data_dir.setter @user_data_dir.setter
def user_data_dir(self, path: str): def user_data_dir(self, path: str):
""" """
Sets the browser profile folder to use, or creates a new profile Sets the browser profile folder to use, or creates a new profile
at given <path>. at given <path>.
Parameters Parameters
---------- ----------
path: str path: str
the path to a chrome profile folder the path to a chrome profile folder
if it does not exist, a new profile will be created at given location if it does not exist, a new profile will be created at given location
""" """
apath = os.path.abspath(path) apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath) self._user_data_dir = os.path.normpath(apath)
@staticmethod @staticmethod
def _undot_key(key, value): def _undot_key(key, value):
"""turn a (dotted key, value) into a proper nested dict""" """turn a (dotted key, value) into a proper nested dict"""
if "." in key: if "." in key:
key, rest = key.split(".", 1) key, rest = key.split(".", 1)
value = ChromeOptions._undot_key(rest, value) value = ChromeOptions._undot_key(rest, value)
return {key: value} return {key: value}
def handle_prefs(self, user_data_dir): @staticmethod
prefs = self.experimental_options.get("prefs") def _merge_nested(a, b):
if prefs: """
merges b into a
user_data_dir = user_data_dir or self._user_data_dir leaf values in a are overwritten with values from b
default_path = os.path.join(user_data_dir, "Default") """
os.makedirs(default_path, exist_ok=True) for key in b:
if key in a:
# undot prefs dict keys if isinstance(a[key], dict) and isinstance(b[key], dict):
undot_prefs = {} ChromeOptions._merge_nested(a[key], b[key])
for key, value in prefs.items(): continue
undot_prefs.update(self._undot_key(key, value)) a[key] = b[key]
return a
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file): def handle_prefs(self, user_data_dir):
with open(prefs_file, encoding="latin1", mode="r") as f: prefs = self.experimental_options.get("prefs")
undot_prefs.update(json.load(f)) if prefs:
with open(prefs_file, encoding="latin1", mode="w") as f: user_data_dir = user_data_dir or self._user_data_dir
json.dump(undot_prefs, f) default_path = os.path.join(user_data_dir, "Default")
os.makedirs(default_path, exist_ok=True)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"] # undot prefs dict keys
undot_prefs = {}
@classmethod for key, value in prefs.items():
def from_options(cls, options): undot_prefs = self._merge_nested(
o = cls() undot_prefs, self._undot_key(key, value)
o.__dict__.update(options.__dict__) )
return o
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file):
with open(prefs_file, encoding="latin1", mode="r") as f:
undot_prefs = self._merge_nested(json.load(f), undot_prefs)
with open(prefs_file, encoding="latin1", mode="w") as f:
json.dump(undot_prefs, f)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"]
@classmethod
def from_options(cls, options):
o = cls()
o.__dict__.update(options.__dict__)
return o

View File

@ -1,276 +1,279 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import io from distutils.version import LooseVersion
import logging import io
import os import logging
import random import os
import re import random
import string import re
import sys import secrets
import time import string
import zipfile import sys
from distutils.version import LooseVersion import time
from urllib.request import urlopen, urlretrieve from urllib.request import urlopen
import secrets from urllib.request import urlretrieve
import zipfile
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux"))
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2"))
class Patcher(object):
url_repo = "https://chromedriver.storage.googleapis.com" class Patcher(object):
zip_name = "chromedriver_%s.zip" url_repo = "https://chromedriver.storage.googleapis.com"
exe_name = "chromedriver%s" zip_name = "chromedriver_%s.zip"
exe_name = "chromedriver%s"
platform = sys.platform
if platform.endswith("win32"): platform = sys.platform
zip_name %= "win32" if platform.endswith("win32"):
exe_name %= ".exe" zip_name %= "win32"
if platform.endswith("linux"): exe_name %= ".exe"
zip_name %= "linux64" if platform.endswith(("linux", "linux2")):
exe_name %= "" zip_name %= "linux64"
if platform.endswith("darwin"): exe_name %= ""
zip_name %= "mac64" if platform.endswith("darwin"):
exe_name %= "" zip_name %= "mac64"
exe_name %= ""
if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver" if platform.endswith("win32"):
elif platform.startswith("linux"): d = "~/appdata/roaming/undetected_chromedriver"
d = "~/.local/share/undetected_chromedriver" elif "LAMBDA_TASK_ROOT" in os.environ:
elif platform.endswith("darwin"): d = "/tmp/undetected_chromedriver"
d = "~/Library/Application Support/undetected_chromedriver" elif platform.startswith(("linux","linux2")):
else: d = "~/.local/share/undetected_chromedriver"
d = "~/.undetected_chromedriver" elif platform.endswith("darwin"):
data_path = os.path.abspath(os.path.expanduser(d)) d = "~/Library/Application Support/undetected_chromedriver"
else:
def __init__(self, executable_path=None, force=False, version_main: int = 0): d = "~/.undetected_chromedriver"
""" data_path = os.path.abspath(os.path.expanduser(d))
Args: def __init__(self, executable_path=None, force=False, version_main: int = 0):
executable_path: None = automatic """
a full file path to the chromedriver executable
force: False Args:
terminate processes which are holding lock executable_path: None = automatic
version_main: 0 = auto a full file path to the chromedriver executable
specify main chrome version (rounded, ex: 82) force: False
""" terminate processes which are holding lock
version_main: 0 = auto
self.force = force specify main chrome version (rounded, ex: 82)
self.executable_path = None """
prefix = secrets.token_hex(8)
self.force = force
if not os.path.exists(self.data_path): self.executable_path = None
os.makedirs(self.data_path, exist_ok=True) prefix = secrets.token_hex(8)
if not executable_path: if not os.path.exists(self.data_path):
self.executable_path = os.path.join( os.makedirs(self.data_path, exist_ok=True)
self.data_path, "_".join([prefix, self.exe_name])
) if not executable_path:
self.executable_path = os.path.join(
if not IS_POSIX: self.data_path, "_".join([prefix, self.exe_name])
if executable_path: )
if not executable_path[-4:] == ".exe":
executable_path += ".exe" if not IS_POSIX:
if executable_path:
self.zip_path = os.path.join(self.data_path, prefix) if not executable_path[-4:] == ".exe":
executable_path += ".exe"
if not executable_path:
self.executable_path = os.path.abspath( self.zip_path = os.path.join(self.data_path, prefix)
os.path.join(".", self.executable_path)
) if not executable_path:
self.executable_path = os.path.abspath(
self._custom_exe_path = False os.path.join(".", self.executable_path)
)
if executable_path:
self._custom_exe_path = True self._custom_exe_path = False
self.executable_path = executable_path
self.version_main = version_main if executable_path:
self.version_full = None self._custom_exe_path = True
self.executable_path = executable_path
def auto(self, executable_path=None, force=False, version_main=None): self.version_main = version_main
"""""" self.version_full = None
if executable_path:
self.executable_path = executable_path def auto(self, executable_path=None, force=False, version_main=None):
self._custom_exe_path = True """"""
if executable_path:
if self._custom_exe_path: self.executable_path = executable_path
ispatched = self.is_binary_patched(self.executable_path) self._custom_exe_path = True
if not ispatched:
return self.patch_exe() if self._custom_exe_path:
else: ispatched = self.is_binary_patched(self.executable_path)
return if not ispatched:
return self.patch_exe()
if version_main: else:
self.version_main = version_main return
if force is True:
self.force = force if version_main:
self.version_main = version_main
try: if force is True:
os.unlink(self.executable_path) self.force = force
except PermissionError:
if self.force: try:
self.force_kill_instances(self.executable_path) os.unlink(self.executable_path)
return self.auto(force=not self.force) except PermissionError:
try: if self.force:
if self.is_binary_patched(): self.force_kill_instances(self.executable_path)
# assumes already running AND patched return self.auto(force=not self.force)
return True try:
except PermissionError: if self.is_binary_patched():
pass # assumes already running AND patched
# return False return True
except FileNotFoundError: except PermissionError:
pass pass
# return False
release = self.fetch_release_number() except FileNotFoundError:
self.version_main = release.version[0] pass
self.version_full = release
self.unzip_package(self.fetch_package()) release = self.fetch_release_number()
return self.patch() self.version_main = release.version[0]
self.version_full = release
def patch(self): self.unzip_package(self.fetch_package())
self.patch_exe() return self.patch()
return self.is_binary_patched()
def patch(self):
def fetch_release_number(self): self.patch_exe()
""" return self.is_binary_patched()
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string def fetch_release_number(self):
:rtype: LooseVersion """
""" Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
path = "/latest_release" :return: version string
if self.version_main: :rtype: LooseVersion
path += f"_{self.version_main}" """
path = path.upper() path = "/latest_release"
logger.debug("getting release number from %s" % path) if self.version_main:
return LooseVersion(urlopen(self.url_repo + path).read().decode()) path += f"_{self.version_main}"
path = path.upper()
def parse_exe_version(self): logger.debug("getting release number from %s" % path)
with io.open(self.executable_path, "rb") as f: return LooseVersion(urlopen(self.url_repo + path).read().decode())
for line in iter(lambda: f.readline(), b""):
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) def parse_exe_version(self):
if match: with io.open(self.executable_path, "rb") as f:
return LooseVersion(match[1].decode()) for line in iter(lambda: f.readline(), b""):
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
def fetch_package(self): if match:
""" return LooseVersion(match[1].decode())
Downloads ChromeDriver from source
def fetch_package(self):
:return: path to downloaded file """
""" Downloads ChromeDriver from source
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
logger.debug("downloading from %s" % u) :return: path to downloaded file
# return urlretrieve(u, filename=self.data_path)[0] """
return urlretrieve(u)[0] u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
logger.debug("downloading from %s" % u)
def unzip_package(self, fp): # return urlretrieve(u, filename=self.data_path)[0]
""" return urlretrieve(u)[0]
Does what it says
def unzip_package(self, fp):
:return: path to unpacked executable """
""" Does what it says
logger.debug("unzipping %s" % fp)
try: :return: path to unpacked executable
os.unlink(self.zip_path) """
except (FileNotFoundError, OSError): logger.debug("unzipping %s" % fp)
pass try:
os.unlink(self.zip_path)
os.makedirs(self.zip_path, mode=0o755, exist_ok=True) except (FileNotFoundError, OSError):
with zipfile.ZipFile(fp, mode="r") as zf: pass
zf.extract(self.exe_name, self.zip_path)
os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
os.remove(fp) with zipfile.ZipFile(fp, mode="r") as zf:
os.rmdir(self.zip_path) zf.extract(self.exe_name, self.zip_path)
os.chmod(self.executable_path, 0o755) os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path)
return self.executable_path os.remove(fp)
os.rmdir(self.zip_path)
@staticmethod os.chmod(self.executable_path, 0o755)
def force_kill_instances(exe_name): return self.executable_path
"""
kills running instances. @staticmethod
:param: executable name to kill, may be a path as well def force_kill_instances(exe_name):
"""
:return: True on success else False kills running instances.
""" :param: executable name to kill, may be a path as well
exe_name = os.path.basename(exe_name)
if IS_POSIX: :return: True on success else False
r = os.system("kill -f -9 $(pidof %s)" % exe_name) """
else: exe_name = os.path.basename(exe_name)
r = os.system("taskkill /f /im %s" % exe_name) if IS_POSIX:
return not r r = os.system("kill -f -9 $(pidof %s)" % exe_name)
else:
@staticmethod r = os.system("taskkill /f /im %s" % exe_name)
def gen_random_cdc(): return not r
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4]) @staticmethod
cdc[2] = cdc[0] def gen_random_cdc():
cdc[3] = "_" cdc = random.choices(string.ascii_lowercase, k=26)
return "".join(cdc).encode() cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
def is_binary_patched(self, executable_path=None): cdc[3] = "_"
"""simple check if executable is patched. return "".join(cdc).encode()
:return: False if not patched, else True def is_binary_patched(self, executable_path=None):
""" """simple check if executable is patched.
executable_path = executable_path or self.executable_path
with io.open(executable_path, "rb") as fh: :return: False if not patched, else True
for line in iter(lambda: fh.readline(), b""): """
if b"cdc_" in line: executable_path = executable_path or self.executable_path
return False with io.open(executable_path, "rb") as fh:
else: for line in iter(lambda: fh.readline(), b""):
return True if b"cdc_" in line:
return False
def patch_exe(self): else:
""" return True
Patches the ChromeDriver binary
def patch_exe(self):
:return: False on failure, binary name on success """
""" Patches the ChromeDriver binary
logger.info("patching driver executable %s" % self.executable_path)
:return: False on failure, binary name on success
linect = 0 """
replacement = self.gen_random_cdc() logger.info("patching driver executable %s" % self.executable_path)
with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""): linect = 0
if b"cdc_" in line: replacement = self.gen_random_cdc()
fh.seek(-len(line), 1) with io.open(self.executable_path, "r+b") as fh:
newline = re.sub(b"cdc_.{22}", replacement, line) for line in iter(lambda: fh.readline(), b""):
fh.write(newline) if b"cdc_" in line:
linect += 1 fh.seek(-len(line), 1)
return linect newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
def __repr__(self): linect += 1
return "{0:s}({1:s})".format( return linect
self.__class__.__name__,
self.executable_path, def __repr__(self):
) return "{0:s}({1:s})".format(
self.__class__.__name__,
def __del__(self): self.executable_path,
)
if self._custom_exe_path:
# if the driver binary is specified by user def __del__(self):
# we assume it is important enough to not delete it
return if self._custom_exe_path:
else: # if the driver binary is specified by user
timeout = 3 # stop trying after this many seconds # we assume it is important enough to not delete it
t = time.monotonic() return
while True: else:
now = time.monotonic() timeout = 3 # stop trying after this many seconds
if now - t > timeout: t = time.monotonic()
# we don't want to wait until the end of time while True:
logger.debug( now = time.monotonic()
"could not unlink %s in time (%d seconds)" if now - t > timeout:
% (self.executable_path, timeout) # we don't want to wait until the end of time
) logger.debug(
break "could not unlink %s in time (%d seconds)"
try: % (self.executable_path, timeout)
os.unlink(self.executable_path) )
logger.debug("successfully unlinked %s" % self.executable_path) break
break try:
except (OSError, RuntimeError, PermissionError): os.unlink(self.executable_path)
time.sleep(0.1) logger.debug("successfully unlinked %s" % self.executable_path)
continue break
except FileNotFoundError: except (OSError, RuntimeError, PermissionError):
break time.sleep(0.1)
continue
except FileNotFoundError:
break

View File

@ -1,37 +1,85 @@
import selenium.webdriver.remote.webelement from selenium.webdriver.common.by import By
import selenium.webdriver.remote.webelement
from typing import List
class WebElement(selenium.webdriver.remote.webelement.WebElement):
"""
Custom WebElement class which makes it easier to view elements when class WebElement(selenium.webdriver.remote.webelement.WebElement):
working in an interactive environment. def click_safe(self):
super().click()
standard webelement repr: self._parent.reconnect(0.1)
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
def children(
using this WebElement class: self, tag=None, recursive=False
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)> ) -> List[selenium.webdriver.remote.webelement.WebElement]:
"""
""" returns direct child elements of current element
:param tag: str, if supplied, returns <tag> nodes only
@property """
def attrs(self): script = "return [... arguments[0].children]"
if not hasattr(self, "_attrs"): if tag:
self._attrs = self._parent.execute_script( script += ".filter( node => node.tagName === '%s')" % tag.upper()
""" if recursive:
var items = {}; return list(_recursive_children(self, tag))
for (index = 0; index < arguments[0].attributes.length; ++index) return list(self._parent.execute_script(script, self))
{
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
}; class UCWebElement(WebElement):
return items; """
""", Custom WebElement class which makes it easier to view elements when
self, working in an interactive environment.
)
return self._attrs standard webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
def __repr__(self):
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()]) using this WebElement class:
if strattrs: <WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
strattrs = " " + strattrs
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>" """
def __init__(self, parent, id_):
super().__init__(parent, id_)
self._attrs = None
@property
def attrs(self):
if not self._attrs:
self._attrs = self._parent.execute_script(
"""
var items = {};
for (index = 0; index < arguments[0].attributes.length; ++index)
{
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
};
return items;
""",
self,
)
return self._attrs
def __repr__(self):
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
if strattrs:
strattrs = " " + strattrs
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
def _recursive_children(element, tag: str = None, _results=None):
"""
returns all children of <element> recursively
:param element: `WebElement` object.
find children below this <element>
:param tag: str = None.
if provided, return only <tag> elements. example: 'a', or 'img'
:param _results: do not use!
"""
results = _results or set()
for element in element.children():
if tag:
if element.tag_name == tag:
results.add(element)
else:
results.add(element)
results |= _recursive_children(element, tag, results)
return results