mirror of
https://github.com/FlareSolverr/FlareSolverr.git
synced 2025-06-08 04:25:25 +00:00
Fork undetected-chromedriver 3.1.5.post4
This commit is contained in:
parent
3dbb4e65d6
commit
93041779fb
@ -1,6 +1,5 @@
|
||||
bottle==0.12.23
|
||||
waitress==2.1.2
|
||||
selenium==4.4.3
|
||||
undetected-chromedriver==3.1.5.post4
|
||||
func-timeout==4.3.5
|
||||
xvfbwrapper==0.2.9
|
||||
|
699
src/undetected_chromedriver/__init__.py
Normal file
699
src/undetected_chromedriver/__init__.py
Normal file
@ -0,0 +1,699 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
|
||||
"""
|
||||
|
||||
888 888 d8b
|
||||
888 888 Y8P
|
||||
888 888
|
||||
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
|
||||
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
|
||||
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
|
||||
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
|
||||
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
|
||||
|
||||
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
|
||||
|
||||
"""
|
||||
|
||||
|
||||
__version__ = "3.1.5r4"
|
||||
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import inspect
|
||||
import threading
|
||||
|
||||
import selenium.webdriver.chrome.service
|
||||
import selenium.webdriver.chrome.webdriver
|
||||
import selenium.webdriver.common.service
|
||||
import selenium.webdriver.remote.webdriver
|
||||
|
||||
from .cdp import CDP
|
||||
from .options import ChromeOptions
|
||||
from .patcher import IS_POSIX
|
||||
from .patcher import Patcher
|
||||
from .reactor import Reactor
|
||||
from .dprocess import start_detached
|
||||
|
||||
__all__ = (
|
||||
"Chrome",
|
||||
"ChromeOptions",
|
||||
"Patcher",
|
||||
"Reactor",
|
||||
"CDP",
|
||||
"find_chrome_executable",
|
||||
)
|
||||
|
||||
logger = logging.getLogger("uc")
|
||||
logger.setLevel(logging.getLogger().getEffectiveLevel())
|
||||
|
||||
|
||||
class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
|
||||
"""
|
||||
|
||||
Controls the ChromeDriver and allows you to drive the browser.
|
||||
|
||||
The webdriver file will be downloaded by this module automatically,
|
||||
you do not need to specify this. however, you may if you wish.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
|
||||
Methods
|
||||
-------
|
||||
|
||||
reconnect()
|
||||
|
||||
this can be useful in case of heavy detection methods
|
||||
-stops the chromedriver service which runs in the background
|
||||
-starts the chromedriver service which runs in the background
|
||||
-recreate session
|
||||
|
||||
|
||||
start_session(capabilities=None, browser_profile=None)
|
||||
|
||||
differentiates from the regular method in that it does not
|
||||
require a capabilities argument. The capabilities are automatically
|
||||
recreated from the options at creation time.
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
NOTE:
|
||||
Chrome has everything included to work out of the box.
|
||||
it does not `need` customizations.
|
||||
any customizations MAY lead to trigger bot migitation systems.
|
||||
|
||||
--------------------------------------------------------------------------
|
||||
"""
|
||||
|
||||
_instances = set()
|
||||
session_id = None
|
||||
debug = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
options=None,
|
||||
user_data_dir=None,
|
||||
driver_executable_path=None,
|
||||
browser_executable_path=None,
|
||||
port=0,
|
||||
enable_cdp_events=False,
|
||||
service_args=None,
|
||||
desired_capabilities=None,
|
||||
advanced_elements=False,
|
||||
service_log_path=None,
|
||||
keep_alive=True,
|
||||
log_level=0,
|
||||
headless=False,
|
||||
version_main=None,
|
||||
patcher_force_close=False,
|
||||
suppress_welcome=True,
|
||||
use_subprocess=False,
|
||||
debug=False,
|
||||
**kw
|
||||
):
|
||||
"""
|
||||
Creates a new instance of the chrome driver.
|
||||
|
||||
Starts the service and then creates new instance of chrome driver.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
options: ChromeOptions, optional, default: None - automatic useful defaults
|
||||
this takes an instance of ChromeOptions, mainly to customize browser behavior.
|
||||
anything other dan the default, for example extensions or startup options
|
||||
are not supported in case of failure, and can probably lowers your undetectability.
|
||||
|
||||
|
||||
user_data_dir: str , optional, default: None (creates temp profile)
|
||||
if user_data_dir is a path to a valid chrome profile directory, use it,
|
||||
and turn off automatic removal mechanism at exit.
|
||||
|
||||
driver_executable_path: str, optional, default: None(=downloads and patches new binary)
|
||||
|
||||
browser_executable_path: str, optional, default: None - use find_chrome_executable
|
||||
Path to the browser executable.
|
||||
If not specified, make sure the executable's folder is in $PATH
|
||||
|
||||
port: int, optional, default: 0
|
||||
port you would like the service to run, if left as 0, a free port will be found.
|
||||
|
||||
enable_cdp_events: bool, default: False
|
||||
:: currently for chrome only
|
||||
this enables the handling of wire messages
|
||||
when enabled, you can subscribe to CDP events by using:
|
||||
|
||||
driver.add_cdp_listener("Network.dataReceived", yourcallback)
|
||||
# yourcallback is an callable which accepts exactly 1 dict as parameter
|
||||
|
||||
|
||||
service_args: list of str, optional, default: None
|
||||
arguments to pass to the driver service
|
||||
|
||||
desired_capabilities: dict, optional, default: None - auto from config
|
||||
Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref".
|
||||
|
||||
advanced_elements: bool, optional, default: False
|
||||
makes it easier to recognize elements like you know them from html/browser inspection, especially when working
|
||||
in an interactive environment
|
||||
|
||||
default webelement repr:
|
||||
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
|
||||
|
||||
advanced webelement repr
|
||||
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
|
||||
|
||||
note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time.
|
||||
|
||||
|
||||
service_log_path: str, optional, default: None
|
||||
path to log information from the driver.
|
||||
|
||||
keep_alive: bool, optional, default: True
|
||||
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
|
||||
|
||||
log_level: int, optional, default: adapts to python global log level
|
||||
|
||||
headless: bool, optional, default: False
|
||||
can also be specified in the options instance.
|
||||
Specify whether you want to use the browser in headless mode.
|
||||
warning: this lowers undetectability and not fully supported.
|
||||
|
||||
version_main: int, optional, default: None (=auto)
|
||||
if you, for god knows whatever reason, use
|
||||
an older version of Chrome. You can specify it's full rounded version number
|
||||
here. Example: 87 for all versions of 87
|
||||
|
||||
patcher_force_close: bool, optional, default: False
|
||||
instructs the patcher to do whatever it can to access the chromedriver binary
|
||||
if the file is locked, it will force shutdown all instances.
|
||||
setting it is not recommended, unless you know the implications and think
|
||||
you might need it.
|
||||
|
||||
suppress_welcome: bool, optional , default: True
|
||||
a "welcome" alert might show up on *nix-like systems asking whether you want to set
|
||||
chrome as your default browser, and if you want to send even more data to google.
|
||||
now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False.
|
||||
Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception.
|
||||
|
||||
use_subprocess: bool, optional , default: False,
|
||||
|
||||
False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python
|
||||
This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after
|
||||
program exits or using .quit()
|
||||
|
||||
unfortunately, there is always an edge case in which one would like to write an single script with the only contents being:
|
||||
--start script--
|
||||
import undetected_chromedriver as uc
|
||||
d = uc.Chrome()
|
||||
d.get('https://somesite/')
|
||||
---end script --
|
||||
|
||||
and will be greeted with an error, since the program exists before chrome has a change to launch.
|
||||
in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times.
|
||||
! setting it to True comes with NO support when being detected. !
|
||||
|
||||
"""
|
||||
self.debug = debug
|
||||
patcher = Patcher(
|
||||
executable_path=driver_executable_path,
|
||||
force=patcher_force_close,
|
||||
version_main=version_main,
|
||||
)
|
||||
patcher.auto()
|
||||
self.patcher = patcher
|
||||
if not options:
|
||||
options = ChromeOptions()
|
||||
|
||||
|
||||
try:
|
||||
if hasattr(options, "_session") and options._session is not None:
|
||||
# prevent reuse of options,
|
||||
# as it just appends arguments, not replace them
|
||||
# you'll get conflicts starting chrome
|
||||
raise RuntimeError("you cannot reuse the ChromeOptions object")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
options._session = self
|
||||
|
||||
debug_port = selenium.webdriver.common.service.utils.free_port()
|
||||
debug_host = "127.0.0.1"
|
||||
|
||||
if not options.debugger_address:
|
||||
options.debugger_address = "%s:%d" % (debug_host, debug_port)
|
||||
|
||||
if enable_cdp_events:
|
||||
options.set_capability(
|
||||
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}
|
||||
)
|
||||
|
||||
options.add_argument("--remote-debugging-host=%s" % debug_host)
|
||||
options.add_argument("--remote-debugging-port=%s" % debug_port)
|
||||
|
||||
if user_data_dir:
|
||||
options.add_argument('--user-data-dir=%s' % user_data_dir)
|
||||
|
||||
language, keep_user_data_dir = None, bool(user_data_dir)
|
||||
|
||||
# see if a custom user profile is specified in options
|
||||
for arg in options.arguments:
|
||||
|
||||
if "lang" in arg:
|
||||
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
|
||||
try:
|
||||
language = m[1]
|
||||
except IndexError:
|
||||
logger.debug("will set the language to en-US,en;q=0.9")
|
||||
language = "en-US,en;q=0.9"
|
||||
|
||||
if "user-data-dir" in arg:
|
||||
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
|
||||
try:
|
||||
user_data_dir = m[1]
|
||||
logger.debug(
|
||||
"user-data-dir found in user argument %s => %s" % (arg, m[1])
|
||||
)
|
||||
keep_user_data_dir = True
|
||||
|
||||
except IndexError:
|
||||
logger.debug(
|
||||
"no user data dir could be extracted from supplied argument %s "
|
||||
% arg
|
||||
)
|
||||
|
||||
if not user_data_dir:
|
||||
|
||||
# backward compatiblity
|
||||
# check if an old uc.ChromeOptions is used, and extract the user data dir
|
||||
|
||||
if hasattr(options, "user_data_dir") and getattr(
|
||||
options, "user_data_dir", None
|
||||
):
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"using ChromeOptions.user_data_dir might stop working in future versions."
|
||||
"use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder"
|
||||
)
|
||||
options.add_argument("--user-data-dir=%s" % options.user_data_dir)
|
||||
keep_user_data_dir = True
|
||||
logger.debug(
|
||||
"user_data_dir property found in options object: %s" % user_data_dir
|
||||
)
|
||||
|
||||
else:
|
||||
user_data_dir = os.path.normpath(tempfile.mkdtemp())
|
||||
keep_user_data_dir = False
|
||||
arg = "--user-data-dir=%s" % user_data_dir
|
||||
options.add_argument(arg)
|
||||
logger.debug(
|
||||
"created a temporary folder in which the user-data (profile) will be stored during this\n"
|
||||
"session, and added it to chrome startup arguments: %s" % arg
|
||||
)
|
||||
|
||||
if not language:
|
||||
try:
|
||||
import locale
|
||||
|
||||
language = locale.getdefaultlocale()[0].replace("_", "-")
|
||||
except Exception:
|
||||
pass
|
||||
if not language:
|
||||
language = "en-US"
|
||||
|
||||
options.add_argument("--lang=%s" % language)
|
||||
|
||||
if not options.binary_location:
|
||||
options.binary_location = (
|
||||
browser_executable_path or find_chrome_executable()
|
||||
)
|
||||
|
||||
self._delay = 3
|
||||
|
||||
self.user_data_dir = user_data_dir
|
||||
self.keep_user_data_dir = keep_user_data_dir
|
||||
|
||||
if suppress_welcome:
|
||||
options.arguments.extend(["--no-default-browser-check", "--no-first-run"])
|
||||
if headless or options.headless:
|
||||
options.headless = True
|
||||
options.add_argument("--window-size=1920,1080")
|
||||
options.add_argument("--start-maximized")
|
||||
options.add_argument("--no-sandbox")
|
||||
# fixes "could not connect to chrome" error when running
|
||||
# on linux using privileged user like root (which i don't recommend)
|
||||
|
||||
options.add_argument(
|
||||
"--log-level=%d" % log_level
|
||||
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
|
||||
)
|
||||
|
||||
if hasattr(options, 'handle_prefs'):
|
||||
options.handle_prefs(user_data_dir)
|
||||
|
||||
# fix exit_type flag to prevent tab-restore nag
|
||||
try:
|
||||
with open(
|
||||
os.path.join(user_data_dir, "Default/Preferences"),
|
||||
encoding="latin1",
|
||||
mode="r+",
|
||||
) as fs:
|
||||
config = json.load(fs)
|
||||
if config["profile"]["exit_type"] is not None:
|
||||
# fixing the restore-tabs-nag
|
||||
config["profile"]["exit_type"] = None
|
||||
fs.seek(0, 0)
|
||||
json.dump(config, fs)
|
||||
logger.debug("fixed exit_type flag")
|
||||
except Exception as e:
|
||||
logger.debug("did not find a bad exit_type flag ")
|
||||
|
||||
self.options = options
|
||||
|
||||
if not desired_capabilities:
|
||||
desired_capabilities = options.to_capabilities()
|
||||
|
||||
if not use_subprocess:
|
||||
self.browser_pid = start_detached(
|
||||
options.binary_location, *options.arguments
|
||||
)
|
||||
else:
|
||||
browser = subprocess.Popen(
|
||||
[options.binary_location, *options.arguments],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
close_fds=IS_POSIX,
|
||||
)
|
||||
self.browser_pid = browser.pid
|
||||
|
||||
super(Chrome, self).__init__(
|
||||
executable_path=patcher.executable_path,
|
||||
port=port,
|
||||
options=options,
|
||||
service_args=service_args,
|
||||
desired_capabilities=desired_capabilities,
|
||||
service_log_path=service_log_path,
|
||||
keep_alive=keep_alive,
|
||||
)
|
||||
|
||||
self.reactor = None
|
||||
|
||||
if enable_cdp_events:
|
||||
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
|
||||
logging.getLogger(
|
||||
"selenium.webdriver.remote.remote_connection"
|
||||
).setLevel(20)
|
||||
reactor = Reactor(self)
|
||||
reactor.start()
|
||||
self.reactor = reactor
|
||||
|
||||
if advanced_elements:
|
||||
from .webelement import WebElement
|
||||
|
||||
self._web_element_cls = WebElement
|
||||
|
||||
if options.headless:
|
||||
self._configure_headless()
|
||||
|
||||
def __getattribute__(self, item):
|
||||
|
||||
if not super().__getattribute__("debug"):
|
||||
return super().__getattribute__(item)
|
||||
else:
|
||||
import inspect
|
||||
|
||||
original = super().__getattribute__(item)
|
||||
if inspect.ismethod(original) and not inspect.isclass(original):
|
||||
|
||||
def newfunc(*args, **kwargs):
|
||||
logger.debug(
|
||||
"calling %s with args %s and kwargs %s\n"
|
||||
% (original.__qualname__, args, kwargs)
|
||||
)
|
||||
return original(*args, **kwargs)
|
||||
|
||||
return newfunc
|
||||
return original
|
||||
|
||||
def _configure_headless(self):
|
||||
|
||||
orig_get = self.get
|
||||
logger.info("setting properties for headless")
|
||||
|
||||
def get_wrapped(*args, **kwargs):
|
||||
if self.execute_script("return navigator.webdriver"):
|
||||
logger.info("patch navigator.webdriver")
|
||||
self.execute_cdp_cmd(
|
||||
"Page.addScriptToEvaluateOnNewDocument",
|
||||
{
|
||||
"source": """
|
||||
|
||||
Object.defineProperty(window, 'navigator', {
|
||||
value: new Proxy(navigator, {
|
||||
has: (target, key) => (key === 'webdriver' ? false : key in target),
|
||||
get: (target, key) =>
|
||||
key === 'webdriver' ?
|
||||
false :
|
||||
typeof target[key] === 'function' ?
|
||||
target[key].bind(target) :
|
||||
target[key]
|
||||
})
|
||||
});
|
||||
|
||||
"""
|
||||
},
|
||||
)
|
||||
|
||||
logger.info("patch user-agent string")
|
||||
self.execute_cdp_cmd(
|
||||
"Network.setUserAgentOverride",
|
||||
{
|
||||
"userAgent": self.execute_script(
|
||||
"return navigator.userAgent"
|
||||
).replace("Headless", "")
|
||||
},
|
||||
)
|
||||
self.execute_cdp_cmd(
|
||||
"Page.addScriptToEvaluateOnNewDocument",
|
||||
{
|
||||
"source": """
|
||||
Object.defineProperty(navigator, 'maxTouchPoints', {
|
||||
get: () => 1
|
||||
})"""
|
||||
},
|
||||
)
|
||||
return orig_get(*args, **kwargs)
|
||||
|
||||
self.get = get_wrapped
|
||||
|
||||
def __dir__(self):
|
||||
return object.__dir__(self)
|
||||
|
||||
def _get_cdc_props(self):
|
||||
return self.execute_script(
|
||||
"""
|
||||
let objectToInspect = window,
|
||||
result = [];
|
||||
while(objectToInspect !== null)
|
||||
{ result = result.concat(Object.getOwnPropertyNames(objectToInspect));
|
||||
objectToInspect = Object.getPrototypeOf(objectToInspect); }
|
||||
return result.filter(i => i.match(/.+_.+_(Array|Promise|Symbol)/ig))
|
||||
"""
|
||||
)
|
||||
|
||||
def _hook_remove_cdc_props(self):
|
||||
self.execute_cdp_cmd(
|
||||
"Page.addScriptToEvaluateOnNewDocument",
|
||||
{
|
||||
"source": """
|
||||
let objectToInspect = window,
|
||||
result = [];
|
||||
while(objectToInspect !== null)
|
||||
{ result = result.concat(Object.getOwnPropertyNames(objectToInspect));
|
||||
objectToInspect = Object.getPrototypeOf(objectToInspect); }
|
||||
result.forEach(p => p.match(/.+_.+_(Array|Promise|Symbol)/ig)
|
||||
&&delete window[p]&&console.log('removed',p))
|
||||
"""
|
||||
},
|
||||
)
|
||||
|
||||
def get(self, url):
|
||||
if self._get_cdc_props():
|
||||
self._hook_remove_cdc_props()
|
||||
return super().get(url)
|
||||
|
||||
def add_cdp_listener(self, event_name, callback):
|
||||
if (
|
||||
self.reactor
|
||||
and self.reactor is not None
|
||||
and isinstance(self.reactor, Reactor)
|
||||
):
|
||||
self.reactor.add_event_handler(event_name, callback)
|
||||
return self.reactor.handlers
|
||||
return False
|
||||
|
||||
def clear_cdp_listeners(self):
|
||||
if self.reactor and isinstance(self.reactor, Reactor):
|
||||
self.reactor.handlers.clear()
|
||||
|
||||
def tab_new(self, url: str):
|
||||
"""
|
||||
this opens a url in a new tab.
|
||||
apparently, that passes all tests directly!
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
||||
"""
|
||||
if not hasattr(self, "cdp"):
|
||||
from .cdp import CDP
|
||||
|
||||
cdp = CDP(self.options)
|
||||
cdp.tab_new(url)
|
||||
|
||||
def reconnect(self, timeout=0.1):
|
||||
try:
|
||||
self.service.stop()
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
time.sleep(timeout)
|
||||
try:
|
||||
self.service.start()
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
try:
|
||||
self.start_session()
|
||||
except Exception as e:
|
||||
logger.debug(e)
|
||||
|
||||
def start_session(self, capabilities=None, browser_profile=None):
|
||||
if not capabilities:
|
||||
capabilities = self.options.to_capabilities()
|
||||
super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session(
|
||||
capabilities, browser_profile
|
||||
)
|
||||
# super(Chrome, self).start_session(capabilities, browser_profile)
|
||||
|
||||
def quit(self):
|
||||
logger.debug("closing webdriver")
|
||||
if hasattr(self, "service") and getattr(self.service, "process", None):
|
||||
self.service.process.kill()
|
||||
try:
|
||||
if self.reactor and isinstance(self.reactor, Reactor):
|
||||
logger.debug("shutting down reactor")
|
||||
self.reactor.event.set()
|
||||
except Exception: # noqa
|
||||
pass
|
||||
try:
|
||||
logger.debug("killing browser")
|
||||
os.kill(self.browser_pid, 15)
|
||||
|
||||
except TimeoutError as e:
|
||||
logger.debug(e, exc_info=True)
|
||||
except Exception: # noqa
|
||||
pass
|
||||
|
||||
if (
|
||||
hasattr(self, "keep_user_data_dir")
|
||||
and hasattr(self, "user_data_dir")
|
||||
and not self.keep_user_data_dir
|
||||
):
|
||||
for _ in range(5):
|
||||
try:
|
||||
|
||||
shutil.rmtree(self.user_data_dir, ignore_errors=False)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except (RuntimeError, OSError, PermissionError) as e:
|
||||
logger.debug(
|
||||
"When removing the temp profile, a %s occured: %s\nretrying..."
|
||||
% (e.__class__.__name__, e)
|
||||
)
|
||||
else:
|
||||
logger.debug("successfully removed %s" % self.user_data_dir)
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
# dereference patcher, so patcher can start cleaning up as well.
|
||||
# this must come last, otherwise it will throw 'in use' errors
|
||||
self.patcher = None
|
||||
|
||||
def __del__(self):
|
||||
try:
|
||||
super().quit()
|
||||
# self.service.process.kill()
|
||||
except: # noqa
|
||||
pass
|
||||
self.quit()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.service.stop()
|
||||
time.sleep(self._delay)
|
||||
self.service.start()
|
||||
self.start_session()
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.options.debugger_address)
|
||||
|
||||
|
||||
def find_chrome_executable():
|
||||
"""
|
||||
Finds the chrome, chrome beta, chrome canary, chromium executable
|
||||
|
||||
Returns
|
||||
-------
|
||||
executable_path : str
|
||||
the full file path to found executable
|
||||
|
||||
"""
|
||||
candidates = set()
|
||||
if IS_POSIX:
|
||||
for item in os.environ.get("PATH").split(os.pathsep):
|
||||
for subitem in (
|
||||
"google-chrome",
|
||||
"chromium",
|
||||
"chromium-browser",
|
||||
"chrome",
|
||||
"google-chrome-stable",
|
||||
):
|
||||
candidates.add(os.sep.join((item, subitem)))
|
||||
if "darwin" in sys.platform:
|
||||
candidates.update(
|
||||
[
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/Applications/Chromium.app/Contents/MacOS/Chromium",
|
||||
]
|
||||
)
|
||||
else:
|
||||
for item in map(
|
||||
os.environ.get, ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA")
|
||||
):
|
||||
for subitem in (
|
||||
"Google/Chrome/Application",
|
||||
"Google/Chrome Beta/Application",
|
||||
"Google/Chrome Canary/Application",
|
||||
):
|
||||
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
|
||||
for candidate in candidates:
|
||||
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
|
||||
return os.path.normpath(candidate)
|
259
src/undetected_chromedriver/_compat.py
Normal file
259
src/undetected_chromedriver/_compat.py
Normal file
@ -0,0 +1,259 @@
|
||||
#!/usr/bin/env python3
|
||||
# this module is part of undetected_chromedriver
|
||||
|
||||
|
||||
"""
|
||||
|
||||
888 888 d8b
|
||||
888 888 Y8P
|
||||
888 888
|
||||
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
|
||||
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
|
||||
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
|
||||
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
|
||||
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
|
||||
|
||||
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
|
||||
|
||||
"""
|
||||
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
import zipfile
|
||||
from distutils.version import LooseVersion
|
||||
from urllib.request import urlopen, urlretrieve
|
||||
|
||||
from selenium.webdriver import Chrome as _Chrome, ChromeOptions as _ChromeOptions
|
||||
|
||||
TARGET_VERSION = 0
|
||||
logger = logging.getLogger("uc")
|
||||
|
||||
|
||||
class Chrome:
|
||||
def __new__(cls, *args, emulate_touch=False, **kwargs):
|
||||
|
||||
if not ChromeDriverManager.installed:
|
||||
ChromeDriverManager(*args, **kwargs).install()
|
||||
if not ChromeDriverManager.selenium_patched:
|
||||
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
|
||||
if not kwargs.get("executable_path"):
|
||||
kwargs["executable_path"] = "./{}".format(
|
||||
ChromeDriverManager(*args, **kwargs).executable_path
|
||||
)
|
||||
if not kwargs.get("options"):
|
||||
kwargs["options"] = ChromeOptions()
|
||||
instance = object.__new__(_Chrome)
|
||||
instance.__init__(*args, **kwargs)
|
||||
|
||||
instance._orig_get = instance.get
|
||||
|
||||
def _get_wrapped(*args, **kwargs):
|
||||
if instance.execute_script("return navigator.webdriver"):
|
||||
instance.execute_cdp_cmd(
|
||||
"Page.addScriptToEvaluateOnNewDocument",
|
||||
{
|
||||
"source": """
|
||||
|
||||
Object.defineProperty(window, 'navigator', {
|
||||
value: new Proxy(navigator, {
|
||||
has: (target, key) => (key === 'webdriver' ? false : key in target),
|
||||
get: (target, key) =>
|
||||
key === 'webdriver'
|
||||
? undefined
|
||||
: typeof target[key] === 'function'
|
||||
? target[key].bind(target)
|
||||
: target[key]
|
||||
})
|
||||
});
|
||||
|
||||
|
||||
"""
|
||||
},
|
||||
)
|
||||
return instance._orig_get(*args, **kwargs)
|
||||
|
||||
instance.get = _get_wrapped
|
||||
instance.get = _get_wrapped
|
||||
instance.get = _get_wrapped
|
||||
|
||||
original_user_agent_string = instance.execute_script(
|
||||
"return navigator.userAgent"
|
||||
)
|
||||
instance.execute_cdp_cmd(
|
||||
"Network.setUserAgentOverride",
|
||||
{
|
||||
"userAgent": original_user_agent_string.replace("Headless", ""),
|
||||
},
|
||||
)
|
||||
if emulate_touch:
|
||||
instance.execute_cdp_cmd(
|
||||
"Page.addScriptToEvaluateOnNewDocument",
|
||||
{
|
||||
"source": """
|
||||
Object.defineProperty(navigator, 'maxTouchPoints', {
|
||||
get: () => 1
|
||||
})"""
|
||||
},
|
||||
)
|
||||
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})")
|
||||
return instance
|
||||
|
||||
|
||||
class ChromeOptions:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if not ChromeDriverManager.installed:
|
||||
ChromeDriverManager(*args, **kwargs).install()
|
||||
if not ChromeDriverManager.selenium_patched:
|
||||
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
|
||||
|
||||
instance = object.__new__(_ChromeOptions)
|
||||
instance.__init__()
|
||||
instance.add_argument("start-maximized")
|
||||
instance.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
instance.add_argument("--disable-blink-features=AutomationControlled")
|
||||
return instance
|
||||
|
||||
|
||||
class ChromeDriverManager(object):
|
||||
installed = False
|
||||
selenium_patched = False
|
||||
target_version = None
|
||||
|
||||
DL_BASE = "https://chromedriver.storage.googleapis.com/"
|
||||
|
||||
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
|
||||
|
||||
_platform = sys.platform
|
||||
|
||||
if TARGET_VERSION:
|
||||
# use global if set
|
||||
self.target_version = TARGET_VERSION
|
||||
|
||||
if target_version:
|
||||
# use explicitly passed target
|
||||
self.target_version = target_version # user override
|
||||
|
||||
if not self.target_version:
|
||||
# none of the above (default) and just get current version
|
||||
self.target_version = self.get_release_version_number().version[
|
||||
0
|
||||
] # only major version int
|
||||
|
||||
self._base = base_ = "chromedriver{}"
|
||||
|
||||
exe_name = self._base
|
||||
if _platform in ("win32",):
|
||||
exe_name = base_.format(".exe")
|
||||
if _platform in ("linux",):
|
||||
_platform += "64"
|
||||
exe_name = exe_name.format("")
|
||||
if _platform in ("darwin",):
|
||||
_platform = "mac64"
|
||||
exe_name = exe_name.format("")
|
||||
self.platform = _platform
|
||||
self.executable_path = executable_path or exe_name
|
||||
self._exe_name = exe_name
|
||||
|
||||
def patch_selenium_webdriver(self_):
|
||||
"""
|
||||
Patches selenium package Chrome, ChromeOptions classes for current session
|
||||
|
||||
:return:
|
||||
"""
|
||||
import selenium.webdriver.chrome.service
|
||||
import selenium.webdriver
|
||||
|
||||
selenium.webdriver.Chrome = Chrome
|
||||
selenium.webdriver.ChromeOptions = ChromeOptions
|
||||
logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
|
||||
self_.__class__.selenium_patched = True
|
||||
|
||||
def install(self, patch_selenium=True):
|
||||
"""
|
||||
Initialize the patch
|
||||
|
||||
This will:
|
||||
download chromedriver if not present
|
||||
patch the downloaded chromedriver
|
||||
patch selenium package if <patch_selenium> is True (default)
|
||||
|
||||
:param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session)
|
||||
:return:
|
||||
"""
|
||||
if not os.path.exists(self.executable_path):
|
||||
self.fetch_chromedriver()
|
||||
if not self.__class__.installed:
|
||||
if self.patch_binary():
|
||||
self.__class__.installed = True
|
||||
|
||||
if patch_selenium:
|
||||
self.patch_selenium_webdriver()
|
||||
|
||||
def get_release_version_number(self):
|
||||
"""
|
||||
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
|
||||
|
||||
:return: version string
|
||||
"""
|
||||
path = (
|
||||
"LATEST_RELEASE"
|
||||
if not self.target_version
|
||||
else f"LATEST_RELEASE_{self.target_version}"
|
||||
)
|
||||
return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode())
|
||||
|
||||
def fetch_chromedriver(self):
|
||||
"""
|
||||
Downloads ChromeDriver from source and unpacks the executable
|
||||
|
||||
:return: on success, name of the unpacked executable
|
||||
"""
|
||||
base_ = self._base
|
||||
zip_name = base_.format(".zip")
|
||||
ver = self.get_release_version_number().vstring
|
||||
if os.path.exists(self.executable_path):
|
||||
return self.executable_path
|
||||
urlretrieve(
|
||||
f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip",
|
||||
filename=zip_name,
|
||||
)
|
||||
with zipfile.ZipFile(zip_name) as zf:
|
||||
zf.extract(self._exe_name)
|
||||
os.remove(zip_name)
|
||||
if sys.platform != "win32":
|
||||
os.chmod(self._exe_name, 0o755)
|
||||
return self._exe_name
|
||||
|
||||
@staticmethod
|
||||
def random_cdc():
|
||||
cdc = random.choices(string.ascii_lowercase, k=26)
|
||||
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
|
||||
cdc[2] = cdc[0]
|
||||
cdc[3] = "_"
|
||||
return "".join(cdc).encode()
|
||||
|
||||
def patch_binary(self):
|
||||
"""
|
||||
Patches the ChromeDriver binary
|
||||
|
||||
:return: False on failure, binary name on success
|
||||
"""
|
||||
linect = 0
|
||||
replacement = self.random_cdc()
|
||||
with io.open(self.executable_path, "r+b") as fh:
|
||||
for line in iter(lambda: fh.readline(), b""):
|
||||
if b"cdc_" in line:
|
||||
fh.seek(-len(line), 1)
|
||||
newline = re.sub(b"cdc_.{22}", replacement, line)
|
||||
fh.write(newline)
|
||||
linect += 1
|
||||
return linect
|
||||
|
||||
|
||||
def install(executable_path=None, target_version=None, *args, **kwargs):
|
||||
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()
|
112
src/undetected_chromedriver/cdp.py
Normal file
112
src/undetected_chromedriver/cdp.py
Normal file
@ -0,0 +1,112 @@
|
||||
#!/usr/bin/env python3
|
||||
# this module is part of undetected_chromedriver
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Mapping, Sequence
|
||||
|
||||
import requests
|
||||
import websockets
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CDPObject(dict):
|
||||
def __init__(self, *a, **k):
|
||||
super().__init__(*a, **k)
|
||||
self.__dict__ = self
|
||||
for k in self.__dict__:
|
||||
if isinstance(self.__dict__[k], dict):
|
||||
self.__dict__[k] = CDPObject(self.__dict__[k])
|
||||
elif isinstance(self.__dict__[k], list):
|
||||
for i in range(len(self.__dict__[k])):
|
||||
if isinstance(self.__dict__[k][i], dict):
|
||||
self.__dict__[k][i] = CDPObject(self)
|
||||
|
||||
def __repr__(self):
|
||||
tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)"
|
||||
return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items()))
|
||||
|
||||
|
||||
class PageElement(CDPObject):
|
||||
pass
|
||||
|
||||
|
||||
class CDP:
|
||||
log = logging.getLogger("CDP")
|
||||
|
||||
endpoints = CDPObject(
|
||||
{
|
||||
"json": "/json",
|
||||
"protocol": "/json/protocol",
|
||||
"list": "/json/list",
|
||||
"new": "/json/new?{url}",
|
||||
"activate": "/json/activate/{id}",
|
||||
"close": "/json/close/{id}",
|
||||
}
|
||||
)
|
||||
|
||||
def __init__(self, options: "ChromeOptions"): # noqa
|
||||
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
|
||||
|
||||
self._reqid = 0
|
||||
self._session = requests.Session()
|
||||
self._last_resp = None
|
||||
self._last_json = None
|
||||
|
||||
resp = self.get(self.endpoints.json) # noqa
|
||||
self.sessionId = resp[0]["id"]
|
||||
self.wsurl = resp[0]["webSocketDebuggerUrl"]
|
||||
|
||||
def tab_activate(self, id=None):
|
||||
if not id:
|
||||
active_tab = self.tab_list()[0]
|
||||
id = active_tab.id # noqa
|
||||
self.wsurl = active_tab.webSocketDebuggerUrl # noqa
|
||||
return self.post(self.endpoints["activate"].format(id=id))
|
||||
|
||||
def tab_list(self):
|
||||
retval = self.get(self.endpoints["list"])
|
||||
return [PageElement(o) for o in retval]
|
||||
|
||||
def tab_new(self, url):
|
||||
return self.post(self.endpoints["new"].format(url=url))
|
||||
|
||||
def tab_close_last_opened(self):
|
||||
sessions = self.tab_list()
|
||||
opentabs = [s for s in sessions if s["type"] == "page"]
|
||||
return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"]))
|
||||
|
||||
async def send(self, method: str, params: dict):
|
||||
self._reqid += 1
|
||||
async with websockets.connect(self.wsurl) as ws:
|
||||
await ws.send(
|
||||
json.dumps({"method": method, "params": params, "id": self._reqid})
|
||||
)
|
||||
self._last_resp = await ws.recv()
|
||||
self._last_json = json.loads(self._last_resp)
|
||||
self.log.info(self._last_json)
|
||||
|
||||
def get(self, uri):
|
||||
resp = self._session.get(self.server_addr + uri)
|
||||
try:
|
||||
self._last_resp = resp
|
||||
self._last_json = resp.json()
|
||||
except Exception:
|
||||
return
|
||||
else:
|
||||
return self._last_json
|
||||
|
||||
def post(self, uri, data: dict = None):
|
||||
if not data:
|
||||
data = {}
|
||||
resp = self._session.post(self.server_addr + uri, json=data)
|
||||
try:
|
||||
self._last_resp = resp
|
||||
self._last_json = resp.json()
|
||||
except Exception:
|
||||
return self._last_resp
|
||||
|
||||
@property
|
||||
def last_json(self):
|
||||
return self._last_json
|
191
src/undetected_chromedriver/devtool.py
Normal file
191
src/undetected_chromedriver/devtool.py
Normal file
@ -0,0 +1,191 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
from collections.abc import Mapping
|
||||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
from typing import Awaitable
|
||||
from typing import Callable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from contextlib import ExitStack
|
||||
import threading
|
||||
from functools import wraps, partial
|
||||
|
||||
|
||||
class Structure(dict):
|
||||
"""
|
||||
This is a dict-like object structure, which you should subclass
|
||||
Only properties defined in the class context are used on initialization.
|
||||
|
||||
See example
|
||||
"""
|
||||
|
||||
_store = {}
|
||||
|
||||
def __init__(self, *a, **kw):
|
||||
"""
|
||||
Instantiate a new instance.
|
||||
|
||||
:param a:
|
||||
:param kw:
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
|
||||
# auxiliar dict
|
||||
d = dict(*a, **kw)
|
||||
for k, v in d.items():
|
||||
if isinstance(v, Mapping):
|
||||
self[k] = self.__class__(v)
|
||||
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
|
||||
self[k] = [self.__class__(i) for i in v]
|
||||
else:
|
||||
self[k] = v
|
||||
super().__setattr__("__dict__", self)
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(super(), item)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return super().__getitem__(item)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
super().__setitem__(key, value)
|
||||
|
||||
def update(self, *a, **kw):
|
||||
super().update(*a, **kw)
|
||||
|
||||
def __eq__(self, other):
|
||||
return frozenset(other.items()) == frozenset(self.items())
|
||||
|
||||
def __hash__(self):
|
||||
return hash(frozenset(self.items()))
|
||||
|
||||
@classmethod
|
||||
def __init_subclass__(cls, **kwargs):
|
||||
cls._store = {}
|
||||
|
||||
def _normalize_strings(self):
|
||||
for k, v in self.copy().items():
|
||||
if isinstance(v, (str)):
|
||||
self[k] = v.strip()
|
||||
|
||||
|
||||
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
|
||||
def wrapper(func):
|
||||
@wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
def function_reached_timeout():
|
||||
if on_timeout:
|
||||
on_timeout(func)
|
||||
else:
|
||||
raise TimeoutError("function call timed out")
|
||||
|
||||
t = threading.Timer(interval=seconds, function=function_reached_timeout)
|
||||
t.start()
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except:
|
||||
t.cancel()
|
||||
raise
|
||||
finally:
|
||||
t.cancel()
|
||||
|
||||
return wrapped
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def test():
|
||||
import sys, os
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
|
||||
import undetected_chromedriver as uc
|
||||
import threading
|
||||
|
||||
def collector(
|
||||
driver: uc.Chrome,
|
||||
stop_event: threading.Event,
|
||||
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
|
||||
listen_events: Sequence = ("browser", "network", "performance"),
|
||||
):
|
||||
def threaded(driver, stop_event, on_event_coro):
|
||||
async def _ensure_service_started():
|
||||
while (
|
||||
getattr(driver, "service", False)
|
||||
and getattr(driver.service, "process", False)
|
||||
and driver.service.process.poll()
|
||||
):
|
||||
print("waiting for driver service to come back on")
|
||||
await asyncio.sleep(0.05)
|
||||
# await asyncio.sleep(driver._delay or .25)
|
||||
|
||||
async def get_log_lines(typ):
|
||||
await _ensure_service_started()
|
||||
return driver.get_log(typ)
|
||||
|
||||
async def looper():
|
||||
while not stop_event.is_set():
|
||||
log_lines = []
|
||||
try:
|
||||
for _ in listen_events:
|
||||
try:
|
||||
log_lines += await get_log_lines(_)
|
||||
except:
|
||||
if logging.getLogger().getEffectiveLevel() <= 10:
|
||||
traceback.print_exc()
|
||||
continue
|
||||
if log_lines and on_event_coro:
|
||||
await on_event_coro(log_lines)
|
||||
except Exception as e:
|
||||
if logging.getLogger().getEffectiveLevel() <= 10:
|
||||
traceback.print_exc()
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(looper())
|
||||
|
||||
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
|
||||
t.start()
|
||||
|
||||
async def on_event(data):
|
||||
print("on_event")
|
||||
print("data:", data)
|
||||
|
||||
def func_called(fn):
|
||||
def wrapped(*args, **kwargs):
|
||||
print(
|
||||
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
|
||||
)
|
||||
while driver.service.process and driver.service.process.poll() is not None:
|
||||
time.sleep(0.1)
|
||||
res = fn(*args, **kwargs)
|
||||
print("func completed! (result: %s)" % res)
|
||||
return res
|
||||
|
||||
return wrapped
|
||||
|
||||
logging.basicConfig(level=10)
|
||||
|
||||
options = uc.ChromeOptions()
|
||||
options.set_capability(
|
||||
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
|
||||
)
|
||||
|
||||
driver = uc.Chrome(version_main=96, options=options)
|
||||
|
||||
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
|
||||
driver.command_executor._request = func_called(driver.command_executor._request)
|
||||
collector_stop = threading.Event()
|
||||
collector(driver, collector_stop, on_event)
|
||||
|
||||
driver.get("https://nowsecure.nl")
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
driver.quit()
|
75
src/undetected_chromedriver/dprocess.py
Normal file
75
src/undetected_chromedriver/dprocess.py
Normal file
@ -0,0 +1,75 @@
|
||||
import multiprocessing
|
||||
import os
|
||||
import platform
|
||||
import sys
|
||||
from subprocess import PIPE
|
||||
from subprocess import Popen
|
||||
import atexit
|
||||
import traceback
|
||||
import logging
|
||||
import signal
|
||||
|
||||
CREATE_NEW_PROCESS_GROUP = 0x00000200
|
||||
DETACHED_PROCESS = 0x00000008
|
||||
|
||||
REGISTERED = []
|
||||
|
||||
|
||||
def start_detached(executable, *args):
|
||||
"""
|
||||
Starts a fully independent subprocess (with no parent)
|
||||
:param executable: executable
|
||||
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
|
||||
:return: pid of the grandchild process
|
||||
"""
|
||||
|
||||
# create pipe
|
||||
reader, writer = multiprocessing.Pipe(False)
|
||||
|
||||
# do not keep reference
|
||||
multiprocessing.Process(
|
||||
target=_start_detached,
|
||||
args=(executable, *args),
|
||||
kwargs={"writer": writer},
|
||||
daemon=True,
|
||||
).start()
|
||||
# receive pid from pipe
|
||||
pid = reader.recv()
|
||||
REGISTERED.append(pid)
|
||||
# close pipes
|
||||
writer.close()
|
||||
reader.close()
|
||||
|
||||
return pid
|
||||
|
||||
|
||||
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
|
||||
|
||||
# configure launch
|
||||
kwargs = {}
|
||||
if platform.system() == "Windows":
|
||||
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
|
||||
elif sys.version_info < (3, 2):
|
||||
# assume posix
|
||||
kwargs.update(preexec_fn=os.setsid)
|
||||
else: # Python 3.2+ and Unix
|
||||
kwargs.update(start_new_session=True)
|
||||
|
||||
# run
|
||||
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
|
||||
|
||||
# send pid to pipe
|
||||
writer.send(p.pid)
|
||||
sys.exit()
|
||||
|
||||
|
||||
def _cleanup():
|
||||
for pid in REGISTERED:
|
||||
try:
|
||||
logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except: # noqa
|
||||
pass
|
||||
|
||||
|
||||
atexit.register(_cleanup)
|
70
src/undetected_chromedriver/options.py
Normal file
70
src/undetected_chromedriver/options.py
Normal file
@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
# this module is part of undetected_chromedriver
|
||||
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
|
||||
|
||||
|
||||
class ChromeOptions(_ChromiumOptions):
|
||||
_session = None
|
||||
_user_data_dir = None
|
||||
|
||||
@property
|
||||
def user_data_dir(self):
|
||||
return self._user_data_dir
|
||||
|
||||
@user_data_dir.setter
|
||||
def user_data_dir(self, path: str):
|
||||
"""
|
||||
Sets the browser profile folder to use, or creates a new profile
|
||||
at given <path>.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path: str
|
||||
the path to a chrome profile folder
|
||||
if it does not exist, a new profile will be created at given location
|
||||
"""
|
||||
apath = os.path.abspath(path)
|
||||
self._user_data_dir = os.path.normpath(apath)
|
||||
|
||||
@staticmethod
|
||||
def _undot_key(key, value):
|
||||
"""turn a (dotted key, value) into a proper nested dict"""
|
||||
if "." in key:
|
||||
key, rest = key.split(".", 1)
|
||||
value = ChromeOptions._undot_key(rest, value)
|
||||
return {key: value}
|
||||
|
||||
def handle_prefs(self, user_data_dir):
|
||||
prefs = self.experimental_options.get("prefs")
|
||||
if prefs:
|
||||
|
||||
user_data_dir = user_data_dir or self._user_data_dir
|
||||
default_path = os.path.join(user_data_dir, "Default")
|
||||
os.makedirs(default_path, exist_ok=True)
|
||||
|
||||
# undot prefs dict keys
|
||||
undot_prefs = {}
|
||||
for key, value in prefs.items():
|
||||
undot_prefs.update(self._undot_key(key, value))
|
||||
|
||||
prefs_file = os.path.join(default_path, "Preferences")
|
||||
if os.path.exists(prefs_file):
|
||||
with open(prefs_file, encoding="latin1", mode="r") as f:
|
||||
undot_prefs.update(json.load(f))
|
||||
|
||||
with open(prefs_file, encoding="latin1", mode="w") as f:
|
||||
json.dump(undot_prefs, f)
|
||||
|
||||
# remove the experimental_options to avoid an error
|
||||
del self._experimental_options["prefs"]
|
||||
|
||||
@classmethod
|
||||
def from_options(cls, options):
|
||||
o = cls()
|
||||
o.__dict__.update(options.__dict__)
|
||||
return o
|
276
src/undetected_chromedriver/patcher.py
Normal file
276
src/undetected_chromedriver/patcher.py
Normal file
@ -0,0 +1,276 @@
|
||||
#!/usr/bin/env python3
|
||||
# this module is part of undetected_chromedriver
|
||||
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
import zipfile
|
||||
from distutils.version import LooseVersion
|
||||
from urllib.request import urlopen, urlretrieve
|
||||
import secrets
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux"))
|
||||
|
||||
|
||||
class Patcher(object):
|
||||
url_repo = "https://chromedriver.storage.googleapis.com"
|
||||
zip_name = "chromedriver_%s.zip"
|
||||
exe_name = "chromedriver%s"
|
||||
|
||||
platform = sys.platform
|
||||
if platform.endswith("win32"):
|
||||
zip_name %= "win32"
|
||||
exe_name %= ".exe"
|
||||
if platform.endswith("linux"):
|
||||
zip_name %= "linux64"
|
||||
exe_name %= ""
|
||||
if platform.endswith("darwin"):
|
||||
zip_name %= "mac64"
|
||||
exe_name %= ""
|
||||
|
||||
if platform.endswith("win32"):
|
||||
d = "~/appdata/roaming/undetected_chromedriver"
|
||||
elif platform.startswith("linux"):
|
||||
d = "~/.local/share/undetected_chromedriver"
|
||||
elif platform.endswith("darwin"):
|
||||
d = "~/Library/Application Support/undetected_chromedriver"
|
||||
else:
|
||||
d = "~/.undetected_chromedriver"
|
||||
data_path = os.path.abspath(os.path.expanduser(d))
|
||||
|
||||
def __init__(self, executable_path=None, force=False, version_main: int = 0):
|
||||
"""
|
||||
|
||||
Args:
|
||||
executable_path: None = automatic
|
||||
a full file path to the chromedriver executable
|
||||
force: False
|
||||
terminate processes which are holding lock
|
||||
version_main: 0 = auto
|
||||
specify main chrome version (rounded, ex: 82)
|
||||
"""
|
||||
|
||||
self.force = force
|
||||
self.executable_path = None
|
||||
prefix = secrets.token_hex(8)
|
||||
|
||||
if not os.path.exists(self.data_path):
|
||||
os.makedirs(self.data_path, exist_ok=True)
|
||||
|
||||
if not executable_path:
|
||||
self.executable_path = os.path.join(
|
||||
self.data_path, "_".join([prefix, self.exe_name])
|
||||
)
|
||||
|
||||
if not IS_POSIX:
|
||||
if executable_path:
|
||||
if not executable_path[-4:] == ".exe":
|
||||
executable_path += ".exe"
|
||||
|
||||
self.zip_path = os.path.join(self.data_path, prefix)
|
||||
|
||||
if not executable_path:
|
||||
self.executable_path = os.path.abspath(
|
||||
os.path.join(".", self.executable_path)
|
||||
)
|
||||
|
||||
self._custom_exe_path = False
|
||||
|
||||
if executable_path:
|
||||
self._custom_exe_path = True
|
||||
self.executable_path = executable_path
|
||||
self.version_main = version_main
|
||||
self.version_full = None
|
||||
|
||||
def auto(self, executable_path=None, force=False, version_main=None):
|
||||
""""""
|
||||
if executable_path:
|
||||
self.executable_path = executable_path
|
||||
self._custom_exe_path = True
|
||||
|
||||
if self._custom_exe_path:
|
||||
ispatched = self.is_binary_patched(self.executable_path)
|
||||
if not ispatched:
|
||||
return self.patch_exe()
|
||||
else:
|
||||
return
|
||||
|
||||
if version_main:
|
||||
self.version_main = version_main
|
||||
if force is True:
|
||||
self.force = force
|
||||
|
||||
try:
|
||||
os.unlink(self.executable_path)
|
||||
except PermissionError:
|
||||
if self.force:
|
||||
self.force_kill_instances(self.executable_path)
|
||||
return self.auto(force=not self.force)
|
||||
try:
|
||||
if self.is_binary_patched():
|
||||
# assumes already running AND patched
|
||||
return True
|
||||
except PermissionError:
|
||||
pass
|
||||
# return False
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
release = self.fetch_release_number()
|
||||
self.version_main = release.version[0]
|
||||
self.version_full = release
|
||||
self.unzip_package(self.fetch_package())
|
||||
return self.patch()
|
||||
|
||||
def patch(self):
|
||||
self.patch_exe()
|
||||
return self.is_binary_patched()
|
||||
|
||||
def fetch_release_number(self):
|
||||
"""
|
||||
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
|
||||
:return: version string
|
||||
:rtype: LooseVersion
|
||||
"""
|
||||
path = "/latest_release"
|
||||
if self.version_main:
|
||||
path += f"_{self.version_main}"
|
||||
path = path.upper()
|
||||
logger.debug("getting release number from %s" % path)
|
||||
return LooseVersion(urlopen(self.url_repo + path).read().decode())
|
||||
|
||||
def parse_exe_version(self):
|
||||
with io.open(self.executable_path, "rb") as f:
|
||||
for line in iter(lambda: f.readline(), b""):
|
||||
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
|
||||
if match:
|
||||
return LooseVersion(match[1].decode())
|
||||
|
||||
def fetch_package(self):
|
||||
"""
|
||||
Downloads ChromeDriver from source
|
||||
|
||||
:return: path to downloaded file
|
||||
"""
|
||||
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
|
||||
logger.debug("downloading from %s" % u)
|
||||
# return urlretrieve(u, filename=self.data_path)[0]
|
||||
return urlretrieve(u)[0]
|
||||
|
||||
def unzip_package(self, fp):
|
||||
"""
|
||||
Does what it says
|
||||
|
||||
:return: path to unpacked executable
|
||||
"""
|
||||
logger.debug("unzipping %s" % fp)
|
||||
try:
|
||||
os.unlink(self.zip_path)
|
||||
except (FileNotFoundError, OSError):
|
||||
pass
|
||||
|
||||
os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
|
||||
with zipfile.ZipFile(fp, mode="r") as zf:
|
||||
zf.extract(self.exe_name, self.zip_path)
|
||||
os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path)
|
||||
os.remove(fp)
|
||||
os.rmdir(self.zip_path)
|
||||
os.chmod(self.executable_path, 0o755)
|
||||
return self.executable_path
|
||||
|
||||
@staticmethod
|
||||
def force_kill_instances(exe_name):
|
||||
"""
|
||||
kills running instances.
|
||||
:param: executable name to kill, may be a path as well
|
||||
|
||||
:return: True on success else False
|
||||
"""
|
||||
exe_name = os.path.basename(exe_name)
|
||||
if IS_POSIX:
|
||||
r = os.system("kill -f -9 $(pidof %s)" % exe_name)
|
||||
else:
|
||||
r = os.system("taskkill /f /im %s" % exe_name)
|
||||
return not r
|
||||
|
||||
@staticmethod
|
||||
def gen_random_cdc():
|
||||
cdc = random.choices(string.ascii_lowercase, k=26)
|
||||
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
|
||||
cdc[2] = cdc[0]
|
||||
cdc[3] = "_"
|
||||
return "".join(cdc).encode()
|
||||
|
||||
def is_binary_patched(self, executable_path=None):
|
||||
"""simple check if executable is patched.
|
||||
|
||||
:return: False if not patched, else True
|
||||
"""
|
||||
executable_path = executable_path or self.executable_path
|
||||
with io.open(executable_path, "rb") as fh:
|
||||
for line in iter(lambda: fh.readline(), b""):
|
||||
if b"cdc_" in line:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def patch_exe(self):
|
||||
"""
|
||||
Patches the ChromeDriver binary
|
||||
|
||||
:return: False on failure, binary name on success
|
||||
"""
|
||||
logger.info("patching driver executable %s" % self.executable_path)
|
||||
|
||||
linect = 0
|
||||
replacement = self.gen_random_cdc()
|
||||
with io.open(self.executable_path, "r+b") as fh:
|
||||
for line in iter(lambda: fh.readline(), b""):
|
||||
if b"cdc_" in line:
|
||||
fh.seek(-len(line), 1)
|
||||
newline = re.sub(b"cdc_.{22}", replacement, line)
|
||||
fh.write(newline)
|
||||
linect += 1
|
||||
return linect
|
||||
|
||||
def __repr__(self):
|
||||
return "{0:s}({1:s})".format(
|
||||
self.__class__.__name__,
|
||||
self.executable_path,
|
||||
)
|
||||
|
||||
def __del__(self):
|
||||
|
||||
if self._custom_exe_path:
|
||||
# if the driver binary is specified by user
|
||||
# we assume it is important enough to not delete it
|
||||
return
|
||||
else:
|
||||
timeout = 3 # stop trying after this many seconds
|
||||
t = time.monotonic()
|
||||
while True:
|
||||
now = time.monotonic()
|
||||
if now - t > timeout:
|
||||
# we don't want to wait until the end of time
|
||||
logger.debug(
|
||||
"could not unlink %s in time (%d seconds)"
|
||||
% (self.executable_path, timeout)
|
||||
)
|
||||
break
|
||||
try:
|
||||
os.unlink(self.executable_path)
|
||||
logger.debug("successfully unlinked %s" % self.executable_path)
|
||||
break
|
||||
except (OSError, RuntimeError, PermissionError):
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
break
|
102
src/undetected_chromedriver/reactor.py
Normal file
102
src/undetected_chromedriver/reactor.py
Normal file
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
# this module is part of undetected_chromedriver
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Reactor(threading.Thread):
|
||||
def __init__(self, driver: "Chrome"):
|
||||
super().__init__()
|
||||
|
||||
self.driver = driver
|
||||
self.loop = asyncio.new_event_loop()
|
||||
|
||||
self.lock = threading.Lock()
|
||||
self.event = threading.Event()
|
||||
self.daemon = True
|
||||
self.handlers = {}
|
||||
|
||||
def add_event_handler(self, method_name, callback: callable):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
----------
|
||||
event_name: str
|
||||
example "Network.responseReceived"
|
||||
|
||||
callback: callable
|
||||
callable which accepts 1 parameter: the message object dictionary
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
||||
"""
|
||||
with self.lock:
|
||||
self.handlers[method_name.lower()] = callback
|
||||
|
||||
@property
|
||||
def running(self):
|
||||
return not self.event.is_set()
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
asyncio.set_event_loop(self.loop)
|
||||
self.loop.run_until_complete(self.listen())
|
||||
except Exception as e:
|
||||
logger.warning("Reactor.run() => %s", e)
|
||||
|
||||
async def _wait_service_started(self):
|
||||
while True:
|
||||
with self.lock:
|
||||
if (
|
||||
getattr(self.driver, "service", None)
|
||||
and getattr(self.driver.service, "process", None)
|
||||
and self.driver.service.process.poll()
|
||||
):
|
||||
await asyncio.sleep(self.driver._delay or 0.25)
|
||||
else:
|
||||
break
|
||||
|
||||
async def listen(self):
|
||||
|
||||
while self.running:
|
||||
|
||||
await self._wait_service_started()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
try:
|
||||
with self.lock:
|
||||
log_entries = self.driver.get_log("performance")
|
||||
|
||||
for entry in log_entries:
|
||||
|
||||
try:
|
||||
|
||||
obj_serialized: str = entry.get("message")
|
||||
obj = json.loads(obj_serialized)
|
||||
message = obj.get("message")
|
||||
method = message.get("method")
|
||||
|
||||
if "*" in self.handlers:
|
||||
await self.loop.run_in_executor(
|
||||
None, self.handlers["*"], message
|
||||
)
|
||||
elif method.lower() in self.handlers:
|
||||
await self.loop.run_in_executor(
|
||||
None, self.handlers[method.lower()], message
|
||||
)
|
||||
|
||||
# print(type(message), message)
|
||||
except Exception as e:
|
||||
raise e from None
|
||||
|
||||
except Exception as e:
|
||||
if "invalid session id" in str(e):
|
||||
pass
|
||||
else:
|
||||
logging.debug("exception ignored :", e)
|
4
src/undetected_chromedriver/v2.py
Normal file
4
src/undetected_chromedriver/v2.py
Normal file
@ -0,0 +1,4 @@
|
||||
# for backward compatibility
|
||||
import sys
|
||||
|
||||
sys.modules[__name__] = sys.modules[__package__]
|
37
src/undetected_chromedriver/webelement.py
Normal file
37
src/undetected_chromedriver/webelement.py
Normal file
@ -0,0 +1,37 @@
|
||||
import selenium.webdriver.remote.webelement
|
||||
|
||||
|
||||
class WebElement(selenium.webdriver.remote.webelement.WebElement):
|
||||
"""
|
||||
Custom WebElement class which makes it easier to view elements when
|
||||
working in an interactive environment.
|
||||
|
||||
standard webelement repr:
|
||||
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
|
||||
|
||||
using this WebElement class:
|
||||
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
|
||||
|
||||
"""
|
||||
|
||||
@property
|
||||
def attrs(self):
|
||||
if not hasattr(self, "_attrs"):
|
||||
self._attrs = self._parent.execute_script(
|
||||
"""
|
||||
var items = {};
|
||||
for (index = 0; index < arguments[0].attributes.length; ++index)
|
||||
{
|
||||
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
|
||||
};
|
||||
return items;
|
||||
""",
|
||||
self,
|
||||
)
|
||||
return self._attrs
|
||||
|
||||
def __repr__(self):
|
||||
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
|
||||
if strattrs:
|
||||
strattrs = " " + strattrs
|
||||
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
|
Loading…
x
Reference in New Issue
Block a user