selenium replaced with DrissionPage

This commit is contained in:
Alexander 2024-07-29 15:51:30 +02:00
parent eb680efc90
commit d82feffeba
12 changed files with 150 additions and 2217 deletions

View File

@ -1,6 +1,6 @@
bottle==0.12.25 bottle==0.12.25
waitress==2.1.2 waitress==2.1.2
selenium==4.15.2 DrissionPage==4.1.0.0b14
func-timeout==4.3.5 func-timeout==4.3.5
prometheus-client==0.17.1 prometheus-client==0.17.1
# required by undetected_chromedriver # required by undetected_chromedriver

View File

@ -7,13 +7,8 @@ from html import escape
from urllib.parse import unquote, quote from urllib.parse import unquote, quote
from func_timeout import FunctionTimedOut, func_timeout from func_timeout import FunctionTimedOut, func_timeout
from selenium.common import TimeoutException from DrissionPage import ChromiumPage
from selenium.webdriver.chrome.webdriver import WebDriver from DrissionPage._units.listener import DataPacket
from selenium.webdriver.common.by import By
from selenium.webdriver.support.expected_conditions import (
presence_of_element_located, staleness_of, title_is)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
import utils import utils
from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT, from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT,
@ -251,178 +246,128 @@ def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT:
driver.quit() driver.quit()
logging.debug('A used instance of webdriver has been destroyed') logging.debug('A used instance of webdriver has been destroyed')
def click_verify(driver: ChromiumPage) -> DataPacket:
def click_verify(driver: WebDriver):
try: try:
logging.debug("Try to find the Cloudflare verify checkbox...") bde = (
iframe = driver.find_element(By.XPATH, "//iframe[starts-with(@id, 'cf-chl-widget-')]") driver
driver.switch_to.frame(iframe) .ele("@Style=border: 0px; margin: 0px; padding: 0px;", timeout=10)
checkbox = driver.find_element( .shadow_root
by=By.XPATH, .ele("tag:iframe", timeout=10)
value='//*[@id="content"]/div/div/label/input', .ele('tag:body', timeout=10)
.shadow_root
) )
if checkbox: ve = bde.ele("text:Verify you are human", timeout=10)
actions = ActionChains(driver)
actions.move_to_element_with_offset(checkbox, 5, 7)
actions.click(checkbox)
actions.perform()
logging.debug("Cloudflare verify checkbox found and clicked!")
except Exception:
logging.debug("Cloudflare verify checkbox not found on the page.")
finally:
driver.switch_to.default_content()
try: driver.listen.start(driver.url)
logging.debug("Try to find the Cloudflare 'Verify you are human' button...") ve.click()
button = driver.find_element( data = driver.listen.wait(count=1)
by=By.XPATH,
value="//input[@type='button' and @value='Verify you are human']",
)
if button:
actions = ActionChains(driver)
actions.move_to_element_with_offset(button, 5, 7)
actions.click(button)
actions.perform()
logging.debug("The Cloudflare 'Verify you are human' button found and clicked!")
except Exception:
logging.debug("The Cloudflare 'Verify you are human' button not found on the page.")
time.sleep(2) if isinstance(data, DataPacket):
return data
return None
except Exception as e:
logging.debug("Cloudflare verify checkbox not found on the page. %s", repr(e))
def get_correct_window(driver: WebDriver) -> WebDriver: def search_challenge(driver: ChromiumPage) -> bool:
if len(driver.window_handles) > 1: page_title = driver.title.lower()
for window_handle in driver.window_handles:
driver.switch_to.window(window_handle) # find challenge by title
current_url = driver.current_url for title in CHALLENGE_TITLES:
if not current_url.startswith("devtools://devtools"): if title.lower() == page_title:
return driver logging.debug("Challenge detected. Title found: %s", page_title)
return driver return True
# find challenge by selectors
if driver.wait.eles_loaded(locators=CHALLENGE_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
logging.debug("Challenge detected. One of selectors found")
return True
return False
def access_page(driver: WebDriver, url: str) -> None: def _evil_logic(req: V1RequestBase, driver: ChromiumPage, method: str) -> ChallengeResolutionT:
driver.get(url)
driver.start_session()
driver.start_session() # required to bypass Cloudflare
def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> ChallengeResolutionT:
res = ChallengeResolutionT({}) res = ChallengeResolutionT({})
res.status = STATUS_OK res.status = STATUS_OK
res.message = "" res.message = ""
# navigate to the page # navigate to the page
logging.debug(f'Navigating to... {req.url}') logging.debug('Navigating to... %s', req.url)
if method == 'POST': if method == 'POST':
_post_request(req, driver) _post_request(req, driver)
else: else:
access_page(driver, req.url) driver.get(req.url)
driver = get_correct_window(driver)
# set cookies if required # set cookies if required
if req.cookies is not None and len(req.cookies) > 0: if req.cookies is not None and len(req.cookies) > 0:
logging.debug(f'Setting cookies...') logging.debug('Setting cookies...')
for cookie in req.cookies: for cookie in req.cookies:
driver.delete_cookie(cookie['name']) driver.set.cookies.remove(cookie['name'])
driver.add_cookie(cookie) driver.set.cookies(cookie)
# reload the page # reload the page
if method == 'POST': if method == 'POST':
_post_request(req, driver) _post_request(req, driver)
else: else:
access_page(driver, req.url) driver.get(req.url)
driver = get_correct_window(driver)
# wait for the page # wait for the page
if utils.get_config_log_html(): if utils.get_config_log_html():
logging.debug(f"Response HTML:\n{driver.page_source}") logging.debug("Response HTML:\n%s", driver.page_source)
html_element = driver.find_element(By.TAG_NAME, "html")
page_title = driver.title
page_title = driver.title
# find access denied titles # find access denied titles
for title in ACCESS_DENIED_TITLES: for title in ACCESS_DENIED_TITLES:
if title == page_title: if title == page_title:
raise Exception('Cloudflare has blocked this request. ' raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.') 'Probably your IP is banned for this site, check in your web browser.')
# find access denied selectors # find access denied selectors
for selector in ACCESS_DENIED_SELECTORS: if driver.wait.eles_loaded(locators=ACCESS_DENIED_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
found_elements = driver.find_elements(By.CSS_SELECTOR, selector) raise Exception('Cloudflare has blocked this request. '
if len(found_elements) > 0: 'Probably your IP is banned for this site, check in your web browser.')
raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.')
# find challenge by title
challenge_found = False
for title in CHALLENGE_TITLES:
if title.lower() == page_title.lower():
challenge_found = True
logging.info("Challenge detected. Title found: " + page_title)
break
if not challenge_found:
# find challenge by selectors
for selector in CHALLENGE_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
challenge_found = True
logging.info("Challenge detected. Selector found: " + selector)
break
attempt = 0 attempt = 0
if challenge_found: data = DataPacket
while True: challenge_found = True
try: while challenge_found:
attempt = attempt + 1 try:
# wait until the title changes attempt += 1
for title in CHALLENGE_TITLES:
logging.debug("Waiting for title (attempt " + str(attempt) + "): " + title)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title))
# then wait until all the selectors disappear if search_challenge(driver):
for selector in CHALLENGE_SELECTORS: if attempt == 1:
logging.debug("Waiting for selector (attempt " + str(attempt) + "): " + selector) logging.info("Challenge detected.")
WebDriverWait(driver, SHORT_TIMEOUT).until_not(
presence_of_element_located((By.CSS_SELECTOR, selector)))
# all elements not found data = click_verify(driver)
else:
if attempt == 1:
logging.info("Challenge not detected!")
res.message = "Challenge not detected!"
else:
logging.info("Challenge solved!")
res.message = "Challenge solved!"
break break
except TimeoutException: except Exception as e:
logging.debug("Timeout waiting for selector") logging.debug("Cloudflare check exception")
raise e
click_verify(driver)
# update the html (cloudflare reloads the page every 5 s)
html_element = driver.find_element(By.TAG_NAME, "html")
# waits until cloudflare redirection ends
logging.debug("Waiting for redirect")
# noinspection PyBroadException
try:
WebDriverWait(driver, SHORT_TIMEOUT).until(staleness_of(html_element))
except Exception:
logging.debug("Timeout waiting for redirect")
logging.info("Challenge solved!")
res.message = "Challenge solved!"
else:
logging.info("Challenge not detected!")
res.message = "Challenge not detected!"
challenge_res = ChallengeResolutionResultT({}) challenge_res = ChallengeResolutionResultT({})
challenge_res.url = driver.current_url challenge_res.url = driver.url
challenge_res.status = 200 # todo: fix, selenium not provides this info challenge_res.cookies = driver.cookies()
challenge_res.cookies = driver.get_cookies()
challenge_res.userAgent = utils.get_user_agent(driver) challenge_res.userAgent = utils.get_user_agent(driver)
if not req.returnOnlyCookies: if data is not None and data.response is not None:
challenge_res.headers = {} # todo: fix, selenium not provides this info challenge_res.status = data.response.status
challenge_res.response = driver.page_source if not req.returnOnlyCookies:
challenge_res.response = data.response.body
challenge_res.headers = data.response.headers.copy()
res.result = challenge_res res.result = challenge_res
return res return res
def _post_request(req: V1RequestBase, driver: WebDriver): def _post_request(req: V1RequestBase, driver: ChromiumPage):
post_form = f'<form id="hackForm" action="{req.url}" method="POST">' post_form = f'<form id="hackForm" action="{req.url}" method="POST">'
query_string = req.postData if req.postData[0] != '?' else req.postData[1:] query_string = req.postData if req.postData[0] != '?' else req.postData[1:]
pairs = query_string.split('&') pairs = query_string.split('&')
@ -451,5 +396,3 @@ def _post_request(req: V1RequestBase, driver: WebDriver):
</body> </body>
</html>""" </html>"""
driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content)) driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content))
driver.start_session()
driver.start_session() # required to bypass Cloudflare

