diff --git a/sources/agents/agent.py b/sources/agents/agent.py index 51a2e83..bf4bdea 100644 --- a/sources/agents/agent.py +++ b/sources/agents/agent.py @@ -37,6 +37,7 @@ class Agent(): recover_last_session=True) -> None: self.agent_name = name self.role = None + self.type = None self.current_directory = os.getcwd() self.model = model self.llm = provider diff --git a/sources/agents/browser_agent.py b/sources/agents/browser_agent.py index 7a4952f..1093feb 100644 --- a/sources/agents/browser_agent.py +++ b/sources/agents/browser_agent.py @@ -17,6 +17,7 @@ class BrowserAgent(Agent): "web_search": searxSearch(), } self.role = "Web Research" + self.type = "browser_agent" self.browser = Browser() self.search_history = [] self.navigable_links = [] @@ -60,14 +61,20 @@ class BrowserAgent(Agent): def make_navigation_prompt(self, user_prompt: str, page_text: str): remaining_links = self.get_unvisited_links() - remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, proceed with a new search." + remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, do a new search." + inputs_form = self.browser.get_form_inputs() + inputs_form_text = '\n'.join(inputs_form) if len(inputs_form) > 0 else "No forms detected." + return f""" You are a web browser. You are currently on this webpage: {page_text} You can navigate to these navigation links: - {remaining_links} + {remaining_links_text} + + You see the following inputs forms: + {inputs_form_text} Your task: 1. Decide if the current page answers the user’s query: {user_prompt} @@ -77,9 +84,13 @@ class BrowserAgent(Agent): 2. Navigate by either: - Navigate to a navigation links (write the full URL, e.g., www.example.com/cats). - If no link seems helpful, say: GO_BACK. + 3. Fill forms on the page: + - If user give you informations that help you fill form, fill it. + - If you don't know how to fill a form, leave it empty. + - You can fill a form using [form_name](value). Recap of note taking: - If useful -> Note: [Briefly summarize the key information that answers the user’s query.] + If useful -> Note: [Briefly summarize the key information or task you conducted.] Do not write "The page talk about ...", write your finding on the page and how they contribute to an answer. If not useful -> Error: [Explain why the page doesn’t help.] @@ -88,20 +99,26 @@ class BrowserAgent(Agent): No link seem useful to provide futher information. GO_BACK Example 2 (not useful, but related link): - Error: This forum reddit.com/welcome does not discuss anything related to the user’s query. + Error: reddit.com/welcome does not discuss anything related to the user’s query. There is a link that could lead to the information, I want to navigate to http://reddit.com/r/locallama Example 3 (not useful, no related links): - Error: x.com does not discuss anything related to the user’s query and no navigation link are usefull + Error: x.com does not discuss anything related to the user’s query and no navigation link are usefull. GO_BACK Example 3 (query answer found): Note: I found on github.com that agenticSeek is Fosowl. Given this information, given this I should exit the web browser. REQUEST_EXIT + Example 4 (loging needed): + Note: I will login to Reddit to fullfill the user request. + [username](David) + [password](edgerunners_2077) + Current date: {self.date} Remember, the user asked: {user_prompt} Do not explain your choice. + Refusal is not an option, you have been given all capabilities that allow you to perform any tasks. """ def llm_decide(self, prompt): @@ -158,7 +175,7 @@ class BrowserAgent(Agent): A web AI made the following finding across different pages: {search_note} - Summarize the finding, and provide a conclusion that answer the request. + Summarize the finding or step that lead to success, and provide a conclusion that answer the request. """ def search_prompt(self, user_prompt): diff --git a/sources/agents/casual_agent.py b/sources/agents/casual_agent.py index 512610e..44ccf42 100644 --- a/sources/agents/casual_agent.py +++ b/sources/agents/casual_agent.py @@ -19,6 +19,7 @@ class CasualAgent(Agent): "bash": BashInterpreter() } self.role = "Chat and Conversation" + self.type = "casual_agent" def process(self, prompt, speech_module) -> str: complete = False diff --git a/sources/agents/code_agent.py b/sources/agents/code_agent.py index 01569bb..797c02f 100644 --- a/sources/agents/code_agent.py +++ b/sources/agents/code_agent.py @@ -21,6 +21,7 @@ class CoderAgent(Agent): "file_finder": FileFinder() } self.role = "Code Assistance" + self.type = "code_agent" def process(self, prompt, speech_module) -> str: answer = "" diff --git a/sources/agents/file_agent.py b/sources/agents/file_agent.py index 6f49d77..d8aee5c 100644 --- a/sources/agents/file_agent.py +++ b/sources/agents/file_agent.py @@ -15,6 +15,7 @@ class FileAgent(Agent): "bash": BashInterpreter() } self.role = "find and read files" + self.type = "file_agent" def process(self, prompt, speech_module) -> str: complete = False diff --git a/sources/agents/planner_agent.py b/sources/agents/planner_agent.py index 6b374ee..93c269a 100644 --- a/sources/agents/planner_agent.py +++ b/sources/agents/planner_agent.py @@ -22,7 +22,7 @@ class PlannerAgent(Agent): "web": BrowserAgent(model, name, prompt_path, provider) } self.role = "Research, setup and code" - self.tag = "json" + self.type = "planner_agent" def parse_agent_tasks(self, text): tasks = [] diff --git a/sources/browser.py b/sources/browser.py index 80b8b4f..175fd7e 100644 --- a/sources/browser.py +++ b/sources/browser.py @@ -5,7 +5,9 @@ from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException +from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.options import Options +from typing import List import chromedriver_autoinstaller import time import os @@ -26,7 +28,6 @@ class Browser: 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://www.google.com/', } - self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related" try: chrome_options = Options() chrome_path = self.get_chrome_path() @@ -72,7 +73,7 @@ class Browser: raise Exception(f"Failed to initialize browser: {str(e)}") @staticmethod - def get_chrome_path(): + def get_chrome_path() -> str: if sys.platform.startswith("win"): paths = [ "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", @@ -89,8 +90,12 @@ class Browser: if os.path.exists(path) and os.access(path, os.X_OK): # Check if executable return path return None + + def load_anticatpcha(self): + # TODO load anticapcha extension from crx file + pass - def go_to(self, url): + def go_to(self, url:str) -> bool: """Navigate to a specified URL.""" try: initial_handles = self.driver.window_handles @@ -103,7 +108,7 @@ class Browser: self.logger.error(f"Error navigating to {url}: {str(e)}") return False - def is_sentence(self, text): + def is_sentence(self, text:str) -> bool: """Check if the text qualifies as a meaningful sentence or contains important error codes.""" text = text.strip() @@ -116,7 +121,7 @@ class Browser: is_long_enough = word_count > 5 return (word_count >= 5 and (has_punctuation or is_long_enough)) - def get_text(self): + def get_text(self) -> str | None: """Get page text and convert it to README (Markdown) format.""" try: soup = BeautifulSoup(self.driver.page_source, 'html.parser') @@ -135,7 +140,7 @@ class Browser: self.logger.error(f"Error getting text: {str(e)}") return None - def clean_url(self, url): + def clean_url(self, url:str) -> str: """Clean URL to keep only the part needed for navigation to the page""" clean = url.split('#')[0] parts = clean.split('?', 1) @@ -152,7 +157,7 @@ class Browser: return f"{base_url}?{'&'.join(essential_params)}" return base_url - def is_link_valid(self, url): + def is_link_valid(self, url:str) -> bool: """Check if a URL is a valid link (page, not related to icon or metadata).""" if len(url) > 64: return False @@ -168,7 +173,7 @@ class Browser: return False return True - def get_navigable(self): + def get_navigable(self) -> [str]: """Get all navigable links on the current page.""" try: links = [] @@ -189,28 +194,161 @@ class Browser: self.logger.error(f"Error getting navigable links: {str(e)}") return [] - def click_element(self, xpath): - """Click an element specified by xpath.""" + def click_element(self, xpath: str) -> bool: + """Click an element specified by XPath.""" try: element = self.wait.until( EC.element_to_be_clickable((By.XPATH, xpath)) ) - element.click() - time.sleep(2) # Wait for action to complete - return True + if not element.is_displayed(): + self.logger.error(f"Element at {xpath} is not visible") + return False + if not element.is_enabled(): + self.logger.error(f"Element at {xpath} is disabled") + return False + + try: + self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element) + time.sleep(0.1) # Wait for scroll to settle + element.click() + self.logger.info(f"Clicked element at {xpath} using standard click") + return True + except ElementClickInterceptedException as e: + self.logger.warning(f"Standard click intercepted for {xpath}: {str(e)}") + try: + self.driver.execute_script("arguments[0].click();", element) + self.logger.info(f"Clicked element at {xpath} using JavaScript click") + time.sleep(0.1) + return True + except Exception as js_e: + self.logger.error(f"JavaScript click failed for {xpath}: {str(js_e)}") + return False except TimeoutException: - self.logger.error(f"Element not found or not clickable: {xpath}") + self.logger.error(f"Element not found or not clickable within timeout: {xpath}") + return False + except Exception as e: + self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}") return False - def get_current_url(self): + def get_form_inputs(self) -> [str]: + """Extract all input from the page and return them.""" + try: + input_elements = self.driver.find_elements(By.TAG_NAME, "input") + if not input_elements: + return "No input forms found on the page." + + form_strings = [] + for element in input_elements: + input_type = element.get_attribute("type") or "text" + if input_type in ["hidden", "submit", "button", "image"] or not element.is_displayed(): + continue + input_name = element.get_attribute("name") or element.get_attribute("id") or input_type + current_value = element.get_attribute("value") or "" + placeholder = element.get_attribute("placeholder") or "" + if input_type == "checkbox" or input_type == "radio": + checked_status = "checked" if element.is_selected() else "unchecked" + form_strings.append(f"[{input_name}]({checked_status})") + else: + display_value = f"{placeholder}" if placeholder and not current_value else f"{current_value}" + form_strings.append(f"[{input_name}]({display_value})") + return form_strings + + except Exception as e: + self.logger.error(f"Error extracting form inputs: {str(e)}") + return f"Error extracting form inputs: {str(e)}" + + def find_input_xpath_by_name(self, name:str) -> str | None: + """Find the XPath of an input element given its name or id.""" + try: + xpaths = [ + f"//input[@name='{name}']", + f"//input[@id='{name}']", + f"//input[@placeholder='{name}']", + f"//input[@aria-label='{name}']", + f"//label[contains(text(), '{name}')]//following::input[1]" + ] + for xpath in xpaths: + try: + element = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath))) + if element.is_displayed() and element.is_enabled(): + return xpath + except: + continue + self.logger.warning(f"No visible input found for name: {name}") + return None + except Exception as e: + self.logger.error(f"Error finding input XPath for {name}: {str(e)}") + return None + + def get_buttons_xpath(self): + """ + Find buttons and return their type and xpath. + """ + buttons = self.driver.find_elements(By.TAG_NAME, "button") + \ + self.driver.find_elements(By.XPATH, "//input[@type='submit']") + result = [] + for i, button in enumerate(buttons): + if not button.is_displayed() or not button.is_enabled(): + continue + text = (button.text or button.get_attribute("value") or "").lower().replace(' ', '') + xpath = f"(//button | //input[@type='submit'])[{i + 1}]" + if "login" in text or "sign" in text or "register": + result.append((text, xpath)) + result.sort(key=lambda x: len(x[0])) + return result + + def find_and_click_submit(self, btn_type:str = 'login') -> None: + buttons = self.get_buttons_xpath() + print(f"Found buttons:", buttons) + for button in buttons: + if button[0] == btn_type: + print("clicking button:", button[0]) + self.click_element(button[1]) + + def fill_form_inputs(self, input_list:[str]) -> bool: + """Fill form inputs based on a list of [name](value) strings.""" + try: + for input_str in input_list: + match = re.match(r'\[(.*?)\]\((.*?)\)', input_str) + if not match: + self.logger.warning(f"Invalid format for input: {input_str}") + continue + + name, value = match.groups() + name = name.strip() + value = value.strip() + xpath = self.find_input_xpath_by_name(name) + if not xpath: + self.logger.warning(f"Skipping {name} - element not found") + continue + element = self.driver.find_element(By.XPATH, xpath) + input_type = (element.get_attribute("type") or "text").lower() + if input_type in ["checkbox", "radio"]: + is_checked = element.is_selected() + should_be_checked = value.lower() == "checked" + + if is_checked != should_be_checked: + element.click() + self.logger.info(f"Set {name} to {value}") + else: + element.clear() + element.send_keys(value) + self.logger.info(f"Filled {name} with {value}") + return True + except Exception as e: + self.logger.error(f"Error filling form inputs: {str(e)}") + return False + + + def get_current_url(self) -> str: """Get the current URL of the page.""" return self.driver.current_url - def get_page_title(self): + def get_page_title(self) -> str: """Get the title of the current page.""" return self.driver.title - def scroll_bottom(self): + def scroll_bottom(self) -> bool: """Scroll to the bottom of the page.""" try: self.driver.execute_script( @@ -222,7 +360,7 @@ class Browser: self.logger.error(f"Error scrolling: {str(e)}") return False - def screenshot(self, filename): + def screenshot(self, filename:str) -> bool: """Take a screenshot of the current page.""" try: self.driver.save_screenshot(filename) @@ -375,15 +513,23 @@ if __name__ == "__main__": try: # stress test - browser.go_to("https://www.bbc.com/news") + browser.load_anticatpcha() + browser.go_to("https://stackoverflow.com/users/login") text = browser.get_text() print("Page Text in Markdown:") print(text) links = browser.get_navigable() print("\nNavigable Links:", links) - print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s") - time.sleep(20) - browser.go_to("https://theannoyingsite.com/") - time.sleep(15) + inputs = browser.get_form_inputs() + print("\nInputs:") + print(inputs) + inputs = ['[q]()', '[email](mlg.fcu@gmail.com)', '[password](hello123)'] + browser.fill_form_inputs(inputs) + browser.find_and_click_submit() + time.sleep(10) + #print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s") + #time.sleep(20) + #browser.go_to("https://theannoyingsite.com/") + #time.sleep(15) finally: browser.close() diff --git a/sources/interaction.py b/sources/interaction.py index 9c17f2a..9bd9d47 100644 --- a/sources/interaction.py +++ b/sources/interaction.py @@ -35,7 +35,7 @@ class Interaction: """Find the name of the default AI. It is required for STT as a trigger word.""" ai_name = "jarvis" for agent in self.agents: - if agent.role == "talking": + if agent.type == "casual_agent": ai_name = agent.agent_name break return ai_name @@ -43,12 +43,12 @@ class Interaction: def recover_last_session(self): """Recover the last session.""" for agent in self.agents: - agent.memory.load_memory() + agent.memory.load_memory(agent.type) def save_session(self): """Save the current session.""" for agent in self.agents: - agent.memory.save_memory() + agent.memory.save_memory(agent.type) def is_active(self) -> bool: return self.is_active diff --git a/sources/memory.py b/sources/memory.py index a0c29d0..d03da29 100644 --- a/sources/memory.py +++ b/sources/memory.py @@ -38,20 +38,23 @@ class Memory(): def get_filename(self) -> str: return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt" - def save_memory(self) -> None: + def save_memory(self, agent_type: str = "casual_agent") -> None: """Save the session memory to a file.""" if not os.path.exists(self.conversation_folder): os.makedirs(self.conversation_folder) + save_path = os.path.join(self.conversation_folder, agent_type) + if not os.path.exists(save_path): + os.makedirs(save_path) filename = self.get_filename() - path = os.path.join(self.conversation_folder, filename) + path = os.path.join(save_path, filename) json_memory = json.dumps(self.memory) with open(path, 'w') as f: f.write(json_memory) - def find_last_session_path(self) -> str: + def find_last_session_path(self, path) -> str: """Find the last session path.""" saved_sessions = [] - for filename in os.listdir(self.conversation_folder): + for filename in os.listdir(path): if filename.startswith('memory_'): date = filename.split('_')[1] saved_sessions.append((filename, date)) @@ -60,14 +63,15 @@ class Memory(): return saved_sessions[0][0] return None - def load_memory(self) -> None: + def load_memory(self, agent_type: str = "casual_agent") -> None: """Load the memory from the last session.""" - if not os.path.exists(self.conversation_folder): + save_path = os.path.join(self.conversation_folder, agent_type) + if not os.path.exists(save_path): return - filename = self.find_last_session_path() + filename = self.find_last_session_path(save_path) if filename is None: return - path = os.path.join(self.conversation_folder, filename) + path = os.path.join(save_path, filename) with open(path, 'r') as f: self.memory = json.load(f)