diff --git a/sources/agents/agent.py b/sources/agents/agent.py index 398a2de..6f6f7e5 100644 --- a/sources/agents/agent.py +++ b/sources/agents/agent.py @@ -14,10 +14,11 @@ class executorResult: """ A class to store the result of a tool execution. """ - def __init__(self, block, feedback, success): + def __init__(self, block, feedback, success, tool_type): self.block = block self.feedback = feedback self.success = success + self.tool_type = tool_type def show(self): pretty_print('▂'*64, color="status") @@ -127,6 +128,9 @@ class Agent(): def get_blocks_result(self) -> list: return self.blocks_result + + def get_last_tool_type(self) -> str: + return self.blocks_result[-1].tool_type if len(self.blocks_result) > 0 else None def show_answer(self): """ @@ -185,7 +189,7 @@ class Agent(): output = tool.execute([block]) feedback = tool.interpreter_feedback(output) # tool interpreter feedback success = not tool.execution_failure_check(output) - self.blocks_result.append(executorResult(block, feedback, success)) + self.blocks_result.append(executorResult(block, feedback, success, name)) if not success: self.memory.push('user', feedback) return False, feedback diff --git a/sources/agents/browser_agent.py b/sources/agents/browser_agent.py index c18be5d..3e85e13 100644 --- a/sources/agents/browser_agent.py +++ b/sources/agents/browser_agent.py @@ -163,7 +163,7 @@ class BrowserAgent(Agent): You previously took these notes: {notes} Do not Step-by-Step explanation. Write comprehensive Notes or Error as a long paragraph followed by your action. - Do not go to tutorials or help pages. + You must always take notes. """ def llm_decide(self, prompt: str, show_reasoning: bool = False) -> Tuple[str, str]: @@ -262,20 +262,24 @@ class BrowserAgent(Agent): Do not try to answer query. you can only formulate search term or exit. """ - def handle_update_prompt(self, user_prompt: str, page_text: str) -> str: - return f""" + def handle_update_prompt(self, user_prompt: str, page_text: str, fill_success: bool) -> str: + prompt = f""" You are a web browser. You just filled a form on the page. Now you should see the result of the form submission on the page: Page text: {page_text} The user asked: {user_prompt} - Does the page answer the user’s query now? + Does the page answer the user’s query now? Are you still on a login page or did you get redirected? If it does, take notes of the useful information, write down result and say {Action.FORM_FILLED.value}. - If you were previously on a login form, no need to explain. - If it does and you completed user request, say {Action.REQUEST_EXIT.value} if it doesn’t, say: Error: Attempt to fill form didn't work {Action.GO_BACK.value}. + If you were previously on a login form, no need to take notes. """ + if not fill_success: + prompt += f""" + According to browser feedback, the form was not filled correctly. Is that so? you might consider other strategies. + """ + return prompt def show_search_results(self, search_result: List[str]): pretty_print("\nSearch results:", color="output") @@ -298,28 +302,28 @@ class BrowserAgent(Agent): animate_thinking(f"Thinking...", color="status") mem_begin_idx = self.memory.push('user', self.search_prompt(user_prompt)) - ai_prompt, _ = self.llm_request() + ai_prompt, reasoning = self.llm_request() if Action.REQUEST_EXIT.value in ai_prompt: pretty_print(f"Web agent requested exit.\n{reasoning}\n\n{ai_prompt}", color="failure") return ai_prompt, "" animate_thinking(f"Searching...", color="status") search_result_raw = self.tools["web_search"].execute([ai_prompt], False) - search_result = self.jsonify_search_results(search_result_raw)[:12] + search_result = self.jsonify_search_results(search_result_raw)[:16] self.show_search_results(search_result) prompt = self.make_newsearch_prompt(user_prompt, search_result) unvisited = [None] while not complete and len(unvisited) > 0: + self.memory.clear() answer, reasoning = self.llm_decide(prompt, show_reasoning = False) pretty_print('▂'*32, color="status") extracted_form = self.extract_form(answer) if len(extracted_form) > 0: pretty_print(f"Filling inputs form...", color="status") - self.browser.fill_form_inputs(extracted_form) - self.browser.find_and_click_submission() + fill_success = self.browser.fill_form(extracted_form) page_text = self.browser.get_text() - answer = self.handle_update_prompt(user_prompt, page_text) + answer = self.handle_update_prompt(user_prompt, page_text, fill_success) answer, reasoning = self.llm_decide(prompt) if Action.FORM_FILLED.value in answer: diff --git a/sources/agents/code_agent.py b/sources/agents/code_agent.py index db2b727..237eedb 100644 --- a/sources/agents/code_agent.py +++ b/sources/agents/code_agent.py @@ -57,6 +57,8 @@ class CoderAgent(Agent): exec_success, _ = self.execute_modules(answer) answer = self.remove_blocks(answer) self.last_answer = answer + if self.get_last_tool_type() == "bash": + continue if exec_success: break pretty_print("Execution failure", color="failure") diff --git a/sources/agents/planner_agent.py b/sources/agents/planner_agent.py index 9206191..22d0f07 100644 --- a/sources/agents/planner_agent.py +++ b/sources/agents/planner_agent.py @@ -80,7 +80,7 @@ class PlannerAgent(Agent): agents_tasks = self.parse_agent_tasks(answer) if agents_tasks == (None, None): pretty_print(answer, color="warning") - pretty_print("Failed to make a plan. This can happen with (too) small LLM. Clarify your request and insist on it making a plan.", color="failure") + pretty_print("Failed to make a plan. This can happen with (too) small LLM. Clarify your request and insist on it making a plan within ```json.", color="failure") return pretty_print("\n▂▘ P L A N ▝▂", color="status") for task_name, task in agents_tasks: diff --git a/sources/browser.py b/sources/browser.py index 98faa80..9b88332 100644 --- a/sources/browser.py +++ b/sources/browser.py @@ -22,6 +22,8 @@ import markdownify import sys import re +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from sources.utility import pretty_print, animate_thinking from sources.logger import Logger @@ -348,17 +350,23 @@ class Browser: result.sort(key=lambda x: len(x[0])) return result - def find_and_click_submission(self, timeout: int = 10) -> bool: - possible_submissions = ["login", "submit", "register", "calculate", "login", "submit", "register", "calculate", "save", "send", - "continue", "apply", "ok", "confirm", "next", "proceed", "accept", "agree", "yes", "no", "cancel", - "close", "done", "finish", "start", "calculate"] - for submission in possible_submissions: - if self.find_and_click_btn(submission, timeout): - return True - self.logger.warning("No submission button found") - return False + def wait_for_submission_outcome(self, timeout: int = 10) -> bool: + """ + Wait for a submission outcome (e.g., URL change or new element). + """ + try: + wait = WebDriverWait(self.driver, timeout) + wait.until( + lambda driver: driver.current_url != self.driver.current_url or + driver.find_elements(By.XPATH, "//*[contains(text(), 'success')]") + ) + self.logger.info("Detected submission outcome") + return True + except TimeoutException: + self.logger.warning("No submission outcome detected") + return False - def find_and_click_btn(self, btn_type: str = 'login', timeout: int = 10) -> bool: + def find_and_click_btn(self, btn_type: str = 'login', timeout: int = 5) -> bool: """Find and click a submit button matching the specified type.""" buttons = self.get_buttons_xpath() if not buttons: @@ -366,7 +374,7 @@ class Browser: return False for button_text, xpath in buttons: - if btn_type.lower() in button_text.lower(): + if btn_type.lower() in button_text.lower() or btn_type.lower() in xpath.lower(): try: wait = WebDriverWait(self.driver, timeout) element = wait.until( @@ -385,6 +393,56 @@ class Browser: return False self.logger.warning(f"No button matching '{btn_type}' found") return False + + def tick_all_checkboxes(self) -> bool: + """ + Find and tick all checkboxes on the page. + Returns True if successful, False if any issues occur. + """ + try: + checkboxes = self.driver.find_elements(By.XPATH, "//input[@type='checkbox']") + if not checkboxes: + self.logger.info("No checkboxes found on the page") + return True + + for index, checkbox in enumerate(checkboxes, 1): + try: + WebDriverWait(self.driver, 10).until( + EC.element_to_be_clickable(checkbox) + ) + self.driver.execute_script( + "arguments[0].scrollIntoView({block: 'center', inline: 'center'});", checkbox + ) + if not checkbox.is_selected(): + try: + checkbox.click() + self.logger.info(f"Ticked checkbox {index}") + except ElementClickInterceptedException: + self.driver.execute_script("arguments[0].click();", checkbox) + self.logger.info(f"Ticked checkbox {index} using JavaScript") + else: + self.logger.debug(f"Checkbox {index} already ticked") + except TimeoutException: + self.logger.warning(f"Timeout waiting for checkbox {index} to be clickable") + continue + except Exception as e: + self.logger.error(f"Error ticking checkbox {index}: {str(e)}") + continue + return True + except Exception as e: + self.logger.error(f"Error finding checkboxes: {str(e)}") + return False + + def find_and_click_submission(self, timeout: int = 10) -> bool: + possible_submissions = ["login", "submit", "register", "continue", "apply", + "ok", "confirm", "proceed", "accept", + "done", "finish", "start", "calculate"] + for submission in possible_submissions: + if self.find_and_click_btn(submission, timeout): + self.logger.info(f"Clicked on submission button: {submission}") + return True + self.logger.warning("No submission button found") + return False def find_input_xpath_by_name(self, inputs, name: str) -> str | None: for field in inputs: @@ -393,7 +451,7 @@ class Browser: return None def fill_form_inputs(self, input_list: List[str]) -> bool: - """Fill form inputs based on a list of [name](value) strings.""" + """Fill inputs based on a list of [name](value) strings.""" if not isinstance(input_list, list): self.logger.error("input_list must be a list") return False @@ -410,8 +468,19 @@ class Browser: value = value.strip() xpath = self.find_input_xpath_by_name(inputs, name) if not xpath: + self.logger.warning(f"Input field '{name}' not found") + continue + try: + element = WebDriverWait(self.driver, 10).until( + EC.element_to_be_clickable((By.XPATH, xpath)) + ) + except TimeoutException: + self.logger.error(f"Timeout waiting for element '{name}' to be clickable") + continue + self.driver.execute_script("arguments[0].scrollIntoView(true);", element) + if not element.is_displayed() or not element.is_enabled(): + self.logger.warning(f"Element '{name}' is not interactable (not displayed or disabled)") continue - element = self.driver.find_element(By.XPATH, xpath) input_type = (element.get_attribute("type") or "text").lower() if input_type in ["checkbox", "radio"]: is_checked = element.is_selected() @@ -428,6 +497,25 @@ class Browser: except Exception as e: self.logger.error(f"Error filling form inputs: {str(e)}") return False + + def fill_form(self, input_list: List[str]) -> bool: + """Fill form inputs based on a list of [name](value) and submit.""" + if not isinstance(input_list, list): + self.logger.error("input_list must be a list") + return False + if self.fill_form_inputs(input_list): + self.logger.info("Form filled successfully") + self.tick_all_checkboxes() + if self.find_and_click_submission(): + if self.wait_for_submission_outcome(): + self.logger.info("Submission outcome detected") + return True + else: + self.logger.warning("No submission outcome detected") + else: + self.logger.warning("Failed to submit form") + self.logger.warning("Failed to fill form inputs") + return False def get_current_url(self) -> str: """Get the current URL of the page.""" @@ -467,7 +555,6 @@ class Browser: input_elements = self.driver.execute_script(script) if __name__ == "__main__": - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) driver = create_driver() browser = Browser(driver, anticaptcha_manual_install=True) @@ -475,12 +562,12 @@ if __name__ == "__main__": #txt = browser.get_text() #print(txt) #browser.go_to("https://practicetestautomation.com/practice-test-login/") - time.sleep(10) + input("press enter to continue") print("AntiCaptcha / Form Test") #browser.go_to("https://www.google.com/recaptcha/api2/demo") - browser.go_to("https://auth.leboncoin.fr/login") - inputs = browser.get_form_inputs() - inputs = ['[input1](Martin)', f'[input2](Test)', '[input3](test@gmail.com)'] - browser.fill_form_inputs(inputs) - browser.find_and_click_submission() - time.sleep(10) + browser.go_to("https://home.openweathermap.org/users/sign_up") + inputs_visible = browser.get_form_inputs() + print("inputs:", inputs_visible) + inputs_fill = ['[q](checked)', '[q](checked)', '[user[username]](mlg)', '[user[email]](mlg.fcu@gmail.com)', '[user[password]](placeholder_P@ssw0rd123)', '[user[password_confirmation]](placeholder_P@ssw0rd123)'] + browser.fill_form(inputs_fill) + input("press enter to exit") diff --git a/sources/llm_provider.py b/sources/llm_provider.py index 49be83f..19b4719 100644 --- a/sources/llm_provider.py +++ b/sources/llm_provider.py @@ -28,6 +28,7 @@ class Provider: "lm-studio": self.lm_studio_fn, "huggingface": self.huggingface_fn, "deepseek": self.deepseek_fn, + "together": self.together_fn, "dsk_deepseek": self.dsk_deepseek, "test": self.test_fn } @@ -122,7 +123,7 @@ class Provider: route_gen = f"http://{self.server_ip}/generate" if not self.is_ip_online(self.server_ip.split(":")[0]): - raise Exception(f"Server is offline at {self.server_ip}") + pretty_print(f"Server is offline at {self.server_ip}", color="failure") try: requests.post(route_setup, json={"model": self.model}) @@ -219,6 +220,27 @@ class Provider: except Exception as e: raise Exception(f"OpenAI API error: {str(e)}") from e + def together_fn(self, history, verbose=False): + """ + Use together AI for completion + """ + from together import Together + client = Together(api_key=self.api_key) + + try: + response = client.chat.completions.create( + model=self.model, + messages=history, + ) + if response is None: + raise Exception("Together AI response is empty.") + thought = response.choices[0].message.content + if verbose: + print(thought) + return thought + except Exception as e: + raise Exception(f"Together AI API error: {str(e)}") from e + def deepseek_fn(self, history, verbose=False): """ Use deepseek api to generate text. diff --git a/sources/logger.py b/sources/logger.py index e22ca16..97319c5 100644 --- a/sources/logger.py +++ b/sources/logger.py @@ -10,6 +10,7 @@ class Logger: self.log_path = os.path.join(self.folder, log_filename) self.enabled = True self.logger = None + self.last_log_msg = "" if self.enabled: self.create_logging(log_filename) @@ -33,7 +34,10 @@ class Logger: return False def log(self, message, level=logging.INFO): + if self.last_log_msg == message: + return if self.enabled: + self.last_log_msg = message self.logger.log(level, message) def info(self, message): diff --git a/sources/memory.py b/sources/memory.py index bc2c796..26e3a8b 100644 --- a/sources/memory.py +++ b/sources/memory.py @@ -34,8 +34,18 @@ class Memory(): self.model = "pszemraj/led-base-book-summary" self.device = self.get_cuda_device() self.memory_compression = memory_compression + self.tokenizer = None + self.model = None + if self.memory_compression: + self.download_model() + + def download_model(self): + """Download the model if not already downloaded.""" + pretty_print("Downloading memory compression model...", color="status") self.tokenizer = AutoTokenizer.from_pretrained(self.model) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model) + self.logger.info("Memory compression system initialized.") + def get_filename(self) -> str: """Get the filename for the save file.""" @@ -170,6 +180,9 @@ class Memory(): """ Compress the memory using the AI model. """ + if self.tokenizer is None or self.model is None: + self.logger.warning("No tokenizer or model to perform memory compression.") + return for i in range(len(self.memory)): if i < 2: continue diff --git a/sources/router.py b/sources/router.py index db179fb..e95e2de 100644 --- a/sources/router.py +++ b/sources/router.py @@ -152,6 +152,7 @@ class AgentRouter: ("make a snake game please", "LOW"), ("Find ‘gallery_list.pdf’, then build a web app to show my pics", "HIGH"), ("Find ‘budget_2025.xlsx’, analyze it, and make a chart for my boss", "HIGH"), + ("I want you to make me a plan to travel to Tainan", "HIGH"), ("Retrieve the latest publications on CRISPR and develop a web application to display them", "HIGH"), ("Bro dig up a music API and build me a tight app for the hottest tracks", "HIGH"), ("Find a public API for sports scores and build a web app to show live updates", "HIGH"),