View File

@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from typing import Optional, Tuple from typing import Optional, Tuple
from uuid import uuid1 from uuid import uuid1
from selenium.webdriver.chrome.webdriver import WebDriver from DrissionPage import ChromiumPage
import utils import utils
@ -12,7 +12,7 @@ import utils
@dataclass @dataclass
class Session: class Session:
session_id: str session_id: str
driver: WebDriver driver: ChromiumPage
created_at: datetime created_at: datetime
def lifetime(self) -> timedelta: def lifetime(self) -> timedelta:
@ -27,13 +27,13 @@ class SessionsStorage:
def create(self, session_id: Optional[str] = None, proxy: Optional[dict] = None, def create(self, session_id: Optional[str] = None, proxy: Optional[dict] = None,
force_new: Optional[bool] = False) -> Tuple[Session, bool]: force_new: Optional[bool] = False) -> Tuple[Session, bool]:
"""create creates new instance of WebDriver if necessary, """create creates new instance of ChromiumPage if necessary,
assign defined (or newly generated) session_id to the instance assign defined (or newly generated) session_id to the instance
and returns the session object. If a new session has been created and returns the session object. If a new session has been created
second argument is set to True. second argument is set to True.
Note: The function is idempotent, so in case if session_id Note: The function is idempotent, so in case if session_id
already exists in the storage a new instance of WebDriver won't be created already exists in the storage a new instance of ChromiumPage won't be created
and existing session will be returned. Second argument defines if and existing session will be returned. Second argument defines if
new session has been created (True) or an existing one was used (False). new session has been created (True) or an existing one was used (False).
""" """

View File

