diff --git a/requirements.txt b/requirements.txt index 26a6f98..204c344 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ bottle==0.12.25 waitress==2.1.2 -selenium==4.15.2 +DrissionPage==4.1.0.0b14 func-timeout==4.3.5 prometheus-client==0.17.1 # required by undetected_chromedriver diff --git a/src/flaresolverr_service.py b/src/flaresolverr_service.py index cfc2088..fe05be5 100644 --- a/src/flaresolverr_service.py +++ b/src/flaresolverr_service.py @@ -7,13 +7,8 @@ from html import escape from urllib.parse import unquote, quote from func_timeout import FunctionTimedOut, func_timeout -from selenium.common import TimeoutException -from selenium.webdriver.chrome.webdriver import WebDriver -from selenium.webdriver.common.by import By -from selenium.webdriver.support.expected_conditions import ( - presence_of_element_located, staleness_of, title_is) -from selenium.webdriver.common.action_chains import ActionChains -from selenium.webdriver.support.wait import WebDriverWait +from DrissionPage import ChromiumPage +from DrissionPage._units.listener import DataPacket import utils from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT, @@ -251,178 +246,128 @@ def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT: driver.quit() logging.debug('A used instance of webdriver has been destroyed') - -def click_verify(driver: WebDriver): +def click_verify(driver: ChromiumPage) -> DataPacket: try: - logging.debug("Try to find the Cloudflare verify checkbox...") - iframe = driver.find_element(By.XPATH, "//iframe[starts-with(@id, 'cf-chl-widget-')]") - driver.switch_to.frame(iframe) - checkbox = driver.find_element( - by=By.XPATH, - value='//*[@id="content"]/div/div/label/input', + bde = ( + driver + .ele("@Style=border: 0px; margin: 0px; padding: 0px;", timeout=10) + .shadow_root + .ele("tag:iframe", timeout=10) + .ele('tag:body', timeout=10) + .shadow_root ) - if checkbox: - actions = ActionChains(driver) - actions.move_to_element_with_offset(checkbox, 5, 7) - actions.click(checkbox) - actions.perform() - logging.debug("Cloudflare verify checkbox found and clicked!") - except Exception: - logging.debug("Cloudflare verify checkbox not found on the page.") - finally: - driver.switch_to.default_content() + ve = bde.ele("text:Verify you are human", timeout=10) - try: - logging.debug("Try to find the Cloudflare 'Verify you are human' button...") - button = driver.find_element( - by=By.XPATH, - value="//input[@type='button' and @value='Verify you are human']", - ) - if button: - actions = ActionChains(driver) - actions.move_to_element_with_offset(button, 5, 7) - actions.click(button) - actions.perform() - logging.debug("The Cloudflare 'Verify you are human' button found and clicked!") - except Exception: - logging.debug("The Cloudflare 'Verify you are human' button not found on the page.") + driver.listen.start(driver.url) + ve.click() + data = driver.listen.wait(count=1) - time.sleep(2) + if isinstance(data, DataPacket): + return data + + return None + + except Exception as e: + logging.debug("Cloudflare verify checkbox not found on the page. %s", repr(e)) -def get_correct_window(driver: WebDriver) -> WebDriver: - if len(driver.window_handles) > 1: - for window_handle in driver.window_handles: - driver.switch_to.window(window_handle) - current_url = driver.current_url - if not current_url.startswith("devtools://devtools"): - return driver - return driver +def search_challenge(driver: ChromiumPage) -> bool: + page_title = driver.title.lower() + + # find challenge by title + for title in CHALLENGE_TITLES: + if title.lower() == page_title: + logging.debug("Challenge detected. Title found: %s", page_title) + return True + # find challenge by selectors + if driver.wait.eles_loaded(locators=CHALLENGE_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True): + logging.debug("Challenge detected. One of selectors found") + return True + return False -def access_page(driver: WebDriver, url: str) -> None: - driver.get(url) - driver.start_session() - driver.start_session() # required to bypass Cloudflare - - -def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> ChallengeResolutionT: +def _evil_logic(req: V1RequestBase, driver: ChromiumPage, method: str) -> ChallengeResolutionT: res = ChallengeResolutionT({}) res.status = STATUS_OK res.message = "" # navigate to the page - logging.debug(f'Navigating to... {req.url}') + logging.debug('Navigating to... %s', req.url) if method == 'POST': _post_request(req, driver) else: - access_page(driver, req.url) - driver = get_correct_window(driver) + driver.get(req.url) # set cookies if required if req.cookies is not None and len(req.cookies) > 0: - logging.debug(f'Setting cookies...') + logging.debug('Setting cookies...') for cookie in req.cookies: - driver.delete_cookie(cookie['name']) - driver.add_cookie(cookie) + driver.set.cookies.remove(cookie['name']) + driver.set.cookies(cookie) # reload the page if method == 'POST': _post_request(req, driver) else: - access_page(driver, req.url) - driver = get_correct_window(driver) + driver.get(req.url) # wait for the page if utils.get_config_log_html(): - logging.debug(f"Response HTML:\n{driver.page_source}") - html_element = driver.find_element(By.TAG_NAME, "html") - page_title = driver.title + logging.debug("Response HTML:\n%s", driver.page_source) + page_title = driver.title # find access denied titles for title in ACCESS_DENIED_TITLES: if title == page_title: raise Exception('Cloudflare has blocked this request. ' 'Probably your IP is banned for this site, check in your web browser.') # find access denied selectors - for selector in ACCESS_DENIED_SELECTORS: - found_elements = driver.find_elements(By.CSS_SELECTOR, selector) - if len(found_elements) > 0: - raise Exception('Cloudflare has blocked this request. ' - 'Probably your IP is banned for this site, check in your web browser.') - - # find challenge by title - challenge_found = False - for title in CHALLENGE_TITLES: - if title.lower() == page_title.lower(): - challenge_found = True - logging.info("Challenge detected. Title found: " + page_title) - break - if not challenge_found: - # find challenge by selectors - for selector in CHALLENGE_SELECTORS: - found_elements = driver.find_elements(By.CSS_SELECTOR, selector) - if len(found_elements) > 0: - challenge_found = True - logging.info("Challenge detected. Selector found: " + selector) - break + if driver.wait.eles_loaded(locators=ACCESS_DENIED_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True): + raise Exception('Cloudflare has blocked this request. ' + 'Probably your IP is banned for this site, check in your web browser.') attempt = 0 - if challenge_found: - while True: - try: - attempt = attempt + 1 - # wait until the title changes - for title in CHALLENGE_TITLES: - logging.debug("Waiting for title (attempt " + str(attempt) + "): " + title) - WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title)) + data = DataPacket + challenge_found = True + while challenge_found: + try: + attempt += 1 - # then wait until all the selectors disappear - for selector in CHALLENGE_SELECTORS: - logging.debug("Waiting for selector (attempt " + str(attempt) + "): " + selector) - WebDriverWait(driver, SHORT_TIMEOUT).until_not( - presence_of_element_located((By.CSS_SELECTOR, selector))) + if search_challenge(driver): + if attempt == 1: + logging.info("Challenge detected.") - # all elements not found + data = click_verify(driver) + else: + if attempt == 1: + logging.info("Challenge not detected!") + res.message = "Challenge not detected!" + else: + logging.info("Challenge solved!") + res.message = "Challenge solved!" break - except TimeoutException: - logging.debug("Timeout waiting for selector") + except Exception as e: + logging.debug("Cloudflare check exception") + raise e - click_verify(driver) - - # update the html (cloudflare reloads the page every 5 s) - html_element = driver.find_element(By.TAG_NAME, "html") - - # waits until cloudflare redirection ends - logging.debug("Waiting for redirect") - # noinspection PyBroadException - try: - WebDriverWait(driver, SHORT_TIMEOUT).until(staleness_of(html_element)) - except Exception: - logging.debug("Timeout waiting for redirect") - - logging.info("Challenge solved!") - res.message = "Challenge solved!" - else: - logging.info("Challenge not detected!") - res.message = "Challenge not detected!" challenge_res = ChallengeResolutionResultT({}) - challenge_res.url = driver.current_url - challenge_res.status = 200 # todo: fix, selenium not provides this info - challenge_res.cookies = driver.get_cookies() + challenge_res.url = driver.url + challenge_res.cookies = driver.cookies() challenge_res.userAgent = utils.get_user_agent(driver) - if not req.returnOnlyCookies: - challenge_res.headers = {} # todo: fix, selenium not provides this info - challenge_res.response = driver.page_source + if data is not None and data.response is not None: + challenge_res.status = data.response.status + if not req.returnOnlyCookies: + challenge_res.response = data.response.body + challenge_res.headers = data.response.headers.copy() res.result = challenge_res return res -def _post_request(req: V1RequestBase, driver: WebDriver): +def _post_request(req: V1RequestBase, driver: ChromiumPage): post_form = f'
' query_string = req.postData if req.postData[0] != '?' else req.postData[1:] pairs = query_string.split('&') @@ -451,5 +396,3 @@ def _post_request(req: V1RequestBase, driver: WebDriver): """ driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content)) - driver.start_session() - driver.start_session() # required to bypass Cloudflare diff --git a/src/sessions.py b/src/sessions.py index 30bb3c1..7fb39d3 100644 --- a/src/sessions.py +++ b/src/sessions.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta from typing import Optional, Tuple from uuid import uuid1 -from selenium.webdriver.chrome.webdriver import WebDriver +from DrissionPage import ChromiumPage import utils @@ -12,7 +12,7 @@ import utils @dataclass class Session: session_id: str - driver: WebDriver + driver: ChromiumPage created_at: datetime def lifetime(self) -> timedelta: @@ -27,13 +27,13 @@ class SessionsStorage: def create(self, session_id: Optional[str] = None, proxy: Optional[dict] = None, force_new: Optional[bool] = False) -> Tuple[Session, bool]: - """create creates new instance of WebDriver if necessary, + """create creates new instance of ChromiumPage if necessary, assign defined (or newly generated) session_id to the instance and returns the session object. If a new session has been created second argument is set to True. Note: The function is idempotent, so in case if session_id - already exists in the storage a new instance of WebDriver won't be created + already exists in the storage a new instance of ChromiumPage won't be created and existing session will be returned. Second argument defines if new session has been created (True) or an existing one was used (False). """ diff --git a/src/undetected_chromedriver/__init__.py b/src/undetected_chromedriver/__init__.py deleted file mode 100644 index 4e7fa1a..0000000 --- a/src/undetected_chromedriver/__init__.py +++ /dev/null @@ -1,914 +0,0 @@ -#!/usr/bin/env python3 - -""" - - 888 888 d8b - 888 888 Y8P - 888 888 - .d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888 -d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P" -888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888 -Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888 - "Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888 - -by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam) - -""" -from __future__ import annotations - - -__version__ = "3.5.5" - -import json -import logging -import os -import pathlib -import re -import shutil -import subprocess -import sys -import tempfile -import time -from weakref import finalize - -import selenium.webdriver.chrome.service -import selenium.webdriver.chrome.webdriver -from selenium.webdriver.common.by import By -import selenium.webdriver.chromium.service -import selenium.webdriver.remote.command -import selenium.webdriver.remote.webdriver - -from .cdp import CDP -from .dprocess import start_detached -from .options import ChromeOptions -from .patcher import IS_POSIX -from .patcher import Patcher -from .reactor import Reactor -from .webelement import UCWebElement -from .webelement import WebElement - - -__all__ = ( - "Chrome", - "ChromeOptions", - "Patcher", - "Reactor", - "CDP", - "find_chrome_executable", -) - -logger = logging.getLogger("uc") -logger.setLevel(logging.getLogger().getEffectiveLevel()) - - -class Chrome(selenium.webdriver.chrome.webdriver.WebDriver): - """ - - Controls the ChromeDriver and allows you to drive the browser. - - The webdriver file will be downloaded by this module automatically, - you do not need to specify this. however, you may if you wish. - - Attributes - ---------- - - Methods - ------- - - reconnect() - - this can be useful in case of heavy detection methods - -stops the chromedriver service which runs in the background - -starts the chromedriver service which runs in the background - -recreate session - - - start_session(capabilities=None, browser_profile=None) - - differentiates from the regular method in that it does not - require a capabilities argument. The capabilities are automatically - recreated from the options at creation time. - - -------------------------------------------------------------------------- - NOTE: - Chrome has everything included to work out of the box. - it does not `need` customizations. - any customizations MAY lead to trigger bot migitation systems. - - -------------------------------------------------------------------------- - """ - - _instances = set() - session_id = None - debug = False - - def __init__( - self, - options=None, - user_data_dir=None, - driver_executable_path=None, - browser_executable_path=None, - port=0, - enable_cdp_events=False, - # service_args=None, - # service_creationflags=None, - desired_capabilities=None, - advanced_elements=False, - # service_log_path=None, - keep_alive=True, - log_level=0, - headless=False, - version_main=None, - patcher_force_close=False, - suppress_welcome=True, - use_subprocess=False, - debug=False, - no_sandbox=True, - windows_headless=False, - user_multi_procs: bool = False, - **kw, - ): - """ - Creates a new instance of the chrome driver. - - Starts the service and then creates new instance of chrome driver. - - Parameters - ---------- - - options: ChromeOptions, optional, default: None - automatic useful defaults - this takes an instance of ChromeOptions, mainly to customize browser behavior. - anything other dan the default, for example extensions or startup options - are not supported in case of failure, and can probably lowers your undetectability. - - - user_data_dir: str , optional, default: None (creates temp profile) - if user_data_dir is a path to a valid chrome profile directory, use it, - and turn off automatic removal mechanism at exit. - - driver_executable_path: str, optional, default: None(=downloads and patches new binary) - - browser_executable_path: str, optional, default: None - use find_chrome_executable - Path to the browser executable. - If not specified, make sure the executable's folder is in $PATH - - port: int, optional, default: 0 - port to be used by the chromedriver executable, this is NOT the debugger port. - leave it at 0 unless you know what you are doing. - the default value of 0 automatically picks an available port. - - enable_cdp_events: bool, default: False - :: currently for chrome only - this enables the handling of wire messages - when enabled, you can subscribe to CDP events by using: - - driver.add_cdp_listener("Network.dataReceived", yourcallback) - # yourcallback is an callable which accepts exactly 1 dict as parameter - - - service_args: list of str, optional, default: None - arguments to pass to the driver service - - desired_capabilities: dict, optional, default: None - auto from config - Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref". - - advanced_elements: bool, optional, default: False - makes it easier to recognize elements like you know them from html/browser inspection, especially when working - in an interactive environment - - default webelement repr: - - - advanced webelement repr - )> - - note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time. - - - service_log_path: str, optional, default: None - path to log information from the driver. - - keep_alive: bool, optional, default: True - Whether to configure ChromeRemoteConnection to use HTTP keep-alive. - - log_level: int, optional, default: adapts to python global log level - - headless: bool, optional, default: False - can also be specified in the options instance. - Specify whether you want to use the browser in headless mode. - warning: this lowers undetectability and not fully supported. - - version_main: int, optional, default: None (=auto) - if you, for god knows whatever reason, use - an older version of Chrome. You can specify it's full rounded version number - here. Example: 87 for all versions of 87 - - patcher_force_close: bool, optional, default: False - instructs the patcher to do whatever it can to access the chromedriver binary - if the file is locked, it will force shutdown all instances. - setting it is not recommended, unless you know the implications and think - you might need it. - - suppress_welcome: bool, optional , default: True - a "welcome" alert might show up on *nix-like systems asking whether you want to set - chrome as your default browser, and if you want to send even more data to google. - now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False. - Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception. - - use_subprocess: bool, optional , default: True, - - False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python - This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after - program exits or using .quit() - you should be knowing what you're doing, and know how python works. - - unfortunately, there is always an edge case in which one would like to write an single script with the only contents being: - --start script-- - import undetected_chromedriver as uc - d = uc.Chrome() - d.get('https://somesite/') - ---end script -- - - and will be greeted with an error, since the program exists before chrome has a change to launch. - in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times. - ! setting it to True comes with NO support when being detected. ! - - no_sandbox: bool, optional, default=True - uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar - this option has a default of True since many people seem to run this as root (....) , and chrome does not start - when running as root without using --no-sandbox flag. - - user_multi_procs: - set to true when you are using multithreads/multiprocessing - ensures not all processes are trying to modify a binary which is in use by another. - for this to work. YOU MUST HAVE AT LEAST 1 UNDETECTED_CHROMEDRIVER BINARY IN YOUR ROAMING DATA FOLDER. - this requirement can be easily satisfied, by just running this program "normal" and close/kill it. - - - """ - - finalize(self, self._ensure_close, self) - self.debug = debug - self.patcher = Patcher( - executable_path=driver_executable_path, - force=patcher_force_close, - version_main=version_main, - user_multi_procs=user_multi_procs, - ) - # self.patcher.auto(user_multiprocess = user_multi_num_procs) - self.patcher.auto() - - # self.patcher = patcher - if not options: - options = ChromeOptions() - - try: - if hasattr(options, "_session") and options._session is not None: - # prevent reuse of options, - # as it just appends arguments, not replace them - # you'll get conflicts starting chrome - raise RuntimeError("you cannot reuse the ChromeOptions object") - except AttributeError: - pass - - options._session = self - - if not options.debugger_address: - debug_port = ( - port - if port != 0 - else selenium.webdriver.common.service.utils.free_port() - ) - debug_host = "127.0.0.1" - options.debugger_address = "%s:%d" % (debug_host, debug_port) - else: - debug_host, debug_port = options.debugger_address.split(":") - debug_port = int(debug_port) - - if enable_cdp_events: - options.set_capability( - "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"} - ) - - options.add_argument("--remote-debugging-host=%s" % debug_host) - options.add_argument("--remote-debugging-port=%s" % debug_port) - - if user_data_dir: - options.add_argument("--user-data-dir=%s" % user_data_dir) - - language, keep_user_data_dir = None, bool(user_data_dir) - - # see if a custom user profile is specified in options - for arg in options.arguments: - - if any([_ in arg for _ in ("--headless", "headless")]): - options.arguments.remove(arg) - options.headless = True - - if "lang" in arg: - m = re.search("(?:--)?lang(?:[ =])?(.*)", arg) - try: - language = m[1] - except IndexError: - logger.debug("will set the language to en-US,en;q=0.9") - language = "en-US,en;q=0.9" - - if "user-data-dir" in arg: - m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg) - try: - user_data_dir = m[1] - logger.debug( - "user-data-dir found in user argument %s => %s" % (arg, m[1]) - ) - keep_user_data_dir = True - - except IndexError: - logger.debug( - "no user data dir could be extracted from supplied argument %s " - % arg - ) - - if not user_data_dir: - # backward compatiblity - # check if an old uc.ChromeOptions is used, and extract the user data dir - - if hasattr(options, "user_data_dir") and getattr( - options, "user_data_dir", None - ): - import warnings - - warnings.warn( - "using ChromeOptions.user_data_dir might stop working in future versions." - "use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder" - ) - options.add_argument("--user-data-dir=%s" % options.user_data_dir) - keep_user_data_dir = True - logger.debug( - "user_data_dir property found in options object: %s" % user_data_dir - ) - - else: - user_data_dir = os.path.normpath(tempfile.mkdtemp()) - keep_user_data_dir = False - arg = "--user-data-dir=%s" % user_data_dir - options.add_argument(arg) - logger.debug( - "created a temporary folder in which the user-data (profile) will be stored during this\n" - "session, and added it to chrome startup arguments: %s" % arg - ) - - if not language: - try: - import locale - - language = locale.getdefaultlocale()[0].replace("_", "-") - except Exception: - pass - if not language: - language = "en-US" - - options.add_argument("--lang=%s" % language) - - if not options.binary_location: - options.binary_location = ( - browser_executable_path or find_chrome_executable() - ) - - if not options.binary_location or not \ - pathlib.Path(options.binary_location).exists(): - raise FileNotFoundError( - "\n---------------------\n" - "Could not determine browser executable." - "\n---------------------\n" - "Make sure your browser is installed in the default location (path).\n" - "If you are sure about the browser executable, you can specify it using\n" - "the `browser_executable_path='{}` parameter.\n\n" - .format("/path/to/browser/executable" if IS_POSIX else "c:/path/to/your/browser.exe") - ) - - self._delay = 3 - - self.user_data_dir = user_data_dir - self.keep_user_data_dir = keep_user_data_dir - - if suppress_welcome: - options.arguments.extend(["--no-default-browser-check", "--no-first-run"]) - if no_sandbox: - options.arguments.extend(["--no-sandbox", "--test-type"]) - - if headless or getattr(options, 'headless', None): - #workaround until a better checking is found - try: - v_main = int(self.patcher.version_main) if self.patcher.version_main else 108 - if v_main < 108: - options.add_argument("--headless=chrome") - elif v_main >= 108: - options.add_argument("--headless=new") - except: - logger.warning("could not detect version_main." - "therefore, we are assuming it is chrome 108 or higher") - options.add_argument("--headless=new") - - options.add_argument("--window-size=1920,1080") - options.add_argument("--start-maximized") - options.add_argument("--no-sandbox") - # fixes "could not connect to chrome" error when running - # on linux using privileged user like root (which i don't recommend) - - options.add_argument( - "--log-level=%d" % log_level - or divmod(logging.getLogger().getEffectiveLevel(), 10)[0] - ) - - if hasattr(options, "handle_prefs"): - options.handle_prefs(user_data_dir) - - # fix exit_type flag to prevent tab-restore nag - try: - with open( - os.path.join(user_data_dir, "Default/Preferences"), - encoding="latin1", - mode="r+", - ) as fs: - config = json.load(fs) - if config["profile"]["exit_type"] is not None: - # fixing the restore-tabs-nag - config["profile"]["exit_type"] = None - fs.seek(0, 0) - json.dump(config, fs) - fs.truncate() # the file might be shorter - logger.debug("fixed exit_type flag") - except Exception as e: - logger.debug("did not find a bad exit_type flag ") - - self.options = options - - if not desired_capabilities: - desired_capabilities = options.to_capabilities() - - if not use_subprocess and not windows_headless: - self.browser_pid = start_detached( - options.binary_location, *options.arguments - ) - else: - startupinfo = None - if os.name == 'nt' and windows_headless: - # STARTUPINFO() is Windows only - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - browser = subprocess.Popen( - [options.binary_location, *options.arguments], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=IS_POSIX, - startupinfo=startupinfo - ) - self.browser_pid = browser.pid - - - service = selenium.webdriver.chromium.service.ChromiumService( - self.patcher.executable_path - ) - - super(Chrome, self).__init__( - service=service, - options=options, - keep_alive=keep_alive, - ) - - self.reactor = None - - if enable_cdp_events: - if logging.getLogger().getEffectiveLevel() == logging.DEBUG: - logging.getLogger( - "selenium.webdriver.remote.remote_connection" - ).setLevel(20) - reactor = Reactor(self) - reactor.start() - self.reactor = reactor - - if advanced_elements: - self._web_element_cls = UCWebElement - else: - self._web_element_cls = WebElement - - if headless or getattr(options, 'headless', None): - self._configure_headless() - - def _configure_headless(self): - orig_get = self.get - logger.info("setting properties for headless") - - def get_wrapped(*args, **kwargs): - if self.execute_script("return navigator.webdriver"): - logger.info("patch navigator.webdriver") - self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument", - { - "source": """ - - Object.defineProperty(window, "navigator", { - Object.defineProperty(window, "navigator", { - value: new Proxy(navigator, { - has: (target, key) => (key === "webdriver" ? false : key in target), - get: (target, key) => - key === "webdriver" - ? false - : typeof target[key] === "function" - ? target[key].bind(target) - : target[key], - }), - }); - """ - }, - ) - - logger.info("patch user-agent string") - self.execute_cdp_cmd( - "Network.setUserAgentOverride", - { - "userAgent": self.execute_script( - "return navigator.userAgent" - ).replace("Headless", "") - }, - ) - self.execute_cdp_cmd( - "Page.addScriptToEvaluateOnNewDocument", - { - "source": """ - Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1}); - Object.defineProperty(navigator.connection, 'rtt', {get: () => 100}); - - // https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/chrome-runtime.js - window.chrome = { - app: { - isInstalled: false, - InstallState: { - DISABLED: 'disabled', - INSTALLED: 'installed', - NOT_INSTALLED: 'not_installed' - }, - RunningState: { - CANNOT_RUN: 'cannot_run', - READY_TO_RUN: 'ready_to_run', - RUNNING: 'running' - } - }, - runtime: { - OnInstalledReason: { - CHROME_UPDATE: 'chrome_update', - INSTALL: 'install', - SHARED_MODULE_UPDATE: 'shared_module_update', - UPDATE: 'update' - }, - OnRestartRequiredReason: { - APP_UPDATE: 'app_update', - OS_UPDATE: 'os_update', - PERIODIC: 'periodic' - }, - PlatformArch: { - ARM: 'arm', - ARM64: 'arm64', - MIPS: 'mips', - MIPS64: 'mips64', - X86_32: 'x86-32', - X86_64: 'x86-64' - }, - PlatformNaclArch: { - ARM: 'arm', - MIPS: 'mips', - MIPS64: 'mips64', - X86_32: 'x86-32', - X86_64: 'x86-64' - }, - PlatformOs: { - ANDROID: 'android', - CROS: 'cros', - LINUX: 'linux', - MAC: 'mac', - OPENBSD: 'openbsd', - WIN: 'win' - }, - RequestUpdateCheckStatus: { - NO_UPDATE: 'no_update', - THROTTLED: 'throttled', - UPDATE_AVAILABLE: 'update_available' - } - } - } - - // https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/navigator-permissions.js - if (!window.Notification) { - window.Notification = { - permission: 'denied' - } - } - - const originalQuery = window.navigator.permissions.query - window.navigator.permissions.__proto__.query = parameters => - parameters.name === 'notifications' - ? Promise.resolve({ state: window.Notification.permission }) - : originalQuery(parameters) - - const oldCall = Function.prototype.call - function call() { - return oldCall.apply(this, arguments) - } - Function.prototype.call = call - - const nativeToStringFunctionString = Error.toString().replace(/Error/g, 'toString') - const oldToString = Function.prototype.toString - - function functionToString() { - if (this === window.navigator.permissions.query) { - return 'function query() { [native code] }' - } - if (this === functionToString) { - return nativeToStringFunctionString - } - return oldCall.call(oldToString, this) - } - // eslint-disable-next-line - Function.prototype.toString = functionToString - """ - }, - ) - return orig_get(*args, **kwargs) - - self.get = get_wrapped - - # def _get_cdc_props(self): - # return self.execute_script( - # """ - # let objectToInspect = window, - # result = []; - # while(objectToInspect !== null) - # { result = result.concat(Object.getOwnPropertyNames(objectToInspect)); - # objectToInspect = Object.getPrototypeOf(objectToInspect); } - # - # return result.filter(i => i.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig)) - # """ - # ) - # - # def _hook_remove_cdc_props(self): - # self.execute_cdp_cmd( - # "Page.addScriptToEvaluateOnNewDocument", - # { - # "source": """ - # let objectToInspect = window, - # result = []; - # while(objectToInspect !== null) - # { result = result.concat(Object.getOwnPropertyNames(objectToInspect)); - # objectToInspect = Object.getPrototypeOf(objectToInspect); } - # result.forEach(p => p.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig) - # &&delete window[p]&&console.log('removed',p)) - # """ - # }, - # ) - - def get(self, url): - # if self._get_cdc_props(): - # self._hook_remove_cdc_props() - return super().get(url) - - def add_cdp_listener(self, event_name, callback): - if ( - self.reactor - and self.reactor is not None - and isinstance(self.reactor, Reactor) - ): - self.reactor.add_event_handler(event_name, callback) - return self.reactor.handlers - return False - - def clear_cdp_listeners(self): - if self.reactor and isinstance(self.reactor, Reactor): - self.reactor.handlers.clear() - - def window_new(self): - self.execute( - selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"} - ) - - def tab_new(self, url: str): - """ - this opens a url in a new tab. - apparently, that passes all tests directly! - - Parameters - ---------- - url - - Returns - ------- - - """ - if not hasattr(self, "cdp"): - from .cdp import CDP - - cdp = CDP(self.options) - cdp.tab_new(url) - - def reconnect(self, timeout=0.1): - try: - self.service.stop() - except Exception as e: - logger.debug(e) - time.sleep(timeout) - try: - self.service.start() - except Exception as e: - logger.debug(e) - - try: - self.start_session() - except Exception as e: - logger.debug(e) - - def start_session(self, capabilities=None, browser_profile=None): - if not capabilities: - capabilities = self.options.to_capabilities() - super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session( - capabilities - ) - # super(Chrome, self).start_session(capabilities, browser_profile) - - def find_elements_recursive(self, by, value): - """ - find elements in all frames - this is a generator function, which is needed - since if it would return a list of elements, they - will be stale on arrival. - using generator, when the element is returned we are in the correct frame - to use it directly - Args: - by: By - value: str - Returns: Generator[webelement.WebElement] - """ - def search_frame(f=None): - if not f: - # ensure we are on main content frame - self.switch_to.default_content() - else: - self.switch_to.frame(f) - for elem in self.find_elements(by, value): - yield elem - # switch back to main content, otherwise we will get StaleElementReferenceException - self.switch_to.default_content() - - # search root frame - for elem in search_frame(): - yield elem - # get iframes - frames = self.find_elements('css selector', 'iframe') - - # search per frame - for f in frames: - for elem in search_frame(f): - yield elem - - def quit(self): - try: - self.service.stop() - self.service.process.kill() - self.command_executor.close() - self.service.process.wait(5) - logger.debug("webdriver process ended") - except (AttributeError, RuntimeError, OSError): - pass - try: - self.reactor.event.set() - logger.debug("shutting down reactor") - except AttributeError: - pass - try: - os.kill(self.browser_pid, 15) - logger.debug("gracefully closed browser") - except Exception as e: # noqa - pass - if ( - hasattr(self, "keep_user_data_dir") - and hasattr(self, "user_data_dir") - and not self.keep_user_data_dir - ): - for _ in range(5): - try: - shutil.rmtree(self.user_data_dir, ignore_errors=False) - except FileNotFoundError: - pass - except (RuntimeError, OSError, PermissionError) as e: - logger.debug( - "When removing the temp profile, a %s occured: %s\nretrying..." - % (e.__class__.__name__, e) - ) - else: - logger.debug("successfully removed %s" % self.user_data_dir) - break - - try: - time.sleep(0.1) - except OSError: - pass - - # dereference patcher, so patcher can start cleaning up as well. - # this must come last, otherwise it will throw 'in use' errors - self.patcher = None - - def __getattribute__(self, item): - if not super().__getattribute__("debug"): - return super().__getattribute__(item) - else: - import inspect - - original = super().__getattribute__(item) - if inspect.ismethod(original) and not inspect.isclass(original): - - def newfunc(*args, **kwargs): - logger.debug( - "calling %s with args %s and kwargs %s\n" - % (original.__qualname__, args, kwargs) - ) - return original(*args, **kwargs) - - return newfunc - return original - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.service.stop() - time.sleep(self._delay) - self.service.start() - self.start_session() - - def __hash__(self): - return hash(self.options.debugger_address) - - def __dir__(self): - return object.__dir__(self) - - def __del__(self): - try: - self.service.process.kill() - except: # noqa - pass - self.quit() - - @classmethod - def _ensure_close(cls, self): - # needs to be a classmethod so finalize can find the reference - logger.info("ensuring close") - if ( - hasattr(self, "service") - and hasattr(self.service, "process") - and hasattr(self.service.process, "kill") - ): - self.service.process.kill() - - -def find_chrome_executable(): - """ - Finds the chrome, chrome beta, chrome canary, chromium executable - - Returns - ------- - executable_path : str - the full file path to found executable - - """ - candidates = set() - if IS_POSIX: - for item in os.environ.get("PATH").split(os.pathsep): - for subitem in ( - "google-chrome", - "chromium", - "chromium-browser", - "chrome", - "google-chrome-stable", - ): - candidates.add(os.sep.join((item, subitem))) - if "darwin" in sys.platform: - candidates.update( - [ - "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", - "/Applications/Chromium.app/Contents/MacOS/Chromium", - ] - ) - else: - for item in map( - os.environ.get, - ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"), - ): - if item is not None: - for subitem in ( - "Google/Chrome/Application", - ): - candidates.add(os.sep.join((item, subitem, "chrome.exe"))) - for candidate in candidates: - logger.debug('checking if %s exists and is executable' % candidate) - if os.path.exists(candidate) and os.access(candidate, os.X_OK): - logger.debug('found! using %s' % candidate) - return os.path.normpath(candidate) diff --git a/src/undetected_chromedriver/cdp.py b/src/undetected_chromedriver/cdp.py deleted file mode 100644 index 32a503c..0000000 --- a/src/undetected_chromedriver/cdp.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -import json -import logging - -import requests -import websockets - - -log = logging.getLogger(__name__) - - -class CDPObject(dict): - def __init__(self, *a, **k): - super().__init__(*a, **k) - self.__dict__ = self - for k in self.__dict__: - if isinstance(self.__dict__[k], dict): - self.__dict__[k] = CDPObject(self.__dict__[k]) - elif isinstance(self.__dict__[k], list): - for i in range(len(self.__dict__[k])): - if isinstance(self.__dict__[k][i], dict): - self.__dict__[k][i] = CDPObject(self) - - def __repr__(self): - tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)" - return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items())) - - -class PageElement(CDPObject): - pass - - -class CDP: - log = logging.getLogger("CDP") - - endpoints = CDPObject( - { - "json": "/json", - "protocol": "/json/protocol", - "list": "/json/list", - "new": "/json/new?{url}", - "activate": "/json/activate/{id}", - "close": "/json/close/{id}", - } - ) - - def __init__(self, options: "ChromeOptions"): # noqa - self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":")) - - self._reqid = 0 - self._session = requests.Session() - self._last_resp = None - self._last_json = None - - resp = self.get(self.endpoints.json) # noqa - self.sessionId = resp[0]["id"] - self.wsurl = resp[0]["webSocketDebuggerUrl"] - - def tab_activate(self, id=None): - if not id: - active_tab = self.tab_list()[0] - id = active_tab.id # noqa - self.wsurl = active_tab.webSocketDebuggerUrl # noqa - return self.post(self.endpoints["activate"].format(id=id)) - - def tab_list(self): - retval = self.get(self.endpoints["list"]) - return [PageElement(o) for o in retval] - - def tab_new(self, url): - return self.post(self.endpoints["new"].format(url=url)) - - def tab_close_last_opened(self): - sessions = self.tab_list() - opentabs = [s for s in sessions if s["type"] == "page"] - return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"])) - - async def send(self, method: str, params: dict): - self._reqid += 1 - async with websockets.connect(self.wsurl) as ws: - await ws.send( - json.dumps({"method": method, "params": params, "id": self._reqid}) - ) - self._last_resp = await ws.recv() - self._last_json = json.loads(self._last_resp) - self.log.info(self._last_json) - - def get(self, uri): - resp = self._session.get(self.server_addr + uri) - try: - self._last_resp = resp - self._last_json = resp.json() - except Exception: - return - else: - return self._last_json - - def post(self, uri, data: dict = None): - if not data: - data = {} - resp = self._session.post(self.server_addr + uri, json=data) - try: - self._last_resp = resp - self._last_json = resp.json() - except Exception: - return self._last_resp - - @property - def last_json(self): - return self._last_json diff --git a/src/undetected_chromedriver/devtool.py b/src/undetected_chromedriver/devtool.py deleted file mode 100644 index 915d417..0000000 --- a/src/undetected_chromedriver/devtool.py +++ /dev/null @@ -1,193 +0,0 @@ -import asyncio -from collections.abc import Mapping -from collections.abc import Sequence -from functools import wraps -import os -import logging -import threading -import time -import traceback -from typing import Any -from typing import Awaitable -from typing import Callable -from typing import List -from typing import Optional - - -class Structure(dict): - """ - This is a dict-like object structure, which you should subclass - Only properties defined in the class context are used on initialization. - - See example - """ - - _store = {} - - def __init__(self, *a, **kw): - """ - Instantiate a new instance. - - :param a: - :param kw: - """ - - super().__init__() - - # auxiliar dict - d = dict(*a, **kw) - for k, v in d.items(): - if isinstance(v, Mapping): - self[k] = self.__class__(v) - elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): - self[k] = [self.__class__(i) for i in v] - else: - self[k] = v - super().__setattr__("__dict__", self) - - def __getattr__(self, item): - return getattr(super(), item) - - def __getitem__(self, item): - return super().__getitem__(item) - - def __setattr__(self, key, value): - self.__setitem__(key, value) - - def __setitem__(self, key, value): - super().__setitem__(key, value) - - def update(self, *a, **kw): - super().update(*a, **kw) - - def __eq__(self, other): - return frozenset(other.items()) == frozenset(self.items()) - - def __hash__(self): - return hash(frozenset(self.items())) - - @classmethod - def __init_subclass__(cls, **kwargs): - cls._store = {} - - def _normalize_strings(self): - for k, v in self.copy().items(): - if isinstance(v, (str)): - self[k] = v.strip() - - -def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None): - def wrapper(func): - @wraps(func) - def wrapped(*args, **kwargs): - def function_reached_timeout(): - if on_timeout: - on_timeout(func) - else: - raise TimeoutError("function call timed out") - - t = threading.Timer(interval=seconds, function=function_reached_timeout) - t.start() - try: - return func(*args, **kwargs) - except: - t.cancel() - raise - finally: - t.cancel() - - return wrapped - - return wrapper - - -def test(): - import sys, os - - sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) - import undetected_chromedriver as uc - import threading - - def collector( - driver: uc.Chrome, - stop_event: threading.Event, - on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None, - listen_events: Sequence = ("browser", "network", "performance"), - ): - def threaded(driver, stop_event, on_event_coro): - async def _ensure_service_started(): - while ( - getattr(driver, "service", False) - and getattr(driver.service, "process", False) - and driver.service.process.poll() - ): - print("waiting for driver service to come back on") - await asyncio.sleep(0.05) - # await asyncio.sleep(driver._delay or .25) - - async def get_log_lines(typ): - await _ensure_service_started() - return driver.get_log(typ) - - async def looper(): - while not stop_event.is_set(): - log_lines = [] - try: - for _ in listen_events: - try: - log_lines += await get_log_lines(_) - except: - if logging.getLogger().getEffectiveLevel() <= 10: - traceback.print_exc() - continue - if log_lines and on_event_coro: - await on_event_coro(log_lines) - except Exception as e: - if logging.getLogger().getEffectiveLevel() <= 10: - traceback.print_exc() - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(looper()) - - t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro)) - t.start() - - async def on_event(data): - print("on_event") - print("data:", data) - - def func_called(fn): - def wrapped(*args, **kwargs): - print( - "func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs) - ) - while driver.service.process and driver.service.process.poll() is not None: - time.sleep(0.1) - res = fn(*args, **kwargs) - print("func completed! (result: %s)" % res) - return res - - return wrapped - - logging.basicConfig(level=10) - - options = uc.ChromeOptions() - options.set_capability( - "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"} - ) - - driver = uc.Chrome(version_main=96, options=options) - - # driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request) - driver.command_executor._request = func_called(driver.command_executor._request) - collector_stop = threading.Event() - collector(driver, collector_stop, on_event) - - driver.get("https://nowsecure.nl") - - time.sleep(10) - - if os.name == "nt": - driver.close() - driver.quit() diff --git a/src/undetected_chromedriver/dprocess.py b/src/undetected_chromedriver/dprocess.py deleted file mode 100644 index 6d053fa..0000000 --- a/src/undetected_chromedriver/dprocess.py +++ /dev/null @@ -1,77 +0,0 @@ -import atexit -import logging -import multiprocessing -import os -import platform -import signal -from subprocess import PIPE -from subprocess import Popen -import sys - - -CREATE_NEW_PROCESS_GROUP = 0x00000200 -DETACHED_PROCESS = 0x00000008 - -REGISTERED = [] - - -def start_detached(executable, *args): - """ - Starts a fully independent subprocess (with no parent) - :param executable: executable - :param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...] - :return: pid of the grandchild process - """ - - # create pipe - reader, writer = multiprocessing.Pipe(False) - - # do not keep reference - process = multiprocessing.Process( - target=_start_detached, - args=(executable, *args), - kwargs={"writer": writer}, - daemon=True, - ) - process.start() - process.join() - # receive pid from pipe - pid = reader.recv() - REGISTERED.append(pid) - # close pipes - writer.close() - reader.close() - process.close() - - return pid - - -def _start_detached(executable, *args, writer: multiprocessing.Pipe = None): - # configure launch - kwargs = {} - if platform.system() == "Windows": - kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP) - elif sys.version_info < (3, 2): - # assume posix - kwargs.update(preexec_fn=os.setsid) - else: # Python 3.2+ and Unix - kwargs.update(start_new_session=True) - - # run - p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs) - - # send pid to pipe - writer.send(p.pid) - sys.exit() - - -def _cleanup(): - for pid in REGISTERED: - try: - logging.getLogger(__name__).debug("cleaning up pid %d " % pid) - os.kill(pid, signal.SIGTERM) - except: # noqa - pass - - -atexit.register(_cleanup) diff --git a/src/undetected_chromedriver/options.py b/src/undetected_chromedriver/options.py deleted file mode 100644 index 8078ae9..0000000 --- a/src/undetected_chromedriver/options.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - - -import json -import os - -from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions - - -class ChromeOptions(_ChromiumOptions): - _session = None - _user_data_dir = None - - @property - def user_data_dir(self): - return self._user_data_dir - - @user_data_dir.setter - def user_data_dir(self, path: str): - """ - Sets the browser profile folder to use, or creates a new profile - at given . - - Parameters - ---------- - path: str - the path to a chrome profile folder - if it does not exist, a new profile will be created at given location - """ - apath = os.path.abspath(path) - self._user_data_dir = os.path.normpath(apath) - - @staticmethod - def _undot_key(key, value): - """turn a (dotted key, value) into a proper nested dict""" - if "." in key: - key, rest = key.split(".", 1) - value = ChromeOptions._undot_key(rest, value) - return {key: value} - - @staticmethod - def _merge_nested(a, b): - """ - merges b into a - leaf values in a are overwritten with values from b - """ - for key in b: - if key in a: - if isinstance(a[key], dict) and isinstance(b[key], dict): - ChromeOptions._merge_nested(a[key], b[key]) - continue - a[key] = b[key] - return a - - def handle_prefs(self, user_data_dir): - prefs = self.experimental_options.get("prefs") - if prefs: - user_data_dir = user_data_dir or self._user_data_dir - default_path = os.path.join(user_data_dir, "Default") - os.makedirs(default_path, exist_ok=True) - - # undot prefs dict keys - undot_prefs = {} - for key, value in prefs.items(): - undot_prefs = self._merge_nested( - undot_prefs, self._undot_key(key, value) - ) - - prefs_file = os.path.join(default_path, "Preferences") - if os.path.exists(prefs_file): - with open(prefs_file, encoding="latin1", mode="r") as f: - undot_prefs = self._merge_nested(json.load(f), undot_prefs) - - with open(prefs_file, encoding="latin1", mode="w") as f: - json.dump(undot_prefs, f) - - # remove the experimental_options to avoid an error - del self._experimental_options["prefs"] - - @classmethod - def from_options(cls, options): - o = cls() - o.__dict__.update(options.__dict__) - return o diff --git a/src/undetected_chromedriver/patcher.py b/src/undetected_chromedriver/patcher.py deleted file mode 100644 index e8a0e96..0000000 --- a/src/undetected_chromedriver/patcher.py +++ /dev/null @@ -1,451 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -from distutils.version import LooseVersion -import io -import json -import logging -import os -import pathlib -import platform -import random -import re -import shutil -import string -import sys -import time -from urllib.request import urlopen -from urllib.request import urlretrieve -import zipfile -from multiprocessing import Lock - -logger = logging.getLogger(__name__) - -IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd")) - - -class Patcher(object): - lock = Lock() - exe_name = "chromedriver%s" - - platform = sys.platform - if platform.endswith("win32"): - d = "~/appdata/roaming/undetected_chromedriver" - elif "LAMBDA_TASK_ROOT" in os.environ: - d = "/tmp/undetected_chromedriver" - elif platform.startswith(("linux", "linux2")): - d = "~/.local/share/undetected_chromedriver" - elif platform.endswith("darwin"): - d = "~/Library/Application Support/undetected_chromedriver" - else: - d = "~/.undetected_chromedriver" - data_path = os.path.abspath(os.path.expanduser(d)) - - def __init__( - self, - executable_path=None, - force=False, - version_main: int = 0, - user_multi_procs=False, - ): - """ - Args: - executable_path: None = automatic - a full file path to the chromedriver executable - force: False - terminate processes which are holding lock - version_main: 0 = auto - specify main chrome version (rounded, ex: 82) - """ - self.force = force - self._custom_exe_path = False - prefix = "undetected" - self.user_multi_procs = user_multi_procs - - try: - # Try to convert version_main into an integer - version_main_int = int(version_main) - # check if version_main_int is less than or equal to e.g 114 - self.is_old_chromedriver = version_main and version_main_int <= 114 - except (ValueError,TypeError): - # Check not running inside Docker - if not os.path.exists("/app/chromedriver"): - # If the conversion fails, log an error message - logging.info("version_main cannot be converted to an integer") - # Set self.is_old_chromedriver to False if the conversion fails - self.is_old_chromedriver = False - - # Needs to be called before self.exe_name is accessed - self._set_platform_name() - - if not os.path.exists(self.data_path): - os.makedirs(self.data_path, exist_ok=True) - - if not executable_path: - if sys.platform.startswith("freebsd"): - self.executable_path = os.path.join( - self.data_path, self.exe_name - ) - else: - self.executable_path = os.path.join( - self.data_path, "_".join([prefix, self.exe_name]) - ) - - if not IS_POSIX: - if executable_path: - if not executable_path[-4:] == ".exe": - executable_path += ".exe" - - self.zip_path = os.path.join(self.data_path, prefix) - - if not executable_path: - if not self.user_multi_procs: - self.executable_path = os.path.abspath( - os.path.join(".", self.executable_path) - ) - - if executable_path: - self._custom_exe_path = True - self.executable_path = executable_path - - # Set the correct repository to download the Chromedriver from - if self.is_old_chromedriver: - self.url_repo = "https://chromedriver.storage.googleapis.com" - else: - self.url_repo = "https://googlechromelabs.github.io/chrome-for-testing" - - self.version_main = version_main - self.version_full = None - - def _set_platform_name(self): - """ - Set the platform and exe name based on the platform undetected_chromedriver is running on - in order to download the correct chromedriver. - """ - if self.platform.endswith("win32"): - self.platform_name = "win32" - self.exe_name %= ".exe" - if self.platform.endswith(("linux", "linux2")): - self.platform_name = "linux64" - self.exe_name %= "" - if self.platform.endswith("darwin"): - if self.is_old_chromedriver: - self.platform_name = "mac64" - else: - self.platform_name = "mac-x64" - self.exe_name %= "" - if self.platform.startswith("freebsd"): - self.platform_name = "freebsd" - self.exe_name %= "" - - def auto(self, executable_path=None, force=False, version_main=None, _=None): - """ - - Args: - executable_path: - force: - version_main: - - Returns: - - """ - p = pathlib.Path(self.data_path) - if self.user_multi_procs: - with Lock(): - files = list(p.rglob("*chromedriver*")) - most_recent = max(files, key=lambda f: f.stat().st_mtime) - files.remove(most_recent) - list(map(lambda f: f.unlink(), files)) - if self.is_binary_patched(most_recent): - self.executable_path = str(most_recent) - return True - - if executable_path: - self.executable_path = executable_path - self._custom_exe_path = True - - if self._custom_exe_path: - ispatched = self.is_binary_patched(self.executable_path) - if not ispatched: - return self.patch_exe() - else: - return - - if version_main: - self.version_main = version_main - if force is True: - self.force = force - - - if self.platform_name == "freebsd": - chromedriver_path = shutil.which("chromedriver") - - if not os.path.isfile(chromedriver_path) or not os.access(chromedriver_path, os.X_OK): - logging.error("Chromedriver not installed!") - return - - version_path = os.path.join(os.path.dirname(self.executable_path), "version.txt") - - process = os.popen(f'"{chromedriver_path}" --version') - chromedriver_version = process.read().split(' ')[1].split(' ')[0] - process.close() - - current_version = None - if os.path.isfile(version_path) or os.access(version_path, os.X_OK): - with open(version_path, 'r') as f: - current_version = f.read() - - if current_version != chromedriver_version: - logging.info("Copying chromedriver executable...") - shutil.copy(chromedriver_path, self.executable_path) - os.chmod(self.executable_path, 0o755) - - with open(version_path, 'w') as f: - f.write(chromedriver_version) - - logging.info("Chromedriver executable copied!") - else: - try: - os.unlink(self.executable_path) - except PermissionError: - if self.force: - self.force_kill_instances(self.executable_path) - return self.auto(force=not self.force) - try: - if self.is_binary_patched(): - # assumes already running AND patched - return True - except PermissionError: - pass - # return False - except FileNotFoundError: - pass - - release = self.fetch_release_number() - self.version_main = release.version[0] - self.version_full = release - self.unzip_package(self.fetch_package()) - - return self.patch() - - def driver_binary_in_use(self, path: str = None) -> bool: - """ - naive test to check if a found chromedriver binary is - currently in use - - Args: - path: a string or PathLike object to the binary to check. - if not specified, we check use this object's executable_path - """ - if not path: - path = self.executable_path - p = pathlib.Path(path) - - if not p.exists(): - raise OSError("file does not exist: %s" % p) - try: - with open(p, mode="a+b") as fs: - exc = [] - try: - - fs.seek(0, 0) - except PermissionError as e: - exc.append(e) # since some systems apprently allow seeking - # we conduct another test - try: - fs.readline() - except PermissionError as e: - exc.append(e) - - if exc: - - return True - return False - # ok safe to assume this is in use - except Exception as e: - # logger.exception("whoops ", e) - pass - - def cleanup_unused_files(self): - p = pathlib.Path(self.data_path) - items = list(p.glob("*undetected*")) - for item in items: - try: - item.unlink() - except: - pass - - def patch(self): - self.patch_exe() - return self.is_binary_patched() - - def fetch_release_number(self): - """ - Gets the latest major version available, or the latest major version of self.target_version if set explicitly. - :return: version string - :rtype: LooseVersion - """ - # Endpoint for old versions of Chromedriver (114 and below) - if self.is_old_chromedriver: - path = f"/latest_release_{self.version_main}" - path = path.upper() - logger.debug("getting release number from %s" % path) - return LooseVersion(urlopen(self.url_repo + path).read().decode()) - - # Endpoint for new versions of Chromedriver (115+) - if not self.version_main: - # Fetch the latest version - path = "/last-known-good-versions-with-downloads.json" - logger.debug("getting release number from %s" % path) - with urlopen(self.url_repo + path) as conn: - response = conn.read().decode() - - last_versions = json.loads(response) - return LooseVersion(last_versions["channels"]["Stable"]["version"]) - - # Fetch the latest minor version of the major version provided - path = "/latest-versions-per-milestone-with-downloads.json" - logger.debug("getting release number from %s" % path) - with urlopen(self.url_repo + path) as conn: - response = conn.read().decode() - - major_versions = json.loads(response) - return LooseVersion(major_versions["milestones"][str(self.version_main)]["version"]) - - def parse_exe_version(self): - with io.open(self.executable_path, "rb") as f: - for line in iter(lambda: f.readline(), b""): - match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) - if match: - return LooseVersion(match[1].decode()) - - def fetch_package(self): - """ - Downloads ChromeDriver from source - - :return: path to downloaded file - """ - zip_name = f"chromedriver_{self.platform_name}.zip" - if self.is_old_chromedriver: - download_url = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, zip_name) - else: - zip_name = zip_name.replace("_", "-", 1) - download_url = "https://storage.googleapis.com/chrome-for-testing-public/%s/%s/%s" - download_url %= (self.version_full.vstring, self.platform_name, zip_name) - - logger.debug("downloading from %s" % download_url) - return urlretrieve(download_url)[0] - - def unzip_package(self, fp): - """ - Does what it says - - :return: path to unpacked executable - """ - exe_path = self.exe_name - if not self.is_old_chromedriver: - # The new chromedriver unzips into its own folder - zip_name = f"chromedriver-{self.platform_name}" - exe_path = os.path.join(zip_name, self.exe_name) - - logger.debug("unzipping %s" % fp) - try: - os.unlink(self.zip_path) - except (FileNotFoundError, OSError): - pass - - os.makedirs(self.zip_path, mode=0o755, exist_ok=True) - with zipfile.ZipFile(fp, mode="r") as zf: - zf.extractall(self.zip_path) - os.rename(os.path.join(self.zip_path, exe_path), self.executable_path) - os.remove(fp) - shutil.rmtree - os.chmod(self.executable_path, 0o755) - return self.executable_path - - @staticmethod - def force_kill_instances(exe_name): - """ - kills running instances. - :param: executable name to kill, may be a path as well - - :return: True on success else False - """ - exe_name = os.path.basename(exe_name) - if IS_POSIX: - r = os.system("kill -f -9 $(pidof %s)" % exe_name) - else: - r = os.system("taskkill /f /im %s" % exe_name) - return not r - - @staticmethod - def gen_random_cdc(): - cdc = random.choices(string.ascii_letters, k=27) - return "".join(cdc).encode() - - def is_binary_patched(self, executable_path=None): - executable_path = executable_path or self.executable_path - try: - with io.open(executable_path, "rb") as fh: - return fh.read().find(b"undetected chromedriver") != -1 - except FileNotFoundError: - return False - - def patch_exe(self): - start = time.perf_counter() - logger.info("patching driver executable %s" % self.executable_path) - with io.open(self.executable_path, "r+b") as fh: - content = fh.read() - # match_injected_codeblock = re.search(rb"{window.*;}", content) - match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content) - if match_injected_codeblock: - target_bytes = match_injected_codeblock[0] - new_target_bytes = ( - b'{console.log("undetected chromedriver 1337!")}'.ljust( - len(target_bytes), b" " - ) - ) - new_content = content.replace(target_bytes, new_target_bytes) - if new_content == content: - logger.warning( - "something went wrong patching the driver binary. could not find injection code block" - ) - else: - logger.debug( - "found block:\n%s\nreplacing with:\n%s" - % (target_bytes, new_target_bytes) - ) - fh.seek(0) - fh.write(new_content) - logger.debug( - "patching took us {:.2f} seconds".format(time.perf_counter() - start) - ) - - def __repr__(self): - return "{0:s}({1:s})".format( - self.__class__.__name__, - self.executable_path, - ) - - def __del__(self): - if self._custom_exe_path: - # if the driver binary is specified by user - # we assume it is important enough to not delete it - return - else: - timeout = 3 # stop trying after this many seconds - t = time.monotonic() - now = lambda: time.monotonic() - while now() - t > timeout: - # we don't want to wait until the end of time - try: - if self.user_multi_procs: - break - os.unlink(self.executable_path) - logger.debug("successfully unlinked %s" % self.executable_path) - break - except (OSError, RuntimeError, PermissionError): - time.sleep(0.01) - continue - except FileNotFoundError: - break diff --git a/src/undetected_chromedriver/reactor.py b/src/undetected_chromedriver/reactor.py deleted file mode 100644 index d52e312..0000000 --- a/src/undetected_chromedriver/reactor.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -import asyncio -import json -import logging -import threading - - -logger = logging.getLogger(__name__) - - -class Reactor(threading.Thread): - def __init__(self, driver: "Chrome"): - super().__init__() - - self.driver = driver - self.loop = asyncio.new_event_loop() - - self.lock = threading.Lock() - self.event = threading.Event() - self.daemon = True - self.handlers = {} - - def add_event_handler(self, method_name, callback: callable): - """ - - Parameters - ---------- - event_name: str - example "Network.responseReceived" - - callback: callable - callable which accepts 1 parameter: the message object dictionary - - Returns - ------- - - """ - with self.lock: - self.handlers[method_name.lower()] = callback - - @property - def running(self): - return not self.event.is_set() - - def run(self): - try: - asyncio.set_event_loop(self.loop) - self.loop.run_until_complete(self.listen()) - except Exception as e: - logger.warning("Reactor.run() => %s", e) - - async def _wait_service_started(self): - while True: - with self.lock: - if ( - getattr(self.driver, "service", None) - and getattr(self.driver.service, "process", None) - and self.driver.service.process.poll() - ): - await asyncio.sleep(self.driver._delay or 0.25) - else: - break - - async def listen(self): - while self.running: - await self._wait_service_started() - await asyncio.sleep(1) - - try: - with self.lock: - log_entries = self.driver.get_log("performance") - - for entry in log_entries: - try: - obj_serialized: str = entry.get("message") - obj = json.loads(obj_serialized) - message = obj.get("message") - method = message.get("method") - - if "*" in self.handlers: - await self.loop.run_in_executor( - None, self.handlers["*"], message - ) - elif method.lower() in self.handlers: - await self.loop.run_in_executor( - None, self.handlers[method.lower()], message - ) - - # print(type(message), message) - except Exception as e: - raise e from None - - except Exception as e: - if "invalid session id" in str(e): - pass - else: - logging.debug("exception ignored :", e) diff --git a/src/undetected_chromedriver/webelement.py b/src/undetected_chromedriver/webelement.py deleted file mode 100644 index 03d6878..0000000 --- a/src/undetected_chromedriver/webelement.py +++ /dev/null @@ -1,86 +0,0 @@ -from typing import List - -from selenium.webdriver.common.by import By -import selenium.webdriver.remote.webelement - - -class WebElement(selenium.webdriver.remote.webelement.WebElement): - def click_safe(self): - super().click() - self._parent.reconnect(0.1) - - def children( - self, tag=None, recursive=False - ) -> List[selenium.webdriver.remote.webelement.WebElement]: - """ - returns direct child elements of current element - :param tag: str, if supplied, returns nodes only - """ - script = "return [... arguments[0].children]" - if tag: - script += ".filter( node => node.tagName === '%s')" % tag.upper() - if recursive: - return list(_recursive_children(self, tag)) - return list(self._parent.execute_script(script, self)) - - -class UCWebElement(WebElement): - """ - Custom WebElement class which makes it easier to view elements when - working in an interactive environment. - - standard webelement repr: - - - using this WebElement class: - )> - - """ - - def __init__(self, parent, id_): - super().__init__(parent, id_) - self._attrs = None - - @property - def attrs(self): - if not self._attrs: - self._attrs = self._parent.execute_script( - """ - var items = {}; - for (index = 0; index < arguments[0].attributes.length; ++index) - { - items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value - }; - return items; - """, - self, - ) - return self._attrs - - def __repr__(self): - strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()]) - if strattrs: - strattrs = " " + strattrs - return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>" - - -def _recursive_children(element, tag: str = None, _results=None): - """ - returns all children of recursively - - :param element: `WebElement` object. - find children below this - - :param tag: str = None. - if provided, return only elements. example: 'a', or 'img' - :param _results: do not use! - """ - results = _results or set() - for element in element.children(): - if tag: - if element.tag_name == tag: - results.add(element) - else: - results.add(element) - results |= _recursive_children(element, tag, results) - return results diff --git a/src/utils.py b/src/utils.py index a5bd1ef..4b4e879 100644 --- a/src/utils.py +++ b/src/utils.py @@ -7,16 +7,16 @@ import urllib.parse import tempfile import sys -from selenium.webdriver.chrome.webdriver import WebDriver -import undetected_chromedriver as uc +from DrissionPage import ChromiumPage, ChromiumOptions +# from DrissionPage.common import Settings FLARESOLVERR_VERSION = None PLATFORM_VERSION = None -CHROME_EXE_PATH = None CHROME_MAJOR_VERSION = None -USER_AGENT = None XVFB_DISPLAY = None -PATCHED_DRIVER_PATH = None + +CHROME_EXE_PATH = os.environ.get('CHROME_EXE_PATH', None) +USER_AGENT = os.environ.get('USER_AGENT', None) def get_config_log_html() -> bool: @@ -121,100 +121,64 @@ def create_proxy_extension(proxy: dict) -> str: return proxy_extension_dir -def get_webdriver(proxy: dict = None) -> WebDriver: - global PATCHED_DRIVER_PATH, USER_AGENT +def get_webdriver(proxy: dict = None) -> ChromiumPage: + global CHROME_EXE_PATH, USER_AGENT logging.debug('Launching web browser...') # undetected_chromedriver - options = uc.ChromeOptions() - options.add_argument('--no-sandbox') - options.add_argument('--window-size=1920,1080') + options = ChromiumOptions() + options.set_argument('--no-sandbox') + options.set_argument('--window-size=1920,1080') # todo: this param shows a warning in chrome head-full - options.add_argument('--disable-setuid-sandbox') - options.add_argument('--disable-dev-shm-usage') + options.set_argument('--disable-setuid-sandbox') + options.set_argument('--disable-dev-shm-usage') # this option removes the zygote sandbox (it seems that the resolution is a bit faster) - options.add_argument('--no-zygote') + options.set_argument('--no-zygote') # attempt to fix Docker ARM32 build - options.add_argument('--disable-gpu-sandbox') - options.add_argument('--disable-software-rasterizer') - options.add_argument('--ignore-certificate-errors') - options.add_argument('--ignore-ssl-errors') + options.set_argument('--disable-gpu-sandbox') + options.set_argument('--disable-software-rasterizer') + options.set_argument('--ignore-certificate-errors') + options.set_argument('--ignore-ssl-errors') # fix GL errors in ASUSTOR NAS # https://github.com/FlareSolverr/FlareSolverr/issues/782 # https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069 # https://peter.sh/experiments/chromium-command-line-switches/#use-gl - options.add_argument('--use-gl=swiftshader') + options.set_argument('--use-gl=swiftshader') language = os.environ.get('LANG', None) if language is not None: - options.add_argument('--accept-lang=%s' % language) + options.set_argument('--accept-lang=%s' % language) # Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910 if USER_AGENT is not None: - options.add_argument('--user-agent=%s' % USER_AGENT) + options.set_argument('--user-agent=%s' % USER_AGENT) proxy_extension_dir = None if proxy and all(key in proxy for key in ['url', 'username', 'password']): proxy_extension_dir = create_proxy_extension(proxy) - options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir)) + options.set_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir)) elif proxy and 'url' in proxy: proxy_url = proxy['url'] logging.debug("Using webdriver proxy: %s", proxy_url) - options.add_argument('--proxy-server=%s' % proxy_url) + options.set_argument('--proxy-server=%s' % proxy_url) - # note: headless mode is detected (headless = True) - # we launch the browser in head-full mode with the window hidden windows_headless = False if get_config_headless(): if os.name == 'nt': windows_headless = True else: start_xvfb_display() - # For normal headless mode: - # options.add_argument('--headless') + options.headless(windows_headless) + options.set_argument("--auto-open-devtools-for-tabs") - options.add_argument("--auto-open-devtools-for-tabs") - - # if we are inside the Docker container, we avoid downloading the driver - driver_exe_path = None - version_main = None - if os.path.exists("/app/chromedriver"): - # running inside Docker - driver_exe_path = "/app/chromedriver" - else: - version_main = get_chrome_major_version() - if PATCHED_DRIVER_PATH is not None: - driver_exe_path = PATCHED_DRIVER_PATH - - # detect chrome path - browser_executable_path = get_chrome_exe_path() - - # downloads and patches the chromedriver - # if we don't set driver_executable_path it downloads, patches, and deletes the driver each time - try: - driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path, - driver_executable_path=driver_exe_path, version_main=version_main, - windows_headless=windows_headless, headless=get_config_headless()) - except Exception as e: - logging.error("Error starting Chrome: %s" % e) - - # save the patched driver to avoid re-downloads - if driver_exe_path is None: - PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name) - if PATCHED_DRIVER_PATH != driver.patcher.executable_path: - shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH) + if CHROME_EXE_PATH is not None: + options.set_paths(browser_path=CHROME_EXE_PATH) # clean up proxy extension directory if proxy_extension_dir is not None: shutil.rmtree(proxy_extension_dir) - # selenium vanilla - # options = webdriver.ChromeOptions() - # options.add_argument('--no-sandbox') - # options.add_argument('--window-size=1920,1080') - # options.add_argument('--disable-setuid-sandbox') - # options.add_argument('--disable-dev-shm-usage') - # driver = webdriver.Chrome(options=options) + driver = ChromiumPage(addr_or_opts=options) return driver @@ -237,10 +201,53 @@ def get_chrome_exe_path() -> str: CHROME_EXE_PATH = chrome_path return CHROME_EXE_PATH # system - CHROME_EXE_PATH = uc.find_chrome_executable() + CHROME_EXE_PATH = find_chrome_executable() return CHROME_EXE_PATH +def find_chrome_executable(): + """ + Finds the chrome, chrome beta, chrome canary, chromium executable + + Returns + ------- + executable_path : str + the full file path to found executable + + """ + candidates = set() + if sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd")): + for item in os.environ.get("PATH").split(os.pathsep): + for subitem in ( + "google-chrome", + "chromium", + "chromium-browser", + "chrome", + "google-chrome-stable", + ): + candidates.add(os.sep.join((item, subitem))) + if "darwin" in sys.platform: + candidates.update( + [ + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "/Applications/Chromium.app/Contents/MacOS/Chromium", + ] + ) + else: + for item in map( + os.environ.get, + ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"), + ): + if item is not None: + for subitem in ( + "Google/Chrome/Application", + ): + candidates.add(os.sep.join((item, subitem, "chrome.exe"))) + for candidate in candidates: + if os.path.exists(candidate) and os.access(candidate, os.X_OK): + return os.path.normpath(candidate) + + def get_chrome_major_version() -> str: global CHROME_MAJOR_VERSION if CHROME_MAJOR_VERSION is not None: @@ -314,7 +321,7 @@ def get_user_agent(driver=None) -> str: try: if driver is None: driver = get_webdriver() - USER_AGENT = driver.execute_script("return navigator.userAgent") + USER_AGENT = driver.user_agent # Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910 USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE) return USER_AGENT