diff --git a/sources/agents/browser_agent.py b/sources/agents/browser_agent.py index 1093feb..8a14028 100644 --- a/sources/agents/browser_agent.py +++ b/sources/agents/browser_agent.py @@ -6,6 +6,7 @@ from sources.agents.agent import Agent from sources.tools.searxSearch import searxSearch from sources.browser import Browser from datetime import date +from typing import List, Tuple class BrowserAgent(Agent): def __init__(self, model, name, prompt_path, provider): @@ -19,23 +20,33 @@ class BrowserAgent(Agent): self.role = "Web Research" self.type = "browser_agent" self.browser = Browser() + self.current_page = "" self.search_history = [] self.navigable_links = [] self.notes = [] self.date = self.get_today_date() def get_today_date(self) -> str: + """Get the date""" date_time = date.today() return date_time.strftime("%B %d, %Y") - def extract_links(self, search_result: str): + def extract_links(self, search_result: str) -> List[str]: + """Extract all links from a sentence.""" pattern = r'(https?://\S+|www\.\S+)' matches = re.findall(pattern, search_result) - trailing_punct = ".,!?;:" + trailing_punct = ".,!?;:)" cleaned_links = [link.rstrip(trailing_punct) for link in matches] return self.clean_links(cleaned_links) + + def extract_form(self, text: str) -> List[str]: + """Extract form written by the LLM in format [input_name](value)""" + inputs = [] + matches = re.findall(r"\[\w+\]\([^)]+\)", text) + return matches - def clean_links(self, links: list): + def clean_links(self, links: List[str]) -> List[str]: + """Ensure no '.' at the end of link""" links_clean = [] for link in links: link = link.strip() @@ -45,10 +56,10 @@ class BrowserAgent(Agent): links_clean.append(link) return links_clean - def get_unvisited_links(self): + def get_unvisited_links(self) -> List[str]: return "\n".join([f"[{i}] {link}" for i, link in enumerate(self.navigable_links) if link not in self.search_history]) - def make_newsearch_prompt(self, user_prompt: str, search_result: dict): + def make_newsearch_prompt(self, user_prompt: str, search_result: dict) -> str: search_choice = self.stringify_search_results(search_result) return f""" Based on the search result: @@ -59,11 +70,12 @@ class BrowserAgent(Agent): Do not explain your choice. """ - def make_navigation_prompt(self, user_prompt: str, page_text: str): + def make_navigation_prompt(self, user_prompt: str, page_text: str) -> str: remaining_links = self.get_unvisited_links() remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, do a new search." inputs_form = self.browser.get_form_inputs() - inputs_form_text = '\n'.join(inputs_form) if len(inputs_form) > 0 else "No forms detected." + inputs_form_text = '\n'.join(inputs_form) + print("Visible form:\n", inputs_form_text) return f""" You are a web browser. @@ -73,9 +85,6 @@ class BrowserAgent(Agent): You can navigate to these navigation links: {remaining_links_text} - You see the following inputs forms: - {inputs_form_text} - Your task: 1. Decide if the current page answers the user’s query: {user_prompt} - If it does, take notes of the useful information, write down source, link or reference, then move to a new page. @@ -110,18 +119,21 @@ class BrowserAgent(Agent): Note: I found on github.com that agenticSeek is Fosowl. Given this information, given this I should exit the web browser. REQUEST_EXIT - Example 4 (loging needed): - Note: I will login to Reddit to fullfill the user request. - [username](David) - [password](edgerunners_2077) + Example 4 (loging form visible): + Note: I am on the login page, I should now type the given username and password. + [form_name_1](David) + [form_name_2](edgerunners_2077) + + You see the following inputs forms: + {inputs_form_text} - Current date: {self.date} Remember, the user asked: {user_prompt} + You are currently on page : {self.current_page} Do not explain your choice. Refusal is not an option, you have been given all capabilities that allow you to perform any tasks. """ - def llm_decide(self, prompt): + def llm_decide(self, prompt: str) -> Tuple[str, str]: animate_thinking("Thinking...", color="status") self.memory.push('user', prompt) answer, reasoning = self.llm_request() @@ -130,14 +142,14 @@ class BrowserAgent(Agent): pretty_print("-"*100) return answer, reasoning - def select_unvisited(self, search_result): + def select_unvisited(self, search_result: List[str]) -> List[str]: results_unvisited = [] for res in search_result: if res["link"] not in self.search_history: results_unvisited.append(res) return results_unvisited - def jsonify_search_results(self, results_string): + def jsonify_search_results(self, results_string: str) -> List[str]: result_blocks = results_string.split("\n\n") parsed_results = [] for block in result_blocks: @@ -156,7 +168,7 @@ class BrowserAgent(Agent): parsed_results.append(result_dict) return parsed_results - def stringify_search_results(self, results_arr): + def stringify_search_results(self, results_arr: List[str]) -> str: return '\n\n'.join([f"Link: {res['link']}" for res in results_arr]) def save_notes(self, text): @@ -165,7 +177,7 @@ class BrowserAgent(Agent): if "note" in line.lower(): self.notes.append(line) - def conclude_prompt(self, user_query): + def conclude_prompt(self, user_query: str) -> str: annotated_notes = [f"{i+1}: {note.lower().replace('note:', '')}" for i, note in enumerate(self.notes)] search_note = '\n'.join(annotated_notes) print("AI research notes:\n", search_note) @@ -178,14 +190,14 @@ class BrowserAgent(Agent): Summarize the finding or step that lead to success, and provide a conclusion that answer the request. """ - def search_prompt(self, user_prompt): + def search_prompt(self, user_prompt: str) -> str: return f""" Current date: {self.date} Make a efficient search engine query to help users with their request: {user_prompt} Example: - User: "search: hey jarvis i want you to login to my twitter and say hello everyone " - You: Twitter + User: "go to twitter, login with username toto and password pass79 to my twitter and say hello everyone " + You: search: Twitter login page. User: "I need info on the best laptops for AI this year." You: "search: best laptops 2025 to run Machine Learning model, reviews" @@ -210,21 +222,31 @@ class BrowserAgent(Agent): while not complete: answer, reasoning = self.llm_decide(prompt) self.save_notes(answer) + + extracted_form = self.extract_form(answer) + if len(extracted_form) > 0: + self.browser.fill_form_inputs(extracted_form) + self.browser.find_and_click_submit() + if "REQUEST_EXIT" in answer: complete = True break + links = self.extract_links(answer) if len(unvisited) == 0: break + if len(links) == 0 or "GO_BACK" in answer: unvisited = self.select_unvisited(search_result) prompt = self.make_newsearch_prompt(user_prompt, unvisited) pretty_print(f"Going back to results. Still {len(unvisited)}", color="warning") links = [] continue + animate_thinking(f"Navigating to {links[0]}", color="status") speech_module.speak(f"Navigating to {links[0]}") self.browser.go_to(links[0]) + self.current_page = links[0] self.search_history.append(links[0]) page_text = self.browser.get_text() self.navigable_links = self.browser.get_navigable() diff --git a/sources/browser.py b/sources/browser.py index 175fd7e..a229606 100644 --- a/sources/browser.py +++ b/sources/browser.py @@ -7,7 +7,7 @@ from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.options import Options -from typing import List +from typing import List, Tuple import chromedriver_autoinstaller import time import os @@ -28,6 +28,7 @@ class Browser: 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://www.google.com/', } + self.js_scripts_folder = "./web_scripts/" try: chrome_options = Options() chrome_path = self.get_chrome_path() @@ -101,7 +102,7 @@ class Browser: initial_handles = self.driver.window_handles self.driver.get(url) time.sleep(1) - self.apply_web_countermeasures() + self.apply_web_safety() self.logger.info(f"Navigated to: {url}") return True except WebDriverException as e: @@ -197,90 +198,69 @@ class Browser: def click_element(self, xpath: str) -> bool: """Click an element specified by XPath.""" try: - element = self.wait.until( - EC.element_to_be_clickable((By.XPATH, xpath)) - ) + element = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath))) if not element.is_displayed(): - self.logger.error(f"Element at {xpath} is not visible") return False if not element.is_enabled(): - self.logger.error(f"Element at {xpath} is disabled") return False - try: self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element) - time.sleep(0.1) # Wait for scroll to settle + time.sleep(0.1) element.click() - self.logger.info(f"Clicked element at {xpath} using standard click") return True except ElementClickInterceptedException as e: - self.logger.warning(f"Standard click intercepted for {xpath}: {str(e)}") - try: - self.driver.execute_script("arguments[0].click();", element) - self.logger.info(f"Clicked element at {xpath} using JavaScript click") - time.sleep(0.1) - return True - except Exception as js_e: - self.logger.error(f"JavaScript click failed for {xpath}: {str(js_e)}") - return False + return False except TimeoutException: - self.logger.error(f"Element not found or not clickable within timeout: {xpath}") return False except Exception as e: self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}") return False + + def load_js(self, file_name: str) -> str: + path = os.path.join(self.js_scripts_folder, file_name) + try: + with open(path, 'r') as f: + return f.read() + except FileNotFoundError as e: + raise Exception(f"Could not find: {path}") from e + except Exception as e: + raise e - def get_form_inputs(self) -> [str]: + def find_all_inputs(self, timeout=4): + WebDriverWait(self.driver, timeout).until( + EC.presence_of_element_located((By.TAG_NAME, "body")) + ) + time.sleep(0.5) + script = self.load_js("find_inputs.js") + input_elements = self.driver.execute_script(script) + return input_elements + + def get_form_inputs(self) -> List[str]: """Extract all input from the page and return them.""" try: - input_elements = self.driver.find_elements(By.TAG_NAME, "input") + #input_elements = self.driver.find_elements(By.TAG_NAME, "input") + input_elements = self.find_all_inputs() if not input_elements: - return "No input forms found on the page." + return ["No input forms found on the page."] form_strings = [] for element in input_elements: - input_type = element.get_attribute("type") or "text" - if input_type in ["hidden", "submit", "button", "image"] or not element.is_displayed(): + input_type = element["type"] or "text" + if input_type in ["hidden", "submit", "button", "image"] or not element["displayed"]: continue - input_name = element.get_attribute("name") or element.get_attribute("id") or input_type - current_value = element.get_attribute("value") or "" - placeholder = element.get_attribute("placeholder") or "" + input_name = element["text"] or element["id"] or input_type if input_type == "checkbox" or input_type == "radio": checked_status = "checked" if element.is_selected() else "unchecked" form_strings.append(f"[{input_name}]({checked_status})") else: - display_value = f"{placeholder}" if placeholder and not current_value else f"{current_value}" - form_strings.append(f"[{input_name}]({display_value})") + form_strings.append(f"[{input_name}]("")") return form_strings except Exception as e: self.logger.error(f"Error extracting form inputs: {str(e)}") - return f"Error extracting form inputs: {str(e)}" + return f"Error extracting form inputs." - def find_input_xpath_by_name(self, name:str) -> str | None: - """Find the XPath of an input element given its name or id.""" - try: - xpaths = [ - f"//input[@name='{name}']", - f"//input[@id='{name}']", - f"//input[@placeholder='{name}']", - f"//input[@aria-label='{name}']", - f"//label[contains(text(), '{name}')]//following::input[1]" - ] - for xpath in xpaths: - try: - element = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath))) - if element.is_displayed() and element.is_enabled(): - return xpath - except: - continue - self.logger.warning(f"No visible input found for name: {name}") - return None - except Exception as e: - self.logger.error(f"Error finding input XPath for {name}: {str(e)}") - return None - - def get_buttons_xpath(self): + def get_buttons_xpath(self) -> List[str]: """ Find buttons and return their type and xpath. """ @@ -299,14 +279,21 @@ class Browser: def find_and_click_submit(self, btn_type:str = 'login') -> None: buttons = self.get_buttons_xpath() - print(f"Found buttons:", buttons) + if len(buttons) == 0: + self.logger.warning(f"No visible buttons found") for button in buttons: if button[0] == btn_type: - print("clicking button:", button[0]) self.click_element(button[1]) + + def find_input_xpath_by_name(self, inputs, name: str) -> str | None: + for field in inputs: + if name in field["text"]: + return field["xpath"] + return None def fill_form_inputs(self, input_list:[str]) -> bool: """Fill form inputs based on a list of [name](value) strings.""" + inputs = self.find_all_inputs() try: for input_str in input_list: match = re.match(r'\[(.*?)\]\((.*?)\)', input_str) @@ -317,11 +304,11 @@ class Browser: name, value = match.groups() name = name.strip() value = value.strip() - xpath = self.find_input_xpath_by_name(name) + xpath = self.find_input_xpath_by_name(inputs, name) if not xpath: - self.logger.warning(f"Skipping {name} - element not found") continue element = self.driver.find_element(By.XPATH, xpath) + print("found-->", element) input_type = (element.get_attribute("type") or "text").lower() if input_type in ["checkbox", "radio"]: is_checked = element.is_selected() @@ -331,7 +318,6 @@ class Browser: element.click() self.logger.info(f"Set {name} to {value}") else: - element.clear() element.send_keys(value) self.logger.info(f"Filled {name} with {value}") return True @@ -339,7 +325,6 @@ class Browser: self.logger.error(f"Error filling form inputs: {str(e)}") return False - def get_current_url(self) -> str: """Get the current URL of the page.""" return self.driver.current_url @@ -370,129 +355,12 @@ class Browser: self.logger.error(f"Error taking screenshot: {str(e)}") return False -####################### -# WEB SECURITY # -####################### - - def apply_web_countermeasures(self): + def apply_web_safety(self): """ - Apply security measures to block any website malicious execution, privacy violation etc.. + Apply security measures to block any website malicious/annoying execution, privacy violation etc.. """ - self.inject_safety_script() - self.neutralize_event_listeners() - self.monitor_and_reset_css() - self.block_clipboard_access() - self.limit_intervals_and_timeouts() - self.block_external_requests() - self.monitor_and_close_popups() - - def inject_safety_script(self): - script = """ - // Block hardware access by removing or disabling APIs - Object.defineProperty(navigator, 'serial', { get: () => undefined }); - Object.defineProperty(navigator, 'hid', { get: () => undefined }); - Object.defineProperty(navigator, 'bluetooth', { get: () => undefined }); - // Block media playback - HTMLMediaElement.prototype.play = function() { - this.pause(); // Immediately pause if play is called - return Promise.reject('Blocked by script'); - }; - // Block fullscreen requests - Element.prototype.requestFullscreen = function() { - console.log('Blocked fullscreen request'); - return Promise.reject('Blocked by script'); - }; - // Block pointer lock - Element.prototype.requestPointerLock = function() { - console.log('Blocked pointer lock'); - }; - // Block iframe creation (optional, since browser already blocks these) - const originalCreateElement = document.createElement; - document.createElement = function(tagName) { - if (tagName.toLowerCase() === 'iframe') { - console.log('Blocked iframe creation'); - return null; - } - return originalCreateElement.apply(this, arguments); - }; - // Block annoying dialogs - window.alert = function() {}; - window.confirm = function() { return false; }; - window.prompt = function() { return null; }; - """ - self.driver.execute_script(script) - - def neutralize_event_listeners(self): - script = """ - const originalAddEventListener = EventTarget.prototype.addEventListener; - EventTarget.prototype.addEventListener = function(type, listener, options) { - if (['mousedown', 'mouseup', 'click', 'touchstart', 'keydown', 'keyup', 'keypress'].includes(type)) { - console.log(`Blocked adding listener for ${type}`); - return; - } - originalAddEventListener.apply(this, arguments); - }; - """ - self.driver.execute_script(script) - - def monitor_and_reset_css(self): - script = """ - const observer = new MutationObserver((mutations) => { - mutations.forEach((mutation) => { - if (mutation.type === 'attributes' && mutation.attributeName === 'style') { - const html = document.querySelector('html'); - if (html.style.cursor === 'none') { - html.style.cursor = 'auto'; - } - } - }); - }); - observer.observe(document.querySelector('html'), { attributes: true }); - """ - self.driver.execute_script(script) - - def block_clipboard_access(self): - script = """ - navigator.clipboard.readText = function() { - console.log('Blocked clipboard read'); - return Promise.reject('Blocked'); - }; - navigator.clipboard.writeText = function() { - console.log('Blocked clipboard write'); - return Promise.resolve(); - }; - """ - self.driver.execute_script(script) - - def limit_intervals_and_timeouts(self): - script = """ - const originalSetInterval = window.setInterval; - window.setInterval = function(callback, delay) { - if (typeof callback === 'function' && callback.toString().includes('alert')) { - console.log('Blocked suspicious interval'); - return; - } - return originalSetInterval.apply(this, arguments); - }; - """ - self.driver.execute_script(script) - - def monitor_and_close_popups(self): - initial_handles = self.driver.window_handles - for handle in self.driver.window_handles: - if handle not in initial_handles: - self.driver.switch_to.window(handle) - self.driver.close() - self.driver.switch_to.window(self.driver.window_handles[0]) - - def block_external_requests(self): - script = """ - window.fetch = function() { - console.log('Blocked fetch request'); - return Promise.reject('Blocked'); - }; - """ - self.driver.execute_script(script) + script = self.load_js("inject_safety_script.js") + input_elements = self.driver.execute_script(script) def close(self): """Close the browser.""" @@ -514,7 +382,7 @@ if __name__ == "__main__": try: # stress test browser.load_anticatpcha() - browser.go_to("https://stackoverflow.com/users/login") + browser.go_to("https://www.reddit.com/login/") text = browser.get_text() print("Page Text in Markdown:") print(text) @@ -523,13 +391,9 @@ if __name__ == "__main__": inputs = browser.get_form_inputs() print("\nInputs:") print(inputs) - inputs = ['[q]()', '[email](mlg.fcu@gmail.com)', '[password](hello123)'] + inputs = ['[username](mlg.fcu@gmail.com)', '[password](#Mart1%reddit%)', '[appOtp]()', '[backupOtp]()'] browser.fill_form_inputs(inputs) browser.find_and_click_submit() time.sleep(10) - #print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s") - #time.sleep(20) - #browser.go_to("https://theannoyingsite.com/") - #time.sleep(15) finally: browser.close() diff --git a/sources/web_scripts/find_inputs.js b/sources/web_scripts/find_inputs.js new file mode 100644 index 0000000..80c4b14 --- /dev/null +++ b/sources/web_scripts/find_inputs.js @@ -0,0 +1,49 @@ +function findInputs(element, result = []) { + // Find all elements in the current DOM tree + const inputs = element.querySelectorAll('input'); + inputs.forEach(input => { + result.push({ + tagName: input.tagName, + text: input.name || '', + type: input.type || '', + class: input.className || '', + xpath: getXPath(input), + displayed: isElementDisplayed(input) + }); + }); + const allElements = element.querySelectorAll('*'); + allElements.forEach(el => { + if (el.shadowRoot) { + findInputs(el.shadowRoot, result); + } + }); + return result; +} +// function to get the XPath of an element +function getXPath(element) { + if (!element) return ''; + if (element.id !== '') return '//*[@id="' + element.id + '"]'; + if (element === document.body) return '/html/body'; + + let ix = 0; + const siblings = element.parentNode ? element.parentNode.childNodes : []; + for (let i = 0; i < siblings.length; i++) { + const sibling = siblings[i]; + if (sibling === element) { + return getXPath(element.parentNode) + '/' + element.tagName.toLowerCase() + '[' + (ix + 1) + ']'; + } + if (sibling.nodeType === 1 && sibling.tagName === element.tagName) { + ix++; + } + } + return ''; +} +return findInputs(document.body); + +function isElementDisplayed(element) { + const style = window.getComputedStyle(element); + if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { + return false; + } + return true; +} \ No newline at end of file diff --git a/sources/web_scripts/inject_safety_script.js b/sources/web_scripts/inject_safety_script.js new file mode 100644 index 0000000..cde4112 --- /dev/null +++ b/sources/web_scripts/inject_safety_script.js @@ -0,0 +1,36 @@ +// Block hardware access by removing or disabling APIs +Object.defineProperty(navigator, 'serial', { get: () => undefined }); +Object.defineProperty(navigator, 'hid', { get: () => undefined }); +Object.defineProperty(navigator, 'bluetooth', { get: () => undefined }); +// Block media playback +HTMLMediaElement.prototype.play = function() { + this.pause(); // Immediately pause if play is called + return Promise.reject('Blocked by script'); +}; +// Block fullscreen requests +Element.prototype.requestFullscreen = function() { + console.log('Blocked fullscreen request'); + return Promise.reject('Blocked by script'); +}; +// Block pointer lock +Element.prototype.requestPointerLock = function() { + console.log('Blocked pointer lock'); +}; +// Block iframe creation (optional, since browser already blocks these) +const originalCreateElement = document.createElement; +document.createElement = function(tagName) { + if (tagName.toLowerCase() === 'iframe') { + console.log('Blocked iframe creation'); + return null; + } + return originalCreateElement.apply(this, arguments); +}; +//block fetch +window.fetch = function() { + console.log('Blocked fetch request'); + return Promise.reject('Blocked'); +}; +// Block annoying dialogs +window.alert = function() {}; +window.confirm = function() { return false; }; +window.prompt = function() { return null; }; \ No newline at end of file