@ -1,914 +0,0 @@
#!/usr/bin/env python3
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
from __future__ import annotations
__version__ = "3.5.5"
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import sys
import tempfile
import time
from weakref import finalize
import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver
from selenium.webdriver.common.by import By
import selenium.webdriver.chromium.service
import selenium.webdriver.remote.command
import selenium.webdriver.remote.webdriver
from .cdp import CDP
from .dprocess import start_detached
from .options import ChromeOptions
from .patcher import IS_POSIX
from .patcher import Patcher
from .reactor import Reactor
from .webelement import UCWebElement
from .webelement import WebElement
__all__ = (
"Chrome",
"ChromeOptions",
"Patcher",
"Reactor",
"CDP",
"find_chrome_executable",
)
logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
"""
Controls the ChromeDriver and allows you to drive the browser.
The webdriver file will be downloaded by this module automatically,
you do not need to specify this. however, you may if you wish.
Attributes
----------
Methods
-------
reconnect()
this can be useful in case of heavy detection methods
-stops the chromedriver service which runs in the background
-starts the chromedriver service which runs in the background
-recreate session
start_session(capabilities=None, browser_profile=None)
differentiates from the regular method in that it does not
require a capabilities argument. The capabilities are automatically
recreated from the options at creation time.
--------------------------------------------------------------------------
NOTE:
Chrome has everything included to work out of the box.
it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems.
--------------------------------------------------------------------------
"""
_instances = set()
session_id = None
debug = False
def __init__(
self,
options=None,
user_data_dir=None,
driver_executable_path=None,
browser_executable_path=None,
port=0,
enable_cdp_events=False,
# service_args=None,
# service_creationflags=None,
desired_capabilities=None,
advanced_elements=False,
# service_log_path=None,
keep_alive=True,
log_level=0,
headless=False,
version_main=None,
patcher_force_close=False,
suppress_welcome=True,
use_subprocess=False,
debug=False,
no_sandbox=True,
windows_headless=False,
user_multi_procs: bool = False,
**kw,
):
"""
Creates a new instance of the chrome driver.
Starts the service and then creates new instance of chrome driver.
Parameters
----------
options: ChromeOptions, optional, default: None - automatic useful defaults
this takes an instance of ChromeOptions, mainly to customize browser behavior.
anything other dan the default, for example extensions or startup options
are not supported in case of failure, and can probably lowers your undetectability.
user_data_dir: str , optional, default: None (creates temp profile)
if user_data_dir is a path to a valid chrome profile directory, use it,
and turn off automatic removal mechanism at exit.
driver_executable_path: str, optional, default: None(=downloads and patches new binary)
browser_executable_path: str, optional, default: None - use find_chrome_executable
Path to the browser executable.
If not specified, make sure the executable's folder is in $PATH
port: int, optional, default: 0
port to be used by the chromedriver executable, this is NOT the debugger port.
leave it at 0 unless you know what you are doing.
the default value of 0 automatically picks an available port.
enable_cdp_events: bool, default: False
:: currently for chrome only
this enables the handling of wire messages
when enabled, you can subscribe to CDP events by using:
driver.add_cdp_listener("Network.dataReceived", yourcallback)
# yourcallback is an callable which accepts exactly 1 dict as parameter
service_args: list of str, optional, default: None
arguments to pass to the driver service
desired_capabilities: dict, optional, default: None - auto from config
Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref".
advanced_elements: bool, optional, default: False
makes it easier to recognize elements like you know them from html/browser inspection, especially when working
in an interactive environment
default webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
advanced webelement repr
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time.
service_log_path: str, optional, default: None
path to log information from the driver.
keep_alive: bool, optional, default: True
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
log_level: int, optional, default: adapts to python global log level
headless: bool, optional, default: False
can also be specified in the options instance.
Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported.
version_main: int, optional, default: None (=auto)
if you, for god knows whatever reason, use
an older version of Chrome. You can specify it's full rounded version number
here. Example: 87 for all versions of 87
patcher_force_close: bool, optional, default: False
instructs the patcher to do whatever it can to access the chromedriver binary
if the file is locked, it will force shutdown all instances.
setting it is not recommended, unless you know the implications and think
you might need it.
suppress_welcome: bool, optional , default: True
a "welcome" alert might show up on *nix-like systems asking whether you want to set
chrome as your default browser, and if you want to send even more data to google.
now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False.
Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception.
use_subprocess: bool, optional , default: True,
False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python
This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after
program exits or using .quit()
you should be knowing what you're doing, and know how python works.
unfortunately, there is always an edge case in which one would like to write an single script with the only contents being:
--start script--
import undetected_chromedriver as uc
d = uc.Chrome()
d.get('https://somesite/')
---end script --
and will be greeted with an error, since the program exists before chrome has a change to launch.
in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times.
! setting it to True comes with NO support when being detected. !
no_sandbox: bool, optional, default=True
uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar
this option has a default of True since many people seem to run this as root (....) , and chrome does not start
when running as root without using --no-sandbox flag.
user_multi_procs:
set to true when you are using multithreads/multiprocessing
ensures not all processes are trying to modify a binary which is in use by another.
for this to work. YOU MUST HAVE AT LEAST 1 UNDETECTED_CHROMEDRIVER BINARY IN YOUR ROAMING DATA FOLDER.
this requirement can be easily satisfied, by just running this program "normal" and close/kill it.
"""
finalize(self, self._ensure_close, self)
self.debug = debug
self.patcher = Patcher(
executable_path=driver_executable_path,
force=patcher_force_close,
version_main=version_main,
user_multi_procs=user_multi_procs,
)
# self.patcher.auto(user_multiprocess = user_multi_num_procs)
self.patcher.auto()
# self.patcher = patcher
if not options:
options = ChromeOptions()
try:
if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options,
# as it just appends arguments, not replace them
# you'll get conflicts starting chrome
raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError:
pass
options._session = self
if not options.debugger_address:
debug_port = (
port
if port != 0
else selenium.webdriver.common.service.utils.free_port()
)
debug_host = "127.0.0.1"
options.debugger_address = "%s:%d" % (debug_host, debug_port)
else:
debug_host, debug_port = options.debugger_address.split(":")
debug_port = int(debug_port)
if enable_cdp_events:
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}
)
options.add_argument("--remote-debugging-host=%s" % debug_host)
options.add_argument("--remote-debugging-port=%s" % debug_port)
if user_data_dir:
options.add_argument("--user-data-dir=%s" % user_data_dir)
language, keep_user_data_dir = None, bool(user_data_dir)
# see if a custom user profile is specified in options
for arg in options.arguments:
if any([_ in arg for _ in ("--headless", "headless")]):
options.arguments.remove(arg)
options.headless = True
if "lang" in arg:
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try:
language = m[1]
except IndexError:
logger.debug("will set the language to en-US,en;q=0.9")
language = "en-US,en;q=0.9"
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
keep_user_data_dir = True
except IndexError:
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
if not user_data_dir:
# backward compatiblity
# check if an old uc.ChromeOptions is used, and extract the user data dir
if hasattr(options, "user_data_dir") and getattr(
options, "user_data_dir", None
):
import warnings
warnings.warn(
"using ChromeOptions.user_data_dir might stop working in future versions."
"use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder"
)
options.add_argument("--user-data-dir=%s" % options.user_data_dir)
keep_user_data_dir = True
logger.debug(
"user_data_dir property found in options object: %s" % user_data_dir
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
if not language:
try:
import locale
language = locale.getdefaultlocale()[0].replace("_", "-")
except Exception:
pass
if not language:
language = "en-US"
options.add_argument("--lang=%s" % language)
if not options.binary_location:
options.binary_location = (
browser_executable_path or find_chrome_executable()
)
if not options.binary_location or not \
pathlib.Path(options.binary_location).exists():
raise FileNotFoundError(
"\n---------------------\n"
"Could not determine browser executable."
"\n---------------------\n"
"Make sure your browser is installed in the default location (path).\n"
"If you are sure about the browser executable, you can specify it using\n"
"the `browser_executable_path='{}` parameter.\n\n"
.format("/path/to/browser/executable" if IS_POSIX else "c:/path/to/your/browser.exe")
)
self._delay = 3
self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir
if suppress_welcome:
options.arguments.extend(["--no-default-browser-check", "--no-first-run"])
if no_sandbox:
options.arguments.extend(["--no-sandbox", "--test-type"])
if headless or getattr(options, 'headless', None):
#workaround until a better checking is found
try:
v_main = int(self.patcher.version_main) if self.patcher.version_main else 108
if v_main < 108:
options.add_argument("--headless=chrome")
elif v_main >= 108:
options.add_argument("--headless=new")
except:
logger.warning("could not detect version_main."
"therefore, we are assuming it is chrome 108 or higher")
options.add_argument("--headless=new")
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
options.add_argument(
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
)
if hasattr(options, "handle_prefs"):
options.handle_prefs(user_data_dir)
# fix exit_type flag to prevent tab-restore nag
try:
with open(
os.path.join(user_data_dir, "Default/Preferences"),
encoding="latin1",
mode="r+",
) as fs:
config = json.load(fs)
if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag
config["profile"]["exit_type"] = None
fs.seek(0, 0)
json.dump(config, fs)
fs.truncate() # the file might be shorter
logger.debug("fixed exit_type flag")
except Exception as e:
logger.debug("did not find a bad exit_type flag ")
self.options = options
if not desired_capabilities:
desired_capabilities = options.to_capabilities()
if not use_subprocess and not windows_headless:
self.browser_pid = start_detached(
options.binary_location, *options.arguments
)
else:
startupinfo = None
if os.name == 'nt' and windows_headless:
# STARTUPINFO() is Windows only
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
browser = subprocess.Popen(
[options.binary_location, *options.arguments],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=IS_POSIX,
startupinfo=startupinfo
)
self.browser_pid = browser.pid
service = selenium.webdriver.chromium.service.ChromiumService(
self.patcher.executable_path
)
super(Chrome, self).__init__(
service=service,
options=options,
keep_alive=keep_alive,
)
self.reactor = None
if enable_cdp_events:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.getLogger(
"selenium.webdriver.remote.remote_connection"
).setLevel(20)
reactor = Reactor(self)
reactor.start()
self.reactor = reactor
if advanced_elements:
self._web_element_cls = UCWebElement
else:
self._web_element_cls = WebElement
if headless or getattr(options, 'headless', None):
self._configure_headless()
def _configure_headless(self):
orig_get = self.get
logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
if self.execute_script("return navigator.webdriver"):
logger.info("patch navigator.webdriver")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, "navigator", {
Object.defineProperty(window, "navigator", {
value: new Proxy(navigator, {
has: (target, key) => (key === "webdriver" ? false : key in target),
get: (target, key) =>
key === "webdriver"
? false
: typeof target[key] === "function"
? target[key].bind(target)
: target[key],
}),
});
"""
},
)
logger.info("patch user-agent string")
self.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": self.execute_script(
"return navigator.userAgent"
).replace("Headless", "")
},
)
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1});
Object.defineProperty(navigator.connection, 'rtt', {get: () => 100});
// https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/chrome-runtime.js
window.chrome = {
app: {
isInstalled: false,
InstallState: {
DISABLED: 'disabled',
INSTALLED: 'installed',
NOT_INSTALLED: 'not_installed'
},
RunningState: {
CANNOT_RUN: 'cannot_run',
READY_TO_RUN: 'ready_to_run',
RUNNING: 'running'
}
},
runtime: {
OnInstalledReason: {
CHROME_UPDATE: 'chrome_update',
INSTALL: 'install',
SHARED_MODULE_UPDATE: 'shared_module_update',
UPDATE: 'update'
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic'
},
PlatformArch: {
ARM: 'arm',
ARM64: 'arm64',
MIPS: 'mips',
MIPS64: 'mips64',
X86_32: 'x86-32',
X86_64: 'x86-64'
},
PlatformNaclArch: {
ARM: 'arm',
MIPS: 'mips',
MIPS64: 'mips64',
X86_32: 'x86-32',
X86_64: 'x86-64'
},
PlatformOs: {
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
MAC: 'mac',
OPENBSD: 'openbsd',
WIN: 'win'
},
RequestUpdateCheckStatus: {
NO_UPDATE: 'no_update',
THROTTLED: 'throttled',
UPDATE_AVAILABLE: 'update_available'
}
}
}
// https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/navigator-permissions.js
if (!window.Notification) {
window.Notification = {
permission: 'denied'
}
}
const originalQuery = window.navigator.permissions.query
window.navigator.permissions.__proto__.query = parameters =>
parameters.name === 'notifications'
? Promise.resolve({ state: window.Notification.permission })
: originalQuery(parameters)
const oldCall = Function.prototype.call
function call() {
return oldCall.apply(this, arguments)
}
Function.prototype.call = call
const nativeToStringFunctionString = Error.toString().replace(/Error/g, 'toString')
const oldToString = Function.prototype.toString
function functionToString() {
if (this === window.navigator.permissions.query) {
return 'function query() { [native code] }'
}
if (this === functionToString) {
return nativeToStringFunctionString
}
return oldCall.call(oldToString, this)
}
// eslint-disable-next-line
Function.prototype.toString = functionToString
"""
},
)
return orig_get(*args, **kwargs)
self.get = get_wrapped
# def _get_cdc_props(self):
# return self.execute_script(
# """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
#
# return result.filter(i => i.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig))
# """
# )
#
# def _hook_remove_cdc_props(self):
# self.execute_cdp_cmd(
# "Page.addScriptToEvaluateOnNewDocument",
# {
# "source": """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
# result.forEach(p => p.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig)
# &&delete window[p]&&console.log('removed',p))
# """
# },
# )
def get(self, url):
# if self._get_cdc_props():
# self._hook_remove_cdc_props()
return super().get(url)
def add_cdp_listener(self, event_name, callback):
if (
self.reactor
and self.reactor is not None
and isinstance(self.reactor, Reactor)
):
self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers
return False
def clear_cdp_listeners(self):
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.handlers.clear()
def window_new(self):
self.execute(
selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"}
)
def tab_new(self, url: str):
"""
this opens a url in a new tab.
apparently, that passes all tests directly!
Parameters
----------
url
Returns
-------
"""
if not hasattr(self, "cdp"):
from .cdp import CDP
cdp = CDP(self.options)
cdp.tab_new(url)
def reconnect(self, timeout=0.1):
try:
self.service.stop()
except Exception as e:
logger.debug(e)
time.sleep(timeout)
try:
self.service.start()
except Exception as e:
logger.debug(e)
try:
self.start_session()
except Exception as e:
logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None):
if not capabilities:
capabilities = self.options.to_capabilities()
super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session(
capabilities
)
# super(Chrome, self).start_session(capabilities, browser_profile)
def find_elements_recursive(self, by, value):
"""
find elements in all frames
this is a generator function, which is needed
since if it would return a list of elements, they
will be stale on arrival.
using generator, when the element is returned we are in the correct frame
to use it directly
Args:
by: By
value: str
Returns: Generator[webelement.WebElement]
"""
def search_frame(f=None):
if not f:
# ensure we are on main content frame
self.switch_to.default_content()
else:
self.switch_to.frame(f)
for elem in self.find_elements(by, value):
yield elem
# switch back to main content, otherwise we will get StaleElementReferenceException
self.switch_to.default_content()
# search root frame
for elem in search_frame():
yield elem
# get iframes
frames = self.find_elements('css selector', 'iframe')
# search per frame
for f in frames:
for elem in search_frame(f):
yield elem
def quit(self):
try:
self.service.stop()
self.service.process.kill()
self.command_executor.close()
self.service.process.wait(5)
logger.debug("webdriver process ended")
except (AttributeError, RuntimeError, OSError):
pass
try:
self.reactor.event.set()
logger.debug("shutting down reactor")
except AttributeError:
pass
try:
os.kill(self.browser_pid, 15)
logger.debug("gracefully closed browser")
except Exception as e: # noqa
pass
if (
hasattr(self, "keep_user_data_dir")
and hasattr(self, "user_data_dir")
and not self.keep_user_data_dir
):
for _ in range(5):
try:
shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError:
pass
except (RuntimeError, OSError, PermissionError) as e:
logger.debug(
"When removing the temp profile, a %s occured: %s\nretrying..."
% (e.__class__.__name__, e)
)
else:
logger.debug("successfully removed %s" % self.user_data_dir)
break
try:
time.sleep(0.1)
except OSError:
pass
# dereference patcher, so patcher can start cleaning up as well.
# this must come last, otherwise it will throw 'in use' errors
self.patcher = None
def __getattribute__(self, item):
if not super().__getattribute__("debug"):
return super().__getattribute__(item)
else:
import inspect
original = super().__getattribute__(item)
if inspect.ismethod(original) and not inspect.isclass(original):
def newfunc(*args, **kwargs):
logger.debug(
"calling %s with args %s and kwargs %s\n"
% (original.__qualname__, args, kwargs)
)
return original(*args, **kwargs)
return newfunc
return original
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop()
time.sleep(self._delay)
self.service.start()
self.start_session()
def __hash__(self):
return hash(self.options.debugger_address)
def __dir__(self):
return object.__dir__(self)
def __del__(self):
try:
self.service.process.kill()
except: # noqa
pass
self.quit()
@classmethod
def _ensure_close(cls, self):
# needs to be a classmethod so finalize can find the reference
logger.info("ensuring close")
if (
hasattr(self, "service")
and hasattr(self.service, "process")
and hasattr(self.service.process, "kill")
):
self.service.process.kill()
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if IS_POSIX:
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in (
"google-chrome",
"chromium",
"chromium-browser",
"chrome",
"google-chrome-stable",
):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
[
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
)
else:
for item in map(
os.environ.get,
("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
):
if item is not None:
for subitem in (
"Google/Chrome/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
logger.debug('checking if %s exists and is executable' % candidate)
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
logger.debug('found! using %s' % candidate)
return os.path.normpath(candidate)

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import json
import logging
import requests
import websockets
log = logging.getLogger(__name__)
class CDPObject(dict):
def __init__(self, *a, **k):
super().__init__(*a, **k)
self.__dict__ = self
for k in self.__dict__:
if isinstance(self.__dict__[k], dict):
self.__dict__[k] = CDPObject(self.__dict__[k])
elif isinstance(self.__dict__[k], list):
for i in range(len(self.__dict__[k])):
if isinstance(self.__dict__[k][i], dict):
self.__dict__[k][i] = CDPObject(self)
def __repr__(self):
tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)"
return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items()))
class PageElement(CDPObject):
pass
class CDP:
log = logging.getLogger("CDP")
endpoints = CDPObject(
{
"json": "/json",
"protocol": "/json/protocol",
"list": "/json/list",
"new": "/json/new?{url}",
"activate": "/json/activate/{id}",
"close": "/json/close/{id}",
}
)
def __init__(self, options: "ChromeOptions"): # noqa
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
self._reqid = 0
self._session = requests.Session()
self._last_resp = None
self._last_json = None
resp = self.get(self.endpoints.json) # noqa
self.sessionId = resp[0]["id"]
self.wsurl = resp[0]["webSocketDebuggerUrl"]
def tab_activate(self, id=None):
if not id:
active_tab = self.tab_list()[0]
id = active_tab.id # noqa
self.wsurl = active_tab.webSocketDebuggerUrl # noqa
return self.post(self.endpoints["activate"].format(id=id))
def tab_list(self):
retval = self.get(self.endpoints["list"])
return [PageElement(o) for o in retval]
def tab_new(self, url):
return self.post(self.endpoints["new"].format(url=url))
def tab_close_last_opened(self):
sessions = self.tab_list()
opentabs = [s for s in sessions if s["type"] == "page"]
return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"]))
async def send(self, method: str, params: dict):
self._reqid += 1
async with websockets.connect(self.wsurl) as ws:
await ws.send(
json.dumps({"method": method, "params": params, "id": self._reqid})
)
self._last_resp = await ws.recv()
self._last_json = json.loads(self._last_resp)
self.log.info(self._last_json)
def get(self, uri):
resp = self._session.get(self.server_addr + uri)
try:
self._last_resp = resp
self._last_json = resp.json()
except Exception:
return
else:
return self._last_json
def post(self, uri, data: dict = None):
if not data:
data = {}
resp = self._session.post(self.server_addr + uri, json=data)
try:
self._last_resp = resp
self._last_json = resp.json()
except Exception:
return self._last_resp
@property
def last_json(self):
return self._last_json

