Update undetected_chromedriver version to 3.4.6 (#715)

Co-authored-by: ilike2burnthing <59480337+ilike2burnthing@users.noreply.github.com>
This commit is contained in:
Artemiy Ryabinkov 2023-03-06 13:57:38 +00:00 committed by GitHub
parent 3a6e8e0f92
commit 96fcd21174
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 206 additions and 469 deletions

View File

@ -17,7 +17,7 @@ by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
from __future__ import annotations
__version__ = "3.2.1"
__version__ = "3.4.6"
import json
import logging
@ -122,7 +122,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
suppress_welcome=True,
use_subprocess=False,
debug=False,
no_sandbox=True,
no_sandbox=True,
windows_headless=False,
**kw,
):
@ -239,13 +239,13 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
finalize(self, self._ensure_close, self)
self.debug = debug
patcher = Patcher(
self.patcher = Patcher(
executable_path=driver_executable_path,
force=patcher_force_close,
version_main=version_main,
)
patcher.auto()
self.patcher = patcher
self.patcher.auto()
# self.patcher = patcher
if not options:
options = ChromeOptions()
@ -287,6 +287,11 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
# see if a custom user profile is specified in options
for arg in options.arguments:
if any([_ in arg for _ in ("--headless", "headless")]):
options.arguments.remove(arg)
options.headless = True
if "lang" in arg:
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try:
@ -365,13 +370,18 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
options.arguments.extend(["--no-default-browser-check", "--no-first-run"])
if no_sandbox:
options.arguments.extend(["--no-sandbox", "--test-type"])
if headless or options.headless:
options.headless = True
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
if self.patcher.version_main < 108:
options.add_argument("--headless=chrome")
elif self.patcher.version_main >= 108:
options.add_argument("--headless=new")
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
options.add_argument(
"--log-level=%d" % log_level
@ -408,23 +418,23 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
self.browser_pid = start_detached(
options.binary_location, *options.arguments
)
else:
startupinfo = subprocess.STARTUPINFO()
if os.name == 'nt' and windows_headless:
else:
startupinfo = subprocess.STARTUPINFO()
if os.name == 'nt' and windows_headless:
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
browser = subprocess.Popen(
[options.binary_location, *options.arguments],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=IS_POSIX,
close_fds=IS_POSIX,
startupinfo=startupinfo
)
self.browser_pid = browser.pid
if service_creationflags:
service = selenium.webdriver.common.service.Service(
patcher.executable_path, port, service_args, service_log_path
self.patcher.executable_path, port, service_args, service_log_path
)
for attr_name in ("creationflags", "creation_flags"):
if hasattr(service, attr_name):
@ -434,7 +444,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
service = None
super(Chrome, self).__init__(
executable_path=patcher.executable_path,
executable_path=self.patcher.executable_path,
port=port,
options=options,
service_args=service_args,
@ -475,18 +485,18 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver' ?
false :
typeof target[key] === 'function' ?
target[key].bind(target) :
target[key]
})
});
Object.defineProperty(window, "navigator", {
Object.defineProperty(window, "navigator", {
value: new Proxy(navigator, {
has: (target, key) => (key === "webdriver" ? false : key in target),
get: (target, key) =>
key === "webdriver"
? false
: typeof target[key] === "function"
? target[key].bind(target)
: target[key],
}),
});
"""
},
)
@ -605,37 +615,38 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
self.get = get_wrapped
def _get_cdc_props(self):
return self.execute_script(
"""
let objectToInspect = window,
result = [];
while(objectToInspect !== null)
{ result = result.concat(Object.getOwnPropertyNames(objectToInspect));
objectToInspect = Object.getPrototypeOf(objectToInspect); }
return result.filter(i => i.match(/.+_.+_(Array|Promise|Symbol)/ig))
"""
)
def _hook_remove_cdc_props(self):
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
let objectToInspect = window,
result = [];
while(objectToInspect !== null)
{ result = result.concat(Object.getOwnPropertyNames(objectToInspect));
objectToInspect = Object.getPrototypeOf(objectToInspect); }
result.forEach(p => p.match(/.+_.+_(Array|Promise|Symbol)/ig)
&&delete window[p]&&console.log('removed',p))
"""
},
)
# def _get_cdc_props(self):
# return self.execute_script(
# """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
#
# return result.filter(i => i.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig))
# """
# )
#
# def _hook_remove_cdc_props(self):
# self.execute_cdp_cmd(
# "Page.addScriptToEvaluateOnNewDocument",
# {
# "source": """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
# result.forEach(p => p.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig)
# &&delete window[p]&&console.log('removed',p))
# """
# },
# )
def get(self, url):
if self._get_cdc_props():
self._hook_remove_cdc_props()
# if self._get_cdc_props():
# self._hook_remove_cdc_props()
return super().get(url)
def add_cdp_listener(self, event_name, callback):
@ -702,7 +713,7 @@ class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
def quit(self):
try:
self.service.process.kill()
self.service.process.kill()
self.service.process.wait(5)
logger.debug("webdriver process ended")
except (AttributeError, RuntimeError, OSError):

