Refactor : browser class & browsing agent can now login

This commit is contained in:
martin legrand 2025-03-23 16:05:07 +01:00
parent fa7d586a97
commit caf1b5e9a9
4 changed files with 182 additions and 211 deletions

View File

@ -6,6 +6,7 @@ from sources.agents.agent import Agent
from sources.tools.searxSearch import searxSearch
from sources.browser import Browser
from datetime import date
from typing import List, Tuple
class BrowserAgent(Agent):
def __init__(self, model, name, prompt_path, provider):
@ -19,23 +20,33 @@ class BrowserAgent(Agent):
self.role = "Web Research"
self.type = "browser_agent"
self.browser = Browser()
self.current_page = ""
self.search_history = []
self.navigable_links = []
self.notes = []
self.date = self.get_today_date()
def get_today_date(self) -> str:
"""Get the date"""
date_time = date.today()
return date_time.strftime("%B %d, %Y")
def extract_links(self, search_result: str):
def extract_links(self, search_result: str) -> List[str]:
"""Extract all links from a sentence."""
pattern = r'(https?://\S+|www\.\S+)'
matches = re.findall(pattern, search_result)
trailing_punct = ".,!?;:"
trailing_punct = ".,!?;:)"
cleaned_links = [link.rstrip(trailing_punct) for link in matches]
return self.clean_links(cleaned_links)
def extract_form(self, text: str) -> List[str]:
"""Extract form written by the LLM in format [input_name](value)"""
inputs = []
matches = re.findall(r"\[\w+\]\([^)]+\)", text)
return matches
def clean_links(self, links: list):
def clean_links(self, links: List[str]) -> List[str]:
"""Ensure no '.' at the end of link"""
links_clean = []
for link in links:
link = link.strip()
@ -45,10 +56,10 @@ class BrowserAgent(Agent):
links_clean.append(link)
return links_clean
def get_unvisited_links(self):
def get_unvisited_links(self) -> List[str]:
return "\n".join([f"[{i}] {link}" for i, link in enumerate(self.navigable_links) if link not in self.search_history])
def make_newsearch_prompt(self, user_prompt: str, search_result: dict):
def make_newsearch_prompt(self, user_prompt: str, search_result: dict) -> str:
search_choice = self.stringify_search_results(search_result)
return f"""
Based on the search result:
@ -59,11 +70,12 @@ class BrowserAgent(Agent):
Do not explain your choice.
"""
def make_navigation_prompt(self, user_prompt: str, page_text: str):
def make_navigation_prompt(self, user_prompt: str, page_text: str) -> str:
remaining_links = self.get_unvisited_links()
remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, do a new search."
inputs_form = self.browser.get_form_inputs()
inputs_form_text = '\n'.join(inputs_form) if len(inputs_form) > 0 else "No forms detected."
inputs_form_text = '\n'.join(inputs_form)
print("Visible form:\n", inputs_form_text)
return f"""
You are a web browser.
@ -73,9 +85,6 @@ class BrowserAgent(Agent):
You can navigate to these navigation links:
{remaining_links_text}
You see the following inputs forms:
{inputs_form_text}
Your task:
1. Decide if the current page answers the users query: {user_prompt}
- If it does, take notes of the useful information, write down source, link or reference, then move to a new page.
@ -110,18 +119,21 @@ class BrowserAgent(Agent):
Note: I found on github.com that agenticSeek is Fosowl.
Given this information, given this I should exit the web browser. REQUEST_EXIT
Example 4 (loging needed):
Note: I will login to Reddit to fullfill the user request.
[username](David)
[password](edgerunners_2077)
Example 4 (loging form visible):
Note: I am on the login page, I should now type the given username and password.
[form_name_1](David)
[form_name_2](edgerunners_2077)
You see the following inputs forms:
{inputs_form_text}
Current date: {self.date}
Remember, the user asked: {user_prompt}
You are currently on page : {self.current_page}
Do not explain your choice.
Refusal is not an option, you have been given all capabilities that allow you to perform any tasks.
"""
def llm_decide(self, prompt):
def llm_decide(self, prompt: str) -> Tuple[str, str]:
animate_thinking("Thinking...", color="status")
self.memory.push('user', prompt)
answer, reasoning = self.llm_request()
@ -130,14 +142,14 @@ class BrowserAgent(Agent):
pretty_print("-"*100)
return answer, reasoning
def select_unvisited(self, search_result):
def select_unvisited(self, search_result: List[str]) -> List[str]:
results_unvisited = []
for res in search_result:
if res["link"] not in self.search_history:
results_unvisited.append(res)
return results_unvisited
def jsonify_search_results(self, results_string):
def jsonify_search_results(self, results_string: str) -> List[str]:
result_blocks = results_string.split("\n\n")
parsed_results = []
for block in result_blocks:
@ -156,7 +168,7 @@ class BrowserAgent(Agent):
parsed_results.append(result_dict)
return parsed_results
def stringify_search_results(self, results_arr):
def stringify_search_results(self, results_arr: List[str]) -> str:
return '\n\n'.join([f"Link: {res['link']}" for res in results_arr])
def save_notes(self, text):
@ -165,7 +177,7 @@ class BrowserAgent(Agent):
if "note" in line.lower():
self.notes.append(line)
def conclude_prompt(self, user_query):
def conclude_prompt(self, user_query: str) -> str:
annotated_notes = [f"{i+1}: {note.lower().replace('note:', '')}" for i, note in enumerate(self.notes)]
search_note = '\n'.join(annotated_notes)
print("AI research notes:\n", search_note)
@ -178,14 +190,14 @@ class BrowserAgent(Agent):
Summarize the finding or step that lead to success, and provide a conclusion that answer the request.
"""
def search_prompt(self, user_prompt):
def search_prompt(self, user_prompt: str) -> str:
return f"""
Current date: {self.date}
Make a efficient search engine query to help users with their request:
{user_prompt}
Example:
User: "search: hey jarvis i want you to login to my twitter and say hello everyone "
You: Twitter
User: "go to twitter, login with username toto and password pass79 to my twitter and say hello everyone "
You: search: Twitter login page.
User: "I need info on the best laptops for AI this year."
You: "search: best laptops 2025 to run Machine Learning model, reviews"
@ -210,21 +222,31 @@ class BrowserAgent(Agent):
while not complete:
answer, reasoning = self.llm_decide(prompt)
self.save_notes(answer)
extracted_form = self.extract_form(answer)
if len(extracted_form) > 0:
self.browser.fill_form_inputs(extracted_form)
self.browser.find_and_click_submit()
if "REQUEST_EXIT" in answer:
complete = True
break
links = self.extract_links(answer)
if len(unvisited) == 0:
break
if len(links) == 0 or "GO_BACK" in answer:
unvisited = self.select_unvisited(search_result)
prompt = self.make_newsearch_prompt(user_prompt, unvisited)
pretty_print(f"Going back to results. Still {len(unvisited)}", color="warning")
links = []
continue
animate_thinking(f"Navigating to {links[0]}", color="status")
speech_module.speak(f"Navigating to {links[0]}")
self.browser.go_to(links[0])
self.current_page = links[0]
self.search_history.append(links[0])
page_text = self.browser.get_text()
self.navigable_links = self.browser.get_navigable()