View File

@ -1,193 +0,0 @@
import asyncio
from collections.abc import Mapping
from collections.abc import Sequence
from functools import wraps
import os
import logging
import threading
import time
import traceback
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import List
from typing import Optional
class Structure(dict):
"""
This is a dict-like object structure, which you should subclass
Only properties defined in the class context are used on initialization.
See example
"""
_store = {}
def __init__(self, *a, **kw):
"""
Instantiate a new instance.
:param a:
:param kw:
"""
super().__init__()
# auxiliar dict
d = dict(*a, **kw)
for k, v in d.items():
if isinstance(v, Mapping):
self[k] = self.__class__(v)
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
self[k] = [self.__class__(i) for i in v]
else:
self[k] = v
super().__setattr__("__dict__", self)
def __getattr__(self, item):
return getattr(super(), item)
def __getitem__(self, item):
return super().__getitem__(item)
def __setattr__(self, key, value):
self.__setitem__(key, value)
def __setitem__(self, key, value):
super().__setitem__(key, value)
def update(self, *a, **kw):
super().update(*a, **kw)
def __eq__(self, other):
return frozenset(other.items()) == frozenset(self.items())
def __hash__(self):
return hash(frozenset(self.items()))
@classmethod
def __init_subclass__(cls, **kwargs):
cls._store = {}
def _normalize_strings(self):
for k, v in self.copy().items():
if isinstance(v, (str)):
self[k] = v.strip()
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
def function_reached_timeout():
if on_timeout:
on_timeout(func)
else:
raise TimeoutError("function call timed out")
t = threading.Timer(interval=seconds, function=function_reached_timeout)
t.start()
try:
return func(*args, **kwargs)
except:
t.cancel()
raise
finally:
t.cancel()
return wrapped
return wrapper
def test():
import sys, os
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
import undetected_chromedriver as uc
import threading
def collector(
driver: uc.Chrome,
stop_event: threading.Event,
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
listen_events: Sequence = ("browser", "network", "performance"),
):
def threaded(driver, stop_event, on_event_coro):
async def _ensure_service_started():
while (
getattr(driver, "service", False)
and getattr(driver.service, "process", False)
and driver.service.process.poll()
):
print("waiting for driver service to come back on")
await asyncio.sleep(0.05)
# await asyncio.sleep(driver._delay or .25)
async def get_log_lines(typ):
await _ensure_service_started()
return driver.get_log(typ)
async def looper():
while not stop_event.is_set():
log_lines = []
try:
for _ in listen_events:
try:
log_lines += await get_log_lines(_)
except:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
continue
if log_lines and on_event_coro:
await on_event_coro(log_lines)
except Exception as e:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(looper())
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
t.start()
async def on_event(data):
print("on_event")
print("data:", data)
def func_called(fn):
def wrapped(*args, **kwargs):
print(
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
)
while driver.service.process and driver.service.process.poll() is not None:
time.sleep(0.1)
res = fn(*args, **kwargs)
print("func completed! (result: %s)" % res)
return res
return wrapped
logging.basicConfig(level=10)
options = uc.ChromeOptions()
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
)
driver = uc.Chrome(version_main=96, options=options)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
driver.command_executor._request = func_called(driver.command_executor._request)
collector_stop = threading.Event()
collector(driver, collector_stop, on_event)
driver.get("https://nowsecure.nl")
time.sleep(10)
if os.name == "nt":
driver.close()
driver.quit()

