Feat : better web search agent

This commit is contained in:
martin legrand 2025-03-14 21:09:27 +01:00
parent c5fc156fc6
commit 0ee77e8ad9
2 changed files with 110 additions and 76 deletions

View File

@ -8,7 +8,7 @@ from sources.browser import Browser
class BrowserAgent(Agent): class BrowserAgent(Agent):
def __init__(self, model, name, prompt_path, provider): def __init__(self, model, name, prompt_path, provider):
""" """
The casual agent is a special for casual talk to the user without specific tasks. The Browser agent is an agent that navigate the web autonomously in search of answer
""" """
super().__init__(model, name, prompt_path, provider) super().__init__(model, name, prompt_path, provider)
self.tools = { self.tools = {
@ -16,45 +16,14 @@ class BrowserAgent(Agent):
} }
self.role = "deep research and web search" self.role = "deep research and web search"
self.browser = Browser() self.browser = Browser()
self.browser.goTo("https://github.com/") self.browser.go_to("https://github.com/")
self.search_history = [] self.search_history = []
self.navigable_links = []
def make_init_prompt(self, user_prompt: str, search_result: str):
return f"""
Based on the search result:
{search_result}
Start browsing and find the information the user want.
User: {user_prompt}
You must choose a link to navigate to. Say i want to navigate to a <link>.
"""
def extract_links(self, search_result: str): def extract_links(self, search_result: str):
return re.findall(r'https?://[^\s]+', search_result) links = re.findall(r'https?://[^\s]+', search_result)
return self.clean_links(links)
def make_navigation_prompt(self, user_prompt: str, page_text: str, navigable_links: list):
remaining_links = "\n".join([f"[{i}] {link}" for i, link in enumerate(navigable_links) if link not in self.search_history])
return f"""
\nYou are browsing the web. Not the user, you are the browser.
Page content:
{page_text}
Navigable links:
{remaining_links}
You must choose a link to navigate to or do a new search.
Remember, you seek the information the user want.
The user query was : {user_prompt}
If you want to do a new search, use the "web_search" tool.
Exemple:
```web_search
weather in tokyo
```
If you have an answer and want to exit the browser, please say "REQUEST_EXIT".
If you don't choose a link or do a new search I will cut my fucking arm off.
"""
def clean_links(self, links: list): def clean_links(self, links: list):
links_clean = [] links_clean = []
for link in links: for link in links:
@ -63,38 +32,85 @@ class BrowserAgent(Agent):
else: else:
links_clean.append(link) links_clean.append(link)
return links_clean return links_clean
def get_unvisited_links(self):
return "\n".join([f"[{i}] {link}" for i, link in enumerate(self.navigable_links) if link not in self.search_history])
def make_newsearch_prompt(self, user_prompt: str, search_result: str):
return f"""
Based on the search result: {search_result}
Your goal is to find accurate and complete information to satisfy the users request.
User request: {user_prompt}
To proceed, choose a relevant link from the search results. Announce your choice by saying: "I want to navigate to <link>."
For example: "I want to navigate to geohot.github.io."
Do not explain your choice.
"""
def process(self, prompt, speech_module) -> str: def make_navigation_prompt(self, user_prompt: str, page_text: str):
remaining_links = self.get_unvisited_links()
remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, proceed with a new search."
return f"""
\nYou are currently browsing the web. Not the user, you are the browser.
Page content:
{page_text}
You can navigate to these links:
{remaining_links}
If no link seem appropriate, please say "GO_BACK".
Remember, you seek the information the user want.
The user query was : {user_prompt}
If you found a clear answer, please say "REQUEST_EXIT".
You must choose a link to navigate to, go back or exit.
Do not explain your choice.
"""
def llm_decide(self, prompt):
animate_thinking("Thinking...", color="status")
self.memory.push('user', prompt)
answer, reasoning = self.llm_request(prompt)
pretty_print("-"*100)
pretty_print(answer, color="output")
pretty_print("-"*100)
return answer, reasoning
def select_unvisited(self, search_result):
results_arr = search_result.split('\n\n')
results_unvisited = []
for res in results_arr:
for his in self.search_history:
if his not in res:
results_unvisited.append(res)
return results_unvisited
def process(self, user_prompt, speech_module) -> str:
complete = False complete = False
animate_thinking(f"Searching...", color="status") animate_thinking(f"Searching...", color="status")
search_result = self.tools["web_search"].execute([prompt], False) search_result = self.tools["web_search"].execute([user_prompt], False)
user_prompt = self.make_init_prompt(prompt, search_result) prompt = self.make_newsearch_prompt(user_prompt, search_result)
prompt = user_prompt
while not complete: while not complete:
animate_thinking("Thinking...", color="status") answer, reasoning = self.llm_decide(prompt)
self.memory.push('user', user_prompt)
answer, reasoning = self.llm_request(prompt)
pretty_print("-"*100)
pretty_print(answer, color="output")
pretty_print("-"*100)
if "REQUEST_EXIT" in answer: if "REQUEST_EXIT" in answer:
complete = True complete = True
break break
links = self.extract_links(answer) links = self.extract_links(answer)
links_clean = self.clean_links(links) if len(links) == 0 or "GO_BACK" in answer:
if len(links_clean) == 0: search_result_unvisited = self.select_unvisited(search_result)
prompt = f"Please choose a link to navigate to or do a new search. Links found:\n{links_clean}" prompt = self.make_newsearch_prompt(user_prompt, search_result)
pretty_print("No links found, doing a new search.", color="warning") pretty_print("No links found, doing a new search.", color="warning")
continue continue
animate_thinking(f"Navigating to {links[0]}", color="status") animate_thinking(f"Navigating to {links[0]}", color="status")
speech_module.speak(f"Navigating to {links[0]}") speech_module.speak(f"Navigating to {links[0]}")
self.browser.goTo(links[0]) self.browser.go_to(links[0])
page_text = self.browser.get_text()
self.search_history.append(links[0]) self.search_history.append(links[0])
page_text = self.browser.getText()[:2048] self.navigable_links = self.browser.get_navigable()
navigable_links = self.browser.getNavigable()[:15] prompt = self.make_navigation_prompt(user_prompt, page_text)
prompt = self.make_navigation_prompt(user_prompt, page_text, navigable_links)
speech_module.speak(answer)
self.browser.close() self.browser.close()
return answer, reasoning return answer, reasoning

