mirror of
https://github.com/tcsenpai/agenticSeek.git
synced 2025-06-06 19:15:28 +00:00
536 lines
22 KiB
Python
536 lines
22 KiB
Python
from selenium import webdriver
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.common.exceptions import TimeoutException, WebDriverException
|
||
from selenium.webdriver.common.action_chains import ActionChains
|
||
from selenium.webdriver.chrome.options import Options
|
||
from typing import List
|
||
import chromedriver_autoinstaller
|
||
import time
|
||
import os
|
||
import shutil
|
||
from bs4 import BeautifulSoup
|
||
import markdownify
|
||
import logging
|
||
import sys
|
||
import re
|
||
from urllib.parse import urlparse
|
||
|
||
class Browser:
|
||
def __init__(self, headless=False, anticaptcha_install=False):
|
||
"""Initialize the browser with optional headless mode."""
|
||
self.headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
||
'Accept-Language': 'en-US,en;q=0.9',
|
||
'Referer': 'https://www.google.com/',
|
||
}
|
||
try:
|
||
chrome_options = Options()
|
||
chrome_path = self.get_chrome_path()
|
||
|
||
if not chrome_path:
|
||
raise FileNotFoundError("Google Chrome not found. Please install it.")
|
||
chrome_options.binary_location = chrome_path
|
||
|
||
if headless:
|
||
chrome_options.add_argument("--headless")
|
||
chrome_options.add_argument("--disable-gpu")
|
||
chrome_options.add_argument("--no-sandbox")
|
||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||
chrome_options.add_argument("--autoplay-policy=user-gesture-required")
|
||
chrome_options.add_argument("--mute-audio")
|
||
chrome_options.add_argument("--disable-webgl")
|
||
chrome_options.add_argument("--disable-notifications")
|
||
security_prefs = {
|
||
"profile.default_content_setting_values.media_stream": 2, # Block webcam/mic
|
||
"profile.default_content_setting_values.notifications": 2, # Block notifications
|
||
"profile.default_content_setting_values.popups": 2, # Block pop-ups
|
||
"profile.default_content_setting_values.geolocation": 2, # Block geolocation
|
||
"download_restrictions": 3, # Block all downloads
|
||
"safebrowsing.enabled": True, # Enable safe browsing
|
||
}
|
||
chrome_options.add_experimental_option("prefs", security_prefs)
|
||
|
||
chromedriver_path = shutil.which("chromedriver") # system installed driver.
|
||
|
||
#If not found, try auto-installing the correct version
|
||
if not chromedriver_path:
|
||
chromedriver_path = chromedriver_autoinstaller.install()
|
||
|
||
if not chromedriver_path:
|
||
raise FileNotFoundError("ChromeDriver not found. Please install it or add it to your PATH.")
|
||
|
||
service = Service(chromedriver_path)
|
||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
self.wait = WebDriverWait(self.driver, 10)
|
||
self.logger = logging.getLogger(__name__)
|
||
self.logger.info("Browser initialized successfully")
|
||
except Exception as e:
|
||
raise Exception(f"Failed to initialize browser: {str(e)}")
|
||
|
||
@staticmethod
|
||
def get_chrome_path() -> str:
|
||
if sys.platform.startswith("win"):
|
||
paths = [
|
||
"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
|
||
"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
|
||
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Google\\Chrome\\Application\\chrome.exe") # User install
|
||
]
|
||
elif sys.platform.startswith("darwin"): # macOS
|
||
paths = ["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||
"/Applications/Google Chrome Beta.app/Contents/MacOS/Google Chrome Beta"]
|
||
else: # Linux
|
||
paths = ["/usr/bin/google-chrome", "/usr/bin/chromium-browser", "/usr/bin/chromium"]
|
||
|
||
for path in paths:
|
||
if os.path.exists(path) and os.access(path, os.X_OK): # Check if executable
|
||
return path
|
||
return None
|
||
|
||
def load_anticatpcha(self):
|
||
# TODO load anticapcha extension from crx file
|
||
pass
|
||
|
||
def go_to(self, url:str) -> bool:
|
||
"""Navigate to a specified URL."""
|
||
try:
|
||
initial_handles = self.driver.window_handles
|
||
self.driver.get(url)
|
||
time.sleep(1)
|
||
self.apply_web_countermeasures()
|
||
self.logger.info(f"Navigated to: {url}")
|
||
return True
|
||
except WebDriverException as e:
|
||
self.logger.error(f"Error navigating to {url}: {str(e)}")
|
||
return False
|
||
|
||
def is_sentence(self, text:str) -> bool:
|
||
"""Check if the text qualifies as a meaningful sentence or contains important error codes."""
|
||
text = text.strip()
|
||
|
||
error_codes = ["404", "403", "500", "502", "503"]
|
||
if any(code in text for code in error_codes):
|
||
return True
|
||
words = re.findall(r'\w+', text, re.UNICODE)
|
||
word_count = len(words)
|
||
has_punctuation = any(text.endswith(p) for p in ['.', ',', ',', '!', '?', '。', '!', '?', '।', '۔'])
|
||
is_long_enough = word_count > 5
|
||
return (word_count >= 5 and (has_punctuation or is_long_enough))
|
||
|
||
def get_text(self) -> str | None:
|
||
"""Get page text and convert it to README (Markdown) format."""
|
||
try:
|
||
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
|
||
|
||
for element in soup(['script', 'style']):
|
||
element.decompose()
|
||
|
||
text = soup.get_text()
|
||
|
||
lines = (line.strip() for line in text.splitlines())
|
||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||
text = "\n".join(chunk for chunk in chunks if chunk and self.is_sentence(chunk))
|
||
#markdown_text = markdownify.markdownify(text, heading_style="ATX")
|
||
return "[Start of page]\n" + text + "\n[End of page]"
|
||
except Exception as e:
|
||
self.logger.error(f"Error getting text: {str(e)}")
|
||
return None
|
||
|
||
def clean_url(self, url:str) -> str:
|
||
"""Clean URL to keep only the part needed for navigation to the page"""
|
||
clean = url.split('#')[0]
|
||
parts = clean.split('?', 1)
|
||
base_url = parts[0]
|
||
if len(parts) > 1:
|
||
query = parts[1]
|
||
essential_params = []
|
||
for param in query.split('&'):
|
||
if param.startswith('_skw=') or param.startswith('q=') or param.startswith('s='):
|
||
essential_params.append(param)
|
||
elif param.startswith('_') or param.startswith('hash=') or param.startswith('itmmeta='):
|
||
break
|
||
if essential_params:
|
||
return f"{base_url}?{'&'.join(essential_params)}"
|
||
return base_url
|
||
|
||
def is_link_valid(self, url:str) -> bool:
|
||
"""Check if a URL is a valid link (page, not related to icon or metadata)."""
|
||
if len(url) > 64:
|
||
return False
|
||
parsed_url = urlparse(url)
|
||
if not parsed_url.scheme or not parsed_url.netloc:
|
||
return False
|
||
if re.search(r'/\d+$', parsed_url.path):
|
||
return False
|
||
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']
|
||
metadata_extensions = ['.ico', '.xml', '.json', '.rss', '.atom']
|
||
for ext in image_extensions + metadata_extensions:
|
||
if url.lower().endswith(ext):
|
||
return False
|
||
return True
|
||
|
||
def get_navigable(self) -> [str]:
|
||
"""Get all navigable links on the current page."""
|
||
try:
|
||
links = []
|
||
elements = self.driver.find_elements(By.TAG_NAME, "a")
|
||
|
||
for element in elements:
|
||
href = element.get_attribute("href")
|
||
if href and href.startswith(("http", "https")):
|
||
links.append({
|
||
"url": href,
|
||
"text": element.text.strip(),
|
||
"is_displayed": element.is_displayed()
|
||
})
|
||
|
||
self.logger.info(f"Found {len(links)} navigable links")
|
||
return [self.clean_url(link['url']) for link in links if (link['is_displayed'] == True and self.is_link_valid(link['url']))]
|
||
except Exception as e:
|
||
self.logger.error(f"Error getting navigable links: {str(e)}")
|
||
return []
|
||
|
||
def click_element(self, xpath: str) -> bool:
|
||
"""Click an element specified by XPath."""
|
||
try:
|
||
element = self.wait.until(
|
||
EC.element_to_be_clickable((By.XPATH, xpath))
|
||
)
|
||
if not element.is_displayed():
|
||
self.logger.error(f"Element at {xpath} is not visible")
|
||
return False
|
||
if not element.is_enabled():
|
||
self.logger.error(f"Element at {xpath} is disabled")
|
||
return False
|
||
|
||
try:
|
||
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element)
|
||
time.sleep(0.1) # Wait for scroll to settle
|
||
element.click()
|
||
self.logger.info(f"Clicked element at {xpath} using standard click")
|
||
return True
|
||
except ElementClickInterceptedException as e:
|
||
self.logger.warning(f"Standard click intercepted for {xpath}: {str(e)}")
|
||
try:
|
||
self.driver.execute_script("arguments[0].click();", element)
|
||
self.logger.info(f"Clicked element at {xpath} using JavaScript click")
|
||
time.sleep(0.1)
|
||
return True
|
||
except Exception as js_e:
|
||
self.logger.error(f"JavaScript click failed for {xpath}: {str(js_e)}")
|
||
return False
|
||
except TimeoutException:
|
||
self.logger.error(f"Element not found or not clickable within timeout: {xpath}")
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}")
|
||
return False
|
||
|
||
def get_form_inputs(self) -> [str]:
|
||
"""Extract all input from the page and return them."""
|
||
try:
|
||
input_elements = self.driver.find_elements(By.TAG_NAME, "input")
|
||
if not input_elements:
|
||
return "No input forms found on the page."
|
||
|
||
form_strings = []
|
||
for element in input_elements:
|
||
input_type = element.get_attribute("type") or "text"
|
||
if input_type in ["hidden", "submit", "button", "image"] or not element.is_displayed():
|
||
continue
|
||
input_name = element.get_attribute("name") or element.get_attribute("id") or input_type
|
||
current_value = element.get_attribute("value") or ""
|
||
placeholder = element.get_attribute("placeholder") or ""
|
||
if input_type == "checkbox" or input_type == "radio":
|
||
checked_status = "checked" if element.is_selected() else "unchecked"
|
||
form_strings.append(f"[{input_name}]({checked_status})")
|
||
else:
|
||
display_value = f"{placeholder}" if placeholder and not current_value else f"{current_value}"
|
||
form_strings.append(f"[{input_name}]({display_value})")
|
||
return form_strings
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error extracting form inputs: {str(e)}")
|
||
return f"Error extracting form inputs: {str(e)}"
|
||
|
||
def find_input_xpath_by_name(self, name:str) -> str | None:
|
||
"""Find the XPath of an input element given its name or id."""
|
||
try:
|
||
xpaths = [
|
||
f"//input[@name='{name}']",
|
||
f"//input[@id='{name}']",
|
||
f"//input[@placeholder='{name}']",
|
||
f"//input[@aria-label='{name}']",
|
||
f"//label[contains(text(), '{name}')]//following::input[1]"
|
||
]
|
||
for xpath in xpaths:
|
||
try:
|
||
element = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
|
||
if element.is_displayed() and element.is_enabled():
|
||
return xpath
|
||
except:
|
||
continue
|
||
self.logger.warning(f"No visible input found for name: {name}")
|
||
return None
|
||
except Exception as e:
|
||
self.logger.error(f"Error finding input XPath for {name}: {str(e)}")
|
||
return None
|
||
|
||
def get_buttons_xpath(self):
|
||
"""
|
||
Find buttons and return their type and xpath.
|
||
"""
|
||
buttons = self.driver.find_elements(By.TAG_NAME, "button") + \
|
||
self.driver.find_elements(By.XPATH, "//input[@type='submit']")
|
||
result = []
|
||
for i, button in enumerate(buttons):
|
||
if not button.is_displayed() or not button.is_enabled():
|
||
continue
|
||
text = (button.text or button.get_attribute("value") or "").lower().replace(' ', '')
|
||
xpath = f"(//button | //input[@type='submit'])[{i + 1}]"
|
||
if "login" in text or "sign" in text or "register":
|
||
result.append((text, xpath))
|
||
result.sort(key=lambda x: len(x[0]))
|
||
return result
|
||
|
||
def find_and_click_submit(self, btn_type:str = 'login') -> None:
|
||
buttons = self.get_buttons_xpath()
|
||
print(f"Found buttons:", buttons)
|
||
for button in buttons:
|
||
if button[0] == btn_type:
|
||
print("clicking button:", button[0])
|
||
self.click_element(button[1])
|
||
|
||
def fill_form_inputs(self, input_list:[str]) -> bool:
|
||
"""Fill form inputs based on a list of [name](value) strings."""
|
||
try:
|
||
for input_str in input_list:
|
||
match = re.match(r'\[(.*?)\]\((.*?)\)', input_str)
|
||
if not match:
|
||
self.logger.warning(f"Invalid format for input: {input_str}")
|
||
continue
|
||
|
||
name, value = match.groups()
|
||
name = name.strip()
|
||
value = value.strip()
|
||
xpath = self.find_input_xpath_by_name(name)
|
||
if not xpath:
|
||
self.logger.warning(f"Skipping {name} - element not found")
|
||
continue
|
||
element = self.driver.find_element(By.XPATH, xpath)
|
||
input_type = (element.get_attribute("type") or "text").lower()
|
||
if input_type in ["checkbox", "radio"]:
|
||
is_checked = element.is_selected()
|
||
should_be_checked = value.lower() == "checked"
|
||
|
||
if is_checked != should_be_checked:
|
||
element.click()
|
||
self.logger.info(f"Set {name} to {value}")
|
||
else:
|
||
element.clear()
|
||
element.send_keys(value)
|
||
self.logger.info(f"Filled {name} with {value}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Error filling form inputs: {str(e)}")
|
||
return False
|
||
|
||
|
||
def get_current_url(self) -> str:
|
||
"""Get the current URL of the page."""
|
||
return self.driver.current_url
|
||
|
||
def get_page_title(self) -> str:
|
||
"""Get the title of the current page."""
|
||
return self.driver.title
|
||
|
||
def scroll_bottom(self) -> bool:
|
||
"""Scroll to the bottom of the page."""
|
||
try:
|
||
self.driver.execute_script(
|
||
"window.scrollTo(0, document.body.scrollHeight);"
|
||
)
|
||
time.sleep(1) # Wait for scroll to complete
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Error scrolling: {str(e)}")
|
||
return False
|
||
|
||
def screenshot(self, filename:str) -> bool:
|
||
"""Take a screenshot of the current page."""
|
||
try:
|
||
self.driver.save_screenshot(filename)
|
||
self.logger.info(f"Screenshot saved as {filename}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Error taking screenshot: {str(e)}")
|
||
return False
|
||
|
||
#######################
|
||
# WEB SECURITY #
|
||
#######################
|
||
|
||
def apply_web_countermeasures(self):
|
||
"""
|
||
Apply security measures to block any website malicious execution, privacy violation etc..
|
||
"""
|
||
self.inject_safety_script()
|
||
self.neutralize_event_listeners()
|
||
self.monitor_and_reset_css()
|
||
self.block_clipboard_access()
|
||
self.limit_intervals_and_timeouts()
|
||
self.block_external_requests()
|
||
self.monitor_and_close_popups()
|
||
|
||
def inject_safety_script(self):
|
||
script = """
|
||
// Block hardware access by removing or disabling APIs
|
||
Object.defineProperty(navigator, 'serial', { get: () => undefined });
|
||
Object.defineProperty(navigator, 'hid', { get: () => undefined });
|
||
Object.defineProperty(navigator, 'bluetooth', { get: () => undefined });
|
||
// Block media playback
|
||
HTMLMediaElement.prototype.play = function() {
|
||
this.pause(); // Immediately pause if play is called
|
||
return Promise.reject('Blocked by script');
|
||
};
|
||
// Block fullscreen requests
|
||
Element.prototype.requestFullscreen = function() {
|
||
console.log('Blocked fullscreen request');
|
||
return Promise.reject('Blocked by script');
|
||
};
|
||
// Block pointer lock
|
||
Element.prototype.requestPointerLock = function() {
|
||
console.log('Blocked pointer lock');
|
||
};
|
||
// Block iframe creation (optional, since browser already blocks these)
|
||
const originalCreateElement = document.createElement;
|
||
document.createElement = function(tagName) {
|
||
if (tagName.toLowerCase() === 'iframe') {
|
||
console.log('Blocked iframe creation');
|
||
return null;
|
||
}
|
||
return originalCreateElement.apply(this, arguments);
|
||
};
|
||
// Block annoying dialogs
|
||
window.alert = function() {};
|
||
window.confirm = function() { return false; };
|
||
window.prompt = function() { return null; };
|
||
"""
|
||
self.driver.execute_script(script)
|
||
|
||
def neutralize_event_listeners(self):
|
||
script = """
|
||
const originalAddEventListener = EventTarget.prototype.addEventListener;
|
||
EventTarget.prototype.addEventListener = function(type, listener, options) {
|
||
if (['mousedown', 'mouseup', 'click', 'touchstart', 'keydown', 'keyup', 'keypress'].includes(type)) {
|
||
console.log(`Blocked adding listener for ${type}`);
|
||
return;
|
||
}
|
||
originalAddEventListener.apply(this, arguments);
|
||
};
|
||
"""
|
||
self.driver.execute_script(script)
|
||
|
||
def monitor_and_reset_css(self):
|
||
script = """
|
||
const observer = new MutationObserver((mutations) => {
|
||
mutations.forEach((mutation) => {
|
||
if (mutation.type === 'attributes' && mutation.attributeName === 'style') {
|
||
const html = document.querySelector('html');
|
||
if (html.style.cursor === 'none') {
|
||
html.style.cursor = 'auto';
|
||
}
|
||
}
|
||
});
|
||
});
|
||
observer.observe(document.querySelector('html'), { attributes: true });
|
||
"""
|
||
self.driver.execute_script(script)
|
||
|
||
def block_clipboard_access(self):
|
||
script = """
|
||
navigator.clipboard.readText = function() {
|
||
console.log('Blocked clipboard read');
|
||
return Promise.reject('Blocked');
|
||
};
|
||
navigator.clipboard.writeText = function() {
|
||
console.log('Blocked clipboard write');
|
||
return Promise.resolve();
|
||
};
|
||
"""
|
||
self.driver.execute_script(script)
|
||
|
||
def limit_intervals_and_timeouts(self):
|
||
script = """
|
||
const originalSetInterval = window.setInterval;
|
||
window.setInterval = function(callback, delay) {
|
||
if (typeof callback === 'function' && callback.toString().includes('alert')) {
|
||
console.log('Blocked suspicious interval');
|
||
return;
|
||
}
|
||
return originalSetInterval.apply(this, arguments);
|
||
};
|
||
"""
|
||
self.driver.execute_script(script)
|
||
|
||
def monitor_and_close_popups(self):
|
||
initial_handles = self.driver.window_handles
|
||
for handle in self.driver.window_handles:
|
||
if handle not in initial_handles:
|
||
self.driver.switch_to.window(handle)
|
||
self.driver.close()
|
||
self.driver.switch_to.window(self.driver.window_handles[0])
|
||
|
||
def block_external_requests(self):
|
||
script = """
|
||
window.fetch = function() {
|
||
console.log('Blocked fetch request');
|
||
return Promise.reject('Blocked');
|
||
};
|
||
"""
|
||
self.driver.execute_script(script)
|
||
|
||
def close(self):
|
||
"""Close the browser."""
|
||
try:
|
||
self.driver.quit()
|
||
self.logger.info("Browser closed")
|
||
except Exception as e:
|
||
raise e
|
||
|
||
def __del__(self):
|
||
"""Destructor to ensure browser is closed."""
|
||
self.close()
|
||
|
||
if __name__ == "__main__":
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
browser = Browser(headless=False)
|
||
|
||
try:
|
||
# stress test
|
||
browser.load_anticatpcha()
|
||
browser.go_to("https://stackoverflow.com/users/login")
|
||
text = browser.get_text()
|
||
print("Page Text in Markdown:")
|
||
print(text)
|
||
links = browser.get_navigable()
|
||
print("\nNavigable Links:", links)
|
||
inputs = browser.get_form_inputs()
|
||
print("\nInputs:")
|
||
print(inputs)
|
||
inputs = ['[q]()', '[email](mlg.fcu@gmail.com)', '[password](hello123)']
|
||
browser.fill_form_inputs(inputs)
|
||
browser.find_and_click_submit()
|
||
time.sleep(10)
|
||
#print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s")
|
||
#time.sleep(20)
|
||
#browser.go_to("https://theannoyingsite.com/")
|
||
#time.sleep(15)
|
||
finally:
|
||
browser.close()
|