Merge bcd5945272ccfb097e0044f35b48a97a91afd87e into 54668a11e7b8db125101eb5760d9cb7e0c799ffc

This commit is contained in:
Alexander 2024-09-28 09:08:52 +02:00 committed by GitHub
commit fe02b42d8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 227 additions and 2246 deletions

View File

@ -1,4 +1,4 @@
FROM python:3.11-slim-bullseye as builder FROM debian:trixie-slim AS builder
# Build dummy packages to skip installing them and their dependencies # Build dummy packages to skip installing them and their dependencies
RUN apt-get update \ RUN apt-get update \
@ -10,41 +10,38 @@ RUN apt-get update \
&& equivs-control adwaita-icon-theme \ && equivs-control adwaita-icon-theme \
&& printf 'Section: misc\nPriority: optional\nStandards-Version: 3.9.2\nPackage: adwaita-icon-theme\nVersion: 99.0.0\nDescription: Dummy package for adwaita-icon-theme\n' >> adwaita-icon-theme \ && printf 'Section: misc\nPriority: optional\nStandards-Version: 3.9.2\nPackage: adwaita-icon-theme\nVersion: 99.0.0\nDescription: Dummy package for adwaita-icon-theme\n' >> adwaita-icon-theme \
&& equivs-build adwaita-icon-theme \ && equivs-build adwaita-icon-theme \
&& mv adwaita-icon-theme_*.deb /adwaita-icon-theme.deb && mv adwaita-icon-theme_*.deb /adwaita-icon-theme.deb \
&& apt-get purge -y --auto-remove equivs \
&& rm -rf /var/lib/apt/lists/*
FROM python:3.11-slim-bullseye FROM debian:trixie-slim
# Copy dummy packages # Copy dummy packages
COPY --from=builder /*.deb / COPY --from=builder /libgl1-mesa-dri.deb /adwaita-icon-theme.deb /
# Install dependencies and create flaresolverr user # Install dependencies and create flaresolverr user
# You can test Chromium running this command inside the container:
# xvfb-run -s "-screen 0 1600x1200x24" chromium --no-sandbox
# The error traces is like this: "*** stack smashing detected ***: terminated"
# To check the package versions available you can use this command:
# apt-cache madison chromium
WORKDIR /app WORKDIR /app
COPY requirements.txt .
RUN apt-get update \
# Install dummy packages # Install dummy packages
RUN dpkg -i /libgl1-mesa-dri.deb \ && dpkg -i /libgl1-mesa-dri.deb \
&& dpkg -i /adwaita-icon-theme.deb \ && dpkg -i /adwaita-icon-theme.deb \
&& apt-get install -f \
# Install dependencies # Install dependencies
&& apt-get update \ && apt-get install -y --no-install-recommends chromium xvfb dumb-init \
&& apt-get install -y --no-install-recommends chromium chromium-common chromium-driver xvfb dumb-init \ procps curl vim xauth python3 python3-pip \
procps curl vim xauth \
# Remove temporary files and hardware decoding libraries # Remove temporary files and hardware decoding libraries
&& rm -rf /var/lib/apt/lists/* \ && rm -rf /var/lib/apt/lists/* \
&& rm -f /usr/lib/x86_64-linux-gnu/libmfxhw* \ && rm -f /usr/lib/x86_64-linux-gnu/libmfxhw* \
&& rm -f /usr/lib/x86_64-linux-gnu/mfx/* \ && rm -f /usr/lib/x86_64-linux-gnu/mfx/* \
# Create flaresolverr user # Create flaresolverr user
&& useradd --home-dir /app --shell /bin/sh flaresolverr \ && useradd --home-dir /app --shell /bin/sh flaresolverr \
&& mv /usr/bin/chromedriver chromedriver \ && chown -R flaresolverr:flaresolverr . \
&& chown -R flaresolverr:flaresolverr . # Set up Python and install dependencies
&& ln -s /usr/bin/python3 /usr/local/bin/python \
# Install Python dependencies && pip install --break-system-packages -r requirements.txt \
COPY requirements.txt .
RUN pip install -r requirements.txt \
# Remove temporary files # Remove temporary files
&& rm -rf /root/.cache && rm -rf /root/.cache /tmp/*
USER flaresolverr USER flaresolverr

View File

@ -185,9 +185,10 @@ session. When you no longer need to use a session you should make sure to close
| session | Optional. Will send the request from and existing browser instance. If one is not sent it will create a temporary instance that will be destroyed immediately after the request is completed. | | session | Optional. Will send the request from and existing browser instance. If one is not sent it will create a temporary instance that will be destroyed immediately after the request is completed. |
| session_ttl_minutes | Optional. FlareSolverr will automatically rotate expired sessions based on the TTL provided in minutes. | | session_ttl_minutes | Optional. FlareSolverr will automatically rotate expired sessions based on the TTL provided in minutes. |
| maxTimeout | Optional, default value 60000. Max timeout to solve the challenge in milliseconds. | | maxTimeout | Optional, default value 60000. Max timeout to solve the challenge in milliseconds. |
| userAgent | Optional. Used for the current request and does not affect subsequent ones. |
| cookies | Optional. Will be used by the headless browser. Eg: `"cookies": [{"name": "cookie1", "value": "value1"}, {"name": "cookie2", "value": "value2"}]`. | | cookies | Optional. Will be used by the headless browser. Eg: `"cookies": [{"name": "cookie1", "value": "value1"}, {"name": "cookie2", "value": "value2"}]`. |
| returnOnlyCookies | Optional, default false. Only returns the cookies. Response data, headers and other parts of the response are removed. | | returnOnlyCookies | Optional, default false. Only returns the cookies. Response data, headers and other parts of the response are removed. |
| proxy | Optional, default disabled. Eg: `"proxy": {"url": "http://127.0.0.1:8888"}`. You must include the proxy schema in the URL: `http://`, `socks4://` or `socks5://`. Authorization (username/password) is not supported. (When the `session` parameter is set, the proxy is ignored; a session specific proxy can be set in `sessions.create`.) | | proxy | Optional, default disabled. Eg: `"proxy": {"url": "http://127.0.0.1:8888"}`. You must include the proxy schema in the URL: `http://`, `socks4://` or `socks5://`. Authorization (username/password) is supported. (When the `session` parameter is set, the proxy is ignored; a session specific proxy can be set in `sessions.create`.) |
> **Warning** > **Warning**
> If you want to use Cloudflare clearance cookie in your scripts, make sure you use the FlareSolverr User-Agent too. If they don't match you will see the challenge. > If you want to use Cloudflare clearance cookie in your scripts, make sure you use the FlareSolverr User-Agent too. If they don't match you will see the challenge.

View File

@ -1,6 +1,6 @@
bottle==0.12.25 bottle==0.12.25
waitress==2.1.2 waitress==2.1.2
selenium==4.15.2 DrissionPage==4.1.0.0b19
func-timeout==4.3.5 func-timeout==4.3.5
prometheus-client==0.17.1 prometheus-client==0.17.1
# required by undetected_chromedriver # required by undetected_chromedriver

View File

@ -35,7 +35,7 @@ class V1RequestBase(object):
session: str = None session: str = None
session_ttl_minutes: int = None session_ttl_minutes: int = None
headers: list = None # deprecated v2.0.0, not used headers: list = None # deprecated v2.0.0, not used
userAgent: str = None # deprecated v2.0.0, not used userAgent: str = None
# V1Request # V1Request
url: str = None url: str = None

View File

@ -114,6 +114,9 @@ if __name__ == "__main__":
prometheus_plugin.setup() prometheus_plugin.setup()
app.install(prometheus_plugin.prometheus_plugin) app.install(prometheus_plugin.prometheus_plugin)
webdriver_data = utils.get_webdriver_data_path()
utils.remove_all_subfolders(webdriver_data)
# start webserver # start webserver
# default server 'wsgiref' does not support concurrent requests # default server 'wsgiref' does not support concurrent requests
# https://github.com/FlareSolverr/FlareSolverr/issues/680 # https://github.com/FlareSolverr/FlareSolverr/issues/680

View File

@ -7,13 +7,8 @@ from html import escape
from urllib.parse import unquote, quote from urllib.parse import unquote, quote
from func_timeout import FunctionTimedOut, func_timeout from func_timeout import FunctionTimedOut, func_timeout
from selenium.common import TimeoutException from DrissionPage import ChromiumPage
from selenium.webdriver.chrome.webdriver import WebDriver from DrissionPage._units.listener import DataPacket
from selenium.webdriver.common.by import By
from selenium.webdriver.support.expected_conditions import (
presence_of_element_located, staleness_of, title_is)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
import utils import utils
from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT, from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT,
@ -116,8 +111,6 @@ def _controller_v1_handler(req: V1RequestBase) -> V1ResponseBase:
raise Exception("Request parameter 'cmd' is mandatory.") raise Exception("Request parameter 'cmd' is mandatory.")
if req.headers is not None: if req.headers is not None:
logging.warning("Request parameter 'headers' was removed in FlareSolverr v2.") logging.warning("Request parameter 'headers' was removed in FlareSolverr v2.")
if req.userAgent is not None:
logging.warning("Request parameter 'userAgent' was removed in FlareSolverr v2.")
# set default values # set default values
if req.maxTimeout is None or int(req.maxTimeout) < 1: if req.maxTimeout is None or int(req.maxTimeout) < 1:
@ -223,6 +216,7 @@ def _cmd_sessions_destroy(req: V1RequestBase) -> V1ResponseBase:
def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT: def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT:
timeout = int(req.maxTimeout) / 1000 timeout = int(req.maxTimeout) / 1000
driver = None driver = None
user_data_path = None
try: try:
if req.session: if req.session:
session_id = req.session session_id = req.session
@ -237,7 +231,8 @@ def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT:
driver = session.driver driver = session.driver
else: else:
driver = utils.get_webdriver(req.proxy) user_data_path = utils.get_user_data_path()
driver = utils.get_webdriver(req.proxy, user_data_path)
logging.debug('New instance of webdriver has been created to perform the request') logging.debug('New instance of webdriver has been created to perform the request')
return func_timeout(timeout, _evil_logic, (req, driver, method)) return func_timeout(timeout, _evil_logic, (req, driver, method))
except FunctionTimedOut: except FunctionTimedOut:
@ -250,179 +245,142 @@ def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT:
driver.close() driver.close()
driver.quit() driver.quit()
logging.debug('A used instance of webdriver has been destroyed') logging.debug('A used instance of webdriver has been destroyed')
if user_data_path:
utils.remove_user_data(user_data_path)
def click_verify(driver: ChromiumPage) -> DataPacket:
def click_verify(driver: WebDriver):
try: try:
logging.debug("Try to find the Cloudflare verify checkbox...") bde = (
iframe = driver.find_element(By.XPATH, "//iframe[starts-with(@id, 'cf-chl-widget-')]") driver
driver.switch_to.frame(iframe) .ele("@Style=border: 0px; margin: 0px; padding: 0px;", timeout=10)
checkbox = driver.find_element( .shadow_root
by=By.XPATH, .ele("tag:iframe", timeout=10)
value='//*[@id="content"]/div/div/label/input', .ele('tag:body', timeout=10)
.shadow_root
) )
if checkbox: ve = bde.ele("text:Verify you are human", timeout=10)
actions = ActionChains(driver)
actions.move_to_element_with_offset(checkbox, 5, 7)
actions.click(checkbox)
actions.perform()
logging.debug("Cloudflare verify checkbox found and clicked!")
except Exception:
logging.debug("Cloudflare verify checkbox not found on the page.")
finally:
driver.switch_to.default_content()
try: driver.listen.resume()
logging.debug("Try to find the Cloudflare 'Verify you are human' button...") ve.click()
button = driver.find_element( data = driver.listen.wait(count=1,timeout=5)
by=By.XPATH,
value="//input[@type='button' and @value='Verify you are human']",
)
if button:
actions = ActionChains(driver)
actions.move_to_element_with_offset(button, 5, 7)
actions.click(button)
actions.perform()
logging.debug("The Cloudflare 'Verify you are human' button found and clicked!")
except Exception:
logging.debug("The Cloudflare 'Verify you are human' button not found on the page.")
time.sleep(2) if isinstance(data, DataPacket):
return data
return None
except Exception as e:
logging.debug("Cloudflare verify checkbox not found on the page. %s", repr(e))
def get_correct_window(driver: WebDriver) -> WebDriver: def search_challenge(driver: ChromiumPage) -> bool:
if len(driver.window_handles) > 1: page_title = driver.title.lower()
for window_handle in driver.window_handles:
driver.switch_to.window(window_handle) # find challenge by title
current_url = driver.current_url for title in CHALLENGE_TITLES:
if not current_url.startswith("devtools://devtools"): if title.lower() == page_title:
return driver logging.debug("Challenge detected. Title found: %s", page_title)
return driver return True
# find challenge by selectors
if driver.wait.eles_loaded(locators=CHALLENGE_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
logging.debug("Challenge detected. One of selectors found")
return True
return False
def access_page(driver: WebDriver, url: str) -> None: def _evil_logic(req: V1RequestBase, driver: ChromiumPage, method: str) -> ChallengeResolutionT:
driver.get(url)
driver.start_session()
driver.start_session() # required to bypass Cloudflare
def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> ChallengeResolutionT:
res = ChallengeResolutionT({}) res = ChallengeResolutionT({})
res.status = STATUS_OK res.status = STATUS_OK
res.message = "" res.message = ""
old_user_agent = utils.get_user_agent(driver)
if req.userAgent is not None and req.userAgent != "":
driver.set.user_agent(ua=req.userAgent)
# navigate to the page # navigate to the page
logging.debug(f'Navigating to... {req.url}') logging.debug('Navigating to... %s', req.url)
driver.listen.start(req.url)
if method == 'POST': if method == 'POST':
_post_request(req, driver) _post_request(req, driver)
else: else:
access_page(driver, req.url) driver.get(req.url)
driver = get_correct_window(driver) data = driver.listen.wait(count=1,timeout=5)
driver.listen.pause()
# set cookies if required # set cookies if required
if req.cookies is not None and len(req.cookies) > 0: if req.cookies is not None and len(req.cookies) > 0:
logging.debug(f'Setting cookies...') logging.debug('Setting cookies...')
for cookie in req.cookies: for cookie in req.cookies:
driver.delete_cookie(cookie['name']) driver.set.cookies.remove(cookie['name'])
driver.add_cookie(cookie) driver.set.cookies(cookie)
# reload the page # reload the page
driver.listen.resume()
if method == 'POST': if method == 'POST':
_post_request(req, driver) _post_request(req, driver)
else: else:
access_page(driver, req.url) driver.get(req.url)
driver = get_correct_window(driver) data = driver.listen.wait(count=1, timeout=5)
driver.listen.pause()
# wait for the page # wait for the page
if utils.get_config_log_html(): if utils.get_config_log_html():
logging.debug(f"Response HTML:\n{driver.page_source}") logging.debug("Response HTML:\n%s", driver.html)
html_element = driver.find_element(By.TAG_NAME, "html")
page_title = driver.title
page_title = driver.title
# find access denied titles # find access denied titles
for title in ACCESS_DENIED_TITLES: for title in ACCESS_DENIED_TITLES:
if title == page_title: if title == page_title:
raise Exception('Cloudflare has blocked this request. ' raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.') 'Probably your IP is banned for this site, check in your web browser.')
# find access denied selectors # find access denied selectors
for selector in ACCESS_DENIED_SELECTORS: if driver.wait.eles_loaded(locators=ACCESS_DENIED_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
found_elements = driver.find_elements(By.CSS_SELECTOR, selector) raise Exception('Cloudflare has blocked this request. '
if len(found_elements) > 0: 'Probably your IP is banned for this site, check in your web browser.')
raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.')
# find challenge by title
challenge_found = False
for title in CHALLENGE_TITLES:
if title.lower() == page_title.lower():
challenge_found = True
logging.info("Challenge detected. Title found: " + page_title)
break
if not challenge_found:
# find challenge by selectors
for selector in CHALLENGE_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
challenge_found = True
logging.info("Challenge detected. Selector found: " + selector)
break
attempt = 0 attempt = 0
if challenge_found: challenge_found = True
while True: while challenge_found:
try: try:
attempt = attempt + 1 attempt += 1
# wait until the title changes
for title in CHALLENGE_TITLES:
logging.debug("Waiting for title (attempt " + str(attempt) + "): " + title)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title))
# then wait until all the selectors disappear if search_challenge(driver):
for selector in CHALLENGE_SELECTORS: if attempt == 1:
logging.debug("Waiting for selector (attempt " + str(attempt) + "): " + selector) logging.info("Challenge detected.")
WebDriverWait(driver, SHORT_TIMEOUT).until_not(
presence_of_element_located((By.CSS_SELECTOR, selector)))
# all elements not found data = click_verify(driver)
else:
if attempt == 1:
logging.info("Challenge not detected!")
res.message = "Challenge not detected!"
else:
logging.info("Challenge solved!")
res.message = "Challenge solved!"
break break
except TimeoutException: except Exception as e:
logging.debug("Timeout waiting for selector") logging.debug("Cloudflare check exception")
raise e
click_verify(driver)
# update the html (cloudflare reloads the page every 5 s)
html_element = driver.find_element(By.TAG_NAME, "html")
# waits until cloudflare redirection ends
logging.debug("Waiting for redirect")
# noinspection PyBroadException
try:
WebDriverWait(driver, SHORT_TIMEOUT).until(staleness_of(html_element))
except Exception:
logging.debug("Timeout waiting for redirect")
logging.info("Challenge solved!")
res.message = "Challenge solved!"
else:
logging.info("Challenge not detected!")
res.message = "Challenge not detected!"
challenge_res = ChallengeResolutionResultT({}) challenge_res = ChallengeResolutionResultT({})
challenge_res.url = driver.current_url challenge_res.url = driver.url
challenge_res.status = 200 # todo: fix, selenium not provides this info if data is not None and data.response is not None: # Fixed #1162
challenge_res.cookies = driver.get_cookies() challenge_res.status = data.response.status
challenge_res.userAgent = utils.get_user_agent(driver) if not req.returnOnlyCookies:
challenge_res.response = data.response.body
challenge_res.headers = dict(data.response.headers)
if not req.returnOnlyCookies: challenge_res.cookies = driver.cookies(all_info=True)
challenge_res.headers = {} # todo: fix, selenium not provides this info challenge_res.userAgent = req.userAgent or utils.get_user_agent(driver)
challenge_res.response = driver.page_source
if old_user_agent:
driver.set.user_agent(ua=old_user_agent)
res.result = challenge_res res.result = challenge_res
return res return res
def _post_request(req: V1RequestBase, driver: WebDriver): def _post_request(req: V1RequestBase, driver: ChromiumPage):
post_form = f'<form id="hackForm" action="{req.url}" method="POST">' post_form = f'<form id="hackForm" action="{req.url}" method="POST">'
query_string = req.postData if req.postData[0] != '?' else req.postData[1:] query_string = req.postData if req.postData[0] != '?' else req.postData[1:]
pairs = query_string.split('&') pairs = query_string.split('&')
@ -451,5 +409,3 @@ def _post_request(req: V1RequestBase, driver: WebDriver):
</body> </body>
</html>""" </html>"""
driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content)) driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content))
driver.start_session()
driver.start_session() # required to bypass Cloudflare

View File

@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from typing import Optional, Tuple from typing import Optional, Tuple
from uuid import uuid1 from uuid import uuid1
from selenium.webdriver.chrome.webdriver import WebDriver from DrissionPage import ChromiumPage
import utils import utils
@ -12,7 +12,7 @@ import utils
@dataclass @dataclass
class Session: class Session:
session_id: str session_id: str
driver: WebDriver driver: ChromiumPage
created_at: datetime created_at: datetime
def lifetime(self) -> timedelta: def lifetime(self) -> timedelta:
@ -27,13 +27,13 @@ class SessionsStorage:
def create(self, session_id: Optional[str] = None, proxy: Optional[dict] = None, def create(self, session_id: Optional[str] = None, proxy: Optional[dict] = None,
force_new: Optional[bool] = False) -> Tuple[Session, bool]: force_new: Optional[bool] = False) -> Tuple[Session, bool]:
"""create creates new instance of WebDriver if necessary, """create creates new instance of ChromiumPage if necessary,
assign defined (or newly generated) session_id to the instance assign defined (or newly generated) session_id to the instance
and returns the session object. If a new session has been created and returns the session object. If a new session has been created
second argument is set to True. second argument is set to True.
Note: The function is idempotent, so in case if session_id Note: The function is idempotent, so in case if session_id
already exists in the storage a new instance of WebDriver won't be created already exists in the storage a new instance of ChromiumPage won't be created
and existing session will be returned. Second argument defines if and existing session will be returned. Second argument defines if
new session has been created (True) or an existing one was used (False). new session has been created (True) or an existing one was used (False).
""" """
@ -45,7 +45,7 @@ class SessionsStorage:
if self.exists(session_id): if self.exists(session_id):
return self.sessions[session_id], False return self.sessions[session_id], False
driver = utils.get_webdriver(proxy) driver = utils.get_webdriver(proxy, utils.get_user_data_path(session_id))
created_at = datetime.now() created_at = datetime.now()
session = Session(session_id, driver, created_at) session = Session(session_id, driver, created_at)
@ -69,6 +69,10 @@ class SessionsStorage:
if utils.PLATFORM_VERSION == "nt": if utils.PLATFORM_VERSION == "nt":
session.driver.close() session.driver.close()
session.driver.quit() session.driver.quit()
session_data = utils.get_user_data_path(session_id)
utils.remove_user_data(session_data)
return True return True
def get(self, session_id: str, ttl: Optional[timedelta] = None) -> Tuple[Session, bool]: def get(self, session_id: str, ttl: Optional[timedelta] = None) -> Tuple[Session, bool]:

View File

@ -1,914 +0,0 @@
#!/usr/bin/env python3
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
from __future__ import annotations
__version__ = "3.5.5"
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import sys
import tempfile
import time
from weakref import finalize
import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver
from selenium.webdriver.common.by import By
import selenium.webdriver.chromium.service
import selenium.webdriver.remote.command
import selenium.webdriver.remote.webdriver
from .cdp import CDP
from .dprocess import start_detached
from .options import ChromeOptions
from .patcher import IS_POSIX
from .patcher import Patcher
from .reactor import Reactor
from .webelement import UCWebElement
from .webelement import WebElement
__all__ = (
"Chrome",
"ChromeOptions",
"Patcher",
"Reactor",
"CDP",
"find_chrome_executable",
)
logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
"""
Controls the ChromeDriver and allows you to drive the browser.
The webdriver file will be downloaded by this module automatically,
you do not need to specify this. however, you may if you wish.
Attributes
----------
Methods
-------
reconnect()
this can be useful in case of heavy detection methods
-stops the chromedriver service which runs in the background
-starts the chromedriver service which runs in the background
-recreate session
start_session(capabilities=None, browser_profile=None)
differentiates from the regular method in that it does not
require a capabilities argument. The capabilities are automatically
recreated from the options at creation time.
--------------------------------------------------------------------------
NOTE:
Chrome has everything included to work out of the box.
it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems.
--------------------------------------------------------------------------
"""
_instances = set()
session_id = None
debug = False
def __init__(
self,
options=None,
user_data_dir=None,
driver_executable_path=None,
browser_executable_path=None,
port=0,
enable_cdp_events=False,
# service_args=None,
# service_creationflags=None,
desired_capabilities=None,
advanced_elements=False,
# service_log_path=None,
keep_alive=True,
log_level=0,
headless=False,
version_main=None,
patcher_force_close=False,
suppress_welcome=True,
use_subprocess=False,
debug=False,
no_sandbox=True,
windows_headless=False,
user_multi_procs: bool = False,
**kw,
):
"""
Creates a new instance of the chrome driver.
Starts the service and then creates new instance of chrome driver.
Parameters
----------
options: ChromeOptions, optional, default: None - automatic useful defaults
this takes an instance of ChromeOptions, mainly to customize browser behavior.
anything other dan the default, for example extensions or startup options
are not supported in case of failure, and can probably lowers your undetectability.
user_data_dir: str , optional, default: None (creates temp profile)
if user_data_dir is a path to a valid chrome profile directory, use it,
and turn off automatic removal mechanism at exit.
driver_executable_path: str, optional, default: None(=downloads and patches new binary)
browser_executable_path: str, optional, default: None - use find_chrome_executable
Path to the browser executable.
If not specified, make sure the executable's folder is in $PATH
port: int, optional, default: 0
port to be used by the chromedriver executable, this is NOT the debugger port.
leave it at 0 unless you know what you are doing.
the default value of 0 automatically picks an available port.
enable_cdp_events: bool, default: False
:: currently for chrome only
this enables the handling of wire messages
when enabled, you can subscribe to CDP events by using:
driver.add_cdp_listener("Network.dataReceived", yourcallback)
# yourcallback is an callable which accepts exactly 1 dict as parameter
service_args: list of str, optional, default: None
arguments to pass to the driver service
desired_capabilities: dict, optional, default: None - auto from config
Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref".
advanced_elements: bool, optional, default: False
makes it easier to recognize elements like you know them from html/browser inspection, especially when working
in an interactive environment
default webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
advanced webelement repr
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time.
service_log_path: str, optional, default: None
path to log information from the driver.
keep_alive: bool, optional, default: True
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
log_level: int, optional, default: adapts to python global log level
headless: bool, optional, default: False
can also be specified in the options instance.
Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported.
version_main: int, optional, default: None (=auto)
if you, for god knows whatever reason, use
an older version of Chrome. You can specify it's full rounded version number
here. Example: 87 for all versions of 87
patcher_force_close: bool, optional, default: False
instructs the patcher to do whatever it can to access the chromedriver binary
if the file is locked, it will force shutdown all instances.
setting it is not recommended, unless you know the implications and think
you might need it.
suppress_welcome: bool, optional , default: True
a "welcome" alert might show up on *nix-like systems asking whether you want to set
chrome as your default browser, and if you want to send even more data to google.
now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False.
Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception.
use_subprocess: bool, optional , default: True,
False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python
This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after
program exits or using .quit()
you should be knowing what you're doing, and know how python works.
unfortunately, there is always an edge case in which one would like to write an single script with the only contents being:
--start script--
import undetected_chromedriver as uc
d = uc.Chrome()
d.get('https://somesite/')
---end script --
and will be greeted with an error, since the program exists before chrome has a change to launch.
in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times.
! setting it to True comes with NO support when being detected. !
no_sandbox: bool, optional, default=True
uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar
this option has a default of True since many people seem to run this as root (....) , and chrome does not start
when running as root without using --no-sandbox flag.
user_multi_procs:
set to true when you are using multithreads/multiprocessing
ensures not all processes are trying to modify a binary which is in use by another.
for this to work. YOU MUST HAVE AT LEAST 1 UNDETECTED_CHROMEDRIVER BINARY IN YOUR ROAMING DATA FOLDER.
this requirement can be easily satisfied, by just running this program "normal" and close/kill it.
"""
finalize(self, self._ensure_close, self)
self.debug = debug
self.patcher = Patcher(
executable_path=driver_executable_path,
force=patcher_force_close,
version_main=version_main,
user_multi_procs=user_multi_procs,
)
# self.patcher.auto(user_multiprocess = user_multi_num_procs)
self.patcher.auto()
# self.patcher = patcher
if not options:
options = ChromeOptions()
try:
if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options,
# as it just appends arguments, not replace them
# you'll get conflicts starting chrome
raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError:
pass
options._session = self
if not options.debugger_address:
debug_port = (
port
if port != 0
else selenium.webdriver.common.service.utils.free_port()
)
debug_host = "127.0.0.1"
options.debugger_address = "%s:%d" % (debug_host, debug_port)
else:
debug_host, debug_port = options.debugger_address.split(":")
debug_port = int(debug_port)
if enable_cdp_events:
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}
)
options.add_argument("--remote-debugging-host=%s" % debug_host)
options.add_argument("--remote-debugging-port=%s" % debug_port)
if user_data_dir:
options.add_argument("--user-data-dir=%s" % user_data_dir)
language, keep_user_data_dir = None, bool(user_data_dir)
# see if a custom user profile is specified in options
for arg in options.arguments:
if any([_ in arg for _ in ("--headless", "headless")]):
options.arguments.remove(arg)
options.headless = True
if "lang" in arg:
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try:
language = m[1]
except IndexError:
logger.debug("will set the language to en-US,en;q=0.9")
language = "en-US,en;q=0.9"
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
keep_user_data_dir = True
except IndexError:
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
if not user_data_dir:
# backward compatiblity
# check if an old uc.ChromeOptions is used, and extract the user data dir
if hasattr(options, "user_data_dir") and getattr(
options, "user_data_dir", None
):
import warnings
warnings.warn(
"using ChromeOptions.user_data_dir might stop working in future versions."
"use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder"
)
options.add_argument("--user-data-dir=%s" % options.user_data_dir)
keep_user_data_dir = True
logger.debug(
"user_data_dir property found in options object: %s" % user_data_dir
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
if not language:
try:
import locale
language = locale.getdefaultlocale()[0].replace("_", "-")
except Exception:
pass
if not language:
language = "en-US"
options.add_argument("--lang=%s" % language)
if not options.binary_location:
options.binary_location = (
browser_executable_path or find_chrome_executable()
)
if not options.binary_location or not \
pathlib.Path(options.binary_location).exists():
raise FileNotFoundError(
"\n---------------------\n"
"Could not determine browser executable."
"\n---------------------\n"
"Make sure your browser is installed in the default location (path).\n"
"If you are sure about the browser executable, you can specify it using\n"
"the `browser_executable_path='{}` parameter.\n\n"
.format("/path/to/browser/executable" if IS_POSIX else "c:/path/to/your/browser.exe")
)
self._delay = 3
self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir
if suppress_welcome:
options.arguments.extend(["--no-default-browser-check", "--no-first-run"])
if no_sandbox:
options.arguments.extend(["--no-sandbox", "--test-type"])
if headless or getattr(options, 'headless', None):
#workaround until a better checking is found
try:
v_main = int(self.patcher.version_main) if self.patcher.version_main else 108
if v_main < 108:
options.add_argument("--headless=chrome")
elif v_main >= 108:
options.add_argument("--headless=new")
except:
logger.warning("could not detect version_main."
"therefore, we are assuming it is chrome 108 or higher")
options.add_argument("--headless=new")
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
options.add_argument(
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
)
if hasattr(options, "handle_prefs"):
options.handle_prefs(user_data_dir)
# fix exit_type flag to prevent tab-restore nag
try:
with open(
os.path.join(user_data_dir, "Default/Preferences"),
encoding="latin1",
mode="r+",
) as fs:
config = json.load(fs)
if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag
config["profile"]["exit_type"] = None
fs.seek(0, 0)
json.dump(config, fs)
fs.truncate() # the file might be shorter
logger.debug("fixed exit_type flag")
except Exception as e:
logger.debug("did not find a bad exit_type flag ")
self.options = options
if not desired_capabilities:
desired_capabilities = options.to_capabilities()
if not use_subprocess and not windows_headless:
self.browser_pid = start_detached(
options.binary_location, *options.arguments
)
else:
startupinfo = None
if os.name == 'nt' and windows_headless:
# STARTUPINFO() is Windows only
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
browser = subprocess.Popen(
[options.binary_location, *options.arguments],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=IS_POSIX,
startupinfo=startupinfo
)
self.browser_pid = browser.pid
service = selenium.webdriver.chromium.service.ChromiumService(
self.patcher.executable_path
)
super(Chrome, self).__init__(
service=service,
options=options,
keep_alive=keep_alive,
)
self.reactor = None
if enable_cdp_events:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.getLogger(
"selenium.webdriver.remote.remote_connection"
).setLevel(20)
reactor = Reactor(self)
reactor.start()
self.reactor = reactor
if advanced_elements:
self._web_element_cls = UCWebElement
else:
self._web_element_cls = WebElement
if headless or getattr(options, 'headless', None):
self._configure_headless()
def _configure_headless(self):
orig_get = self.get
logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
if self.execute_script("return navigator.webdriver"):
logger.info("patch navigator.webdriver")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, "navigator", {
Object.defineProperty(window, "navigator", {
value: new Proxy(navigator, {
has: (target, key) => (key === "webdriver" ? false : key in target),
get: (target, key) =>
key === "webdriver"
? false
: typeof target[key] === "function"
? target[key].bind(target)
: target[key],
}),
});
"""
},
)
logger.info("patch user-agent string")
self.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": self.execute_script(
"return navigator.userAgent"
).replace("Headless", "")
},
)
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1});
Object.defineProperty(navigator.connection, 'rtt', {get: () => 100});
// https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/chrome-runtime.js
window.chrome = {
app: {
isInstalled: false,
InstallState: {
DISABLED: 'disabled',
INSTALLED: 'installed',
NOT_INSTALLED: 'not_installed'
},
RunningState: {
CANNOT_RUN: 'cannot_run',
READY_TO_RUN: 'ready_to_run',
RUNNING: 'running'
}
},
runtime: {
OnInstalledReason: {
CHROME_UPDATE: 'chrome_update',
INSTALL: 'install',
SHARED_MODULE_UPDATE: 'shared_module_update',
UPDATE: 'update'
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic'
},
PlatformArch: {
ARM: 'arm',
ARM64: 'arm64',
MIPS: 'mips',
MIPS64: 'mips64',
X86_32: 'x86-32',
X86_64: 'x86-64'
},
PlatformNaclArch: {
ARM: 'arm',
MIPS: 'mips',
MIPS64: 'mips64',
X86_32: 'x86-32',
X86_64: 'x86-64'
},
PlatformOs: {
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
MAC: 'mac',
OPENBSD: 'openbsd',
WIN: 'win'
},
RequestUpdateCheckStatus: {
NO_UPDATE: 'no_update',
THROTTLED: 'throttled',
UPDATE_AVAILABLE: 'update_available'
}
}
}
// https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/navigator-permissions.js
if (!window.Notification) {
window.Notification = {
permission: 'denied'
}
}
const originalQuery = window.navigator.permissions.query
window.navigator.permissions.__proto__.query = parameters =>
parameters.name === 'notifications'
? Promise.resolve({ state: window.Notification.permission })
: originalQuery(parameters)
const oldCall = Function.prototype.call
function call() {
return oldCall.apply(this, arguments)
}
Function.prototype.call = call
const nativeToStringFunctionString = Error.toString().replace(/Error/g, 'toString')
const oldToString = Function.prototype.toString
function functionToString() {
if (this === window.navigator.permissions.query) {
return 'function query() { [native code] }'
}
if (this === functionToString) {
return nativeToStringFunctionString
}
return oldCall.call(oldToString, this)
}
// eslint-disable-next-line
Function.prototype.toString = functionToString
"""
},
)
return orig_get(*args, **kwargs)
self.get = get_wrapped
# def _get_cdc_props(self):
# return self.execute_script(
# """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
#
# return result.filter(i => i.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig))
# """
# )
#
# def _hook_remove_cdc_props(self):
# self.execute_cdp_cmd(
# "Page.addScriptToEvaluateOnNewDocument",
# {
# "source": """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
# result.forEach(p => p.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig)
# &&delete window[p]&&console.log('removed',p))
# """
# },
# )
def get(self, url):
# if self._get_cdc_props():
# self._hook_remove_cdc_props()
return super().get(url)
def add_cdp_listener(self, event_name, callback):
if (
self.reactor
and self.reactor is not None
and isinstance(self.reactor, Reactor)
):
self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers
return False
def clear_cdp_listeners(self):
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.handlers.clear()
def window_new(self):
self.execute(
selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"}
)
def tab_new(self, url: str):
"""
this opens a url in a new tab.
apparently, that passes all tests directly!
Parameters
----------
url
Returns
-------
"""
if not hasattr(self, "cdp"):
from .cdp import CDP
cdp = CDP(self.options)
cdp.tab_new(url)
def reconnect(self, timeout=0.1):
try:
self.service.stop()
except Exception as e:
logger.debug(e)
time.sleep(timeout)
try:
self.service.start()
except Exception as e:
logger.debug(e)
try:
self.start_session()
except Exception as e:
logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None):
if not capabilities:
capabilities = self.options.to_capabilities()
super(selenium.webdriver.chrome.webdriver.WebDriver, self).start_session(
capabilities
)
# super(Chrome, self).start_session(capabilities, browser_profile)
def find_elements_recursive(self, by, value):
"""
find elements in all frames
this is a generator function, which is needed
since if it would return a list of elements, they
will be stale on arrival.
using generator, when the element is returned we are in the correct frame
to use it directly
Args:
by: By
value: str
Returns: Generator[webelement.WebElement]
"""
def search_frame(f=None):
if not f:
# ensure we are on main content frame
self.switch_to.default_content()
else:
self.switch_to.frame(f)
for elem in self.find_elements(by, value):
yield elem
# switch back to main content, otherwise we will get StaleElementReferenceException
self.switch_to.default_content()
# search root frame
for elem in search_frame():
yield elem
# get iframes
frames = self.find_elements('css selector', 'iframe')
# search per frame
for f in frames:
for elem in search_frame(f):
yield elem
def quit(self):
try:
self.service.stop()
self.service.process.kill()
self.command_executor.close()
self.service.process.wait(5)
logger.debug("webdriver process ended")
except (AttributeError, RuntimeError, OSError):
pass
try:
self.reactor.event.set()
logger.debug("shutting down reactor")
except AttributeError:
pass
try:
os.kill(self.browser_pid, 15)
logger.debug("gracefully closed browser")
except Exception as e: # noqa
pass
if (
hasattr(self, "keep_user_data_dir")
and hasattr(self, "user_data_dir")
and not self.keep_user_data_dir
):
for _ in range(5):
try:
shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError:
pass
except (RuntimeError, OSError, PermissionError) as e:
logger.debug(
"When removing the temp profile, a %s occured: %s\nretrying..."
% (e.__class__.__name__, e)
)
else:
logger.debug("successfully removed %s" % self.user_data_dir)
break
try:
time.sleep(0.1)
except OSError:
pass
# dereference patcher, so patcher can start cleaning up as well.
# this must come last, otherwise it will throw 'in use' errors
self.patcher = None
def __getattribute__(self, item):
if not super().__getattribute__("debug"):
return super().__getattribute__(item)
else:
import inspect
original = super().__getattribute__(item)
if inspect.ismethod(original) and not inspect.isclass(original):
def newfunc(*args, **kwargs):
logger.debug(
"calling %s with args %s and kwargs %s\n"
% (original.__qualname__, args, kwargs)
)
return original(*args, **kwargs)
return newfunc
return original
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop()
time.sleep(self._delay)
self.service.start()
self.start_session()
def __hash__(self):
return hash(self.options.debugger_address)
def __dir__(self):
return object.__dir__(self)
def __del__(self):
try:
self.service.process.kill()
except: # noqa
pass
self.quit()
@classmethod
def _ensure_close(cls, self):
# needs to be a classmethod so finalize can find the reference
logger.info("ensuring close")
if (
hasattr(self, "service")
and hasattr(self.service, "process")
and hasattr(self.service.process, "kill")
):
self.service.process.kill()
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if IS_POSIX:
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in (
"google-chrome",
"chromium",
"chromium-browser",
"chrome",
"google-chrome-stable",
):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
[
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
)
else:
for item in map(
os.environ.get,
("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
):
if item is not None:
for subitem in (
"Google/Chrome/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
logger.debug('checking if %s exists and is executable' % candidate)
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
logger.debug('found! using %s' % candidate)
return os.path.normpath(candidate)

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import json
import logging
import requests
import websockets
log = logging.getLogger(__name__)
class CDPObject(dict):
def __init__(self, *a, **k):
super().__init__(*a, **k)
self.__dict__ = self
for k in self.__dict__:
if isinstance(self.__dict__[k], dict):
self.__dict__[k] = CDPObject(self.__dict__[k])
elif isinstance(self.__dict__[k], list):
for i in range(len(self.__dict__[k])):
if isinstance(self.__dict__[k][i], dict):
self.__dict__[k][i] = CDPObject(self)
def __repr__(self):
tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)"
return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items()))
class PageElement(CDPObject):
pass
class CDP:
log = logging.getLogger("CDP")
endpoints = CDPObject(
{
"json": "/json",
"protocol": "/json/protocol",
"list": "/json/list",
"new": "/json/new?{url}",
"activate": "/json/activate/{id}",
"close": "/json/close/{id}",
}
)
def __init__(self, options: "ChromeOptions"): # noqa
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
self._reqid = 0
self._session = requests.Session()
self._last_resp = None
self._last_json = None
resp = self.get(self.endpoints.json) # noqa
self.sessionId = resp[0]["id"]
self.wsurl = resp[0]["webSocketDebuggerUrl"]
def tab_activate(self, id=None):
if not id:
active_tab = self.tab_list()[0]
id = active_tab.id # noqa
self.wsurl = active_tab.webSocketDebuggerUrl # noqa
return self.post(self.endpoints["activate"].format(id=id))
def tab_list(self):
retval = self.get(self.endpoints["list"])
return [PageElement(o) for o in retval]
def tab_new(self, url):
return self.post(self.endpoints["new"].format(url=url))
def tab_close_last_opened(self):
sessions = self.tab_list()
opentabs = [s for s in sessions if s["type"] == "page"]
return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"]))
async def send(self, method: str, params: dict):
self._reqid += 1
async with websockets.connect(self.wsurl) as ws:
await ws.send(
json.dumps({"method": method, "params": params, "id": self._reqid})
)
self._last_resp = await ws.recv()
self._last_json = json.loads(self._last_resp)
self.log.info(self._last_json)
def get(self, uri):
resp = self._session.get(self.server_addr + uri)
try:
self._last_resp = resp
self._last_json = resp.json()
except Exception:
return
else:
return self._last_json
def post(self, uri, data: dict = None):
if not data:
data = {}
resp = self._session.post(self.server_addr + uri, json=data)
try:
self._last_resp = resp
self._last_json = resp.json()
except Exception:
return self._last_resp
@property
def last_json(self):
return self._last_json

View File

@ -1,193 +0,0 @@
import asyncio
from collections.abc import Mapping
from collections.abc import Sequence
from functools import wraps
import os
import logging
import threading
import time
import traceback
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import List
from typing import Optional
class Structure(dict):
"""
This is a dict-like object structure, which you should subclass
Only properties defined in the class context are used on initialization.
See example
"""
_store = {}
def __init__(self, *a, **kw):
"""
Instantiate a new instance.
:param a:
:param kw:
"""
super().__init__()
# auxiliar dict
d = dict(*a, **kw)
for k, v in d.items():
if isinstance(v, Mapping):
self[k] = self.__class__(v)
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
self[k] = [self.__class__(i) for i in v]
else:
self[k] = v
super().__setattr__("__dict__", self)
def __getattr__(self, item):
return getattr(super(), item)
def __getitem__(self, item):
return super().__getitem__(item)
def __setattr__(self, key, value):
self.__setitem__(key, value)
def __setitem__(self, key, value):
super().__setitem__(key, value)
def update(self, *a, **kw):
super().update(*a, **kw)
def __eq__(self, other):
return frozenset(other.items()) == frozenset(self.items())
def __hash__(self):
return hash(frozenset(self.items()))
@classmethod
def __init_subclass__(cls, **kwargs):
cls._store = {}
def _normalize_strings(self):
for k, v in self.copy().items():
if isinstance(v, (str)):
self[k] = v.strip()
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
def function_reached_timeout():
if on_timeout:
on_timeout(func)
else:
raise TimeoutError("function call timed out")
t = threading.Timer(interval=seconds, function=function_reached_timeout)
t.start()
try:
return func(*args, **kwargs)
except:
t.cancel()
raise
finally:
t.cancel()
return wrapped
return wrapper
def test():
import sys, os
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
import undetected_chromedriver as uc
import threading
def collector(
driver: uc.Chrome,
stop_event: threading.Event,
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
listen_events: Sequence = ("browser", "network", "performance"),
):
def threaded(driver, stop_event, on_event_coro):
async def _ensure_service_started():
while (
getattr(driver, "service", False)
and getattr(driver.service, "process", False)
and driver.service.process.poll()
):
print("waiting for driver service to come back on")
await asyncio.sleep(0.05)
# await asyncio.sleep(driver._delay or .25)
async def get_log_lines(typ):
await _ensure_service_started()
return driver.get_log(typ)
async def looper():
while not stop_event.is_set():
log_lines = []
try:
for _ in listen_events:
try:
log_lines += await get_log_lines(_)
except:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
continue
if log_lines and on_event_coro:
await on_event_coro(log_lines)
except Exception as e:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(looper())
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
t.start()
async def on_event(data):
print("on_event")
print("data:", data)
def func_called(fn):
def wrapped(*args, **kwargs):
print(
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
)
while driver.service.process and driver.service.process.poll() is not None:
time.sleep(0.1)
res = fn(*args, **kwargs)
print("func completed! (result: %s)" % res)
return res
return wrapped
logging.basicConfig(level=10)
options = uc.ChromeOptions()
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
)
driver = uc.Chrome(version_main=96, options=options)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
driver.command_executor._request = func_called(driver.command_executor._request)
collector_stop = threading.Event()
collector(driver, collector_stop, on_event)
driver.get("https://nowsecure.nl")
time.sleep(10)
if os.name == "nt":
driver.close()
driver.quit()

View File

@ -1,77 +0,0 @@
import atexit
import logging
import multiprocessing
import os
import platform
import signal
from subprocess import PIPE
from subprocess import Popen
import sys
CREATE_NEW_PROCESS_GROUP = 0x00000200
DETACHED_PROCESS = 0x00000008
REGISTERED = []
def start_detached(executable, *args):
"""
Starts a fully independent subprocess (with no parent)
:param executable: executable
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
:return: pid of the grandchild process
"""
# create pipe
reader, writer = multiprocessing.Pipe(False)
# do not keep reference
process = multiprocessing.Process(
target=_start_detached,
args=(executable, *args),
kwargs={"writer": writer},
daemon=True,
)
process.start()
process.join()
# receive pid from pipe
pid = reader.recv()
REGISTERED.append(pid)
# close pipes
writer.close()
reader.close()
process.close()
return pid
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch
kwargs = {}
if platform.system() == "Windows":
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
elif sys.version_info < (3, 2):
# assume posix
kwargs.update(preexec_fn=os.setsid)
else: # Python 3.2+ and Unix
kwargs.update(start_new_session=True)
# run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
# send pid to pipe
writer.send(p.pid)
sys.exit()
def _cleanup():
for pid in REGISTERED:
try:
logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
os.kill(pid, signal.SIGTERM)
except: # noqa
pass
atexit.register(_cleanup)

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import json
import os
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
class ChromeOptions(_ChromiumOptions):
_session = None
_user_data_dir = None
@property
def user_data_dir(self):
return self._user_data_dir
@user_data_dir.setter
def user_data_dir(self, path: str):
"""
Sets the browser profile folder to use, or creates a new profile
at given <path>.
Parameters
----------
path: str
the path to a chrome profile folder
if it does not exist, a new profile will be created at given location
"""
apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath)
@staticmethod
def _undot_key(key, value):
"""turn a (dotted key, value) into a proper nested dict"""
if "." in key:
key, rest = key.split(".", 1)
value = ChromeOptions._undot_key(rest, value)
return {key: value}
@staticmethod
def _merge_nested(a, b):
"""
merges b into a
leaf values in a are overwritten with values from b
"""
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
ChromeOptions._merge_nested(a[key], b[key])
continue
a[key] = b[key]
return a
def handle_prefs(self, user_data_dir):
prefs = self.experimental_options.get("prefs")
if prefs:
user_data_dir = user_data_dir or self._user_data_dir
default_path = os.path.join(user_data_dir, "Default")
os.makedirs(default_path, exist_ok=True)
# undot prefs dict keys
undot_prefs = {}
for key, value in prefs.items():
undot_prefs = self._merge_nested(
undot_prefs, self._undot_key(key, value)
)
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file):
with open(prefs_file, encoding="latin1", mode="r") as f:
undot_prefs = self._merge_nested(json.load(f), undot_prefs)
with open(prefs_file, encoding="latin1", mode="w") as f:
json.dump(undot_prefs, f)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"]
@classmethod
def from_options(cls, options):
o = cls()
o.__dict__.update(options.__dict__)
return o

View File

@ -1,451 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
from distutils.version import LooseVersion
import io
import json
import logging
import os
import pathlib
import platform
import random
import re
import shutil
import string
import sys
import time
from urllib.request import urlopen
from urllib.request import urlretrieve
import zipfile
from multiprocessing import Lock
logger = logging.getLogger(__name__)
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd"))
class Patcher(object):
lock = Lock()
exe_name = "chromedriver%s"
platform = sys.platform
if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver"
elif "LAMBDA_TASK_ROOT" in os.environ:
d = "/tmp/undetected_chromedriver"
elif platform.startswith(("linux", "linux2")):
d = "~/.local/share/undetected_chromedriver"
elif platform.endswith("darwin"):
d = "~/Library/Application Support/undetected_chromedriver"
else:
d = "~/.undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d))
def __init__(
self,
executable_path=None,
force=False,
version_main: int = 0,
user_multi_procs=False,
):
"""
Args:
executable_path: None = automatic
a full file path to the chromedriver executable
force: False
terminate processes which are holding lock
version_main: 0 = auto
specify main chrome version (rounded, ex: 82)
"""
self.force = force
self._custom_exe_path = False
prefix = "undetected"
self.user_multi_procs = user_multi_procs
try:
# Try to convert version_main into an integer
version_main_int = int(version_main)
# check if version_main_int is less than or equal to e.g 114
self.is_old_chromedriver = version_main and version_main_int <= 114
except (ValueError,TypeError):
# Check not running inside Docker
if not os.path.exists("/app/chromedriver"):
# If the conversion fails, log an error message
logging.info("version_main cannot be converted to an integer")
# Set self.is_old_chromedriver to False if the conversion fails
self.is_old_chromedriver = False
# Needs to be called before self.exe_name is accessed
self._set_platform_name()
if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True)
if not executable_path:
if sys.platform.startswith("freebsd"):
self.executable_path = os.path.join(
self.data_path, self.exe_name
)
else:
self.executable_path = os.path.join(
self.data_path, "_".join([prefix, self.exe_name])
)
if not IS_POSIX:
if executable_path:
if not executable_path[-4:] == ".exe":
executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, prefix)
if not executable_path:
if not self.user_multi_procs:
self.executable_path = os.path.abspath(
os.path.join(".", self.executable_path)
)
if executable_path:
self._custom_exe_path = True
self.executable_path = executable_path
# Set the correct repository to download the Chromedriver from
if self.is_old_chromedriver:
self.url_repo = "https://chromedriver.storage.googleapis.com"
else:
self.url_repo = "https://googlechromelabs.github.io/chrome-for-testing"
self.version_main = version_main
self.version_full = None
def _set_platform_name(self):
"""
Set the platform and exe name based on the platform undetected_chromedriver is running on
in order to download the correct chromedriver.
"""
if self.platform.endswith("win32"):
self.platform_name = "win32"
self.exe_name %= ".exe"
if self.platform.endswith(("linux", "linux2")):
self.platform_name = "linux64"
self.exe_name %= ""
if self.platform.endswith("darwin"):
if self.is_old_chromedriver:
self.platform_name = "mac64"
else:
self.platform_name = "mac-x64"
self.exe_name %= ""
if self.platform.startswith("freebsd"):
self.platform_name = "freebsd"
self.exe_name %= ""
def auto(self, executable_path=None, force=False, version_main=None, _=None):
"""
Args:
executable_path:
force:
version_main:
Returns:
"""
p = pathlib.Path(self.data_path)
if self.user_multi_procs:
with Lock():
files = list(p.rglob("*chromedriver*"))
most_recent = max(files, key=lambda f: f.stat().st_mtime)
files.remove(most_recent)
list(map(lambda f: f.unlink(), files))
if self.is_binary_patched(most_recent):
self.executable_path = str(most_recent)
return True
if executable_path:
self.executable_path = executable_path
self._custom_exe_path = True
if self._custom_exe_path:
ispatched = self.is_binary_patched(self.executable_path)
if not ispatched:
return self.patch_exe()
else:
return
if version_main:
self.version_main = version_main
if force is True:
self.force = force
if self.platform_name == "freebsd":
chromedriver_path = shutil.which("chromedriver")
if not os.path.isfile(chromedriver_path) or not os.access(chromedriver_path, os.X_OK):
logging.error("Chromedriver not installed!")
return
version_path = os.path.join(os.path.dirname(self.executable_path), "version.txt")
process = os.popen(f'"{chromedriver_path}" --version')
chromedriver_version = process.read().split(' ')[1].split(' ')[0]
process.close()
current_version = None
if os.path.isfile(version_path) or os.access(version_path, os.X_OK):
with open(version_path, 'r') as f:
current_version = f.read()
if current_version != chromedriver_version:
logging.info("Copying chromedriver executable...")
shutil.copy(chromedriver_path, self.executable_path)
os.chmod(self.executable_path, 0o755)
with open(version_path, 'w') as f:
f.write(chromedriver_version)
logging.info("Chromedriver executable copied!")
else:
try:
os.unlink(self.executable_path)
except PermissionError:
if self.force:
self.force_kill_instances(self.executable_path)
return self.auto(force=not self.force)
try:
if self.is_binary_patched():
# assumes already running AND patched
return True
except PermissionError:
pass
# return False
except FileNotFoundError:
pass
release = self.fetch_release_number()
self.version_main = release.version[0]
self.version_full = release
self.unzip_package(self.fetch_package())
return self.patch()
def driver_binary_in_use(self, path: str = None) -> bool:
"""
naive test to check if a found chromedriver binary is
currently in use
Args:
path: a string or PathLike object to the binary to check.
if not specified, we check use this object's executable_path
"""
if not path:
path = self.executable_path
p = pathlib.Path(path)
if not p.exists():
raise OSError("file does not exist: %s" % p)
try:
with open(p, mode="a+b") as fs:
exc = []
try:
fs.seek(0, 0)
except PermissionError as e:
exc.append(e) # since some systems apprently allow seeking
# we conduct another test
try:
fs.readline()
except PermissionError as e:
exc.append(e)
if exc:
return True
return False
# ok safe to assume this is in use
except Exception as e:
# logger.exception("whoops ", e)
pass
def cleanup_unused_files(self):
p = pathlib.Path(self.data_path)
items = list(p.glob("*undetected*"))
for item in items:
try:
item.unlink()
except:
pass
def patch(self):
self.patch_exe()
return self.is_binary_patched()
def fetch_release_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
:rtype: LooseVersion
"""
# Endpoint for old versions of Chromedriver (114 and below)
if self.is_old_chromedriver:
path = f"/latest_release_{self.version_main}"
path = path.upper()
logger.debug("getting release number from %s" % path)
return LooseVersion(urlopen(self.url_repo + path).read().decode())
# Endpoint for new versions of Chromedriver (115+)
if not self.version_main:
# Fetch the latest version
path = "/last-known-good-versions-with-downloads.json"
logger.debug("getting release number from %s" % path)
with urlopen(self.url_repo + path) as conn:
response = conn.read().decode()
last_versions = json.loads(response)
return LooseVersion(last_versions["channels"]["Stable"]["version"])
# Fetch the latest minor version of the major version provided
path = "/latest-versions-per-milestone-with-downloads.json"
logger.debug("getting release number from %s" % path)
with urlopen(self.url_repo + path) as conn:
response = conn.read().decode()
major_versions = json.loads(response)
return LooseVersion(major_versions["milestones"][str(self.version_main)]["version"])
def parse_exe_version(self):
with io.open(self.executable_path, "rb") as f:
for line in iter(lambda: f.readline(), b""):
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
if match:
return LooseVersion(match[1].decode())
def fetch_package(self):
"""
Downloads ChromeDriver from source
:return: path to downloaded file
"""
zip_name = f"chromedriver_{self.platform_name}.zip"
if self.is_old_chromedriver:
download_url = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, zip_name)
else:
zip_name = zip_name.replace("_", "-", 1)
download_url = "https://storage.googleapis.com/chrome-for-testing-public/%s/%s/%s"
download_url %= (self.version_full.vstring, self.platform_name, zip_name)
logger.debug("downloading from %s" % download_url)
return urlretrieve(download_url)[0]
def unzip_package(self, fp):
"""
Does what it says
:return: path to unpacked executable
"""
exe_path = self.exe_name
if not self.is_old_chromedriver:
# The new chromedriver unzips into its own folder
zip_name = f"chromedriver-{self.platform_name}"
exe_path = os.path.join(zip_name, self.exe_name)
logger.debug("unzipping %s" % fp)
try:
os.unlink(self.zip_path)
except (FileNotFoundError, OSError):
pass
os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
with zipfile.ZipFile(fp, mode="r") as zf:
zf.extractall(self.zip_path)
os.rename(os.path.join(self.zip_path, exe_path), self.executable_path)
os.remove(fp)
shutil.rmtree
os.chmod(self.executable_path, 0o755)
return self.executable_path
@staticmethod
def force_kill_instances(exe_name):
"""
kills running instances.
:param: executable name to kill, may be a path as well
:return: True on success else False
"""
exe_name = os.path.basename(exe_name)
if IS_POSIX:
r = os.system("kill -f -9 $(pidof %s)" % exe_name)
else:
r = os.system("taskkill /f /im %s" % exe_name)
return not r
@staticmethod
def gen_random_cdc():
cdc = random.choices(string.ascii_letters, k=27)
return "".join(cdc).encode()
def is_binary_patched(self, executable_path=None):
executable_path = executable_path or self.executable_path
try:
with io.open(executable_path, "rb") as fh:
return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError:
return False
def patch_exe(self):
start = time.perf_counter()
logger.info("patching driver executable %s" % self.executable_path)
with io.open(self.executable_path, "r+b") as fh:
content = fh.read()
# match_injected_codeblock = re.search(rb"{window.*;}", content)
match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content)
if match_injected_codeblock:
target_bytes = match_injected_codeblock[0]
new_target_bytes = (
b'{console.log("undetected chromedriver 1337!")}'.ljust(
len(target_bytes), b" "
)
)
new_content = content.replace(target_bytes, new_target_bytes)
if new_content == content:
logger.warning(
"something went wrong patching the driver binary. could not find injection code block"
)
else:
logger.debug(
"found block:\n%s\nreplacing with:\n%s"
% (target_bytes, new_target_bytes)
)
fh.seek(0)
fh.write(new_content)
logger.debug(
"patching took us {:.2f} seconds".format(time.perf_counter() - start)
)
def __repr__(self):
return "{0:s}({1:s})".format(
self.__class__.__name__,
self.executable_path,
)
def __del__(self):
if self._custom_exe_path:
# if the driver binary is specified by user
# we assume it is important enough to not delete it
return
else:
timeout = 3 # stop trying after this many seconds
t = time.monotonic()
now = lambda: time.monotonic()
while now() - t > timeout:
# we don't want to wait until the end of time
try:
if self.user_multi_procs:
break
os.unlink(self.executable_path)
logger.debug("successfully unlinked %s" % self.executable_path)
break
except (OSError, RuntimeError, PermissionError):
time.sleep(0.01)
continue
except FileNotFoundError:
break

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import asyncio
import json
import logging
import threading
logger = logging.getLogger(__name__)
class Reactor(threading.Thread):
def __init__(self, driver: "Chrome"):
super().__init__()
self.driver = driver
self.loop = asyncio.new_event_loop()
self.lock = threading.Lock()
self.event = threading.Event()
self.daemon = True
self.handlers = {}
def add_event_handler(self, method_name, callback: callable):
"""
Parameters
----------
event_name: str
example "Network.responseReceived"
callback: callable
callable which accepts 1 parameter: the message object dictionary
Returns
-------
"""
with self.lock:
self.handlers[method_name.lower()] = callback
@property
def running(self):
return not self.event.is_set()
def run(self):
try:
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.listen())
except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True:
with self.lock:
if (
getattr(self.driver, "service", None)
and getattr(self.driver.service, "process", None)
and self.driver.service.process.poll()
):
await asyncio.sleep(self.driver._delay or 0.25)
else:
break
async def listen(self):
while self.running:
await self._wait_service_started()
await asyncio.sleep(1)
try:
with self.lock:
log_entries = self.driver.get_log("performance")
for entry in log_entries:
try:
obj_serialized: str = entry.get("message")
obj = json.loads(obj_serialized)
message = obj.get("message")
method = message.get("method")
if "*" in self.handlers:
await self.loop.run_in_executor(
None, self.handlers["*"], message
)
elif method.lower() in self.handlers:
await self.loop.run_in_executor(
None, self.handlers[method.lower()], message
)
# print(type(message), message)
except Exception as e:
raise e from None
except Exception as e:
if "invalid session id" in str(e):
pass
else:
logging.debug("exception ignored :", e)

View File

@ -1,86 +0,0 @@
from typing import List
from selenium.webdriver.common.by import By
import selenium.webdriver.remote.webelement
class WebElement(selenium.webdriver.remote.webelement.WebElement):
def click_safe(self):
super().click()
self._parent.reconnect(0.1)
def children(
self, tag=None, recursive=False
) -> List[selenium.webdriver.remote.webelement.WebElement]:
"""
returns direct child elements of current element
:param tag: str, if supplied, returns <tag> nodes only
"""
script = "return [... arguments[0].children]"
if tag:
script += ".filter( node => node.tagName === '%s')" % tag.upper()
if recursive:
return list(_recursive_children(self, tag))
return list(self._parent.execute_script(script, self))
class UCWebElement(WebElement):
"""
Custom WebElement class which makes it easier to view elements when
working in an interactive environment.
standard webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
using this WebElement class:
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
"""
def __init__(self, parent, id_):
super().__init__(parent, id_)
self._attrs = None
@property
def attrs(self):
if not self._attrs:
self._attrs = self._parent.execute_script(
"""
var items = {};
for (index = 0; index < arguments[0].attributes.length; ++index)
{
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
};
return items;
""",
self,
)
return self._attrs
def __repr__(self):
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
if strattrs:
strattrs = " " + strattrs
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
def _recursive_children(element, tag: str = None, _results=None):
"""
returns all children of <element> recursively
:param element: `WebElement` object.
find children below this <element>
:param tag: str = None.
if provided, return only <tag> elements. example: 'a', or 'img'
:param _results: do not use!
"""
results = _results or set()
for element in element.children():
if tag:
if element.tag_name == tag:
results.add(element)
else:
results.add(element)
results |= _recursive_children(element, tag, results)
return results

View File

@ -7,16 +7,20 @@ import urllib.parse
import tempfile import tempfile
import sys import sys
from selenium.webdriver.chrome.webdriver import WebDriver from uuid import uuid1
import undetected_chromedriver as uc from pathlib import Path
from tempfile import gettempdir
from DrissionPage import ChromiumPage, ChromiumOptions
# from DrissionPage.common import Settings
FLARESOLVERR_VERSION = None FLARESOLVERR_VERSION = None
PLATFORM_VERSION = None PLATFORM_VERSION = None
CHROME_EXE_PATH = None
CHROME_MAJOR_VERSION = None CHROME_MAJOR_VERSION = None
USER_AGENT = None
XVFB_DISPLAY = None XVFB_DISPLAY = None
PATCHED_DRIVER_PATH = None
CHROME_EXE_PATH = os.environ.get('CHROME_EXE_PATH', None)
USER_AGENT = os.environ.get('USER_AGENT', None)
def get_config_log_html() -> bool: def get_config_log_html() -> bool:
@ -120,102 +124,89 @@ def create_proxy_extension(proxy: dict) -> str:
return proxy_extension_dir return proxy_extension_dir
def get_webdriver_data_path() -> Path:
return Path(gettempdir()) / 'FlareSolverr'
def get_webdriver(proxy: dict = None) -> WebDriver: def get_user_data_path(user_name: str = None) -> Path:
global PATCHED_DRIVER_PATH, USER_AGENT user_name = user_name or str(uuid1())
return get_webdriver_data_path() / user_name
def remove_user_data(user_data_path: Path):
if user_data_path.exists():
shutil.rmtree(user_data_path)
def remove_all_subfolders(parent_folder: str):
if not os.path.exists(parent_folder):
return
for item in os.listdir(parent_folder):
item_path = os.path.join(parent_folder, item)
if os.path.isdir(item_path):
shutil.rmtree(item_path)
def get_webdriver(proxy: dict = None, user_data_path: str = None) -> ChromiumPage:
global CHROME_EXE_PATH, USER_AGENT
logging.debug('Launching web browser...') logging.debug('Launching web browser...')
# undetected_chromedriver # undetected_chromedriver
options = uc.ChromeOptions() options = ChromiumOptions()
options.add_argument('--no-sandbox') options.set_argument('--no-sandbox')
options.add_argument('--window-size=1920,1080') options.set_argument('--window-size=1920,1080')
# todo: this param shows a warning in chrome head-full # todo: this param shows a warning in chrome head-full
options.add_argument('--disable-setuid-sandbox') options.set_argument('--disable-setuid-sandbox')
options.add_argument('--disable-dev-shm-usage') options.set_argument('--disable-dev-shm-usage')
# this option removes the zygote sandbox (it seems that the resolution is a bit faster) # this option removes the zygote sandbox (it seems that the resolution is a bit faster)
options.add_argument('--no-zygote') options.set_argument('--no-zygote')
# attempt to fix Docker ARM32 build # attempt to fix Docker ARM32 build
options.add_argument('--disable-gpu-sandbox') options.set_argument('--disable-gpu-sandbox')
options.add_argument('--disable-software-rasterizer') options.set_argument('--disable-software-rasterizer')
options.add_argument('--ignore-certificate-errors') options.set_argument('--ignore-certificate-errors')
options.add_argument('--ignore-ssl-errors') options.set_argument('--ignore-ssl-errors')
# fix GL errors in ASUSTOR NAS # fix GL errors in ASUSTOR NAS
# https://github.com/FlareSolverr/FlareSolverr/issues/782 # https://github.com/FlareSolverr/FlareSolverr/issues/782
# https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069 # https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069
# https://peter.sh/experiments/chromium-command-line-switches/#use-gl # https://peter.sh/experiments/chromium-command-line-switches/#use-gl
options.add_argument('--use-gl=swiftshader') options.set_argument('--use-gl=swiftshader')
language = os.environ.get('LANG', None) language = os.environ.get('LANG', None)
if language is not None: if language is not None:
options.add_argument('--accept-lang=%s' % language) options.set_argument('--accept-lang=%s' % language)
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910 # Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
if USER_AGENT is not None: if USER_AGENT is not None:
options.add_argument('--user-agent=%s' % USER_AGENT) options.set_argument('--user-agent=%s' % USER_AGENT)
proxy_extension_dir = None proxy_extension_dir = None
if proxy and all(key in proxy for key in ['url', 'username', 'password']): if proxy and all(key in proxy for key in ['url', 'username', 'password']):
proxy_extension_dir = create_proxy_extension(proxy) proxy_extension_dir = create_proxy_extension(proxy)
options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir)) options.add_extension(proxy_extension_dir)
elif proxy and 'url' in proxy: elif proxy and 'url' in proxy:
proxy_url = proxy['url'] proxy_url = proxy['url']
logging.debug("Using webdriver proxy: %s", proxy_url) logging.debug("Using webdriver proxy: %s", proxy_url)
options.add_argument('--proxy-server=%s' % proxy_url) options.set_argument('--proxy-server=%s' % proxy_url)
# note: headless mode is detected (headless = True)
# we launch the browser in head-full mode with the window hidden
windows_headless = False windows_headless = False
if get_config_headless(): if get_config_headless():
if os.name == 'nt': if os.name == 'nt':
windows_headless = True windows_headless = True
else: else:
start_xvfb_display() start_xvfb_display()
# For normal headless mode: options.headless(windows_headless)
# options.add_argument('--headless') options.set_argument("--auto-open-devtools-for-tabs")
options.add_argument("--auto-open-devtools-for-tabs") if user_data_path:
options.set_user_data_path(user_data_path)
options.auto_port(True)
# if we are inside the Docker container, we avoid downloading the driver if CHROME_EXE_PATH is not None:
driver_exe_path = None options.set_paths(browser_path=CHROME_EXE_PATH)
version_main = None
if os.path.exists("/app/chromedriver"):
# running inside Docker
driver_exe_path = "/app/chromedriver"
else:
version_main = get_chrome_major_version()
if PATCHED_DRIVER_PATH is not None:
driver_exe_path = PATCHED_DRIVER_PATH
# detect chrome path
browser_executable_path = get_chrome_exe_path()
# downloads and patches the chromedriver
# if we don't set driver_executable_path it downloads, patches, and deletes the driver each time
try:
driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path,
driver_executable_path=driver_exe_path, version_main=version_main,
windows_headless=windows_headless, headless=get_config_headless())
except Exception as e:
logging.error("Error starting Chrome: %s" % e)
# save the patched driver to avoid re-downloads
if driver_exe_path is None:
PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name)
if PATCHED_DRIVER_PATH != driver.patcher.executable_path:
shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH)
driver = ChromiumPage(addr_or_opts=options)
# clean up proxy extension directory # clean up proxy extension directory
if proxy_extension_dir is not None: if proxy_extension_dir is not None:
shutil.rmtree(proxy_extension_dir) shutil.rmtree(proxy_extension_dir)
# selenium vanilla
# options = webdriver.ChromeOptions()
# options.add_argument('--no-sandbox')
# options.add_argument('--window-size=1920,1080')
# options.add_argument('--disable-setuid-sandbox')
# options.add_argument('--disable-dev-shm-usage')
# driver = webdriver.Chrome(options=options)
return driver return driver
@ -237,10 +228,53 @@ def get_chrome_exe_path() -> str:
CHROME_EXE_PATH = chrome_path CHROME_EXE_PATH = chrome_path
return CHROME_EXE_PATH return CHROME_EXE_PATH
# system # system
CHROME_EXE_PATH = uc.find_chrome_executable() CHROME_EXE_PATH = find_chrome_executable()
return CHROME_EXE_PATH return CHROME_EXE_PATH
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd")):
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in (
"google-chrome",
"chromium",
"chromium-browser",
"chrome",
"google-chrome-stable",
):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
[
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
)
else:
for item in map(
os.environ.get,
("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
):
if item is not None:
for subitem in (
"Google/Chrome/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath(candidate)
def get_chrome_major_version() -> str: def get_chrome_major_version() -> str:
global CHROME_MAJOR_VERSION global CHROME_MAJOR_VERSION
if CHROME_MAJOR_VERSION is not None: if CHROME_MAJOR_VERSION is not None:
@ -298,7 +332,7 @@ def extract_version_nt_folder() -> str:
paths = [f.path for f in os.scandir(path) if f.is_dir()] paths = [f.path for f in os.scandir(path) if f.is_dir()]
for path in paths: for path in paths:
filename = os.path.basename(path) filename = os.path.basename(path)
pattern = '\d+\.\d+\.\d+\.\d+' pattern = r'\d+\.\d+\.\d+\.\d+'
match = re.search(pattern, filename) match = re.search(pattern, filename)
if match and match.group(): if match and match.group():
# Found a Chrome version. # Found a Chrome version.
@ -312,9 +346,10 @@ def get_user_agent(driver=None) -> str:
return USER_AGENT return USER_AGENT
try: try:
user_path = get_user_data_path("DefaultUserAgent")
if driver is None: if driver is None:
driver = get_webdriver() driver = get_webdriver(user_data_path=user_path)
USER_AGENT = driver.execute_script("return navigator.userAgent") USER_AGENT = driver.user_agent
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910 # Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE) USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE)
return USER_AGENT return USER_AGENT
@ -325,6 +360,8 @@ def get_user_agent(driver=None) -> str:
if PLATFORM_VERSION == "nt": if PLATFORM_VERSION == "nt":
driver.close() driver.close()
driver.quit() driver.quit()
if user_path:
remove_user_data(user_path)
def start_xvfb_display(): def start_xvfb_display():