Merge pull request #116 from Fosowl/dev

Better web form handling
This commit is contained in:
Martin 2025-04-13 17:30:00 +02:00 committed by GitHub
commit 424c5c4f7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 173 additions and 36 deletions

View File

@ -14,10 +14,11 @@ class executorResult:
"""
A class to store the result of a tool execution.
"""
def __init__(self, block, feedback, success):
def __init__(self, block, feedback, success, tool_type):
self.block = block
self.feedback = feedback
self.success = success
self.tool_type = tool_type
def show(self):
pretty_print(''*64, color="status")
@ -127,6 +128,9 @@ class Agent():
def get_blocks_result(self) -> list:
return self.blocks_result
def get_last_tool_type(self) -> str:
return self.blocks_result[-1].tool_type if len(self.blocks_result) > 0 else None
def show_answer(self):
"""
@ -185,7 +189,7 @@ class Agent():
output = tool.execute([block])
feedback = tool.interpreter_feedback(output) # tool interpreter feedback
success = not tool.execution_failure_check(output)
self.blocks_result.append(executorResult(block, feedback, success))
self.blocks_result.append(executorResult(block, feedback, success, name))
if not success:
self.memory.push('user', feedback)
return False, feedback

View File

@ -163,7 +163,7 @@ class BrowserAgent(Agent):
You previously took these notes:
{notes}
Do not Step-by-Step explanation. Write comprehensive Notes or Error as a long paragraph followed by your action.
Do not go to tutorials or help pages.
You must always take notes.
"""
def llm_decide(self, prompt: str, show_reasoning: bool = False) -> Tuple[str, str]:
@ -262,20 +262,24 @@ class BrowserAgent(Agent):
Do not try to answer query. you can only formulate search term or exit.
"""
def handle_update_prompt(self, user_prompt: str, page_text: str) -> str:
return f"""
def handle_update_prompt(self, user_prompt: str, page_text: str, fill_success: bool) -> str:
prompt = f"""
You are a web browser.
You just filled a form on the page.
Now you should see the result of the form submission on the page:
Page text:
{page_text}
The user asked: {user_prompt}
Does the page answer the users query now?
Does the page answer the users query now? Are you still on a login page or did you get redirected?
If it does, take notes of the useful information, write down result and say {Action.FORM_FILLED.value}.
If you were previously on a login form, no need to explain.
If it does and you completed user request, say {Action.REQUEST_EXIT.value}
if it doesnt, say: Error: Attempt to fill form didn't work {Action.GO_BACK.value}.
If you were previously on a login form, no need to take notes.
"""
if not fill_success:
prompt += f"""
According to browser feedback, the form was not filled correctly. Is that so? you might consider other strategies.
"""
return prompt
def show_search_results(self, search_result: List[str]):
pretty_print("\nSearch results:", color="output")
@ -298,28 +302,28 @@ class BrowserAgent(Agent):
animate_thinking(f"Thinking...", color="status")
mem_begin_idx = self.memory.push('user', self.search_prompt(user_prompt))
ai_prompt, _ = self.llm_request()
ai_prompt, reasoning = self.llm_request()
if Action.REQUEST_EXIT.value in ai_prompt:
pretty_print(f"Web agent requested exit.\n{reasoning}\n\n{ai_prompt}", color="failure")
return ai_prompt, ""
animate_thinking(f"Searching...", color="status")
search_result_raw = self.tools["web_search"].execute([ai_prompt], False)
search_result = self.jsonify_search_results(search_result_raw)[:12]
search_result = self.jsonify_search_results(search_result_raw)[:16]
self.show_search_results(search_result)
prompt = self.make_newsearch_prompt(user_prompt, search_result)
unvisited = [None]
while not complete and len(unvisited) > 0:
self.memory.clear()
answer, reasoning = self.llm_decide(prompt, show_reasoning = False)
pretty_print(''*32, color="status")
extracted_form = self.extract_form(answer)
if len(extracted_form) > 0:
pretty_print(f"Filling inputs form...", color="status")
self.browser.fill_form_inputs(extracted_form)
self.browser.find_and_click_submission()
fill_success = self.browser.fill_form(extracted_form)
page_text = self.browser.get_text()
answer = self.handle_update_prompt(user_prompt, page_text)
answer = self.handle_update_prompt(user_prompt, page_text, fill_success)
answer, reasoning = self.llm_decide(prompt)
if Action.FORM_FILLED.value in answer:

View File

@ -57,6 +57,8 @@ class CoderAgent(Agent):
exec_success, _ = self.execute_modules(answer)
answer = self.remove_blocks(answer)
self.last_answer = answer
if self.get_last_tool_type() == "bash":
continue
if exec_success:
break
pretty_print("Execution failure", color="failure")

View File

@ -80,7 +80,7 @@ class PlannerAgent(Agent):
agents_tasks = self.parse_agent_tasks(answer)
if agents_tasks == (None, None):
pretty_print(answer, color="warning")
pretty_print("Failed to make a plan. This can happen with (too) small LLM. Clarify your request and insist on it making a plan.", color="failure")
pretty_print("Failed to make a plan. This can happen with (too) small LLM. Clarify your request and insist on it making a plan within ```json.", color="failure")
return
pretty_print("\n▂▘ P L A N ▝▂", color="status")
for task_name, task in agents_tasks:

View File

@ -22,6 +22,8 @@ import markdownify
import sys
import re
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sources.utility import pretty_print, animate_thinking
from sources.logger import Logger
@ -348,17 +350,23 @@ class Browser:
result.sort(key=lambda x: len(x[0]))
return result
def find_and_click_submission(self, timeout: int = 10) -> bool:
possible_submissions = ["login", "submit", "register", "calculate", "login", "submit", "register", "calculate", "save", "send",
"continue", "apply", "ok", "confirm", "next", "proceed", "accept", "agree", "yes", "no", "cancel",
"close", "done", "finish", "start", "calculate"]
for submission in possible_submissions:
if self.find_and_click_btn(submission, timeout):
return True
self.logger.warning("No submission button found")
return False
def wait_for_submission_outcome(self, timeout: int = 10) -> bool:
"""
Wait for a submission outcome (e.g., URL change or new element).
"""
try:
wait = WebDriverWait(self.driver, timeout)
wait.until(
lambda driver: driver.current_url != self.driver.current_url or
driver.find_elements(By.XPATH, "//*[contains(text(), 'success')]")
)
self.logger.info("Detected submission outcome")
return True
except TimeoutException:
self.logger.warning("No submission outcome detected")
return False
def find_and_click_btn(self, btn_type: str = 'login', timeout: int = 10) -> bool:
def find_and_click_btn(self, btn_type: str = 'login', timeout: int = 5) -> bool:
"""Find and click a submit button matching the specified type."""
buttons = self.get_buttons_xpath()
if not buttons:
@ -366,7 +374,7 @@ class Browser:
return False
for button_text, xpath in buttons:
if btn_type.lower() in button_text.lower():
if btn_type.lower() in button_text.lower() or btn_type.lower() in xpath.lower():
try:
wait = WebDriverWait(self.driver, timeout)
element = wait.until(
@ -385,6 +393,56 @@ class Browser:
return False
self.logger.warning(f"No button matching '{btn_type}' found")
return False
def tick_all_checkboxes(self) -> bool:
"""
Find and tick all checkboxes on the page.
Returns True if successful, False if any issues occur.
"""
try:
checkboxes = self.driver.find_elements(By.XPATH, "//input[@type='checkbox']")
if not checkboxes:
self.logger.info("No checkboxes found on the page")
return True
for index, checkbox in enumerate(checkboxes, 1):
try:
WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable(checkbox)
)
self.driver.execute_script(
"arguments[0].scrollIntoView({block: 'center', inline: 'center'});", checkbox
)
if not checkbox.is_selected():
try:
checkbox.click()
self.logger.info(f"Ticked checkbox {index}")
except ElementClickInterceptedException:
self.driver.execute_script("arguments[0].click();", checkbox)
self.logger.info(f"Ticked checkbox {index} using JavaScript")
else:
self.logger.debug(f"Checkbox {index} already ticked")
except TimeoutException:
self.logger.warning(f"Timeout waiting for checkbox {index} to be clickable")
continue
except Exception as e:
self.logger.error(f"Error ticking checkbox {index}: {str(e)}")
continue
return True
except Exception as e:
self.logger.error(f"Error finding checkboxes: {str(e)}")
return False
def find_and_click_submission(self, timeout: int = 10) -> bool:
possible_submissions = ["login", "submit", "register", "continue", "apply",
"ok", "confirm", "proceed", "accept",
"done", "finish", "start", "calculate"]
for submission in possible_submissions:
if self.find_and_click_btn(submission, timeout):
self.logger.info(f"Clicked on submission button: {submission}")
return True
self.logger.warning("No submission button found")
return False
def find_input_xpath_by_name(self, inputs, name: str) -> str | None:
for field in inputs:
@ -393,7 +451,7 @@ class Browser:
return None
def fill_form_inputs(self, input_list: List[str]) -> bool:
"""Fill form inputs based on a list of [name](value) strings."""
"""Fill inputs based on a list of [name](value) strings."""
if not isinstance(input_list, list):
self.logger.error("input_list must be a list")
return False
@ -410,8 +468,19 @@ class Browser:
value = value.strip()
xpath = self.find_input_xpath_by_name(inputs, name)
if not xpath:
self.logger.warning(f"Input field '{name}' not found")
continue
try:
element = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, xpath))
)
except TimeoutException:
self.logger.error(f"Timeout waiting for element '{name}' to be clickable")
continue
self.driver.execute_script("arguments[0].scrollIntoView(true);", element)
if not element.is_displayed() or not element.is_enabled():
self.logger.warning(f"Element '{name}' is not interactable (not displayed or disabled)")
continue
element = self.driver.find_element(By.XPATH, xpath)
input_type = (element.get_attribute("type") or "text").lower()
if input_type in ["checkbox", "radio"]:
is_checked = element.is_selected()
@ -428,6 +497,25 @@ class Browser:
except Exception as e:
self.logger.error(f"Error filling form inputs: {str(e)}")
return False
def fill_form(self, input_list: List[str]) -> bool:
"""Fill form inputs based on a list of [name](value) and submit."""
if not isinstance(input_list, list):
self.logger.error("input_list must be a list")
return False
if self.fill_form_inputs(input_list):
self.logger.info("Form filled successfully")
self.tick_all_checkboxes()
if self.find_and_click_submission():
if self.wait_for_submission_outcome():
self.logger.info("Submission outcome detected")
return True
else:
self.logger.warning("No submission outcome detected")
else:
self.logger.warning("Failed to submit form")
self.logger.warning("Failed to fill form inputs")
return False
def get_current_url(self) -> str:
"""Get the current URL of the page."""
@ -467,7 +555,6 @@ class Browser:
input_elements = self.driver.execute_script(script)
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
driver = create_driver()
browser = Browser(driver, anticaptcha_manual_install=True)
@ -475,12 +562,12 @@ if __name__ == "__main__":
#txt = browser.get_text()
#print(txt)
#browser.go_to("https://practicetestautomation.com/practice-test-login/")
time.sleep(10)
input("press enter to continue")
print("AntiCaptcha / Form Test")
#browser.go_to("https://www.google.com/recaptcha/api2/demo")
browser.go_to("https://auth.leboncoin.fr/login")
inputs = browser.get_form_inputs()
inputs = ['[input1](Martin)', f'[input2](Test)', '[input3](test@gmail.com)']
browser.fill_form_inputs(inputs)
browser.find_and_click_submission()
time.sleep(10)
browser.go_to("https://home.openweathermap.org/users/sign_up")
inputs_visible = browser.get_form_inputs()
print("inputs:", inputs_visible)
inputs_fill = ['[q](checked)', '[q](checked)', '[user[username]](mlg)', '[user[email]](mlg.fcu@gmail.com)', '[user[password]](placeholder_P@ssw0rd123)', '[user[password_confirmation]](placeholder_P@ssw0rd123)']
browser.fill_form(inputs_fill)
input("press enter to exit")