View File

@ -35,7 +35,7 @@ class Browser:
except Exception as e: except Exception as e:
raise Exception(f"Failed to initialize browser: {str(e)}") raise Exception(f"Failed to initialize browser: {str(e)}")
def goTo(self, url): def go_to(self, url):
"""Navigate to a specified URL.""" """Navigate to a specified URL."""
try: try:
self.driver.get(url) self.driver.get(url)
@ -47,12 +47,19 @@ class Browser:
return False return False
def is_sentence(self, text): def is_sentence(self, text):
"""Check if the text is a sentence.""" """Check if the text qualifies as a meaningful sentence or contains important error codes."""
if "404" in text: text = text.strip()
return True # we want the ai to see the error error_codes = ["404", "403", "500", "502", "503"]
return len(text.split(" ")) > 5 and '.' in text if any(code in text for code in error_codes):
return True
words = text.split()
word_count = len(words)
has_punctuation = text.endswith(('.', '!', '?'))
is_long_enough = word_count > 5
has_letters = any(word.isalpha() for word in words)
return (word_count >= 5 and (has_punctuation or is_long_enough) and has_letters)
def getText(self): def get_text(self):
"""Get page text and convert it to README (Markdown) format.""" """Get page text and convert it to README (Markdown) format."""
try: try:
soup = BeautifulSoup(self.driver.page_source, 'html.parser') soup = BeautifulSoup(self.driver.page_source, 'html.parser')
@ -72,8 +79,24 @@ class Browser:
except Exception as e: except Exception as e:
self.logger.error(f"Error getting text: {str(e)}") self.logger.error(f"Error getting text: {str(e)}")
return None return None
def clean_url(self, url):
clean = url.split('#')[0]
parts = clean.split('?', 1)
base_url = parts[0]
if len(parts) > 1:
query = parts[1]
essential_params = []
for param in query.split('&'):
if param.startswith('_skw=') or param.startswith('q=') or param.startswith('s='):
essential_params.append(param)
elif param.startswith('_') or param.startswith('hash=') or param.startswith('itmmeta='):
break
if essential_params:
return f"{base_url}?{'&'.join(essential_params)}"
return base_url
def getNavigable(self): def get_navigable(self):
"""Get all navigable links on the current page.""" """Get all navigable links on the current page."""
try: try:
links = [] links = []
@ -89,12 +112,12 @@ class Browser:
}) })
self.logger.info(f"Found {len(links)} navigable links") self.logger.info(f"Found {len(links)} navigable links")
return links return [self.clean_url(link['url']) for link in links if link['is_displayed'] == True and len(link) < 256]
except Exception as e: except Exception as e:
self.logger.error(f"Error getting navigable links: {str(e)}") self.logger.error(f"Error getting navigable links: {str(e)}")
return [] return []
def clickElement(self, xpath): def click_element(self, xpath):
"""Click an element specified by xpath.""" """Click an element specified by xpath."""
try: try:
element = self.wait.until( element = self.wait.until(
@ -107,15 +130,15 @@ class Browser:
self.logger.error(f"Element not found or not clickable: {xpath}") self.logger.error(f"Element not found or not clickable: {xpath}")
return False return False
def getCurrentUrl(self): def get_current_url(self):
"""Get the current URL of the page.""" """Get the current URL of the page."""
return self.driver.current_url return self.driver.current_url
def getPageTitle(self): def get_page_title(self):
"""Get the title of the current page.""" """Get the title of the current page."""
return self.driver.title return self.driver.title
def scrollToBottom(self): def scroll_bottom(self):
"""Scroll to the bottom of the page.""" """Scroll to the bottom of the page."""
try: try:
self.driver.execute_script( self.driver.execute_script(
@ -127,7 +150,7 @@ class Browser:
self.logger.error(f"Error scrolling: {str(e)}") self.logger.error(f"Error scrolling: {str(e)}")
return False return False
def takeScreenshot(self, filename): def screenshot(self, filename):
"""Take a screenshot of the current page.""" """Take a screenshot of the current page."""
try: try:
self.driver.save_screenshot(filename) self.driver.save_screenshot(filename)
@ -155,16 +178,11 @@ if __name__ == "__main__":
browser = Browser(headless=False) browser = Browser(headless=False)
try: try:
browser.goTo("https://karpathy.github.io/") browser.go_to("https://karpathy.github.io/")
text = browser.getText() text = browser.get_text()
print("Page Text in Markdown:") print("Page Text in Markdown:")
print(text) print(text)
links = browser.getNavigable() links = browser.get_navigable()
print("\nNavigable Links:") print("\nNavigable Links:", links)
for link in links[:50]:
print(f"Text: {link['text']}, URL: {link['url']}")
browser.takeScreenshot("example.png")
finally: finally:
browser.close() browser.close()