View File

@ -1,262 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
from distutils.version import LooseVersion
import io
import logging
import os
import random
import re
import string
import sys
from urllib.request import urlopen
from urllib.request import urlretrieve
import zipfile
from selenium.webdriver import Chrome as _Chrome
from selenium.webdriver import ChromeOptions as _ChromeOptions
TARGET_VERSION = 0
logger = logging.getLogger("uc")
class Chrome:
def __new__(cls, *args, emulate_touch=False, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
if not kwargs.get("executable_path"):
kwargs["executable_path"] = "./{}".format(
ChromeDriverManager(*args, **kwargs).executable_path
)
if not kwargs.get("options"):
kwargs["options"] = ChromeOptions()
instance = object.__new__(_Chrome)
instance.__init__(*args, **kwargs)
instance._orig_get = instance.get
def _get_wrapped(*args, **kwargs):
if instance.execute_script("return navigator.webdriver"):
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver'
? undefined
: typeof target[key] === 'function'
? target[key].bind(target)
: target[key]
})
});
"""
},
)
return instance._orig_get(*args, **kwargs)
instance.get = _get_wrapped
instance.get = _get_wrapped
instance.get = _get_wrapped
original_user_agent_string = instance.execute_script(
"return navigator.userAgent"
)
instance.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": original_user_agent_string.replace("Headless", ""),
},
)
if emulate_touch:
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})")
return instance
class ChromeOptions:
def __new__(cls, *args, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
instance = object.__new__(_ChromeOptions)
instance.__init__()
instance.add_argument("start-maximized")
instance.add_experimental_option("excludeSwitches", ["enable-automation"])
instance.add_argument("--disable-blink-features=AutomationControlled")
return instance
class ChromeDriverManager(object):
installed = False
selenium_patched = False
target_version = None
DL_BASE = "https://chromedriver.storage.googleapis.com/"
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
_platform = sys.platform
if TARGET_VERSION:
# use global if set
self.target_version = TARGET_VERSION
if target_version:
# use explicitly passed target
self.target_version = target_version # user override
if not self.target_version:
# none of the above (default) and just get current version
self.target_version = self.get_release_version_number().version[
0
] # only major version int
self._base = base_ = "chromedriver{}"
exe_name = self._base
if _platform in ("win32",):
exe_name = base_.format(".exe")
if _platform in ("linux",):
_platform += "64"
exe_name = exe_name.format("")
if _platform in ("darwin",):
_platform = "mac64"
exe_name = exe_name.format("")
self.platform = _platform
self.executable_path = executable_path or exe_name
self._exe_name = exe_name
def patch_selenium_webdriver(self_):
"""
Patches selenium package Chrome, ChromeOptions classes for current session
:return:
"""
import selenium.webdriver.chrome.service
import selenium.webdriver
selenium.webdriver.Chrome = Chrome
selenium.webdriver.ChromeOptions = ChromeOptions
logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
self_.__class__.selenium_patched = True
def install(self, patch_selenium=True):
"""
Initialize the patch
This will:
download chromedriver if not present
patch the downloaded chromedriver
patch selenium package if <patch_selenium> is True (default)
:param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session)
:return:
"""
if not os.path.exists(self.executable_path):
self.fetch_chromedriver()
if not self.__class__.installed:
if self.patch_binary():
self.__class__.installed = True
if patch_selenium:
self.patch_selenium_webdriver()
def get_release_version_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
"""
path = (
"LATEST_RELEASE"
if not self.target_version
else f"LATEST_RELEASE_{self.target_version}"
)
return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode())
def fetch_chromedriver(self):
"""
Downloads ChromeDriver from source and unpacks the executable
:return: on success, name of the unpacked executable
"""
base_ = self._base
zip_name = base_.format(".zip")
ver = self.get_release_version_number().vstring
if os.path.exists(self.executable_path):
return self.executable_path
urlretrieve(
f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip",
filename=zip_name,
)
with zipfile.ZipFile(zip_name) as zf:
zf.extract(self._exe_name)
os.remove(zip_name)
if sys.platform != "win32":
os.chmod(self._exe_name, 0o755)
return self._exe_name
@staticmethod
def random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
cdc[3] = "_"
return "".join(cdc).encode()
def patch_binary(self):
"""
Patches the ChromeDriver binary
:return: False on failure, binary name on success
"""
linect = 0
replacement = self.random_cdc()
with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
fh.seek(-len(line), 1)
newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
return linect
def install(executable_path=None, target_version=None, *args, **kwargs):
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()