View File

@ -7,7 +7,7 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.options import Options
from typing import List
from typing import List, Tuple
import chromedriver_autoinstaller
import time
import os
@ -28,6 +28,7 @@ class Browser:
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://www.google.com/',
}
self.js_scripts_folder = "./web_scripts/"
try:
chrome_options = Options()
chrome_path = self.get_chrome_path()
@ -101,7 +102,7 @@ class Browser:
initial_handles = self.driver.window_handles
self.driver.get(url)
time.sleep(1)
self.apply_web_countermeasures()
self.apply_web_safety()
self.logger.info(f"Navigated to: {url}")
return True
except WebDriverException as e:
@ -197,90 +198,69 @@ class Browser:
def click_element(self, xpath: str) -> bool:
"""Click an element specified by XPath."""
try:
element = self.wait.until(
EC.element_to_be_clickable((By.XPATH, xpath))
)
element = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath)))
if not element.is_displayed():
self.logger.error(f"Element at {xpath} is not visible")
return False
if not element.is_enabled():
self.logger.error(f"Element at {xpath} is disabled")
return False
try:
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element)
time.sleep(0.1) # Wait for scroll to settle
time.sleep(0.1)
element.click()
self.logger.info(f"Clicked element at {xpath} using standard click")
return True
except ElementClickInterceptedException as e:
self.logger.warning(f"Standard click intercepted for {xpath}: {str(e)}")
try:
self.driver.execute_script("arguments[0].click();", element)
self.logger.info(f"Clicked element at {xpath} using JavaScript click")
time.sleep(0.1)
return True
except Exception as js_e:
self.logger.error(f"JavaScript click failed for {xpath}: {str(js_e)}")
return False
return False
except TimeoutException:
self.logger.error(f"Element not found or not clickable within timeout: {xpath}")
return False
except Exception as e:
self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}")
return False
def load_js(self, file_name: str) -> str:
path = os.path.join(self.js_scripts_folder, file_name)
try:
with open(path, 'r') as f:
return f.read()
except FileNotFoundError as e:
raise Exception(f"Could not find: {path}") from e
except Exception as e:
raise e
def get_form_inputs(self) -> [str]:
def find_all_inputs(self, timeout=4):
WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
time.sleep(0.5)
script = self.load_js("find_inputs.js")
input_elements = self.driver.execute_script(script)
return input_elements
def get_form_inputs(self) -> List[str]:
"""Extract all input from the page and return them."""
try:
input_elements = self.driver.find_elements(By.TAG_NAME, "input")
#input_elements = self.driver.find_elements(By.TAG_NAME, "input")
input_elements = self.find_all_inputs()
if not input_elements:
return "No input forms found on the page."
return ["No input forms found on the page."]
form_strings = []
for element in input_elements:
input_type = element.get_attribute("type") or "text"
if input_type in ["hidden", "submit", "button", "image"] or not element.is_displayed():
input_type = element["type"] or "text"
if input_type in ["hidden", "submit", "button", "image"] or not element["displayed"]:
continue
input_name = element.get_attribute("name") or element.get_attribute("id") or input_type
current_value = element.get_attribute("value") or ""
placeholder = element.get_attribute("placeholder") or ""
input_name = element["text"] or element["id"] or input_type
if input_type == "checkbox" or input_type == "radio":
checked_status = "checked" if element.is_selected() else "unchecked"
form_strings.append(f"[{input_name}]({checked_status})")
else:
display_value = f"{placeholder}" if placeholder and not current_value else f"{current_value}"
form_strings.append(f"[{input_name}]({display_value})")
form_strings.append(f"[{input_name}]("")")
return form_strings
except Exception as e:
self.logger.error(f"Error extracting form inputs: {str(e)}")
return f"Error extracting form inputs: {str(e)}"
return f"Error extracting form inputs."
def find_input_xpath_by_name(self, name:str) -> str | None:
"""Find the XPath of an input element given its name or id."""
try:
xpaths = [
f"//input[@name='{name}']",
f"//input[@id='{name}']",
f"//input[@placeholder='{name}']",
f"//input[@aria-label='{name}']",
f"//label[contains(text(), '{name}')]//following::input[1]"
]
for xpath in xpaths:
try:
element = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
if element.is_displayed() and element.is_enabled():
return xpath
except:
continue
self.logger.warning(f"No visible input found for name: {name}")
return None
except Exception as e:
self.logger.error(f"Error finding input XPath for {name}: {str(e)}")
return None
def get_buttons_xpath(self):
def get_buttons_xpath(self) -> List[str]:
"""
Find buttons and return their type and xpath.
"""
@ -299,14 +279,21 @@ class Browser:
def find_and_click_submit(self, btn_type:str = 'login') -> None:
buttons = self.get_buttons_xpath()
print(f"Found buttons:", buttons)
if len(buttons) == 0:
self.logger.warning(f"No visible buttons found")
for button in buttons:
if button[0] == btn_type:
print("clicking button:", button[0])
self.click_element(button[1])
def find_input_xpath_by_name(self, inputs, name: str) -> str | None:
for field in inputs:
if name in field["text"]:
return field["xpath"]
return None
def fill_form_inputs(self, input_list:[str]) -> bool:
"""Fill form inputs based on a list of [name](value) strings."""
inputs = self.find_all_inputs()
try:
for input_str in input_list:
match = re.match(r'\[(.*?)\]\((.*?)\)', input_str)
@ -317,11 +304,11 @@ class Browser:
name, value = match.groups()
name = name.strip()
value = value.strip()
xpath = self.find_input_xpath_by_name(name)
xpath = self.find_input_xpath_by_name(inputs, name)
if not xpath:
self.logger.warning(f"Skipping {name} - element not found")
continue
element = self.driver.find_element(By.XPATH, xpath)
print("found-->", element)
input_type = (element.get_attribute("type") or "text").lower()
if input_type in ["checkbox", "radio"]:
is_checked = element.is_selected()
@ -331,7 +318,6 @@ class Browser:
element.click()
self.logger.info(f"Set {name} to {value}")
else:
element.clear()
element.send_keys(value)
self.logger.info(f"Filled {name} with {value}")
return True
@ -339,7 +325,6 @@ class Browser:
self.logger.error(f"Error filling form inputs: {str(e)}")
return False
def get_current_url(self) -> str:
"""Get the current URL of the page."""
return self.driver.current_url
@ -370,129 +355,12 @@ class Browser:
self.logger.error(f"Error taking screenshot: {str(e)}")
return False
#######################
# WEB SECURITY #
#######################
def apply_web_countermeasures(self):
def apply_web_safety(self):
"""
Apply security measures to block any website malicious execution, privacy violation etc..
Apply security measures to block any website malicious/annoying execution, privacy violation etc..
"""
self.inject_safety_script()
self.neutralize_event_listeners()
self.monitor_and_reset_css()
self.block_clipboard_access()
self.limit_intervals_and_timeouts()
self.block_external_requests()
self.monitor_and_close_popups()
def inject_safety_script(self):
script = """
// Block hardware access by removing or disabling APIs
Object.defineProperty(navigator, 'serial', { get: () => undefined });
Object.defineProperty(navigator, 'hid', { get: () => undefined });
Object.defineProperty(navigator, 'bluetooth', { get: () => undefined });
// Block media playback
HTMLMediaElement.prototype.play = function() {
this.pause(); // Immediately pause if play is called
return Promise.reject('Blocked by script');
};
// Block fullscreen requests
Element.prototype.requestFullscreen = function() {
console.log('Blocked fullscreen request');
return Promise.reject('Blocked by script');
};
// Block pointer lock
Element.prototype.requestPointerLock = function() {
console.log('Blocked pointer lock');
};
// Block iframe creation (optional, since browser already blocks these)
const originalCreateElement = document.createElement;
document.createElement = function(tagName) {
if (tagName.toLowerCase() === 'iframe') {
console.log('Blocked iframe creation');
return null;
}
return originalCreateElement.apply(this, arguments);
};
// Block annoying dialogs
window.alert = function() {};
window.confirm = function() { return false; };
window.prompt = function() { return null; };
"""
self.driver.execute_script(script)
def neutralize_event_listeners(self):
script = """
const originalAddEventListener = EventTarget.prototype.addEventListener;
EventTarget.prototype.addEventListener = function(type, listener, options) {
if (['mousedown', 'mouseup', 'click', 'touchstart', 'keydown', 'keyup', 'keypress'].includes(type)) {
console.log(`Blocked adding listener for ${type}`);
return;
}
originalAddEventListener.apply(this, arguments);
};
"""
self.driver.execute_script(script)
def monitor_and_reset_css(self):
script = """
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
if (mutation.type === 'attributes' && mutation.attributeName === 'style') {
const html = document.querySelector('html');
if (html.style.cursor === 'none') {
html.style.cursor = 'auto';
}
}
});
});
observer.observe(document.querySelector('html'), { attributes: true });
"""
self.driver.execute_script(script)
def block_clipboard_access(self):
script = """
navigator.clipboard.readText = function() {
console.log('Blocked clipboard read');
return Promise.reject('Blocked');
};
navigator.clipboard.writeText = function() {
console.log('Blocked clipboard write');
return Promise.resolve();
};
"""
self.driver.execute_script(script)
def limit_intervals_and_timeouts(self):
script = """
const originalSetInterval = window.setInterval;
window.setInterval = function(callback, delay) {
if (typeof callback === 'function' && callback.toString().includes('alert')) {
console.log('Blocked suspicious interval');
return;
}
return originalSetInterval.apply(this, arguments);
};
"""
self.driver.execute_script(script)
def monitor_and_close_popups(self):
initial_handles = self.driver.window_handles
for handle in self.driver.window_handles:
if handle not in initial_handles:
self.driver.switch_to.window(handle)
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
def block_external_requests(self):
script = """
window.fetch = function() {
console.log('Blocked fetch request');
return Promise.reject('Blocked');
};
"""
self.driver.execute_script(script)
script = self.load_js("inject_safety_script.js")
input_elements = self.driver.execute_script(script)
def close(self):
"""Close the browser."""
@ -514,7 +382,7 @@ if __name__ == "__main__":
try:
# stress test
browser.load_anticatpcha()
browser.go_to("https://stackoverflow.com/users/login")
browser.go_to("https://www.reddit.com/login/")
text = browser.get_text()
print("Page Text in Markdown:")
print(text)
@ -523,13 +391,9 @@ if __name__ == "__main__":
inputs = browser.get_form_inputs()
print("\nInputs:")
print(inputs)
inputs = ['[q]()', '[email](mlg.fcu@gmail.com)', '[password](hello123)']
inputs = ['[username](mlg.fcu@gmail.com)', '[password](#Mart1%reddit%)', '[appOtp]()', '[backupOtp]()']
browser.fill_form_inputs(inputs)
browser.find_and_click_submit()
time.sleep(10)
#print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s")
#time.sleep(20)
#browser.go_to("https://theannoyingsite.com/")
#time.sleep(15)
finally:
browser.close()

