From b32d7b70bebfabe90e5c1348ae9795ba83dc5f04 Mon Sep 17 00:00:00 2001 From: ngosang Date: Sat, 24 Sep 2022 18:35:01 +0200 Subject: [PATCH] Fork undetected-chromedriver 3.1.5.post4 --- requirements.txt | 1 - src/undetected_chromedriver/__init__.py | 699 ++++++++++++++++++++++ src/undetected_chromedriver/_compat.py | 259 ++++++++ src/undetected_chromedriver/cdp.py | 112 ++++ src/undetected_chromedriver/devtool.py | 191 ++++++ src/undetected_chromedriver/dprocess.py | 75 +++ src/undetected_chromedriver/options.py | 70 +++ src/undetected_chromedriver/patcher.py | 276 +++++++++ src/undetected_chromedriver/reactor.py | 102 ++++ src/undetected_chromedriver/v2.py | 4 + src/undetected_chromedriver/webelement.py | 37 ++ 11 files changed, 1825 insertions(+), 1 deletion(-) create mode 100644 src/undetected_chromedriver/__init__.py create mode 100644 src/undetected_chromedriver/_compat.py create mode 100644 src/undetected_chromedriver/cdp.py create mode 100644 src/undetected_chromedriver/devtool.py create mode 100644 src/undetected_chromedriver/dprocess.py create mode 100644 src/undetected_chromedriver/options.py create mode 100644 src/undetected_chromedriver/patcher.py create mode 100644 src/undetected_chromedriver/reactor.py create mode 100644 src/undetected_chromedriver/v2.py create mode 100644 src/undetected_chromedriver/webelement.py diff --git a/requirements.txt b/requirements.txt index bb15839..1191618 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ bottle==0.12.23 waitress==2.1.2 selenium==4.4.3 -undetected-chromedriver==3.1.5.post4 func-timeout==4.3.5 xvfbwrapper==0.2.9 diff --git a/src/undetected_chromedriver/__init__.py b/src/undetected_chromedriver/__init__.py new file mode 100644 index 0000000..7926a9e --- /dev/null +++ b/src/undetected_chromedriver/__init__.py @@ -0,0 +1,699 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import subprocess + +""" + + 888 888 d8b + 888 888 Y8P + 888 888 + .d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888 +d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P" +888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888 +Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888 + "Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888 + +by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam) + +""" + + +__version__ = "3.1.5r4" + + +import json +import logging +import os +import re +import shutil +import sys +import tempfile +import time +import inspect +import threading + +import selenium.webdriver.chrome.service +import selenium.webdriver.chrome.webdriver +import selenium.webdriver.common.service +import selenium.webdriver.remote.webdriver + +from .cdp import CDP +from .options import ChromeOptions +from .patcher import IS_POSIX +from .patcher import Patcher +from .reactor import Reactor +from .dprocess import start_detached + +__all__ = ( + "Chrome", + "ChromeOptions", + "Patcher", + "Reactor", + "CDP", + "find_chrome_executable", +) + +logger = logging.getLogger("uc") +logger.setLevel(logging.getLogger().getEffectiveLevel()) + + +class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): + """ + + Controls the ChromeDriver and allows you to drive the browser. + + The webdriver file will be downloaded by this module automatically, + you do not need to specify this. however, you may if you wish. + + Attributes + ---------- + + Methods + ------- + + reconnect() + + this can be useful in case of heavy detection methods + -stops the chromedriver service which runs in the background + -starts the chromedriver service which runs in the background + -recreate session + + + start_session(capabilities=None, browser_profile=None) + + differentiates from the regular method in that it does not + require a capabilities argument. The capabilities are automatically + recreated from the options at creation time. + + -------------------------------------------------------------------------- + NOTE: + Chrome has everything included to work out of the box. + it does not `need` customizations. + any customizations MAY lead to trigger bot migitation systems. + + -------------------------------------------------------------------------- + """ + + _instances = set() + session_id = None + debug = False + + def __init__( + self, + options=None, + user_data_dir=None, + driver_executable_path=None, + browser_executable_path=None, + port=0, + enable_cdp_events=False, + service_args=None, + desired_capabilities=None, + advanced_elements=False, + service_log_path=None, + keep_alive=True, + log_level=0, + headless=False, + version_main=None, + patcher_force_close=False, + suppress_welcome=True, + use_subprocess=False, + debug=False, + **kw + ): + """ + Creates a new instance of the chrome driver. + + Starts the service and then creates new instance of chrome driver. + + Parameters + ---------- + + options: ChromeOptions, optional, default: None - automatic useful defaults + this takes an instance of ChromeOptions, mainly to customize browser behavior. + anything other dan the default, for example extensions or startup options + are not supported in case of failure, and can probably lowers your undetectability. + + + user_data_dir: str , optional, default: None (creates temp profile) + if user_data_dir is a path to a valid chrome profile directory, use it, + and turn off automatic removal mechanism at exit. + + driver_executable_path: str, optional, default: None(=downloads and patches new binary) + + browser_executable_path: str, optional, default: None - use find_chrome_executable + Path to the browser executable. + If not specified, make sure the executable's folder is in $PATH + + port: int, optional, default: 0 + port you would like the service to run, if left as 0, a free port will be found. + + enable_cdp_events: bool, default: False + :: currently for chrome only + this enables the handling of wire messages + when enabled, you can subscribe to CDP events by using: + + driver.add_cdp_listener("Network.dataReceived", yourcallback) + # yourcallback is an callable which accepts exactly 1 dict as parameter + + + service_args: list of str, optional, default: None + arguments to pass to the driver service + + desired_capabilities: dict, optional, default: None - auto from config + Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref". + + advanced_elements: bool, optional, default: False + makes it easier to recognize elements like you know them from html/browser inspection, especially when working + in an interactive environment + + default webelement repr: + + + advanced webelement repr + )> + + note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time. + + + service_log_path: str, optional, default: None + path to log information from the driver. + + keep_alive: bool, optional, default: True + Whether to configure ChromeRemoteConnection to use HTTP keep-alive. + + log_level: int, optional, default: adapts to python global log level + + headless: bool, optional, default: False + can also be specified in the options instance. + Specify whether you want to use the browser in headless mode. + warning: this lowers undetectability and not fully supported. + + version_main: int, optional, default: None (=auto) + if you, for god knows whatever reason, use + an older version of Chrome. You can specify it's full rounded version number + here. Example: 87 for all versions of 87 + + patcher_force_close: bool, optional, default: False + instructs the patcher to do whatever it can to access the chromedriver binary + if the file is locked, it will force shutdown all instances. + setting it is not recommended, unless you know the implications and think + you might need it. + + suppress_welcome: bool, optional , default: True + a "welcome" alert might show up on *nix-like systems asking whether you want to set + chrome as your default browser, and if you want to send even more data to google. + now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False. + Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception. + + use_subprocess: bool, optional , default: False, + + False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python + This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after + program exits or using .quit() + + unfortunately, there is always an edge case in which one would like to write an single script with the only contents being: + --start script-- + import undetected_chromedriver as uc + d = uc.Chrome() + d.get('https://somesite/') + ---end script -- + + and will be greeted with an error, since the program exists before chrome has a change to launch. + in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times. + ! setting it to True comes with NO support when being detected. ! + + """ + self.debug = debug + patcher = Patcher( + executable_path=driver_executable_path, + force=patcher_force_close, + version_main=version_main, + ) + patcher.auto() + self.patcher = patcher + if not options: + options = ChromeOptions() + + + try: + if hasattr(options, "_session") and options._session is not None: + # prevent reuse of options, + # as it just appends arguments, not replace them + # you'll get conflicts starting chrome + raise RuntimeError("you cannot reuse the ChromeOptions object") + except AttributeError: + pass + + options._session = self + + debug_port = selenium.webdriver.common.service.utils.free_port() + debug_host = "127.0.0.1" + + if not options.debugger_address: + options.debugger_address = "%s:%d" % (debug_host, debug_port) + + if enable_cdp_events: + options.set_capability( + "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"} + ) + + options.add_argument("--remote-debugging-host=%s" % debug_host) + options.add_argument("--remote-debugging-port=%s" % debug_port) + + if user_data_dir: + options.add_argument('--user-data-dir=%s' % user_data_dir) + + language, keep_user_data_dir = None, bool(user_data_dir) + + # see if a custom user profile is specified in options + for arg in options.arguments: + + if "lang" in arg: + m = re.search("(?:--)?lang(?:[ =])?(.*)", arg) + try: + language = m[1] + except IndexError: + logger.debug("will set the language to en-US,en;q=0.9") + language = "en-US,en;q=0.9" + + if "user-data-dir" in arg: + m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg) + try: + user_data_dir = m[1] + logger.debug( + "user-data-dir found in user argument %s => %s" % (arg, m[1]) + ) + keep_user_data_dir = True + + except IndexError: + logger.debug( + "no user data dir could be extracted from supplied argument %s " + % arg + ) + + if not user_data_dir: + + # backward compatiblity + # check if an old uc.ChromeOptions is used, and extract the user data dir + + if hasattr(options, "user_data_dir") and getattr( + options, "user_data_dir", None + ): + import warnings + + warnings.warn( + "using ChromeOptions.user_data_dir might stop working in future versions." + "use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder" + ) + options.add_argument("--user-data-dir=%s" % options.user_data_dir) + keep_user_data_dir = True + logger.debug( + "user_data_dir property found in options object: %s" % user_data_dir + ) + + else: + user_data_dir = os.path.normpath(tempfile.mkdtemp()) + keep_user_data_dir = False + arg = "--user-data-dir=%s" % user_data_dir + options.add_argument(arg) + logger.debug( + "created a temporary folder in which the user-data (profile) will be stored during this\n" + "session, and added it to chrome startup arguments: %s" % arg + ) + + if not language: + try: + import locale + + language = locale.getdefaultlocale()[0].replace("_", "-") + except Exception: + pass + if not language: + language = "en-US" + + options.add_argument("--lang=%s" % language) + + if not options.binary_location: + options.binary_location = ( + browser_executable_path or find_chrome_executable() + ) + + self._delay = 3 + + self.user_data_dir = user_data_dir + self.keep_user_data_dir = keep_user_data_dir + + if suppress_welcome: + options.arguments.extend(["--no-default-browser-check", "--no-first-run"]) + if headless or options.headless: + options.headless = True + options.add_argument("--window-size=1920,1080") + options.add_argument("--start-maximized") + options.add_argument("--no-sandbox") + # fixes "could not connect to chrome" error when running + # on linux using privileged user like root (which i don't recommend) + + options.add_argument( + "--log-level=%d" % log_level + or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] + ) + + if hasattr(options, 'handle_prefs'): + options.handle_prefs(user_data_dir) + + # fix exit_type flag to prevent tab-restore nag + try: + with open( + os.path.join(user_data_dir, "Default/Preferences"), + encoding="latin1", + mode="r+", + ) as fs: + config = json.load(fs) + if config["profile"]["exit_type"] is not None: + # fixing the restore-tabs-nag + config["profile"]["exit_type"] = None + fs.seek(0, 0) + json.dump(config, fs) + logger.debug("fixed exit_type flag") + except Exception as e: + logger.debug("did not find a bad exit_type flag ") + + self.options = options + + if not desired_capabilities: + desired_capabilities = options.to_capabilities() + + if not use_subprocess: + self.browser_pid = start_detached( + options.binary_location, *options.arguments + ) + else: + browser = subprocess.Popen( + [options.binary_location, *options.arguments], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=IS_POSIX, + ) + self.browser_pid = browser.pid + + super(Chrome, self).__init__( + executable_path=patcher.executable_path, + port=port, + options=options, + service_args=service_args, + desired_capabilities=desired_capabilities, + service_log_path=service_log_path, + keep_alive=keep_alive, + ) + + self.reactor = None + + if enable_cdp_events: + if logging.getLogger().getEffectiveLevel() == logging.DEBUG: + logging.getLogger( + "selenium.webdriver.remote.remote_connection" + ).setLevel(20) + reactor = Reactor(self) + reactor.start() + self.reactor = reactor + + if advanced_elements: + from .webelement import WebElement + + self._web_element_cls = WebElement + + if options.headless: + self._configure_headless() + + def __getattribute__(self, item): + + if not super().__getattribute__("debug"): + return super().__getattribute__(item) + else: + import inspect + + original = super().__getattribute__(item) + if inspect.ismethod(original) and not inspect.isclass(original): + + def newfunc(*args, **kwargs): + logger.debug( + "calling %s with args %s and kwargs %s\n" + % (original.__qualname__, args, kwargs) + ) + return original(*args, **kwargs) + + return newfunc + return original + + def _configure_headless(self): + + orig_get = self.get + logger.info("setting properties for headless") + + def get_wrapped(*args, **kwargs): + if self.execute_script("return navigator.webdriver"): + logger.info("patch navigator.webdriver") + self.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + + Object.defineProperty(window, 'navigator', { + value: new Proxy(navigator, { + has: (target, key) => (key === 'webdriver' ? false : key in target), + get: (target, key) => + key === 'webdriver' ? + false : + typeof target[key] === 'function' ? + target[key].bind(target) : + target[key] + }) + }); + + """ + }, + ) + + logger.info("patch user-agent string") + self.execute_cdp_cmd( + "Network.setUserAgentOverride", + { + "userAgent": self.execute_script( + "return navigator.userAgent" + ).replace("Headless", "") + }, + ) + self.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + Object.defineProperty(navigator, 'maxTouchPoints', { + get: () => 1 + })""" + }, + ) + return orig_get(*args, **kwargs) + + self.get = get_wrapped + + def __dir__(self): + return object.__dir__(self) + + def _get_cdc_props(self): + return self.execute_script( + """ + let objectToInspect = window, + result = []; + while(objectToInspect !== null) + { result = result.concat(Object.getOwnPropertyNames(objectToInspect)); + objectToInspect = Object.getPrototypeOf(objectToInspect); } + return result.filter(i => i.match(/.+_.+_(Array|Promise|Symbol)/ig)) + """ + ) + + def _hook_remove_cdc_props(self): + self.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + let objectToInspect = window, + result = []; + while(objectToInspect !== null) + { result = result.concat(Object.getOwnPropertyNames(objectToInspect)); + objectToInspect = Object.getPrototypeOf(objectToInspect); } + result.forEach(p => p.match(/.+_.+_(Array|Promise|Symbol)/ig) + &&delete window[p]&&console.log('removed',p)) + """ + }, + ) + + def get(self, url): + if self._get_cdc_props(): + self._hook_remove_cdc_props() + return super().get(url) + + def add_cdp_listener(self, event_name, callback): + if ( + self.reactor + and self.reactor is not None + and isinstance(self.reactor, Reactor) + ): + self.reactor.add_event_handler(event_name, callback) + return self.reactor.handlers + return False + + def clear_cdp_listeners(self): + if self.reactor and isinstance(self.reactor, Reactor): + self.reactor.handlers.clear() + + def tab_new(self, url: str): + """ + this opens a url in a new tab. + apparently, that passes all tests directly! + + Parameters + ---------- + url + + Returns + ------- + + """ + if not hasattr(self, "cdp"): + from .cdp import CDP + + cdp = CDP(self.options) + cdp.tab_new(url) + + def reconnect(self, timeout=0.1): + try: + self.service.stop() + except Exception as e: + logger.debug(e) + time.sleep(timeout) + try: + self.service.start() + except Exception as e: + logger.debug(e) + + try: + self.start_session() + except Exception as e: + logger.debug(e) + + def start_session(self, capabilities=None, browser_profile=None): + if not capabilities: + capabilities = self.options.to_capabilities() + super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session( + capabilities, browser_profile + ) + # super(Chrome, self).start_session(capabilities, browser_profile) + + def quit(self): + logger.debug("closing webdriver") + if hasattr(self, "service") and getattr(self.service, "process", None): + self.service.process.kill() + try: + if self.reactor and isinstance(self.reactor, Reactor): + logger.debug("shutting down reactor") + self.reactor.event.set() + except Exception: # noqa + pass + try: + logger.debug("killing browser") + os.kill(self.browser_pid, 15) + + except TimeoutError as e: + logger.debug(e, exc_info=True) + except Exception: # noqa + pass + + if ( + hasattr(self, "keep_user_data_dir") + and hasattr(self, "user_data_dir") + and not self.keep_user_data_dir + ): + for _ in range(5): + try: + + shutil.rmtree(self.user_data_dir, ignore_errors=False) + except FileNotFoundError: + pass + except (RuntimeError, OSError, PermissionError) as e: + logger.debug( + "When removing the temp profile, a %s occured: %s\nretrying..." + % (e.__class__.__name__, e) + ) + else: + logger.debug("successfully removed %s" % self.user_data_dir) + break + time.sleep(0.1) + + # dereference patcher, so patcher can start cleaning up as well. + # this must come last, otherwise it will throw 'in use' errors + self.patcher = None + + def __del__(self): + try: + super().quit() + # self.service.process.kill() + except: # noqa + pass + self.quit() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.service.stop() + time.sleep(self._delay) + self.service.start() + self.start_session() + + def __hash__(self): + return hash(self.options.debugger_address) + + +def find_chrome_executable(): + """ + Finds the chrome, chrome beta, chrome canary, chromium executable + + Returns + ------- + executable_path : str + the full file path to found executable + + """ + candidates = set() + if IS_POSIX: + for item in os.environ.get("PATH").split(os.pathsep): + for subitem in ( + "google-chrome", + "chromium", + "chromium-browser", + "chrome", + "google-chrome-stable", + ): + candidates.add(os.sep.join((item, subitem))) + if "darwin" in sys.platform: + candidates.update( + [ + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "/Applications/Chromium.app/Contents/MacOS/Chromium", + ] + ) + else: + for item in map( + os.environ.get, ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA") + ): + for subitem in ( + "Google/Chrome/Application", + "Google/Chrome Beta/Application", + "Google/Chrome Canary/Application", + ): + candidates.add(os.sep.join((item, subitem, "chrome.exe"))) + for candidate in candidates: + if os.path.exists(candidate) and os.access(candidate, os.X_OK): + return os.path.normpath(candidate) diff --git a/src/undetected_chromedriver/_compat.py b/src/undetected_chromedriver/_compat.py new file mode 100644 index 0000000..5e40a7c --- /dev/null +++ b/src/undetected_chromedriver/_compat.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + + +""" + + 888 888 d8b + 888 888 Y8P + 888 888 + .d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888 +d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P" +888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888 +Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888 + "Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888 + +by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam) + +""" + +import io +import logging +import os +import random +import re +import string +import sys +import zipfile +from distutils.version import LooseVersion +from urllib.request import urlopen, urlretrieve + +from selenium.webdriver import Chrome as _Chrome, ChromeOptions as _ChromeOptions + +TARGET_VERSION = 0 +logger = logging.getLogger("uc") + + +class Chrome: + def __new__(cls, *args, emulate_touch=False, **kwargs): + + if not ChromeDriverManager.installed: + ChromeDriverManager(*args, **kwargs).install() + if not ChromeDriverManager.selenium_patched: + ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver() + if not kwargs.get("executable_path"): + kwargs["executable_path"] = "./{}".format( + ChromeDriverManager(*args, **kwargs).executable_path + ) + if not kwargs.get("options"): + kwargs["options"] = ChromeOptions() + instance = object.__new__(_Chrome) + instance.__init__(*args, **kwargs) + + instance._orig_get = instance.get + + def _get_wrapped(*args, **kwargs): + if instance.execute_script("return navigator.webdriver"): + instance.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + + Object.defineProperty(window, 'navigator', { + value: new Proxy(navigator, { + has: (target, key) => (key === 'webdriver' ? false : key in target), + get: (target, key) => + key === 'webdriver' + ? undefined + : typeof target[key] === 'function' + ? target[key].bind(target) + : target[key] + }) + }); + + + """ + }, + ) + return instance._orig_get(*args, **kwargs) + + instance.get = _get_wrapped + instance.get = _get_wrapped + instance.get = _get_wrapped + + original_user_agent_string = instance.execute_script( + "return navigator.userAgent" + ) + instance.execute_cdp_cmd( + "Network.setUserAgentOverride", + { + "userAgent": original_user_agent_string.replace("Headless", ""), + }, + ) + if emulate_touch: + instance.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + Object.defineProperty(navigator, 'maxTouchPoints', { + get: () => 1 + })""" + }, + ) + logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})") + return instance + + +class ChromeOptions: + def __new__(cls, *args, **kwargs): + if not ChromeDriverManager.installed: + ChromeDriverManager(*args, **kwargs).install() + if not ChromeDriverManager.selenium_patched: + ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver() + + instance = object.__new__(_ChromeOptions) + instance.__init__() + instance.add_argument("start-maximized") + instance.add_experimental_option("excludeSwitches", ["enable-automation"]) + instance.add_argument("--disable-blink-features=AutomationControlled") + return instance + + +class ChromeDriverManager(object): + installed = False + selenium_patched = False + target_version = None + + DL_BASE = "https://chromedriver.storage.googleapis.com/" + + def __init__(self, executable_path=None, target_version=None, *args, **kwargs): + + _platform = sys.platform + + if TARGET_VERSION: + # use global if set + self.target_version = TARGET_VERSION + + if target_version: + # use explicitly passed target + self.target_version = target_version # user override + + if not self.target_version: + # none of the above (default) and just get current version + self.target_version = self.get_release_version_number().version[ + 0 + ] # only major version int + + self._base = base_ = "chromedriver{}" + + exe_name = self._base + if _platform in ("win32",): + exe_name = base_.format(".exe") + if _platform in ("linux",): + _platform += "64" + exe_name = exe_name.format("") + if _platform in ("darwin",): + _platform = "mac64" + exe_name = exe_name.format("") + self.platform = _platform + self.executable_path = executable_path or exe_name + self._exe_name = exe_name + + def patch_selenium_webdriver(self_): + """ + Patches selenium package Chrome, ChromeOptions classes for current session + + :return: + """ + import selenium.webdriver.chrome.service + import selenium.webdriver + + selenium.webdriver.Chrome = Chrome + selenium.webdriver.ChromeOptions = ChromeOptions + logger.info("Selenium patched. Safe to import Chrome / ChromeOptions") + self_.__class__.selenium_patched = True + + def install(self, patch_selenium=True): + """ + Initialize the patch + + This will: + download chromedriver if not present + patch the downloaded chromedriver + patch selenium package if is True (default) + + :param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session) + :return: + """ + if not os.path.exists(self.executable_path): + self.fetch_chromedriver() + if not self.__class__.installed: + if self.patch_binary(): + self.__class__.installed = True + + if patch_selenium: + self.patch_selenium_webdriver() + + def get_release_version_number(self): + """ + Gets the latest major version available, or the latest major version of self.target_version if set explicitly. + + :return: version string + """ + path = ( + "LATEST_RELEASE" + if not self.target_version + else f"LATEST_RELEASE_{self.target_version}" + ) + return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode()) + + def fetch_chromedriver(self): + """ + Downloads ChromeDriver from source and unpacks the executable + + :return: on success, name of the unpacked executable + """ + base_ = self._base + zip_name = base_.format(".zip") + ver = self.get_release_version_number().vstring + if os.path.exists(self.executable_path): + return self.executable_path + urlretrieve( + f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip", + filename=zip_name, + ) + with zipfile.ZipFile(zip_name) as zf: + zf.extract(self._exe_name) + os.remove(zip_name) + if sys.platform != "win32": + os.chmod(self._exe_name, 0o755) + return self._exe_name + + @staticmethod + def random_cdc(): + cdc = random.choices(string.ascii_lowercase, k=26) + cdc[-6:-4] = map(str.upper, cdc[-6:-4]) + cdc[2] = cdc[0] + cdc[3] = "_" + return "".join(cdc).encode() + + def patch_binary(self): + """ + Patches the ChromeDriver binary + + :return: False on failure, binary name on success + """ + linect = 0 + replacement = self.random_cdc() + with io.open(self.executable_path, "r+b") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + fh.seek(-len(line), 1) + newline = re.sub(b"cdc_.{22}", replacement, line) + fh.write(newline) + linect += 1 + return linect + + +def install(executable_path=None, target_version=None, *args, **kwargs): + ChromeDriverManager(executable_path, target_version, *args, **kwargs).install() diff --git a/src/undetected_chromedriver/cdp.py b/src/undetected_chromedriver/cdp.py new file mode 100644 index 0000000..926eb70 --- /dev/null +++ b/src/undetected_chromedriver/cdp.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +import json +import logging +from collections.abc import Mapping, Sequence + +import requests +import websockets + +log = logging.getLogger(__name__) + + +class CDPObject(dict): + def __init__(self, *a, **k): + super().__init__(*a, **k) + self.__dict__ = self + for k in self.__dict__: + if isinstance(self.__dict__[k], dict): + self.__dict__[k] = CDPObject(self.__dict__[k]) + elif isinstance(self.__dict__[k], list): + for i in range(len(self.__dict__[k])): + if isinstance(self.__dict__[k][i], dict): + self.__dict__[k][i] = CDPObject(self) + + def __repr__(self): + tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)" + return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items())) + + +class PageElement(CDPObject): + pass + + +class CDP: + log = logging.getLogger("CDP") + + endpoints = CDPObject( + { + "json": "/json", + "protocol": "/json/protocol", + "list": "/json/list", + "new": "/json/new?{url}", + "activate": "/json/activate/{id}", + "close": "/json/close/{id}", + } + ) + + def __init__(self, options: "ChromeOptions"): # noqa + self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":")) + + self._reqid = 0 + self._session = requests.Session() + self._last_resp = None + self._last_json = None + + resp = self.get(self.endpoints.json) # noqa + self.sessionId = resp[0]["id"] + self.wsurl = resp[0]["webSocketDebuggerUrl"] + + def tab_activate(self, id=None): + if not id: + active_tab = self.tab_list()[0] + id = active_tab.id # noqa + self.wsurl = active_tab.webSocketDebuggerUrl # noqa + return self.post(self.endpoints["activate"].format(id=id)) + + def tab_list(self): + retval = self.get(self.endpoints["list"]) + return [PageElement(o) for o in retval] + + def tab_new(self, url): + return self.post(self.endpoints["new"].format(url=url)) + + def tab_close_last_opened(self): + sessions = self.tab_list() + opentabs = [s for s in sessions if s["type"] == "page"] + return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"])) + + async def send(self, method: str, params: dict): + self._reqid += 1 + async with websockets.connect(self.wsurl) as ws: + await ws.send( + json.dumps({"method": method, "params": params, "id": self._reqid}) + ) + self._last_resp = await ws.recv() + self._last_json = json.loads(self._last_resp) + self.log.info(self._last_json) + + def get(self, uri): + resp = self._session.get(self.server_addr + uri) + try: + self._last_resp = resp + self._last_json = resp.json() + except Exception: + return + else: + return self._last_json + + def post(self, uri, data: dict = None): + if not data: + data = {} + resp = self._session.post(self.server_addr + uri, json=data) + try: + self._last_resp = resp + self._last_json = resp.json() + except Exception: + return self._last_resp + + @property + def last_json(self): + return self._last_json diff --git a/src/undetected_chromedriver/devtool.py b/src/undetected_chromedriver/devtool.py new file mode 100644 index 0000000..cac8001 --- /dev/null +++ b/src/undetected_chromedriver/devtool.py @@ -0,0 +1,191 @@ +import asyncio +import logging +import time +import traceback +from collections.abc import Mapping +from collections.abc import Sequence +from typing import Any +from typing import Awaitable +from typing import Callable +from typing import List +from typing import Optional +from contextlib import ExitStack +import threading +from functools import wraps, partial + + +class Structure(dict): + """ + This is a dict-like object structure, which you should subclass + Only properties defined in the class context are used on initialization. + + See example + """ + + _store = {} + + def __init__(self, *a, **kw): + """ + Instantiate a new instance. + + :param a: + :param kw: + """ + + super().__init__() + + # auxiliar dict + d = dict(*a, **kw) + for k, v in d.items(): + if isinstance(v, Mapping): + self[k] = self.__class__(v) + elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): + self[k] = [self.__class__(i) for i in v] + else: + self[k] = v + super().__setattr__("__dict__", self) + + def __getattr__(self, item): + return getattr(super(), item) + + def __getitem__(self, item): + return super().__getitem__(item) + + def __setattr__(self, key, value): + self.__setitem__(key, value) + + def __setitem__(self, key, value): + super().__setitem__(key, value) + + def update(self, *a, **kw): + super().update(*a, **kw) + + def __eq__(self, other): + return frozenset(other.items()) == frozenset(self.items()) + + def __hash__(self): + return hash(frozenset(self.items())) + + @classmethod + def __init_subclass__(cls, **kwargs): + cls._store = {} + + def _normalize_strings(self): + for k, v in self.copy().items(): + if isinstance(v, (str)): + self[k] = v.strip() + + +def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None): + def wrapper(func): + @wraps(func) + def wrapped(*args, **kwargs): + def function_reached_timeout(): + if on_timeout: + on_timeout(func) + else: + raise TimeoutError("function call timed out") + + t = threading.Timer(interval=seconds, function=function_reached_timeout) + t.start() + try: + return func(*args, **kwargs) + except: + t.cancel() + raise + finally: + t.cancel() + + return wrapped + + return wrapper + + +def test(): + import sys, os + + sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) + import undetected_chromedriver as uc + import threading + + def collector( + driver: uc.Chrome, + stop_event: threading.Event, + on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None, + listen_events: Sequence = ("browser", "network", "performance"), + ): + def threaded(driver, stop_event, on_event_coro): + async def _ensure_service_started(): + while ( + getattr(driver, "service", False) + and getattr(driver.service, "process", False) + and driver.service.process.poll() + ): + print("waiting for driver service to come back on") + await asyncio.sleep(0.05) + # await asyncio.sleep(driver._delay or .25) + + async def get_log_lines(typ): + await _ensure_service_started() + return driver.get_log(typ) + + async def looper(): + while not stop_event.is_set(): + log_lines = [] + try: + for _ in listen_events: + try: + log_lines += await get_log_lines(_) + except: + if logging.getLogger().getEffectiveLevel() <= 10: + traceback.print_exc() + continue + if log_lines and on_event_coro: + await on_event_coro(log_lines) + except Exception as e: + if logging.getLogger().getEffectiveLevel() <= 10: + traceback.print_exc() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(looper()) + + t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro)) + t.start() + + async def on_event(data): + print("on_event") + print("data:", data) + + def func_called(fn): + def wrapped(*args, **kwargs): + print( + "func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs) + ) + while driver.service.process and driver.service.process.poll() is not None: + time.sleep(0.1) + res = fn(*args, **kwargs) + print("func completed! (result: %s)" % res) + return res + + return wrapped + + logging.basicConfig(level=10) + + options = uc.ChromeOptions() + options.set_capability( + "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"} + ) + + driver = uc.Chrome(version_main=96, options=options) + + # driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request) + driver.command_executor._request = func_called(driver.command_executor._request) + collector_stop = threading.Event() + collector(driver, collector_stop, on_event) + + driver.get("https://nowsecure.nl") + + time.sleep(10) + + driver.quit() diff --git a/src/undetected_chromedriver/dprocess.py b/src/undetected_chromedriver/dprocess.py new file mode 100644 index 0000000..c934392 --- /dev/null +++ b/src/undetected_chromedriver/dprocess.py @@ -0,0 +1,75 @@ +import multiprocessing +import os +import platform +import sys +from subprocess import PIPE +from subprocess import Popen +import atexit +import traceback +import logging +import signal + +CREATE_NEW_PROCESS_GROUP = 0x00000200 +DETACHED_PROCESS = 0x00000008 + +REGISTERED = [] + + +def start_detached(executable, *args): + """ + Starts a fully independent subprocess (with no parent) + :param executable: executable + :param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...] + :return: pid of the grandchild process + """ + + # create pipe + reader, writer = multiprocessing.Pipe(False) + + # do not keep reference + multiprocessing.Process( + target=_start_detached, + args=(executable, *args), + kwargs={"writer": writer}, + daemon=True, + ).start() + # receive pid from pipe + pid = reader.recv() + REGISTERED.append(pid) + # close pipes + writer.close() + reader.close() + + return pid + + +def _start_detached(executable, *args, writer: multiprocessing.Pipe = None): + + # configure launch + kwargs = {} + if platform.system() == "Windows": + kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP) + elif sys.version_info < (3, 2): + # assume posix + kwargs.update(preexec_fn=os.setsid) + else: # Python 3.2+ and Unix + kwargs.update(start_new_session=True) + + # run + p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs) + + # send pid to pipe + writer.send(p.pid) + sys.exit() + + +def _cleanup(): + for pid in REGISTERED: + try: + logging.getLogger(__name__).debug("cleaning up pid %d " % pid) + os.kill(pid, signal.SIGTERM) + except: # noqa + pass + + +atexit.register(_cleanup) diff --git a/src/undetected_chromedriver/options.py b/src/undetected_chromedriver/options.py new file mode 100644 index 0000000..fbe1d87 --- /dev/null +++ b/src/undetected_chromedriver/options.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + + +import json +import os + +from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions + + +class ChromeOptions(_ChromiumOptions): + _session = None + _user_data_dir = None + + @property + def user_data_dir(self): + return self._user_data_dir + + @user_data_dir.setter + def user_data_dir(self, path: str): + """ + Sets the browser profile folder to use, or creates a new profile + at given . + + Parameters + ---------- + path: str + the path to a chrome profile folder + if it does not exist, a new profile will be created at given location + """ + apath = os.path.abspath(path) + self._user_data_dir = os.path.normpath(apath) + + @staticmethod + def _undot_key(key, value): + """turn a (dotted key, value) into a proper nested dict""" + if "." in key: + key, rest = key.split(".", 1) + value = ChromeOptions._undot_key(rest, value) + return {key: value} + + def handle_prefs(self, user_data_dir): + prefs = self.experimental_options.get("prefs") + if prefs: + + user_data_dir = user_data_dir or self._user_data_dir + default_path = os.path.join(user_data_dir, "Default") + os.makedirs(default_path, exist_ok=True) + + # undot prefs dict keys + undot_prefs = {} + for key, value in prefs.items(): + undot_prefs.update(self._undot_key(key, value)) + + prefs_file = os.path.join(default_path, "Preferences") + if os.path.exists(prefs_file): + with open(prefs_file, encoding="latin1", mode="r") as f: + undot_prefs.update(json.load(f)) + + with open(prefs_file, encoding="latin1", mode="w") as f: + json.dump(undot_prefs, f) + + # remove the experimental_options to avoid an error + del self._experimental_options["prefs"] + + @classmethod + def from_options(cls, options): + o = cls() + o.__dict__.update(options.__dict__) + return o diff --git a/src/undetected_chromedriver/patcher.py b/src/undetected_chromedriver/patcher.py new file mode 100644 index 0000000..7ac50bc --- /dev/null +++ b/src/undetected_chromedriver/patcher.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +import io +import logging +import os +import random +import re +import string +import sys +import time +import zipfile +from distutils.version import LooseVersion +from urllib.request import urlopen, urlretrieve +import secrets + + +logger = logging.getLogger(__name__) + +IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) + + +class Patcher(object): + url_repo = "https://chromedriver.storage.googleapis.com" + zip_name = "chromedriver_%s.zip" + exe_name = "chromedriver%s" + + platform = sys.platform + if platform.endswith("win32"): + zip_name %= "win32" + exe_name %= ".exe" + if platform.endswith("linux"): + zip_name %= "linux64" + exe_name %= "" + if platform.endswith("darwin"): + zip_name %= "mac64" + exe_name %= "" + + if platform.endswith("win32"): + d = "~/appdata/roaming/undetected_chromedriver" + elif platform.startswith("linux"): + d = "~/.local/share/undetected_chromedriver" + elif platform.endswith("darwin"): + d = "~/Library/Application Support/undetected_chromedriver" + else: + d = "~/.undetected_chromedriver" + data_path = os.path.abspath(os.path.expanduser(d)) + + def __init__(self, executable_path=None, force=False, version_main: int = 0): + """ + + Args: + executable_path: None = automatic + a full file path to the chromedriver executable + force: False + terminate processes which are holding lock + version_main: 0 = auto + specify main chrome version (rounded, ex: 82) + """ + + self.force = force + self.executable_path = None + prefix = secrets.token_hex(8) + + if not os.path.exists(self.data_path): + os.makedirs(self.data_path, exist_ok=True) + + if not executable_path: + self.executable_path = os.path.join( + self.data_path, "_".join([prefix, self.exe_name]) + ) + + if not IS_POSIX: + if executable_path: + if not executable_path[-4:] == ".exe": + executable_path += ".exe" + + self.zip_path = os.path.join(self.data_path, prefix) + + if not executable_path: + self.executable_path = os.path.abspath( + os.path.join(".", self.executable_path) + ) + + self._custom_exe_path = False + + if executable_path: + self._custom_exe_path = True + self.executable_path = executable_path + self.version_main = version_main + self.version_full = None + + def auto(self, executable_path=None, force=False, version_main=None): + """""" + if executable_path: + self.executable_path = executable_path + self._custom_exe_path = True + + if self._custom_exe_path: + ispatched = self.is_binary_patched(self.executable_path) + if not ispatched: + return self.patch_exe() + else: + return + + if version_main: + self.version_main = version_main + if force is True: + self.force = force + + try: + os.unlink(self.executable_path) + except PermissionError: + if self.force: + self.force_kill_instances(self.executable_path) + return self.auto(force=not self.force) + try: + if self.is_binary_patched(): + # assumes already running AND patched + return True + except PermissionError: + pass + # return False + except FileNotFoundError: + pass + + release = self.fetch_release_number() + self.version_main = release.version[0] + self.version_full = release + self.unzip_package(self.fetch_package()) + return self.patch() + + def patch(self): + self.patch_exe() + return self.is_binary_patched() + + def fetch_release_number(self): + """ + Gets the latest major version available, or the latest major version of self.target_version if set explicitly. + :return: version string + :rtype: LooseVersion + """ + path = "/latest_release" + if self.version_main: + path += f"_{self.version_main}" + path = path.upper() + logger.debug("getting release number from %s" % path) + return LooseVersion(urlopen(self.url_repo + path).read().decode()) + + def parse_exe_version(self): + with io.open(self.executable_path, "rb") as f: + for line in iter(lambda: f.readline(), b""): + match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) + if match: + return LooseVersion(match[1].decode()) + + def fetch_package(self): + """ + Downloads ChromeDriver from source + + :return: path to downloaded file + """ + u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) + logger.debug("downloading from %s" % u) + # return urlretrieve(u, filename=self.data_path)[0] + return urlretrieve(u)[0] + + def unzip_package(self, fp): + """ + Does what it says + + :return: path to unpacked executable + """ + logger.debug("unzipping %s" % fp) + try: + os.unlink(self.zip_path) + except (FileNotFoundError, OSError): + pass + + os.makedirs(self.zip_path, mode=0o755, exist_ok=True) + with zipfile.ZipFile(fp, mode="r") as zf: + zf.extract(self.exe_name, self.zip_path) + os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) + os.remove(fp) + os.rmdir(self.zip_path) + os.chmod(self.executable_path, 0o755) + return self.executable_path + + @staticmethod + def force_kill_instances(exe_name): + """ + kills running instances. + :param: executable name to kill, may be a path as well + + :return: True on success else False + """ + exe_name = os.path.basename(exe_name) + if IS_POSIX: + r = os.system("kill -f -9 $(pidof %s)" % exe_name) + else: + r = os.system("taskkill /f /im %s" % exe_name) + return not r + + @staticmethod + def gen_random_cdc(): + cdc = random.choices(string.ascii_lowercase, k=26) + cdc[-6:-4] = map(str.upper, cdc[-6:-4]) + cdc[2] = cdc[0] + cdc[3] = "_" + return "".join(cdc).encode() + + def is_binary_patched(self, executable_path=None): + """simple check if executable is patched. + + :return: False if not patched, else True + """ + executable_path = executable_path or self.executable_path + with io.open(executable_path, "rb") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + return False + else: + return True + + def patch_exe(self): + """ + Patches the ChromeDriver binary + + :return: False on failure, binary name on success + """ + logger.info("patching driver executable %s" % self.executable_path) + + linect = 0 + replacement = self.gen_random_cdc() + with io.open(self.executable_path, "r+b") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + fh.seek(-len(line), 1) + newline = re.sub(b"cdc_.{22}", replacement, line) + fh.write(newline) + linect += 1 + return linect + + def __repr__(self): + return "{0:s}({1:s})".format( + self.__class__.__name__, + self.executable_path, + ) + + def __del__(self): + + if self._custom_exe_path: + # if the driver binary is specified by user + # we assume it is important enough to not delete it + return + else: + timeout = 3 # stop trying after this many seconds + t = time.monotonic() + while True: + now = time.monotonic() + if now - t > timeout: + # we don't want to wait until the end of time + logger.debug( + "could not unlink %s in time (%d seconds)" + % (self.executable_path, timeout) + ) + break + try: + os.unlink(self.executable_path) + logger.debug("successfully unlinked %s" % self.executable_path) + break + except (OSError, RuntimeError, PermissionError): + time.sleep(0.1) + continue + except FileNotFoundError: + break diff --git a/src/undetected_chromedriver/reactor.py b/src/undetected_chromedriver/reactor.py new file mode 100644 index 0000000..d13e05b --- /dev/null +++ b/src/undetected_chromedriver/reactor.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +import asyncio +import json +import logging +import threading + +logger = logging.getLogger(__name__) + + +class Reactor(threading.Thread): + def __init__(self, driver: "Chrome"): + super().__init__() + + self.driver = driver + self.loop = asyncio.new_event_loop() + + self.lock = threading.Lock() + self.event = threading.Event() + self.daemon = True + self.handlers = {} + + def add_event_handler(self, method_name, callback: callable): + """ + + Parameters + ---------- + event_name: str + example "Network.responseReceived" + + callback: callable + callable which accepts 1 parameter: the message object dictionary + + Returns + ------- + + """ + with self.lock: + self.handlers[method_name.lower()] = callback + + @property + def running(self): + return not self.event.is_set() + + def run(self): + try: + asyncio.set_event_loop(self.loop) + self.loop.run_until_complete(self.listen()) + except Exception as e: + logger.warning("Reactor.run() => %s", e) + + async def _wait_service_started(self): + while True: + with self.lock: + if ( + getattr(self.driver, "service", None) + and getattr(self.driver.service, "process", None) + and self.driver.service.process.poll() + ): + await asyncio.sleep(self.driver._delay or 0.25) + else: + break + + async def listen(self): + + while self.running: + + await self._wait_service_started() + await asyncio.sleep(1) + + try: + with self.lock: + log_entries = self.driver.get_log("performance") + + for entry in log_entries: + + try: + + obj_serialized: str = entry.get("message") + obj = json.loads(obj_serialized) + message = obj.get("message") + method = message.get("method") + + if "*" in self.handlers: + await self.loop.run_in_executor( + None, self.handlers["*"], message + ) + elif method.lower() in self.handlers: + await self.loop.run_in_executor( + None, self.handlers[method.lower()], message + ) + + # print(type(message), message) + except Exception as e: + raise e from None + + except Exception as e: + if "invalid session id" in str(e): + pass + else: + logging.debug("exception ignored :", e) diff --git a/src/undetected_chromedriver/v2.py b/src/undetected_chromedriver/v2.py new file mode 100644 index 0000000..8cf58fa --- /dev/null +++ b/src/undetected_chromedriver/v2.py @@ -0,0 +1,4 @@ +# for backward compatibility +import sys + +sys.modules[__name__] = sys.modules[__package__] diff --git a/src/undetected_chromedriver/webelement.py b/src/undetected_chromedriver/webelement.py new file mode 100644 index 0000000..4117b1e --- /dev/null +++ b/src/undetected_chromedriver/webelement.py @@ -0,0 +1,37 @@ +import selenium.webdriver.remote.webelement + + +class WebElement(selenium.webdriver.remote.webelement.WebElement): + """ + Custom WebElement class which makes it easier to view elements when + working in an interactive environment. + + standard webelement repr: + + + using this WebElement class: + )> + + """ + + @property + def attrs(self): + if not hasattr(self, "_attrs"): + self._attrs = self._parent.execute_script( + """ + var items = {}; + for (index = 0; index < arguments[0].attributes.length; ++index) + { + items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value + }; + return items; + """, + self, + ) + return self._attrs + + def __repr__(self): + strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()]) + if strattrs: + strattrs = " " + strattrs + return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"