View File

@ -46,7 +46,6 @@ def start_detached(executable, *args):
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch
kwargs = {}
if platform.system() == "Windows":

View File

@ -56,7 +56,6 @@ class ChromeOptions(_ChromiumOptions):
def handle_prefs(self, user_data_dir):
prefs = self.experimental_options.get("prefs")
if prefs:
user_data_dir = user_data_dir or self._user_data_dir
default_path = os.path.join(user_data_dir, "Default")
os.makedirs(default_path, exist_ok=True)

View File

@ -7,7 +7,6 @@ import logging
import os
import random
import re
import secrets
import string
import sys
import time
@ -41,7 +40,7 @@ class Patcher(object):
d = "~/appdata/roaming/undetected_chromedriver"
elif "LAMBDA_TASK_ROOT" in os.environ:
d = "/tmp/undetected_chromedriver"
elif platform.startswith(("linux","linux2")):
elif platform.startswith(("linux", "linux2")):
d = "~/.local/share/undetected_chromedriver"
elif platform.endswith("darwin"):
d = "~/Library/Application Support/undetected_chromedriver"
@ -51,7 +50,6 @@ class Patcher(object):
def __init__(self, executable_path=None, force=False, version_main: int = 0):
"""
Args:
executable_path: None = automatic
a full file path to the chromedriver executable
@ -60,10 +58,9 @@ class Patcher(object):
version_main: 0 = auto
specify main chrome version (rounded, ex: 82)
"""
self.force = force
self.executable_path = None
prefix = secrets.token_hex(8)
self._custom_exe_path = False
prefix = "undetected"
if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True)
@ -85,8 +82,6 @@ class Patcher(object):
os.path.join(".", self.executable_path)
)
self._custom_exe_path = False
if executable_path:
self._custom_exe_path = True
self.executable_path = executable_path
@ -94,7 +89,6 @@ class Patcher(object):
self.version_full = None
def auto(self, executable_path=None, force=False, version_main=None):
""""""
if executable_path:
self.executable_path = executable_path
self._custom_exe_path = True
@ -206,43 +200,46 @@ class Patcher(object):
@staticmethod
def gen_random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
cdc[3] = "_"
cdc = random.choices(string.ascii_letters, k=27)
return "".join(cdc).encode()
def is_binary_patched(self, executable_path=None):
"""simple check if executable is patched.
:return: False if not patched, else True
"""
executable_path = executable_path or self.executable_path
with io.open(executable_path, "rb") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
return False
else:
return True
try:
with io.open(executable_path, "rb") as fh:
return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError:
return False
def patch_exe(self):
"""
Patches the ChromeDriver binary
:return: False on failure, binary name on success
"""
start = time.perf_counter()
logger.info("patching driver executable %s" % self.executable_path)
linect = 0
replacement = self.gen_random_cdc()
with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
fh.seek(-len(line), 1)
newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
return linect
content = fh.read()
# match_injected_codeblock = re.search(rb"{window.*;}", content)
match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content)
if match_injected_codeblock:
target_bytes = match_injected_codeblock[0]
new_target_bytes = (
b'{console.log("undetected chromedriver 1337!")}'.ljust(
len(target_bytes), b" "
)
)
new_content = content.replace(target_bytes, new_target_bytes)
if new_content == content:
logger.warning(
"something went wrong patching the driver binary. could not find injection code block"
)
else:
logger.debug(
"found block:\n%s\nreplacing with:\n%s"
% (target_bytes, new_target_bytes)
)
fh.seek(0)
fh.write(new_content)
logger.debug(
"patching took us {:.2f} seconds".format(time.perf_counter() - start)
)
def __repr__(self):
return "{0:s}({1:s})".format(
@ -251,7 +248,6 @@ class Patcher(object):
)
def __del__(self):
if self._custom_exe_path:
# if the driver binary is specified by user
# we assume it is important enough to not delete it