View File

@ -1,77 +0,0 @@
import atexit
import logging
import multiprocessing
import os
import platform
import signal
from subprocess import PIPE
from subprocess import Popen
import sys
CREATE_NEW_PROCESS_GROUP = 0x00000200
DETACHED_PROCESS = 0x00000008
REGISTERED = []
def start_detached(executable, *args):
"""
Starts a fully independent subprocess (with no parent)
:param executable: executable
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
:return: pid of the grandchild process
"""
# create pipe
reader, writer = multiprocessing.Pipe(False)
# do not keep reference
process = multiprocessing.Process(
target=_start_detached,
args=(executable, *args),
kwargs={"writer": writer},
daemon=True,
)
process.start()
process.join()
# receive pid from pipe
pid = reader.recv()
REGISTERED.append(pid)
# close pipes
writer.close()
reader.close()
process.close()
return pid
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch
kwargs = {}
if platform.system() == "Windows":
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
elif sys.version_info < (3, 2):
# assume posix
kwargs.update(preexec_fn=os.setsid)
else: # Python 3.2+ and Unix
kwargs.update(start_new_session=True)
# run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
# send pid to pipe
writer.send(p.pid)
sys.exit()
def _cleanup():
for pid in REGISTERED:
try:
logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
os.kill(pid, signal.SIGTERM)
except: # noqa
pass
atexit.register(_cleanup)

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import json
import os
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
class ChromeOptions(_ChromiumOptions):
_session = None
_user_data_dir = None
@property
def user_data_dir(self):
return self._user_data_dir
@user_data_dir.setter
def user_data_dir(self, path: str):
"""
Sets the browser profile folder to use, or creates a new profile
at given <path>.
Parameters
----------
path: str
the path to a chrome profile folder
if it does not exist, a new profile will be created at given location
"""
apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath)
@staticmethod
def _undot_key(key, value):
"""turn a (dotted key, value) into a proper nested dict"""
if "." in key:
key, rest = key.split(".", 1)
value = ChromeOptions._undot_key(rest, value)
return {key: value}
@staticmethod
def _merge_nested(a, b):
"""
merges b into a
leaf values in a are overwritten with values from b
"""
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
ChromeOptions._merge_nested(a[key], b[key])
continue
a[key] = b[key]
return a
def handle_prefs(self, user_data_dir):
prefs = self.experimental_options.get("prefs")
if prefs:
user_data_dir = user_data_dir or self._user_data_dir
default_path = os.path.join(user_data_dir, "Default")
os.makedirs(default_path, exist_ok=True)
# undot prefs dict keys
undot_prefs = {}
for key, value in prefs.items():
undot_prefs = self._merge_nested(
undot_prefs, self._undot_key(key, value)
)
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file):
with open(prefs_file, encoding="latin1", mode="r") as f:
undot_prefs = self._merge_nested(json.load(f), undot_prefs)
with open(prefs_file, encoding="latin1", mode="w") as f:
json.dump(undot_prefs, f)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"]
@classmethod
def from_options(cls, options):
o = cls()
o.__dict__.update(options.__dict__)
return o