View File

@ -0,0 +1,49 @@
function findInputs(element, result = []) {
// Find all <input> elements in the current DOM tree
const inputs = element.querySelectorAll('input');
inputs.forEach(input => {
result.push({
tagName: input.tagName,
text: input.name || '',
type: input.type || '',
class: input.className || '',
xpath: getXPath(input),
displayed: isElementDisplayed(input)
});
});
const allElements = element.querySelectorAll('*');
allElements.forEach(el => {
if (el.shadowRoot) {
findInputs(el.shadowRoot, result);
}
});
return result;
}
// function to get the XPath of an element
function getXPath(element) {
if (!element) return '';
if (element.id !== '') return '//*[@id="' + element.id + '"]';
if (element === document.body) return '/html/body';
let ix = 0;
const siblings = element.parentNode ? element.parentNode.childNodes : [];
for (let i = 0; i < siblings.length; i++) {
const sibling = siblings[i];
if (sibling === element) {
return getXPath(element.parentNode) + '/' + element.tagName.toLowerCase() + '[' + (ix + 1) + ']';
}
if (sibling.nodeType === 1 && sibling.tagName === element.tagName) {
ix++;
}
}
return '';
}
return findInputs(document.body);
function isElementDisplayed(element) {
const style = window.getComputedStyle(element);
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
return false;
}
return true;
}

View File

@ -0,0 +1,36 @@
// Block hardware access by removing or disabling APIs
Object.defineProperty(navigator, 'serial', { get: () => undefined });
Object.defineProperty(navigator, 'hid', { get: () => undefined });
Object.defineProperty(navigator, 'bluetooth', { get: () => undefined });
// Block media playback
HTMLMediaElement.prototype.play = function() {
this.pause(); // Immediately pause if play is called
return Promise.reject('Blocked by script');
};
// Block fullscreen requests
Element.prototype.requestFullscreen = function() {
console.log('Blocked fullscreen request');
return Promise.reject('Blocked by script');
};
// Block pointer lock
Element.prototype.requestPointerLock = function() {
console.log('Blocked pointer lock');
};
// Block iframe creation (optional, since browser already blocks these)
const originalCreateElement = document.createElement;
document.createElement = function(tagName) {
if (tagName.toLowerCase() === 'iframe') {
console.log('Blocked iframe creation');
return null;
}
return originalCreateElement.apply(this, arguments);
};
//block fetch
window.fetch = function() {
console.log('Blocked fetch request');
return Promise.reject('Blocked');
};
// Block annoying dialogs
window.alert = function() {};
window.confirm = function() { return false; };
window.prompt = function() { return null; };