View File

@ -1,102 +1,99 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import asyncio
import json
import logging
import threading
logger = logging.getLogger(__name__)
class Reactor(threading.Thread):
def __init__(self, driver: "Chrome"):
super().__init__()
self.driver = driver
self.loop = asyncio.new_event_loop()
self.lock = threading.Lock()
self.event = threading.Event()
self.daemon = True
self.handlers = {}
def add_event_handler(self, method_name, callback: callable):
"""
Parameters
----------
event_name: str
example "Network.responseReceived"
callback: callable
callable which accepts 1 parameter: the message object dictionary
Returns
-------
"""
with self.lock:
self.handlers[method_name.lower()] = callback
@property
def running(self):
return not self.event.is_set()
def run(self):
try:
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.listen())
except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True:
with self.lock:
if (
getattr(self.driver, "service", None)
and getattr(self.driver.service, "process", None)
and self.driver.service.process.poll()
):
await asyncio.sleep(self.driver._delay or 0.25)
else:
break
async def listen(self):
while self.running:
await self._wait_service_started()
await asyncio.sleep(1)
try:
with self.lock:
log_entries = self.driver.get_log("performance")
for entry in log_entries:
try:
obj_serialized: str = entry.get("message")
obj = json.loads(obj_serialized)
message = obj.get("message")
method = message.get("method")
if "*" in self.handlers:
await self.loop.run_in_executor(
None, self.handlers["*"], message
)
elif method.lower() in self.handlers:
await self.loop.run_in_executor(
None, self.handlers[method.lower()], message
)
# print(type(message), message)
except Exception as e:
raise e from None
except Exception as e:
if "invalid session id" in str(e):
pass
else:
logging.debug("exception ignored :", e)
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import asyncio
import json
import logging
import threading
logger = logging.getLogger(__name__)
class Reactor(threading.Thread):
def __init__(self, driver: "Chrome"):
super().__init__()
self.driver = driver
self.loop = asyncio.new_event_loop()
self.lock = threading.Lock()
self.event = threading.Event()
self.daemon = True
self.handlers = {}
def add_event_handler(self, method_name, callback: callable):
"""
Parameters
----------
event_name: str
example "Network.responseReceived"
callback: callable
callable which accepts 1 parameter: the message object dictionary
Returns
-------
"""
with self.lock:
self.handlers[method_name.lower()] = callback
@property
def running(self):
return not self.event.is_set()
def run(self):
try:
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.listen())
except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True:
with self.lock:
if (
getattr(self.driver, "service", None)
and getattr(self.driver.service, "process", None)
and self.driver.service.process.poll()
):
await asyncio.sleep(self.driver._delay or 0.25)
else:
break
async def listen(self):
while self.running:
await self._wait_service_started()
await asyncio.sleep(1)
try:
with self.lock:
log_entries = self.driver.get_log("performance")
for entry in log_entries:
try:
obj_serialized: str = entry.get("message")
obj = json.loads(obj_serialized)
message = obj.get("message")
method = message.get("method")
if "*" in self.handlers:
await self.loop.run_in_executor(
None, self.handlers["*"], message
)
elif method.lower() in self.handlers:
await self.loop.run_in_executor(
None, self.handlers[method.lower()], message
)
# print(type(message), message)
except Exception as e:
raise e from None
except Exception as e:
if "invalid session id" in str(e):
pass
else:
logging.debug("exception ignored :", e)

View File

@ -1,4 +0,0 @@
# for backward compatibility
import sys
sys.modules[__name__] = sys.modules[__package__]

View File

@ -1,6 +1,7 @@
from typing import List
from selenium.webdriver.common.by import By
import selenium.webdriver.remote.webelement
from typing import List
class WebElement(selenium.webdriver.remote.webelement.WebElement):