View File

@ -1,451 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
from distutils.version import LooseVersion
import io
import json
import logging
import os
import pathlib
import platform
import random
import re
import shutil
import string
import sys
import time
from urllib.request import urlopen
from urllib.request import urlretrieve
import zipfile
from multiprocessing import Lock
logger = logging.getLogger(__name__)
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd"))
class Patcher(object):
lock = Lock()
exe_name = "chromedriver%s"
platform = sys.platform
if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver"
elif "LAMBDA_TASK_ROOT" in os.environ:
d = "/tmp/undetected_chromedriver"
elif platform.startswith(("linux", "linux2")):
d = "~/.local/share/undetected_chromedriver"
elif platform.endswith("darwin"):
d = "~/Library/Application Support/undetected_chromedriver"
else:
d = "~/.undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d))
def __init__(
self,
executable_path=None,
force=False,
version_main: int = 0,
user_multi_procs=False,
):
"""
Args:
executable_path: None = automatic
a full file path to the chromedriver executable
force: False
terminate processes which are holding lock
version_main: 0 = auto
specify main chrome version (rounded, ex: 82)
"""
self.force = force
self._custom_exe_path = False
prefix = "undetected"
self.user_multi_procs = user_multi_procs
try:
# Try to convert version_main into an integer
version_main_int = int(version_main)
# check if version_main_int is less than or equal to e.g 114
self.is_old_chromedriver = version_main and version_main_int <= 114
except (ValueError,TypeError):
# Check not running inside Docker
if not os.path.exists("/app/chromedriver"):
# If the conversion fails, log an error message
logging.info("version_main cannot be converted to an integer")
# Set self.is_old_chromedriver to False if the conversion fails
self.is_old_chromedriver = False
# Needs to be called before self.exe_name is accessed
self._set_platform_name()
if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True)
if not executable_path:
if sys.platform.startswith("freebsd"):
self.executable_path = os.path.join(
self.data_path, self.exe_name
)
else:
self.executable_path = os.path.join(
self.data_path, "_".join([prefix, self.exe_name])
)
if not IS_POSIX:
if executable_path:
if not executable_path[-4:] == ".exe":
executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, prefix)
if not executable_path:
if not self.user_multi_procs:
self.executable_path = os.path.abspath(
os.path.join(".", self.executable_path)
)
if executable_path:
self._custom_exe_path = True
self.executable_path = executable_path
# Set the correct repository to download the Chromedriver from
if self.is_old_chromedriver:
self.url_repo = "https://chromedriver.storage.googleapis.com"
else:
self.url_repo = "https://googlechromelabs.github.io/chrome-for-testing"
self.version_main = version_main
self.version_full = None
def _set_platform_name(self):
"""
Set the platform and exe name based on the platform undetected_chromedriver is running on
in order to download the correct chromedriver.
"""
if self.platform.endswith("win32"):
self.platform_name = "win32"
self.exe_name %= ".exe"
if self.platform.endswith(("linux", "linux2")):
self.platform_name = "linux64"
self.exe_name %= ""
if self.platform.endswith("darwin"):
if self.is_old_chromedriver:
self.platform_name = "mac64"
else:
self.platform_name = "mac-x64"
self.exe_name %= ""
if self.platform.startswith("freebsd"):
self.platform_name = "freebsd"
self.exe_name %= ""
def auto(self, executable_path=None, force=False, version_main=None, _=None):
"""
Args:
executable_path:
force:
version_main:
Returns:
"""
p = pathlib.Path(self.data_path)
if self.user_multi_procs:
with Lock():
files = list(p.rglob("*chromedriver*"))
most_recent = max(files, key=lambda f: f.stat().st_mtime)
files.remove(most_recent)
list(map(lambda f: f.unlink(), files))
if self.is_binary_patched(most_recent):
self.executable_path = str(most_recent)
return True
if executable_path:
self.executable_path = executable_path
self._custom_exe_path = True
if self._custom_exe_path:
ispatched = self.is_binary_patched(self.executable_path)
if not ispatched:
return self.patch_exe()
else:
return
if version_main:
self.version_main = version_main
if force is True:
self.force = force
if self.platform_name == "freebsd":
chromedriver_path = shutil.which("chromedriver")
if not os.path.isfile(chromedriver_path) or not os.access(chromedriver_path, os.X_OK):
logging.error("Chromedriver not installed!")
return
version_path = os.path.join(os.path.dirname(self.executable_path), "version.txt")
process = os.popen(f'"{chromedriver_path}" --version')
chromedriver_version = process.read().split(' ')[1].split(' ')[0]
process.close()
current_version = None
if os.path.isfile(version_path) or os.access(version_path, os.X_OK):
with open(version_path, 'r') as f:
current_version = f.read()
if current_version != chromedriver_version:
logging.info("Copying chromedriver executable...")
shutil.copy(chromedriver_path, self.executable_path)
os.chmod(self.executable_path, 0o755)
with open(version_path, 'w') as f:
f.write(chromedriver_version)
logging.info("Chromedriver executable copied!")
else:
try:
os.unlink(self.executable_path)
except PermissionError:
if self.force:
self.force_kill_instances(self.executable_path)
return self.auto(force=not self.force)
try:
if self.is_binary_patched():
# assumes already running AND patched
return True
except PermissionError:
pass
# return False
except FileNotFoundError:
pass
release = self.fetch_release_number()
self.version_main = release.version[0]
self.version_full = release
self.unzip_package(self.fetch_package())
return self.patch()
def driver_binary_in_use(self, path: str = None) -> bool:
"""
naive test to check if a found chromedriver binary is
currently in use
Args:
path: a string or PathLike object to the binary to check.
if not specified, we check use this object's executable_path
"""
if not path:
path = self.executable_path
p = pathlib.Path(path)
if not p.exists():
raise OSError("file does not exist: %s" % p)
try:
with open(p, mode="a+b") as fs:
exc = []
try:
fs.seek(0, 0)
except PermissionError as e:
exc.append(e) # since some systems apprently allow seeking
# we conduct another test
try:
fs.readline()
except PermissionError as e:
exc.append(e)
if exc:
return True
return False
# ok safe to assume this is in use
except Exception as e:
# logger.exception("whoops ", e)
pass
def cleanup_unused_files(self):
p = pathlib.Path(self.data_path)
items = list(p.glob("*undetected*"))
for item in items:
try:
item.unlink()
except:
pass
def patch(self):
self.patch_exe()
return self.is_binary_patched()
def fetch_release_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
:rtype: LooseVersion
"""
# Endpoint for old versions of Chromedriver (114 and below)
if self.is_old_chromedriver:
path = f"/latest_release_{self.version_main}"
path = path.upper()
logger.debug("getting release number from %s" % path)
return LooseVersion(urlopen(self.url_repo + path).read().decode())
# Endpoint for new versions of Chromedriver (115+)
if not self.version_main:
# Fetch the latest version
path = "/last-known-good-versions-with-downloads.json"
logger.debug("getting release number from %s" % path)
with urlopen(self.url_repo + path) as conn:
response = conn.read().decode()
last_versions = json.loads(response)
return LooseVersion(last_versions["channels"]["Stable"]["version"])
# Fetch the latest minor version of the major version provided
path = "/latest-versions-per-milestone-with-downloads.json"
logger.debug("getting release number from %s" % path)
with urlopen(self.url_repo + path) as conn:
response = conn.read().decode()
major_versions = json.loads(response)
return LooseVersion(major_versions["milestones"][str(self.version_main)]["version"])
def parse_exe_version(self):
with io.open(self.executable_path, "rb") as f:
for line in iter(lambda: f.readline(), b""):
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
if match:
return LooseVersion(match[1].decode())
def fetch_package(self):
"""
Downloads ChromeDriver from source
:return: path to downloaded file
"""
zip_name = f"chromedriver_{self.platform_name}.zip"
if self.is_old_chromedriver:
download_url = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, zip_name)
else:
zip_name = zip_name.replace("_", "-", 1)
download_url = "https://storage.googleapis.com/chrome-for-testing-public/%s/%s/%s"
download_url %= (self.version_full.vstring, self.platform_name, zip_name)
logger.debug("downloading from %s" % download_url)
return urlretrieve(download_url)[0]
def unzip_package(self, fp):
"""
Does what it says
:return: path to unpacked executable
"""
exe_path = self.exe_name
if not self.is_old_chromedriver:
# The new chromedriver unzips into its own folder
zip_name = f"chromedriver-{self.platform_name}"
exe_path = os.path.join(zip_name, self.exe_name)
logger.debug("unzipping %s" % fp)
try:
os.unlink(self.zip_path)
except (FileNotFoundError, OSError):
pass
os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
with zipfile.ZipFile(fp, mode="r") as zf:
zf.extractall(self.zip_path)
os.rename(os.path.join(self.zip_path, exe_path), self.executable_path)
os.remove(fp)
shutil.rmtree
os.chmod(self.executable_path, 0o755)
return self.executable_path
@staticmethod
def force_kill_instances(exe_name):
"""
kills running instances.
:param: executable name to kill, may be a path as well
:return: True on success else False
"""
exe_name = os.path.basename(exe_name)
if IS_POSIX:
r = os.system("kill -f -9 $(pidof %s)" % exe_name)
else:
r = os.system("taskkill /f /im %s" % exe_name)
return not r
@staticmethod
def gen_random_cdc():
cdc = random.choices(string.ascii_letters, k=27)
return "".join(cdc).encode()
def is_binary_patched(self, executable_path=None):
executable_path = executable_path or self.executable_path
try:
with io.open(executable_path, "rb") as fh:
return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError:
return False
def patch_exe(self):
start = time.perf_counter()
logger.info("patching driver executable %s" % self.executable_path)
with io.open(self.executable_path, "r+b") as fh:
content = fh.read()
# match_injected_codeblock = re.search(rb"{window.*;}", content)
match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content)
if match_injected_codeblock:
target_bytes = match_injected_codeblock[0]
new_target_bytes = (
b'{console.log("undetected chromedriver 1337!")}'.ljust(
len(target_bytes), b" "
)
)
new_content = content.replace(target_bytes, new_target_bytes)
if new_content == content:
logger.warning(
"something went wrong patching the driver binary. could not find injection code block"
)
else:
logger.debug(
"found block:\n%s\nreplacing with:\n%s"
% (target_bytes, new_target_bytes)
)
fh.seek(0)
fh.write(new_content)
logger.debug(
"patching took us {:.2f} seconds".format(time.perf_counter() - start)
)
def __repr__(self):
return "{0:s}({1:s})".format(
self.__class__.__name__,
self.executable_path,
)
def __del__(self):
if self._custom_exe_path:
# if the driver binary is specified by user
# we assume it is important enough to not delete it
return
else:
timeout = 3 # stop trying after this many seconds
t = time.monotonic()
now = lambda: time.monotonic()
while now() - t > timeout:
# we don't want to wait until the end of time
try:
if self.user_multi_procs:
break
os.unlink(self.executable_path)
logger.debug("successfully unlinked %s" % self.executable_path)
break
except (OSError, RuntimeError, PermissionError):
time.sleep(0.01)
continue
except FileNotFoundError:
break

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import asyncio
import json
import logging
import threading
logger = logging.getLogger(__name__)
class Reactor(threading.Thread):
def __init__(self, driver: "Chrome"):
super().__init__()
self.driver = driver
self.loop = asyncio.new_event_loop()
self.lock = threading.Lock()
self.event = threading.Event()
self.daemon = True
self.handlers = {}
def add_event_handler(self, method_name, callback: callable):
"""
Parameters
----------
event_name: str
example "Network.responseReceived"
callback: callable
callable which accepts 1 parameter: the message object dictionary
Returns
-------
"""
with self.lock:
self.handlers[method_name.lower()] = callback
@property
def running(self):
return not self.event.is_set()
def run(self):
try:
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.listen())
except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True:
with self.lock:
if (
getattr(self.driver, "service", None)
and getattr(self.driver.service, "process", None)
and self.driver.service.process.poll()
):
await asyncio.sleep(self.driver._delay or 0.25)
else:
break
async def listen(self):
while self.running:
await self._wait_service_started()
await asyncio.sleep(1)
try:
with self.lock:
log_entries = self.driver.get_log("performance")
for entry in log_entries:
try:
obj_serialized: str = entry.get("message")
obj = json.loads(obj_serialized)
message = obj.get("message")
method = message.get("method")
if "*" in self.handlers:
await self.loop.run_in_executor(
None, self.handlers["*"], message
)
elif method.lower() in self.handlers:
await self.loop.run_in_executor(
None, self.handlers[method.lower()], message
)
# print(type(message), message)
except Exception as e:
raise e from None
except Exception as e:
if "invalid session id" in str(e):
pass
else:
logging.debug("exception ignored :", e)

View File

@ -1,86 +0,0 @@
from typing import List
from selenium.webdriver.common.by import By
import selenium.webdriver.remote.webelement
class WebElement(selenium.webdriver.remote.webelement.WebElement):
def click_safe(self):
super().click()
self._parent.reconnect(0.1)
def children(
self, tag=None, recursive=False
) -> List[selenium.webdriver.remote.webelement.WebElement]:
"""
returns direct child elements of current element
:param tag: str, if supplied, returns <tag> nodes only
"""
script = "return [... arguments[0].children]"
if tag:
script += ".filter( node => node.tagName === '%s')" % tag.upper()
if recursive:
return list(_recursive_children(self, tag))
return list(self._parent.execute_script(script, self))
class UCWebElement(WebElement):
"""
Custom WebElement class which makes it easier to view elements when
working in an interactive environment.
standard webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
using this WebElement class:
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
"""
def __init__(self, parent, id_):
super().__init__(parent, id_)
self._attrs = None
@property
def attrs(self):
if not self._attrs:
self._attrs = self._parent.execute_script(
"""
var items = {};
for (index = 0; index < arguments[0].attributes.length; ++index)
{
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
};
return items;
""",
self,
)
return self._attrs
def __repr__(self):
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
if strattrs:
strattrs = " " + strattrs
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
def _recursive_children(element, tag: str = None, _results=None):
"""
returns all children of <element> recursively
:param element: `WebElement` object.
find children below this <element>
:param tag: str = None.
if provided, return only <tag> elements. example: 'a', or 'img'
:param _results: do not use!
"""
results = _results or set()
for element in element.children():
if tag:
if element.tag_name == tag:
results.add(element)
else:
results.add(element)
results |= _recursive_children(element, tag, results)
return results

View File

@ -7,16 +7,16 @@ import urllib.parse
import tempfile import tempfile
import sys import sys
from selenium.webdriver.chrome.webdriver import WebDriver from DrissionPage import ChromiumPage, ChromiumOptions
import undetected_chromedriver as uc # from DrissionPage.common import Settings
FLARESOLVERR_VERSION = None FLARESOLVERR_VERSION = None
PLATFORM_VERSION = None PLATFORM_VERSION = None
CHROME_EXE_PATH = None
CHROME_MAJOR_VERSION = None CHROME_MAJOR_VERSION = None
USER_AGENT = None
XVFB_DISPLAY = None XVFB_DISPLAY = None
PATCHED_DRIVER_PATH = None
CHROME_EXE_PATH = os.environ.get('CHROME_EXE_PATH', None)
USER_AGENT = os.environ.get('USER_AGENT', None)
def get_config_log_html() -> bool: def get_config_log_html() -> bool:
@ -121,100 +121,64 @@ def create_proxy_extension(proxy: dict) -> str:
return proxy_extension_dir return proxy_extension_dir
def get_webdriver(proxy: dict = None) -> WebDriver: def get_webdriver(proxy: dict = None) -> ChromiumPage:
global PATCHED_DRIVER_PATH, USER_AGENT global CHROME_EXE_PATH, USER_AGENT
logging.debug('Launching web browser...') logging.debug('Launching web browser...')
# undetected_chromedriver # undetected_chromedriver
options = uc.ChromeOptions() options = ChromiumOptions()
options.add_argument('--no-sandbox') options.set_argument('--no-sandbox')
options.add_argument('--window-size=1920,1080') options.set_argument('--window-size=1920,1080')
# todo: this param shows a warning in chrome head-full # todo: this param shows a warning in chrome head-full
options.add_argument('--disable-setuid-sandbox') options.set_argument('--disable-setuid-sandbox')
options.add_argument('--disable-dev-shm-usage') options.set_argument('--disable-dev-shm-usage')
# this option removes the zygote sandbox (it seems that the resolution is a bit faster) # this option removes the zygote sandbox (it seems that the resolution is a bit faster)
options.add_argument('--no-zygote') options.set_argument('--no-zygote')
# attempt to fix Docker ARM32 build # attempt to fix Docker ARM32 build
options.add_argument('--disable-gpu-sandbox') options.set_argument('--disable-gpu-sandbox')
options.add_argument('--disable-software-rasterizer') options.set_argument('--disable-software-rasterizer')
options.add_argument('--ignore-certificate-errors') options.set_argument('--ignore-certificate-errors')
options.add_argument('--ignore-ssl-errors') options.set_argument('--ignore-ssl-errors')
# fix GL errors in ASUSTOR NAS # fix GL errors in ASUSTOR NAS
# https://github.com/FlareSolverr/FlareSolverr/issues/782 # https://github.com/FlareSolverr/FlareSolverr/issues/782
# https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069 # https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069
# https://peter.sh/experiments/chromium-command-line-switches/#use-gl # https://peter.sh/experiments/chromium-command-line-switches/#use-gl
options.add_argument('--use-gl=swiftshader') options.set_argument('--use-gl=swiftshader')
language = os.environ.get('LANG', None) language = os.environ.get('LANG', None)
if language is not None: if language is not None:
options.add_argument('--accept-lang=%s' % language) options.set_argument('--accept-lang=%s' % language)
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910 # Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
if USER_AGENT is not None: if USER_AGENT is not None:
options.add_argument('--user-agent=%s' % USER_AGENT) options.set_argument('--user-agent=%s' % USER_AGENT)
proxy_extension_dir = None proxy_extension_dir = None
if proxy and all(key in proxy for key in ['url', 'username', 'password']): if proxy and all(key in proxy for key in ['url', 'username', 'password']):
proxy_extension_dir = create_proxy_extension(proxy) proxy_extension_dir = create_proxy_extension(proxy)
options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir)) options.set_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir))
elif proxy and 'url' in proxy: elif proxy and 'url' in proxy:
proxy_url = proxy['url'] proxy_url = proxy['url']
logging.debug("Using webdriver proxy: %s", proxy_url) logging.debug("Using webdriver proxy: %s", proxy_url)
options.add_argument('--proxy-server=%s' % proxy_url) options.set_argument('--proxy-server=%s' % proxy_url)
# note: headless mode is detected (headless = True)
# we launch the browser in head-full mode with the window hidden
windows_headless = False windows_headless = False
if get_config_headless(): if get_config_headless():
if os.name == 'nt': if os.name == 'nt':
windows_headless = True windows_headless = True
else: else:
start_xvfb_display() start_xvfb_display()
# For normal headless mode: options.headless(windows_headless)
# options.add_argument('--headless') options.set_argument("--auto-open-devtools-for-tabs")
options.add_argument("--auto-open-devtools-for-tabs") if CHROME_EXE_PATH is not None:
options.set_paths(browser_path=CHROME_EXE_PATH)
# if we are inside the Docker container, we avoid downloading the driver
driver_exe_path = None
version_main = None
if os.path.exists("/app/chromedriver"):
# running inside Docker
driver_exe_path = "/app/chromedriver"
else:
version_main = get_chrome_major_version()
if PATCHED_DRIVER_PATH is not None:
driver_exe_path = PATCHED_DRIVER_PATH
# detect chrome path
browser_executable_path = get_chrome_exe_path()
# downloads and patches the chromedriver
# if we don't set driver_executable_path it downloads, patches, and deletes the driver each time
try:
driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path,
driver_executable_path=driver_exe_path, version_main=version_main,
windows_headless=windows_headless, headless=get_config_headless())
except Exception as e:
logging.error("Error starting Chrome: %s" % e)
# save the patched driver to avoid re-downloads
if driver_exe_path is None:
PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name)
if PATCHED_DRIVER_PATH != driver.patcher.executable_path:
shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH)
# clean up proxy extension directory # clean up proxy extension directory
if proxy_extension_dir is not None: if proxy_extension_dir is not None:
shutil.rmtree(proxy_extension_dir) shutil.rmtree(proxy_extension_dir)
# selenium vanilla driver = ChromiumPage(addr_or_opts=options)
# options = webdriver.ChromeOptions()
# options.add_argument('--no-sandbox')
# options.add_argument('--window-size=1920,1080')
# options.add_argument('--disable-setuid-sandbox')
# options.add_argument('--disable-dev-shm-usage')
# driver = webdriver.Chrome(options=options)
return driver return driver
@ -237,10 +201,53 @@ def get_chrome_exe_path() -> str:
CHROME_EXE_PATH = chrome_path CHROME_EXE_PATH = chrome_path
return CHROME_EXE_PATH return CHROME_EXE_PATH
# system # system
CHROME_EXE_PATH = uc.find_chrome_executable() CHROME_EXE_PATH = find_chrome_executable()
return CHROME_EXE_PATH return CHROME_EXE_PATH
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd")):
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in (
"google-chrome",
"chromium",
"chromium-browser",
"chrome",
"google-chrome-stable",
):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
[
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
)
else:
for item in map(
os.environ.get,
("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
):
if item is not None:
for subitem in (
"Google/Chrome/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath(candidate)
def get_chrome_major_version() -> str: def get_chrome_major_version() -> str:
global CHROME_MAJOR_VERSION global CHROME_MAJOR_VERSION
if CHROME_MAJOR_VERSION is not None: if CHROME_MAJOR_VERSION is not None:
@ -314,7 +321,7 @@ def get_user_agent(driver=None) -> str:
try: try:
if driver is None: if driver is None:
driver = get_webdriver() driver = get_webdriver()
USER_AGENT = driver.execute_script("return navigator.userAgent") USER_AGENT = driver.user_agent
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910 # Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE) USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE)
return USER_AGENT return USER_AGENT