Update flaresolverr_service.py

This commit is contained in:
ilike2burnthing 2024-07-29 21:00:35 +01:00 committed by GitHub
parent eb680efc90
commit af274a2485
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,13 +7,8 @@ from html import escape
from urllib.parse import unquote, quote from urllib.parse import unquote, quote
from func_timeout import FunctionTimedOut, func_timeout from func_timeout import FunctionTimedOut, func_timeout
from selenium.common import TimeoutException from DrissionPage import ChromiumPage
from selenium.webdriver.chrome.webdriver import WebDriver from DrissionPage._units.listener import DataPacket
from selenium.webdriver.common.by import By
from selenium.webdriver.support.expected_conditions import (
presence_of_element_located, staleness_of, title_is)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
import utils import utils
from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT, from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT,
@ -251,178 +246,133 @@ def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT:
driver.quit() driver.quit()
logging.debug('A used instance of webdriver has been destroyed') logging.debug('A used instance of webdriver has been destroyed')
def click_verify(driver: ChromiumPage) -> DataPacket:
def click_verify(driver: WebDriver):
try: try:
logging.debug("Try to find the Cloudflare verify checkbox...") bde = (
iframe = driver.find_element(By.XPATH, "//iframe[starts-with(@id, 'cf-chl-widget-')]") driver
driver.switch_to.frame(iframe) .ele("@Style=border: 0px; margin: 0px; padding: 0px;", timeout=10)
checkbox = driver.find_element( .shadow_root
by=By.XPATH, .ele("tag:iframe", timeout=10)
value='//*[@id="content"]/div/div/label/input', .ele('tag:body', timeout=10)
.shadow_root
) )
if checkbox: ve = bde.ele("text:Verify you are human", timeout=10)
actions = ActionChains(driver)
actions.move_to_element_with_offset(checkbox, 5, 7)
actions.click(checkbox)
actions.perform()
logging.debug("Cloudflare verify checkbox found and clicked!")
except Exception:
logging.debug("Cloudflare verify checkbox not found on the page.")
finally:
driver.switch_to.default_content()
try: driver.listen.resume()
logging.debug("Try to find the Cloudflare 'Verify you are human' button...") ve.click()
button = driver.find_element( data = driver.listen.wait(count=1,timeout=5)
by=By.XPATH,
value="//input[@type='button' and @value='Verify you are human']",
)
if button:
actions = ActionChains(driver)
actions.move_to_element_with_offset(button, 5, 7)
actions.click(button)
actions.perform()
logging.debug("The Cloudflare 'Verify you are human' button found and clicked!")
except Exception:
logging.debug("The Cloudflare 'Verify you are human' button not found on the page.")
time.sleep(2) if isinstance(data, DataPacket):
return data
return None
except Exception as e:
logging.debug("Cloudflare verify checkbox not found on the page. %s", repr(e))
def get_correct_window(driver: WebDriver) -> WebDriver: def search_challenge(driver: ChromiumPage) -> bool:
if len(driver.window_handles) > 1: page_title = driver.title.lower()
for window_handle in driver.window_handles:
driver.switch_to.window(window_handle) # find challenge by title
current_url = driver.current_url for title in CHALLENGE_TITLES:
if not current_url.startswith("devtools://devtools"): if title.lower() == page_title:
return driver logging.debug("Challenge detected. Title found: %s", page_title)
return driver return True
# find challenge by selectors
if driver.wait.eles_loaded(locators=CHALLENGE_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
logging.debug("Challenge detected. One of selectors found")
return True
return False
def access_page(driver: WebDriver, url: str) -> None: def _evil_logic(req: V1RequestBase, driver: ChromiumPage, method: str) -> ChallengeResolutionT:
driver.get(url)
driver.start_session()
driver.start_session() # required to bypass Cloudflare
def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> ChallengeResolutionT:
res = ChallengeResolutionT({}) res = ChallengeResolutionT({})
res.status = STATUS_OK res.status = STATUS_OK
res.message = "" res.message = ""
# navigate to the page # navigate to the page
logging.debug(f'Navigating to... {req.url}') logging.debug('Navigating to... %s', req.url)
driver.listen.start(req.url)
if method == 'POST': if method == 'POST':
_post_request(req, driver) _post_request(req, driver)
else: else:
access_page(driver, req.url) driver.get(req.url)
driver = get_correct_window(driver) data = driver.listen.wait(count=1,timeout=5)
driver.listen.pause()
# set cookies if required # set cookies if required
if req.cookies is not None and len(req.cookies) > 0: if req.cookies is not None and len(req.cookies) > 0:
logging.debug(f'Setting cookies...') logging.debug('Setting cookies...')
for cookie in req.cookies: for cookie in req.cookies:
driver.delete_cookie(cookie['name']) driver.set.cookies.remove(cookie['name'])
driver.add_cookie(cookie) driver.set.cookies(cookie)
# reload the page # reload the page
driver.listen.resume()
if method == 'POST': if method == 'POST':
_post_request(req, driver) _post_request(req, driver)
else: else:
access_page(driver, req.url) driver.get(req.url)
driver = get_correct_window(driver) data = driver.listen.wait(count=1, timeout=5)
driver.listen.pause()
# wait for the page # wait for the page
if utils.get_config_log_html(): if utils.get_config_log_html():
logging.debug(f"Response HTML:\n{driver.page_source}") logging.debug("Response HTML:\n%s", driver.page_source)
html_element = driver.find_element(By.TAG_NAME, "html")
page_title = driver.title
page_title = driver.title
# find access denied titles # find access denied titles
for title in ACCESS_DENIED_TITLES: for title in ACCESS_DENIED_TITLES:
if title == page_title: if title == page_title:
raise Exception('Cloudflare has blocked this request. ' raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.') 'Probably your IP is banned for this site, check in your web browser.')
# find access denied selectors # find access denied selectors
for selector in ACCESS_DENIED_SELECTORS: if driver.wait.eles_loaded(locators=ACCESS_DENIED_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
raise Exception('Cloudflare has blocked this request. ' raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.') 'Probably your IP is banned for this site, check in your web browser.')
# find challenge by title
challenge_found = False
for title in CHALLENGE_TITLES:
if title.lower() == page_title.lower():
challenge_found = True
logging.info("Challenge detected. Title found: " + page_title)
break
if not challenge_found:
# find challenge by selectors
for selector in CHALLENGE_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
challenge_found = True
logging.info("Challenge detected. Selector found: " + selector)
break
attempt = 0 attempt = 0
if challenge_found: challenge_found = True
while True: while challenge_found:
try: try:
attempt = attempt + 1 attempt += 1
# wait until the title changes
for title in CHALLENGE_TITLES:
logging.debug("Waiting for title (attempt " + str(attempt) + "): " + title)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title))
# then wait until all the selectors disappear if search_challenge(driver):
for selector in CHALLENGE_SELECTORS: if attempt == 1:
logging.debug("Waiting for selector (attempt " + str(attempt) + "): " + selector) logging.info("Challenge detected.")
WebDriverWait(driver, SHORT_TIMEOUT).until_not(
presence_of_element_located((By.CSS_SELECTOR, selector)))
# all elements not found data = click_verify(driver)
break
except TimeoutException:
logging.debug("Timeout waiting for selector")
click_verify(driver)
# update the html (cloudflare reloads the page every 5 s)
html_element = driver.find_element(By.TAG_NAME, "html")
# waits until cloudflare redirection ends
logging.debug("Waiting for redirect")
# noinspection PyBroadException
try:
WebDriverWait(driver, SHORT_TIMEOUT).until(staleness_of(html_element))
except Exception:
logging.debug("Timeout waiting for redirect")
logging.info("Challenge solved!")
res.message = "Challenge solved!"
else: else:
if attempt == 1:
logging.info("Challenge not detected!") logging.info("Challenge not detected!")
res.message = "Challenge not detected!" res.message = "Challenge not detected!"
else:
logging.info("Challenge solved!")
res.message = "Challenge solved!"
break
except Exception as e:
logging.debug("Cloudflare check exception")
raise e
challenge_res = ChallengeResolutionResultT({}) challenge_res = ChallengeResolutionResultT({})
challenge_res.url = driver.current_url challenge_res.url = driver.url
challenge_res.status = 200 # todo: fix, selenium not provides this info if data is not None and data.response is not None:
challenge_res.cookies = driver.get_cookies() challenge_res.status = data.response.status
challenge_res.userAgent = utils.get_user_agent(driver)
if not req.returnOnlyCookies: if not req.returnOnlyCookies:
challenge_res.headers = {} # todo: fix, selenium not provides this info challenge_res.response = data.response.body
challenge_res.response = driver.page_source challenge_res.headers = data.response.headers.copy()
challenge_res.cookies = driver.cookies()
challenge_res.userAgent = utils.get_user_agent(driver)
res.result = challenge_res res.result = challenge_res
return res return res
def _post_request(req: V1RequestBase, driver: WebDriver): def _post_request(req: V1RequestBase, driver: ChromiumPage):
post_form = f'<form id="hackForm" action="{req.url}" method="POST">' post_form = f'<form id="hackForm" action="{req.url}" method="POST">'
query_string = req.postData if req.postData[0] != '?' else req.postData[1:] query_string = req.postData if req.postData[0] != '?' else req.postData[1:]
pairs = query_string.split('&') pairs = query_string.split('&')
@ -451,5 +401,3 @@ def _post_request(req: V1RequestBase, driver: WebDriver):
</body> </body>
</html>""" </html>"""
driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content)) driver.get("data:text/html;charset=utf-8,{html_content}".format(html_content=html_content))
driver.start_session()
driver.start_session() # required to bypass Cloudflare