View File

@ -28,6 +28,7 @@ class Provider:
"lm-studio": self.lm_studio_fn,
"huggingface": self.huggingface_fn,
"deepseek": self.deepseek_fn,
"together": self.together_fn,
"dsk_deepseek": self.dsk_deepseek,
"test": self.test_fn
}
@ -122,7 +123,7 @@ class Provider:
route_gen = f"http://{self.server_ip}/generate"
if not self.is_ip_online(self.server_ip.split(":")[0]):
raise Exception(f"Server is offline at {self.server_ip}")
pretty_print(f"Server is offline at {self.server_ip}", color="failure")
try:
requests.post(route_setup, json={"model": self.model})
@ -219,6 +220,27 @@ class Provider:
except Exception as e:
raise Exception(f"OpenAI API error: {str(e)}") from e
def together_fn(self, history, verbose=False):
"""
Use together AI for completion
"""
from together import Together
client = Together(api_key=self.api_key)
try:
response = client.chat.completions.create(
model=self.model,
messages=history,
)
if response is None:
raise Exception("Together AI response is empty.")
thought = response.choices[0].message.content
if verbose:
print(thought)
return thought
except Exception as e:
raise Exception(f"Together AI API error: {str(e)}") from e
def deepseek_fn(self, history, verbose=False):
"""
Use deepseek api to generate text.

View File

@ -10,6 +10,7 @@ class Logger:
self.log_path = os.path.join(self.folder, log_filename)
self.enabled = True
self.logger = None
self.last_log_msg = ""
if self.enabled:
self.create_logging(log_filename)
@ -33,7 +34,10 @@ class Logger:
return False
def log(self, message, level=logging.INFO):
if self.last_log_msg == message:
return
if self.enabled:
self.last_log_msg = message
self.logger.log(level, message)
def info(self, message):

View File

@ -34,8 +34,18 @@ class Memory():
self.model = "pszemraj/led-base-book-summary"
self.device = self.get_cuda_device()
self.memory_compression = memory_compression
self.tokenizer = None
self.model = None
if self.memory_compression:
self.download_model()
def download_model(self):
"""Download the model if not already downloaded."""
pretty_print("Downloading memory compression model...", color="status")
self.tokenizer = AutoTokenizer.from_pretrained(self.model)
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model)
self.logger.info("Memory compression system initialized.")
def get_filename(self) -> str:
"""Get the filename for the save file."""
@ -170,6 +180,9 @@ class Memory():
"""
Compress the memory using the AI model.
"""
if self.tokenizer is None or self.model is None:
self.logger.warning("No tokenizer or model to perform memory compression.")
return
for i in range(len(self.memory)):
if i < 2:
continue

View File

@ -152,6 +152,7 @@ class AgentRouter:
("make a snake game please", "LOW"),
("Find gallery_list.pdf, then build a web app to show my pics", "HIGH"),
("Find budget_2025.xlsx, analyze it, and make a chart for my boss", "HIGH"),
("I want you to make me a plan to travel to Tainan", "HIGH"),
("Retrieve the latest publications on CRISPR and develop a web application to display them", "HIGH"),
("Bro dig up a music API and build me a tight app for the hottest tracks", "HIGH"),
("Find a public API for sports scores and build a web app to show live updates", "HIGH"),