From 4d9733316076b079d26be471b0537260073ceb04 Mon Sep 17 00:00:00 2001 From: ilike2burnthing <59480337+ilike2burnthing@users.noreply.github.com> Date: Mon, 29 Jul 2024 21:00:45 +0100 Subject: [PATCH] Delete src/undetected_chromedriver directory --- src/undetected_chromedriver/__init__.py | 914 ---------------------- src/undetected_chromedriver/cdp.py | 112 --- src/undetected_chromedriver/devtool.py | 193 ----- src/undetected_chromedriver/dprocess.py | 77 -- src/undetected_chromedriver/options.py | 85 -- src/undetected_chromedriver/patcher.py | 451 ----------- src/undetected_chromedriver/reactor.py | 99 --- src/undetected_chromedriver/webelement.py | 86 -- 8 files changed, 2017 deletions(-) delete mode 100644 src/undetected_chromedriver/__init__.py delete mode 100644 src/undetected_chromedriver/cdp.py delete mode 100644 src/undetected_chromedriver/devtool.py delete mode 100644 src/undetected_chromedriver/dprocess.py delete mode 100644 src/undetected_chromedriver/options.py delete mode 100644 src/undetected_chromedriver/patcher.py delete mode 100644 src/undetected_chromedriver/reactor.py delete mode 100644 src/undetected_chromedriver/webelement.py diff --git a/src/undetected_chromedriver/__init__.py b/src/undetected_chromedriver/__init__.py deleted file mode 100644 index 4e7fa1a..0000000 --- a/src/undetected_chromedriver/__init__.py +++ /dev/null @@ -1,914 +0,0 @@ -#!/usr/bin/env python3 - -""" - - 888 888 d8b - 888 888 Y8P - 888 888 - .d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888 -d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P" -888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888 -Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888 - "Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888 - -by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam) - -""" -from __future__ import annotations - - -__version__ = "3.5.5" - -import json -import logging -import os -import pathlib -import re -import shutil -import subprocess -import sys -import tempfile -import time -from weakref import finalize - -import selenium.webdriver.chrome.service -import selenium.webdriver.chrome.webdriver -from selenium.webdriver.common.by import By -import selenium.webdriver.chromium.service -import selenium.webdriver.remote.command -import selenium.webdriver.remote.webdriver - -from .cdp import CDP -from .dprocess import start_detached -from .options import ChromeOptions -from .patcher import IS_POSIX -from .patcher import Patcher -from .reactor import Reactor -from .webelement import UCWebElement -from .webelement import WebElement - - -__all__ = ( - "Chrome", - "ChromeOptions", - "Patcher", - "Reactor", - "CDP", - "find_chrome_executable", -) - -logger = logging.getLogger("uc") -logger.setLevel(logging.getLogger().getEffectiveLevel()) - - -class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): - """ - - Controls the ChromeDriver and allows you to drive the browser. - - The webdriver file will be downloaded by this module automatically, - you do not need to specify this. however, you may if you wish. - - Attributes - ---------- - - Methods - ------- - - reconnect() - - this can be useful in case of heavy detection methods - -stops the chromedriver service which runs in the background - -starts the chromedriver service which runs in the background - -recreate session - - - start_session(capabilities=None, browser_profile=None) - - differentiates from the regular method in that it does not - require a capabilities argument. The capabilities are automatically - recreated from the options at creation time. - - -------------------------------------------------------------------------- - NOTE: - Chrome has everything included to work out of the box. - it does not `need` customizations. - any customizations MAY lead to trigger bot migitation systems. - - -------------------------------------------------------------------------- - """ - - _instances = set() - session_id = None - debug = False - - def __init__( - self, - options=None, - user_data_dir=None, - driver_executable_path=None, - browser_executable_path=None, - port=0, - enable_cdp_events=False, - # service_args=None, - # service_creationflags=None, - desired_capabilities=None, - advanced_elements=False, - # service_log_path=None, - keep_alive=True, - log_level=0, - headless=False, - version_main=None, - patcher_force_close=False, - suppress_welcome=True, - use_subprocess=False, - debug=False, - no_sandbox=True, - windows_headless=False, - user_multi_procs: bool = False, - **kw, - ): - """ - Creates a new instance of the chrome driver. - - Starts the service and then creates new instance of chrome driver. - - Parameters - ---------- - - options: ChromeOptions, optional, default: None - automatic useful defaults - this takes an instance of ChromeOptions, mainly to customize browser behavior. - anything other dan the default, for example extensions or startup options - are not supported in case of failure, and can probably lowers your undetectability. - - - user_data_dir: str , optional, default: None (creates temp profile) - if user_data_dir is a path to a valid chrome profile directory, use it, - and turn off automatic removal mechanism at exit. - - driver_executable_path: str, optional, default: None(=downloads and patches new binary) - - browser_executable_path: str, optional, default: None - use find_chrome_executable - Path to the browser executable. - If not specified, make sure the executable's folder is in $PATH - - port: int, optional, default: 0 - port to be used by the chromedriver executable, this is NOT the debugger port. - leave it at 0 unless you know what you are doing. - the default value of 0 automatically picks an available port. - - enable_cdp_events: bool, default: False - :: currently for chrome only - this enables the handling of wire messages - when enabled, you can subscribe to CDP events by using: - - driver.add_cdp_listener("Network.dataReceived", yourcallback) - # yourcallback is an callable which accepts exactly 1 dict as parameter - - - service_args: list of str, optional, default: None - arguments to pass to the driver service - - desired_capabilities: dict, optional, default: None - auto from config - Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref". - - advanced_elements: bool, optional, default: False - makes it easier to recognize elements like you know them from html/browser inspection, especially when working - in an interactive environment - - default webelement repr: - - - advanced webelement repr - )> - - note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time. - - - service_log_path: str, optional, default: None - path to log information from the driver. - - keep_alive: bool, optional, default: True - Whether to configure ChromeRemoteConnection to use HTTP keep-alive. - - log_level: int, optional, default: adapts to python global log level - - headless: bool, optional, default: False - can also be specified in the options instance. - Specify whether you want to use the browser in headless mode. - warning: this lowers undetectability and not fully supported. - - version_main: int, optional, default: None (=auto) - if you, for god knows whatever reason, use - an older version of Chrome. You can specify it's full rounded version number - here. Example: 87 for all versions of 87 - - patcher_force_close: bool, optional, default: False - instructs the patcher to do whatever it can to access the chromedriver binary - if the file is locked, it will force shutdown all instances. - setting it is not recommended, unless you know the implications and think - you might need it. - - suppress_welcome: bool, optional , default: True - a "welcome" alert might show up on *nix-like systems asking whether you want to set - chrome as your default browser, and if you want to send even more data to google. - now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False. - Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception. - - use_subprocess: bool, optional , default: True, - - False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python - This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after - program exits or using .quit() - you should be knowing what you're doing, and know how python works. - - unfortunately, there is always an edge case in which one would like to write an single script with the only contents being: - --start script-- - import undetected_chromedriver as uc - d = uc.Chrome() - d.get('https://somesite/') - ---end script -- - - and will be greeted with an error, since the program exists before chrome has a change to launch. - in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times. - ! setting it to True comes with NO support when being detected. ! - - no_sandbox: bool, optional, default=True - uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar - this option has a default of True since many people seem to run this as root (....) , and chrome does not start - when running as root without using --no-sandbox flag. - - user_multi_procs: - set to true when you are using multithreads/multiprocessing - ensures not all processes are trying to modify a binary which is in use by another. - for this to work. YOU MUST HAVE AT LEAST 1 UNDETECTED_CHROMEDRIVER BINARY IN YOUR ROAMING DATA FOLDER. - this requirement can be easily satisfied, by just running this program "normal" and close/kill it. - - - """ - - finalize(self, self._ensure_close, self) - self.debug = debug - self.patcher = Patcher( - executable_path=driver_executable_path, - force=patcher_force_close, - version_main=version_main, - user_multi_procs=user_multi_procs, - ) - # self.patcher.auto(user_multiprocess = user_multi_num_procs) - self.patcher.auto() - - # self.patcher = patcher - if not options: - options = ChromeOptions() - - try: - if hasattr(options, "_session") and options._session is not None: - # prevent reuse of options, - # as it just appends arguments, not replace them - # you'll get conflicts starting chrome - raise RuntimeError("you cannot reuse the ChromeOptions object") - except AttributeError: - pass - - options._session = self - - if not options.debugger_address: - debug_port = ( - port - if port != 0 - else selenium.webdriver.common.service.utils.free_port() - ) - debug_host = "127.0.0.1" - options.debugger_address = "%s:%d" % (debug_host, debug_port) - else: - debug_host, debug_port = options.debugger_address.split(":") - debug_port = int(debug_port) - - if enable_cdp_events: - options.set_capability( - "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"} - ) - - options.add_argument("--remote-debugging-host=%s" % debug_host) - options.add_argument("--remote-debugging-port=%s" % debug_port) - - if user_data_dir: - options.add_argument("--user-data-dir=%s" % user_data_dir) - - language, keep_user_data_dir = None, bool(user_data_dir) - - # see if a custom user profile is specified in options - for arg in options.arguments: - - if any([_ in arg for _ in ("--headless", "headless")]): - options.arguments.remove(arg) - options.headless = True - - if "lang" in arg: - m = re.search("(?:--)?lang(?:[ =])?(.*)", arg) - try: - language = m[1] - except IndexError: - logger.debug("will set the language to en-US,en;q=0.9") - language = "en-US,en;q=0.9" - - if "user-data-dir" in arg: - m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg) - try: - user_data_dir = m[1] - logger.debug( - "user-data-dir found in user argument %s => %s" % (arg, m[1]) - ) - keep_user_data_dir = True - - except IndexError: - logger.debug( - "no user data dir could be extracted from supplied argument %s " - % arg - ) - - if not user_data_dir: - # backward compatiblity - # check if an old uc.ChromeOptions is used, and extract the user data dir - - if hasattr(options, "user_data_dir") and getattr( - options, "user_data_dir", None - ): - import warnings - - warnings.warn( - "using ChromeOptions.user_data_dir might stop working in future versions." - "use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder" - ) - options.add_argument("--user-data-dir=%s" % options.user_data_dir) - keep_user_data_dir = True - logger.debug( - "user_data_dir property found in options object: %s" % user_data_dir - ) - - else: - user_data_dir = os.path.normpath(tempfile.mkdtemp()) - keep_user_data_dir = False - arg = "--user-data-dir=%s" % user_data_dir - options.add_argument(arg) - logger.debug( - "created a temporary folder in which the user-data (profile) will be stored during this\n" - "session, and added it to chrome startup arguments: %s" % arg - ) - - if not language: - try: - import locale - - language = locale.getdefaultlocale()[0].replace("_", "-") - except Exception: - pass - if not language: - language = "en-US" - - options.add_argument("--lang=%s" % language) - - if not options.binary_location: - options.binary_location = ( - browser_executable_path or find_chrome_executable() - ) - - if not options.binary_location or not \ - pathlib.Path(options.binary_location).exists(): - raise FileNotFoundError( - "\n---------------------\n" - "Could not determine browser executable." - "\n---------------------\n" - "Make sure your browser is installed in the default location (path).\n" - "If you are sure about the browser executable, you can specify it using\n" - "the `browser_executable_path='{}` parameter.\n\n" - .format("/path/to/browser/executable" if IS_POSIX else "c:/path/to/your/browser.exe") - ) - - self._delay = 3 - - self.user_data_dir = user_data_dir - self.keep_user_data_dir = keep_user_data_dir - - if suppress_welcome: - options.arguments.extend(["--no-default-browser-check", "--no-first-run"]) - if no_sandbox: - options.arguments.extend(["--no-sandbox", "--test-type"]) - - if headless or getattr(options, 'headless', None): - #workaround until a better checking is found - try: - v_main = int(self.patcher.version_main) if self.patcher.version_main else 108 - if v_main < 108: - options.add_argument("--headless=chrome") - elif v_main >= 108: - options.add_argument("--headless=new") - except: - logger.warning("could not detect version_main." - "therefore, we are assuming it is chrome 108 or higher") - options.add_argument("--headless=new") - - options.add_argument("--window-size=1920,1080") - options.add_argument("--start-maximized") - options.add_argument("--no-sandbox") - # fixes "could not connect to chrome" error when running - # on linux using privileged user like root (which i don't recommend) - - options.add_argument( - "--log-level=%d" % log_level - or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] - ) - - if hasattr(options, "handle_prefs"): - options.handle_prefs(user_data_dir) - - # fix exit_type flag to prevent tab-restore nag - try: - with open( - os.path.join(user_data_dir, "Default/Preferences"), - encoding="latin1", - mode="r+", - ) as fs: - config = json.load(fs) - if config["profile"]["exit_type"] is not None: - # fixing the restore-tabs-nag - config["profile"]["exit_type"] = None - fs.seek(0, 0) - json.dump(config, fs) - fs.truncate() # the file might be shorter - logger.debug("fixed exit_type flag") - except Exception as e: - logger.debug("did not find a bad exit_type flag ") - - self.options = options - - if not desired_capabilities: - desired_capabilities = options.to_capabilities() - - if not use_subprocess and not windows_headless: - self.browser_pid = start_detached( - options.binary_location, *options.arguments - ) - else: - startupinfo = None - if os.name == 'nt' and windows_headless: - # STARTUPINFO() is Windows only - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - browser = subprocess.Popen( - [options.binary_location, *options.arguments], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=IS_POSIX, - startupinfo=startupinfo - ) - self.browser_pid = browser.pid - - - service = selenium.webdriver.chromium.service.ChromiumService( - self.patcher.executable_path - ) - - super(Chrome, self).__init__( - service=service, - options=options, - keep_alive=keep_alive, - ) - - self.reactor = None - - if enable_cdp_events: - if logging.getLogger().getEffectiveLevel() == logging.DEBUG: - logging.getLogger( - "selenium.webdriver.remote.remote_connection" - ).setLevel(20) - reactor = Reactor(self) - reactor.start() - self.reactor = reactor - - if advanced_elements: - self._web_element_cls = UCWebElement - else: - self._web_element_cls = WebElement - - if headless or getattr(options, 'headless', None): - self._configure_headless() - - def _configure_headless(self): - orig_get = self.get - logger.info("setting properties for headless") - - def get_wrapped(*args, **kwargs): - if self.execute_script("return navigator.webdriver"): - logger.info("patch navigator.webdriver") - self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument", - { - "source": """ - - Object.defineProperty(window, "navigator", { - Object.defineProperty(window, "navigator", { - value: new Proxy(navigator, { - has: (target, key) => (key === "webdriver" ? false : key in target), - get: (target, key) => - key === "webdriver" - ? false - : typeof target[key] === "function" - ? target[key].bind(target) - : target[key], - }), - }); - """ - }, - ) - - logger.info("patch user-agent string") - self.execute_cdp_cmd( - "Network.setUserAgentOverride", - { - "userAgent": self.execute_script( - "return navigator.userAgent" - ).replace("Headless", "") - }, - ) - self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument", - { - "source": """ - Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1}); - Object.defineProperty(navigator.connection, 'rtt', {get: () => 100}); - - // https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/chrome-runtime.js - window.chrome = { - app: { - isInstalled: false, - InstallState: { - DISABLED: 'disabled', - INSTALLED: 'installed', - NOT_INSTALLED: 'not_installed' - }, - RunningState: { - CANNOT_RUN: 'cannot_run', - READY_TO_RUN: 'ready_to_run', - RUNNING: 'running' - } - }, - runtime: { - OnInstalledReason: { - CHROME_UPDATE: 'chrome_update', - INSTALL: 'install', - SHARED_MODULE_UPDATE: 'shared_module_update', - UPDATE: 'update' - }, - OnRestartRequiredReason: { - APP_UPDATE: 'app_update', - OS_UPDATE: 'os_update', - PERIODIC: 'periodic' - }, - PlatformArch: { - ARM: 'arm', - ARM64: 'arm64', - MIPS: 'mips', - MIPS64: 'mips64', - X86_32: 'x86-32', - X86_64: 'x86-64' - }, - PlatformNaclArch: { - ARM: 'arm', - MIPS: 'mips', - MIPS64: 'mips64', - X86_32: 'x86-32', - X86_64: 'x86-64' - }, - PlatformOs: { - ANDROID: 'android', - CROS: 'cros', - LINUX: 'linux', - MAC: 'mac', - OPENBSD: 'openbsd', - WIN: 'win' - }, - RequestUpdateCheckStatus: { - NO_UPDATE: 'no_update', - THROTTLED: 'throttled', - UPDATE_AVAILABLE: 'update_available' - } - } - } - - // https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/navigator-permissions.js - if (!window.Notification) { - window.Notification = { - permission: 'denied' - } - } - - const originalQuery = window.navigator.permissions.query - window.navigator.permissions.__proto__.query = parameters => - parameters.name === 'notifications' - ? Promise.resolve({ state: window.Notification.permission }) - : originalQuery(parameters) - - const oldCall = Function.prototype.call - function call() { - return oldCall.apply(this, arguments) - } - Function.prototype.call = call - - const nativeToStringFunctionString = Error.toString().replace(/Error/g, 'toString') - const oldToString = Function.prototype.toString - - function functionToString() { - if (this === window.navigator.permissions.query) { - return 'function query() { [native code] }' - } - if (this === functionToString) { - return nativeToStringFunctionString - } - return oldCall.call(oldToString, this) - } - // eslint-disable-next-line - Function.prototype.toString = functionToString - """ - }, - ) - return orig_get(*args, **kwargs) - - self.get = get_wrapped - - # def _get_cdc_props(self): - # return self.execute_script( - # """ - # let objectToInspect = window, - # result = []; - # while(objectToInspect !== null) - # { result = result.concat(Object.getOwnPropertyNames(objectToInspect)); - # objectToInspect = Object.getPrototypeOf(objectToInspect); } - # - # return result.filter(i => i.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig)) - # """ - # ) - # - # def _hook_remove_cdc_props(self): - # self.execute_cdp_cmd( - # "Page.addScriptToEvaluateOnNewDocument", - # { - # "source": """ - # let objectToInspect = window, - # result = []; - # while(objectToInspect !== null) - # { result = result.concat(Object.getOwnPropertyNames(objectToInspect)); - # objectToInspect = Object.getPrototypeOf(objectToInspect); } - # result.forEach(p => p.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig) - # &&delete window[p]&&console.log('removed',p)) - # """ - # }, - # ) - - def get(self, url): - # if self._get_cdc_props(): - # self._hook_remove_cdc_props() - return super().get(url) - - def add_cdp_listener(self, event_name, callback): - if ( - self.reactor - and self.reactor is not None - and isinstance(self.reactor, Reactor) - ): - self.reactor.add_event_handler(event_name, callback) - return self.reactor.handlers - return False - - def clear_cdp_listeners(self): - if self.reactor and isinstance(self.reactor, Reactor): - self.reactor.handlers.clear() - - def window_new(self): - self.execute( - selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"} - ) - - def tab_new(self, url: str): - """ - this opens a url in a new tab. - apparently, that passes all tests directly! - - Parameters - ---------- - url - - Returns - ------- - - """ - if not hasattr(self, "cdp"): - from .cdp import CDP - - cdp = CDP(self.options) - cdp.tab_new(url) - - def reconnect(self, timeout=0.1): - try: - self.service.stop() - except Exception as e: - logger.debug(e) - time.sleep(timeout) - try: - self.service.start() - except Exception as e: - logger.debug(e) - - try: - self.start_session() - except Exception as e: - logger.debug(e) - - def start_session(self, capabilities=None, browser_profile=None): - if not capabilities: - capabilities = self.options.to_capabilities() - super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session( - capabilities - ) - # super(Chrome, self).start_session(capabilities, browser_profile) - - def find_elements_recursive(self, by, value): - """ - find elements in all frames - this is a generator function, which is needed - since if it would return a list of elements, they - will be stale on arrival. - using generator, when the element is returned we are in the correct frame - to use it directly - Args: - by: By - value: str - Returns: Generator[webelement.WebElement] - """ - def search_frame(f=None): - if not f: - # ensure we are on main content frame - self.switch_to.default_content() - else: - self.switch_to.frame(f) - for elem in self.find_elements(by, value): - yield elem - # switch back to main content, otherwise we will get StaleElementReferenceException - self.switch_to.default_content() - - # search root frame - for elem in search_frame(): - yield elem - # get iframes - frames = self.find_elements('css selector', 'iframe') - - # search per frame - for f in frames: - for elem in search_frame(f): - yield elem - - def quit(self): - try: - self.service.stop() - self.service.process.kill() - self.command_executor.close() - self.service.process.wait(5) - logger.debug("webdriver process ended") - except (AttributeError, RuntimeError, OSError): - pass - try: - self.reactor.event.set() - logger.debug("shutting down reactor") - except AttributeError: - pass - try: - os.kill(self.browser_pid, 15) - logger.debug("gracefully closed browser") - except Exception as e: # noqa - pass - if ( - hasattr(self, "keep_user_data_dir") - and hasattr(self, "user_data_dir") - and not self.keep_user_data_dir - ): - for _ in range(5): - try: - shutil.rmtree(self.user_data_dir, ignore_errors=False) - except FileNotFoundError: - pass - except (RuntimeError, OSError, PermissionError) as e: - logger.debug( - "When removing the temp profile, a %s occured: %s\nretrying..." - % (e.__class__.__name__, e) - ) - else: - logger.debug("successfully removed %s" % self.user_data_dir) - break - - try: - time.sleep(0.1) - except OSError: - pass - - # dereference patcher, so patcher can start cleaning up as well. - # this must come last, otherwise it will throw 'in use' errors - self.patcher = None - - def __getattribute__(self, item): - if not super().__getattribute__("debug"): - return super().__getattribute__(item) - else: - import inspect - - original = super().__getattribute__(item) - if inspect.ismethod(original) and not inspect.isclass(original): - - def newfunc(*args, **kwargs): - logger.debug( - "calling %s with args %s and kwargs %s\n" - % (original.__qualname__, args, kwargs) - ) - return original(*args, **kwargs) - - return newfunc - return original - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.service.stop() - time.sleep(self._delay) - self.service.start() - self.start_session() - - def __hash__(self): - return hash(self.options.debugger_address) - - def __dir__(self): - return object.__dir__(self) - - def __del__(self): - try: - self.service.process.kill() - except: # noqa - pass - self.quit() - - @classmethod - def _ensure_close(cls, self): - # needs to be a classmethod so finalize can find the reference - logger.info("ensuring close") - if ( - hasattr(self, "service") - and hasattr(self.service, "process") - and hasattr(self.service.process, "kill") - ): - self.service.process.kill() - - -def find_chrome_executable(): - """ - Finds the chrome, chrome beta, chrome canary, chromium executable - - Returns - ------- - executable_path : str - the full file path to found executable - - """ - candidates = set() - if IS_POSIX: - for item in os.environ.get("PATH").split(os.pathsep): - for subitem in ( - "google-chrome", - "chromium", - "chromium-browser", - "chrome", - "google-chrome-stable", - ): - candidates.add(os.sep.join((item, subitem))) - if "darwin" in sys.platform: - candidates.update( - [ - "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", - "/Applications/Chromium.app/Contents/MacOS/Chromium", - ] - ) - else: - for item in map( - os.environ.get, - ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"), - ): - if item is not None: - for subitem in ( - "Google/Chrome/Application", - ): - candidates.add(os.sep.join((item, subitem, "chrome.exe"))) - for candidate in candidates: - logger.debug('checking if %s exists and is executable' % candidate) - if os.path.exists(candidate) and os.access(candidate, os.X_OK): - logger.debug('found! using %s' % candidate) - return os.path.normpath(candidate) diff --git a/src/undetected_chromedriver/cdp.py b/src/undetected_chromedriver/cdp.py deleted file mode 100644 index 32a503c..0000000 --- a/src/undetected_chromedriver/cdp.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -import json -import logging - -import requests -import websockets - - -log = logging.getLogger(__name__) - - -class CDPObject(dict): - def __init__(self, *a, **k): - super().__init__(*a, **k) - self.__dict__ = self - for k in self.__dict__: - if isinstance(self.__dict__[k], dict): - self.__dict__[k] = CDPObject(self.__dict__[k]) - elif isinstance(self.__dict__[k], list): - for i in range(len(self.__dict__[k])): - if isinstance(self.__dict__[k][i], dict): - self.__dict__[k][i] = CDPObject(self) - - def __repr__(self): - tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)" - return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items())) - - -class PageElement(CDPObject): - pass - - -class CDP: - log = logging.getLogger("CDP") - - endpoints = CDPObject( - { - "json": "/json", - "protocol": "/json/protocol", - "list": "/json/list", - "new": "/json/new?{url}", - "activate": "/json/activate/{id}", - "close": "/json/close/{id}", - } - ) - - def __init__(self, options: "ChromeOptions"): # noqa - self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":")) - - self._reqid = 0 - self._session = requests.Session() - self._last_resp = None - self._last_json = None - - resp = self.get(self.endpoints.json) # noqa - self.sessionId = resp[0]["id"] - self.wsurl = resp[0]["webSocketDebuggerUrl"] - - def tab_activate(self, id=None): - if not id: - active_tab = self.tab_list()[0] - id = active_tab.id # noqa - self.wsurl = active_tab.webSocketDebuggerUrl # noqa - return self.post(self.endpoints["activate"].format(id=id)) - - def tab_list(self): - retval = self.get(self.endpoints["list"]) - return [PageElement(o) for o in retval] - - def tab_new(self, url): - return self.post(self.endpoints["new"].format(url=url)) - - def tab_close_last_opened(self): - sessions = self.tab_list() - opentabs = [s for s in sessions if s["type"] == "page"] - return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"])) - - async def send(self, method: str, params: dict): - self._reqid += 1 - async with websockets.connect(self.wsurl) as ws: - await ws.send( - json.dumps({"method": method, "params": params, "id": self._reqid}) - ) - self._last_resp = await ws.recv() - self._last_json = json.loads(self._last_resp) - self.log.info(self._last_json) - - def get(self, uri): - resp = self._session.get(self.server_addr + uri) - try: - self._last_resp = resp - self._last_json = resp.json() - except Exception: - return - else: - return self._last_json - - def post(self, uri, data: dict = None): - if not data: - data = {} - resp = self._session.post(self.server_addr + uri, json=data) - try: - self._last_resp = resp - self._last_json = resp.json() - except Exception: - return self._last_resp - - @property - def last_json(self): - return self._last_json diff --git a/src/undetected_chromedriver/devtool.py b/src/undetected_chromedriver/devtool.py deleted file mode 100644 index 915d417..0000000 --- a/src/undetected_chromedriver/devtool.py +++ /dev/null @@ -1,193 +0,0 @@ -import asyncio -from collections.abc import Mapping -from collections.abc import Sequence -from functools import wraps -import os -import logging -import threading -import time -import traceback -from typing import Any -from typing import Awaitable -from typing import Callable -from typing import List -from typing import Optional - - -class Structure(dict): - """ - This is a dict-like object structure, which you should subclass - Only properties defined in the class context are used on initialization. - - See example - """ - - _store = {} - - def __init__(self, *a, **kw): - """ - Instantiate a new instance. - - :param a: - :param kw: - """ - - super().__init__() - - # auxiliar dict - d = dict(*a, **kw) - for k, v in d.items(): - if isinstance(v, Mapping): - self[k] = self.__class__(v) - elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): - self[k] = [self.__class__(i) for i in v] - else: - self[k] = v - super().__setattr__("__dict__", self) - - def __getattr__(self, item): - return getattr(super(), item) - - def __getitem__(self, item): - return super().__getitem__(item) - - def __setattr__(self, key, value): - self.__setitem__(key, value) - - def __setitem__(self, key, value): - super().__setitem__(key, value) - - def update(self, *a, **kw): - super().update(*a, **kw) - - def __eq__(self, other): - return frozenset(other.items()) == frozenset(self.items()) - - def __hash__(self): - return hash(frozenset(self.items())) - - @classmethod - def __init_subclass__(cls, **kwargs): - cls._store = {} - - def _normalize_strings(self): - for k, v in self.copy().items(): - if isinstance(v, (str)): - self[k] = v.strip() - - -def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None): - def wrapper(func): - @wraps(func) - def wrapped(*args, **kwargs): - def function_reached_timeout(): - if on_timeout: - on_timeout(func) - else: - raise TimeoutError("function call timed out") - - t = threading.Timer(interval=seconds, function=function_reached_timeout) - t.start() - try: - return func(*args, **kwargs) - except: - t.cancel() - raise - finally: - t.cancel() - - return wrapped - - return wrapper - - -def test(): - import sys, os - - sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) - import undetected_chromedriver as uc - import threading - - def collector( - driver: uc.Chrome, - stop_event: threading.Event, - on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None, - listen_events: Sequence = ("browser", "network", "performance"), - ): - def threaded(driver, stop_event, on_event_coro): - async def _ensure_service_started(): - while ( - getattr(driver, "service", False) - and getattr(driver.service, "process", False) - and driver.service.process.poll() - ): - print("waiting for driver service to come back on") - await asyncio.sleep(0.05) - # await asyncio.sleep(driver._delay or .25) - - async def get_log_lines(typ): - await _ensure_service_started() - return driver.get_log(typ) - - async def looper(): - while not stop_event.is_set(): - log_lines = [] - try: - for _ in listen_events: - try: - log_lines += await get_log_lines(_) - except: - if logging.getLogger().getEffectiveLevel() <= 10: - traceback.print_exc() - continue - if log_lines and on_event_coro: - await on_event_coro(log_lines) - except Exception as e: - if logging.getLogger().getEffectiveLevel() <= 10: - traceback.print_exc() - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(looper()) - - t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro)) - t.start() - - async def on_event(data): - print("on_event") - print("data:", data) - - def func_called(fn): - def wrapped(*args, **kwargs): - print( - "func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs) - ) - while driver.service.process and driver.service.process.poll() is not None: - time.sleep(0.1) - res = fn(*args, **kwargs) - print("func completed! (result: %s)" % res) - return res - - return wrapped - - logging.basicConfig(level=10) - - options = uc.ChromeOptions() - options.set_capability( - "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"} - ) - - driver = uc.Chrome(version_main=96, options=options) - - # driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request) - driver.command_executor._request = func_called(driver.command_executor._request) - collector_stop = threading.Event() - collector(driver, collector_stop, on_event) - - driver.get("https://nowsecure.nl") - - time.sleep(10) - - if os.name == "nt": - driver.close() - driver.quit() diff --git a/src/undetected_chromedriver/dprocess.py b/src/undetected_chromedriver/dprocess.py deleted file mode 100644 index 6d053fa..0000000 --- a/src/undetected_chromedriver/dprocess.py +++ /dev/null @@ -1,77 +0,0 @@ -import atexit -import logging -import multiprocessing -import os -import platform -import signal -from subprocess import PIPE -from subprocess import Popen -import sys - - -CREATE_NEW_PROCESS_GROUP = 0x00000200 -DETACHED_PROCESS = 0x00000008 - -REGISTERED = [] - - -def start_detached(executable, *args): - """ - Starts a fully independent subprocess (with no parent) - :param executable: executable - :param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...] - :return: pid of the grandchild process - """ - - # create pipe - reader, writer = multiprocessing.Pipe(False) - - # do not keep reference - process = multiprocessing.Process( - target=_start_detached, - args=(executable, *args), - kwargs={"writer": writer}, - daemon=True, - ) - process.start() - process.join() - # receive pid from pipe - pid = reader.recv() - REGISTERED.append(pid) - # close pipes - writer.close() - reader.close() - process.close() - - return pid - - -def _start_detached(executable, *args, writer: multiprocessing.Pipe = None): - # configure launch - kwargs = {} - if platform.system() == "Windows": - kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP) - elif sys.version_info < (3, 2): - # assume posix - kwargs.update(preexec_fn=os.setsid) - else: # Python 3.2+ and Unix - kwargs.update(start_new_session=True) - - # run - p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs) - - # send pid to pipe - writer.send(p.pid) - sys.exit() - - -def _cleanup(): - for pid in REGISTERED: - try: - logging.getLogger(__name__).debug("cleaning up pid %d " % pid) - os.kill(pid, signal.SIGTERM) - except: # noqa - pass - - -atexit.register(_cleanup) diff --git a/src/undetected_chromedriver/options.py b/src/undetected_chromedriver/options.py deleted file mode 100644 index 8078ae9..0000000 --- a/src/undetected_chromedriver/options.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - - -import json -import os - -from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions - - -class ChromeOptions(_ChromiumOptions): - _session = None - _user_data_dir = None - - @property - def user_data_dir(self): - return self._user_data_dir - - @user_data_dir.setter - def user_data_dir(self, path: str): - """ - Sets the browser profile folder to use, or creates a new profile - at given . - - Parameters - ---------- - path: str - the path to a chrome profile folder - if it does not exist, a new profile will be created at given location - """ - apath = os.path.abspath(path) - self._user_data_dir = os.path.normpath(apath) - - @staticmethod - def _undot_key(key, value): - """turn a (dotted key, value) into a proper nested dict""" - if "." in key: - key, rest = key.split(".", 1) - value = ChromeOptions._undot_key(rest, value) - return {key: value} - - @staticmethod - def _merge_nested(a, b): - """ - merges b into a - leaf values in a are overwritten with values from b - """ - for key in b: - if key in a: - if isinstance(a[key], dict) and isinstance(b[key], dict): - ChromeOptions._merge_nested(a[key], b[key]) - continue - a[key] = b[key] - return a - - def handle_prefs(self, user_data_dir): - prefs = self.experimental_options.get("prefs") - if prefs: - user_data_dir = user_data_dir or self._user_data_dir - default_path = os.path.join(user_data_dir, "Default") - os.makedirs(default_path, exist_ok=True) - - # undot prefs dict keys - undot_prefs = {} - for key, value in prefs.items(): - undot_prefs = self._merge_nested( - undot_prefs, self._undot_key(key, value) - ) - - prefs_file = os.path.join(default_path, "Preferences") - if os.path.exists(prefs_file): - with open(prefs_file, encoding="latin1", mode="r") as f: - undot_prefs = self._merge_nested(json.load(f), undot_prefs) - - with open(prefs_file, encoding="latin1", mode="w") as f: - json.dump(undot_prefs, f) - - # remove the experimental_options to avoid an error - del self._experimental_options["prefs"] - - @classmethod - def from_options(cls, options): - o = cls() - o.__dict__.update(options.__dict__) - return o diff --git a/src/undetected_chromedriver/patcher.py b/src/undetected_chromedriver/patcher.py deleted file mode 100644 index e8a0e96..0000000 --- a/src/undetected_chromedriver/patcher.py +++ /dev/null @@ -1,451 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -from distutils.version import LooseVersion -import io -import json -import logging -import os -import pathlib -import platform -import random -import re -import shutil -import string -import sys -import time -from urllib.request import urlopen -from urllib.request import urlretrieve -import zipfile -from multiprocessing import Lock - -logger = logging.getLogger(__name__) - -IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd")) - - -class Patcher(object): - lock = Lock() - exe_name = "chromedriver%s" - - platform = sys.platform - if platform.endswith("win32"): - d = "~/appdata/roaming/undetected_chromedriver" - elif "LAMBDA_TASK_ROOT" in os.environ: - d = "/tmp/undetected_chromedriver" - elif platform.startswith(("linux", "linux2")): - d = "~/.local/share/undetected_chromedriver" - elif platform.endswith("darwin"): - d = "~/Library/Application Support/undetected_chromedriver" - else: - d = "~/.undetected_chromedriver" - data_path = os.path.abspath(os.path.expanduser(d)) - - def __init__( - self, - executable_path=None, - force=False, - version_main: int = 0, - user_multi_procs=False, - ): - """ - Args: - executable_path: None = automatic - a full file path to the chromedriver executable - force: False - terminate processes which are holding lock - version_main: 0 = auto - specify main chrome version (rounded, ex: 82) - """ - self.force = force - self._custom_exe_path = False - prefix = "undetected" - self.user_multi_procs = user_multi_procs - - try: - # Try to convert version_main into an integer - version_main_int = int(version_main) - # check if version_main_int is less than or equal to e.g 114 - self.is_old_chromedriver = version_main and version_main_int <= 114 - except (ValueError,TypeError): - # Check not running inside Docker - if not os.path.exists("/app/chromedriver"): - # If the conversion fails, log an error message - logging.info("version_main cannot be converted to an integer") - # Set self.is_old_chromedriver to False if the conversion fails - self.is_old_chromedriver = False - - # Needs to be called before self.exe_name is accessed - self._set_platform_name() - - if not os.path.exists(self.data_path): - os.makedirs(self.data_path, exist_ok=True) - - if not executable_path: - if sys.platform.startswith("freebsd"): - self.executable_path = os.path.join( - self.data_path, self.exe_name - ) - else: - self.executable_path = os.path.join( - self.data_path, "_".join([prefix, self.exe_name]) - ) - - if not IS_POSIX: - if executable_path: - if not executable_path[-4:] == ".exe": - executable_path += ".exe" - - self.zip_path = os.path.join(self.data_path, prefix) - - if not executable_path: - if not self.user_multi_procs: - self.executable_path = os.path.abspath( - os.path.join(".", self.executable_path) - ) - - if executable_path: - self._custom_exe_path = True - self.executable_path = executable_path - - # Set the correct repository to download the Chromedriver from - if self.is_old_chromedriver: - self.url_repo = "https://chromedriver.storage.googleapis.com" - else: - self.url_repo = "https://googlechromelabs.github.io/chrome-for-testing" - - self.version_main = version_main - self.version_full = None - - def _set_platform_name(self): - """ - Set the platform and exe name based on the platform undetected_chromedriver is running on - in order to download the correct chromedriver. - """ - if self.platform.endswith("win32"): - self.platform_name = "win32" - self.exe_name %= ".exe" - if self.platform.endswith(("linux", "linux2")): - self.platform_name = "linux64" - self.exe_name %= "" - if self.platform.endswith("darwin"): - if self.is_old_chromedriver: - self.platform_name = "mac64" - else: - self.platform_name = "mac-x64" - self.exe_name %= "" - if self.platform.startswith("freebsd"): - self.platform_name = "freebsd" - self.exe_name %= "" - - def auto(self, executable_path=None, force=False, version_main=None, _=None): - """ - - Args: - executable_path: - force: - version_main: - - Returns: - - """ - p = pathlib.Path(self.data_path) - if self.user_multi_procs: - with Lock(): - files = list(p.rglob("*chromedriver*")) - most_recent = max(files, key=lambda f: f.stat().st_mtime) - files.remove(most_recent) - list(map(lambda f: f.unlink(), files)) - if self.is_binary_patched(most_recent): - self.executable_path = str(most_recent) - return True - - if executable_path: - self.executable_path = executable_path - self._custom_exe_path = True - - if self._custom_exe_path: - ispatched = self.is_binary_patched(self.executable_path) - if not ispatched: - return self.patch_exe() - else: - return - - if version_main: - self.version_main = version_main - if force is True: - self.force = force - - - if self.platform_name == "freebsd": - chromedriver_path = shutil.which("chromedriver") - - if not os.path.isfile(chromedriver_path) or not os.access(chromedriver_path, os.X_OK): - logging.error("Chromedriver not installed!") - return - - version_path = os.path.join(os.path.dirname(self.executable_path), "version.txt") - - process = os.popen(f'"{chromedriver_path}" --version') - chromedriver_version = process.read().split(' ')[1].split(' ')[0] - process.close() - - current_version = None - if os.path.isfile(version_path) or os.access(version_path, os.X_OK): - with open(version_path, 'r') as f: - current_version = f.read() - - if current_version != chromedriver_version: - logging.info("Copying chromedriver executable...") - shutil.copy(chromedriver_path, self.executable_path) - os.chmod(self.executable_path, 0o755) - - with open(version_path, 'w') as f: - f.write(chromedriver_version) - - logging.info("Chromedriver executable copied!") - else: - try: - os.unlink(self.executable_path) - except PermissionError: - if self.force: - self.force_kill_instances(self.executable_path) - return self.auto(force=not self.force) - try: - if self.is_binary_patched(): - # assumes already running AND patched - return True - except PermissionError: - pass - # return False - except FileNotFoundError: - pass - - release = self.fetch_release_number() - self.version_main = release.version[0] - self.version_full = release - self.unzip_package(self.fetch_package()) - - return self.patch() - - def driver_binary_in_use(self, path: str = None) -> bool: - """ - naive test to check if a found chromedriver binary is - currently in use - - Args: - path: a string or PathLike object to the binary to check. - if not specified, we check use this object's executable_path - """ - if not path: - path = self.executable_path - p = pathlib.Path(path) - - if not p.exists(): - raise OSError("file does not exist: %s" % p) - try: - with open(p, mode="a+b") as fs: - exc = [] - try: - - fs.seek(0, 0) - except PermissionError as e: - exc.append(e) # since some systems apprently allow seeking - # we conduct another test - try: - fs.readline() - except PermissionError as e: - exc.append(e) - - if exc: - - return True - return False - # ok safe to assume this is in use - except Exception as e: - # logger.exception("whoops ", e) - pass - - def cleanup_unused_files(self): - p = pathlib.Path(self.data_path) - items = list(p.glob("*undetected*")) - for item in items: - try: - item.unlink() - except: - pass - - def patch(self): - self.patch_exe() - return self.is_binary_patched() - - def fetch_release_number(self): - """ - Gets the latest major version available, or the latest major version of self.target_version if set explicitly. - :return: version string - :rtype: LooseVersion - """ - # Endpoint for old versions of Chromedriver (114 and below) - if self.is_old_chromedriver: - path = f"/latest_release_{self.version_main}" - path = path.upper() - logger.debug("getting release number from %s" % path) - return LooseVersion(urlopen(self.url_repo + path).read().decode()) - - # Endpoint for new versions of Chromedriver (115+) - if not self.version_main: - # Fetch the latest version - path = "/last-known-good-versions-with-downloads.json" - logger.debug("getting release number from %s" % path) - with urlopen(self.url_repo + path) as conn: - response = conn.read().decode() - - last_versions = json.loads(response) - return LooseVersion(last_versions["channels"]["Stable"]["version"]) - - # Fetch the latest minor version of the major version provided - path = "/latest-versions-per-milestone-with-downloads.json" - logger.debug("getting release number from %s" % path) - with urlopen(self.url_repo + path) as conn: - response = conn.read().decode() - - major_versions = json.loads(response) - return LooseVersion(major_versions["milestones"][str(self.version_main)]["version"]) - - def parse_exe_version(self): - with io.open(self.executable_path, "rb") as f: - for line in iter(lambda: f.readline(), b""): - match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) - if match: - return LooseVersion(match[1].decode()) - - def fetch_package(self): - """ - Downloads ChromeDriver from source - - :return: path to downloaded file - """ - zip_name = f"chromedriver_{self.platform_name}.zip" - if self.is_old_chromedriver: - download_url = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, zip_name) - else: - zip_name = zip_name.replace("_", "-", 1) - download_url = "https://storage.googleapis.com/chrome-for-testing-public/%s/%s/%s" - download_url %= (self.version_full.vstring, self.platform_name, zip_name) - - logger.debug("downloading from %s" % download_url) - return urlretrieve(download_url)[0] - - def unzip_package(self, fp): - """ - Does what it says - - :return: path to unpacked executable - """ - exe_path = self.exe_name - if not self.is_old_chromedriver: - # The new chromedriver unzips into its own folder - zip_name = f"chromedriver-{self.platform_name}" - exe_path = os.path.join(zip_name, self.exe_name) - - logger.debug("unzipping %s" % fp) - try: - os.unlink(self.zip_path) - except (FileNotFoundError, OSError): - pass - - os.makedirs(self.zip_path, mode=0o755, exist_ok=True) - with zipfile.ZipFile(fp, mode="r") as zf: - zf.extractall(self.zip_path) - os.rename(os.path.join(self.zip_path, exe_path), self.executable_path) - os.remove(fp) - shutil.rmtree - os.chmod(self.executable_path, 0o755) - return self.executable_path - - @staticmethod - def force_kill_instances(exe_name): - """ - kills running instances. - :param: executable name to kill, may be a path as well - - :return: True on success else False - """ - exe_name = os.path.basename(exe_name) - if IS_POSIX: - r = os.system("kill -f -9 $(pidof %s)" % exe_name) - else: - r = os.system("taskkill /f /im %s" % exe_name) - return not r - - @staticmethod - def gen_random_cdc(): - cdc = random.choices(string.ascii_letters, k=27) - return "".join(cdc).encode() - - def is_binary_patched(self, executable_path=None): - executable_path = executable_path or self.executable_path - try: - with io.open(executable_path, "rb") as fh: - return fh.read().find(b"undetected chromedriver") != -1 - except FileNotFoundError: - return False - - def patch_exe(self): - start = time.perf_counter() - logger.info("patching driver executable %s" % self.executable_path) - with io.open(self.executable_path, "r+b") as fh: - content = fh.read() - # match_injected_codeblock = re.search(rb"{window.*;}", content) - match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content) - if match_injected_codeblock: - target_bytes = match_injected_codeblock[0] - new_target_bytes = ( - b'{console.log("undetected chromedriver 1337!")}'.ljust( - len(target_bytes), b" " - ) - ) - new_content = content.replace(target_bytes, new_target_bytes) - if new_content == content: - logger.warning( - "something went wrong patching the driver binary. could not find injection code block" - ) - else: - logger.debug( - "found block:\n%s\nreplacing with:\n%s" - % (target_bytes, new_target_bytes) - ) - fh.seek(0) - fh.write(new_content) - logger.debug( - "patching took us {:.2f} seconds".format(time.perf_counter() - start) - ) - - def __repr__(self): - return "{0:s}({1:s})".format( - self.__class__.__name__, - self.executable_path, - ) - - def __del__(self): - if self._custom_exe_path: - # if the driver binary is specified by user - # we assume it is important enough to not delete it - return - else: - timeout = 3 # stop trying after this many seconds - t = time.monotonic() - now = lambda: time.monotonic() - while now() - t > timeout: - # we don't want to wait until the end of time - try: - if self.user_multi_procs: - break - os.unlink(self.executable_path) - logger.debug("successfully unlinked %s" % self.executable_path) - break - except (OSError, RuntimeError, PermissionError): - time.sleep(0.01) - continue - except FileNotFoundError: - break diff --git a/src/undetected_chromedriver/reactor.py b/src/undetected_chromedriver/reactor.py deleted file mode 100644 index d52e312..0000000 --- a/src/undetected_chromedriver/reactor.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -import asyncio -import json -import logging -import threading - - -logger = logging.getLogger(__name__) - - -class Reactor(threading.Thread): - def __init__(self, driver: "Chrome"): - super().__init__() - - self.driver = driver - self.loop = asyncio.new_event_loop() - - self.lock = threading.Lock() - self.event = threading.Event() - self.daemon = True - self.handlers = {} - - def add_event_handler(self, method_name, callback: callable): - """ - - Parameters - ---------- - event_name: str - example "Network.responseReceived" - - callback: callable - callable which accepts 1 parameter: the message object dictionary - - Returns - ------- - - """ - with self.lock: - self.handlers[method_name.lower()] = callback - - @property - def running(self): - return not self.event.is_set() - - def run(self): - try: - asyncio.set_event_loop(self.loop) - self.loop.run_until_complete(self.listen()) - except Exception as e: - logger.warning("Reactor.run() => %s", e) - - async def _wait_service_started(self): - while True: - with self.lock: - if ( - getattr(self.driver, "service", None) - and getattr(self.driver.service, "process", None) - and self.driver.service.process.poll() - ): - await asyncio.sleep(self.driver._delay or 0.25) - else: - break - - async def listen(self): - while self.running: - await self._wait_service_started() - await asyncio.sleep(1) - - try: - with self.lock: - log_entries = self.driver.get_log("performance") - - for entry in log_entries: - try: - obj_serialized: str = entry.get("message") - obj = json.loads(obj_serialized) - message = obj.get("message") - method = message.get("method") - - if "*" in self.handlers: - await self.loop.run_in_executor( - None, self.handlers["*"], message - ) - elif method.lower() in self.handlers: - await self.loop.run_in_executor( - None, self.handlers[method.lower()], message - ) - - # print(type(message), message) - except Exception as e: - raise e from None - - except Exception as e: - if "invalid session id" in str(e): - pass - else: - logging.debug("exception ignored :", e) diff --git a/src/undetected_chromedriver/webelement.py b/src/undetected_chromedriver/webelement.py deleted file mode 100644 index 03d6878..0000000 --- a/src/undetected_chromedriver/webelement.py +++ /dev/null @@ -1,86 +0,0 @@ -from typing import List - -from selenium.webdriver.common.by import By -import selenium.webdriver.remote.webelement - - -class WebElement(selenium.webdriver.remote.webelement.WebElement): - def click_safe(self): - super().click() - self._parent.reconnect(0.1) - - def children( - self, tag=None, recursive=False - ) -> List[selenium.webdriver.remote.webelement.WebElement]: - """ - returns direct child elements of current element - :param tag: str, if supplied, returns nodes only - """ - script = "return [... arguments[0].children]" - if tag: - script += ".filter( node => node.tagName === '%s')" % tag.upper() - if recursive: - return list(_recursive_children(self, tag)) - return list(self._parent.execute_script(script, self)) - - -class UCWebElement(WebElement): - """ - Custom WebElement class which makes it easier to view elements when - working in an interactive environment. - - standard webelement repr: - - - using this WebElement class: - )> - - """ - - def __init__(self, parent, id_): - super().__init__(parent, id_) - self._attrs = None - - @property - def attrs(self): - if not self._attrs: - self._attrs = self._parent.execute_script( - """ - var items = {}; - for (index = 0; index < arguments[0].attributes.length; ++index) - { - items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value - }; - return items; - """, - self, - ) - return self._attrs - - def __repr__(self): - strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()]) - if strattrs: - strattrs = " " + strattrs - return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>" - - -def _recursive_children(element, tag: str = None, _results=None): - """ - returns all children of recursively - - :param element: `WebElement` object. - find children below this - - :param tag: str = None. - if provided, return only elements. example: 'a', or 'img' - :param _results: do not use! - """ - results = _results or set() - for element in element.children(): - if tag: - if element.tag_name == tag: - results.add(element) - else: - results.add(element) - results |= _recursive_children(element, tag, results) - return results