feat : html to actual markdown for browser + better logging system

This commit is contained in:
martin legrand 2025-04-05 14:14:23 +02:00
parent a09b6bf8aa
commit 688e94d97c
8 changed files with 88 additions and 54 deletions

View File

@ -192,10 +192,10 @@ class BrowserAgent(Agent):
return f"""
Following a human request:
{user_query}
A web AI made the following finding across different pages:
A web browsing AI made the following finding across different pages:
{search_note}
Summarize the finding or step that lead to success, and provide a conclusion that answer the request.
Expand on the finding or step that lead to success, and provide a conclusion that answer the request. Include link when possible.
"""
def search_prompt(self, user_prompt: str) -> str:
@ -306,7 +306,7 @@ class BrowserAgent(Agent):
prompt = self.make_navigation_prompt(user_prompt, page_text)
prompt = self.conclude_prompt(user_prompt)
mem_last_idx = self.memory.push('assistant', prompt)
mem_last_idx = self.memory.push('user', prompt)
answer, reasoning = self.llm_request()
pretty_print(answer, color="output")
self.memory.clear_section(mem_begin_idx, mem_last_idx)

View File

@ -21,12 +21,8 @@ import markdownify
import sys
import re
if __name__ == "__main__":
from utility import pretty_print, animate_thinking
from logger import Logger
else:
from sources.utility import pretty_print, animate_thinking
from sources.logger import Logger
from sources.utility import pretty_print, animate_thinking
from sources.logger import Logger
def get_chrome_path() -> str:
"""Get the path to the Chrome executable."""
@ -163,19 +159,29 @@ class Browser:
return (word_count >= 5 and (has_punctuation or is_long_enough))
def get_text(self) -> str | None:
"""Get page text."""
"""Get page text as formatted Markdown"""
try:
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
for element in soup(['script', 'style']):
for element in soup(['script', 'style', 'noscript', 'meta', 'link']):
element.decompose()
text = soup.get_text()
lines = (f"{line.strip()}\n" for line in text.splitlines())
text = "\n".join(chunk for chunk in lines if chunk and self.is_sentence(chunk))
text = text[:4096]
#markdown_text = markdownify.markdownify(text, heading_style="ATX")
return "[Start of page]\n" + text + "\n[End of page]"
markdown_converter = markdownify.MarkdownConverter(
heading_style="ATX",
strip=['a'],
autolinks=False,
bullets='',
strong_em_symbol='*',
default_title=False,
)
markdown_text = markdown_converter.convert(str(soup.body))
lines = []
for line in markdown_text.splitlines():
stripped = line.strip()
if stripped and self.is_sentence(stripped):
cleaned = ' '.join(stripped.split())
lines.append(cleaned)
result = "[Start of page]\n\n" + "\n\n".join(lines) + "\n\n[End of page]"
result = re.sub(r'!\[(.*?)\]\(.*?\)', r'[IMAGE: \1]', result)
return result[:8192]
except Exception as e:
self.logger.error(f"Error getting text: {str(e)}")
return None
@ -243,20 +249,25 @@ class Browser:
if not element.is_enabled():
return False
try:
self.logger.error(f"Scrolling to element for click_element.")
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element)
time.sleep(0.1)
element.click()
return True
except ElementClickInterceptedException as e:
self.logger.error(f"Error click_element: {str(e)}")
return False
except TimeoutException:
self.logger.warning(f"Timeout clicking element.")
return False
except Exception as e:
self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}")
return False
def load_js(self, file_name: str) -> str:
"""Load javascript from script folder to inject to page."""
path = os.path.join(self.js_scripts_folder, file_name)
self.logger.info(f"Loading js at {path}")
try:
with open(path, 'r') as f:
return f.read()
@ -266,6 +277,7 @@ class Browser:
raise e
def find_all_inputs(self, timeout=3):
"""Find all inputs elements on the page."""
try:
WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
@ -283,6 +295,7 @@ class Browser:
try:
input_elements = self.find_all_inputs()
if not input_elements:
self.logger.info("No input element on page.")
return ["No input forms found on the page."]
form_strings = []
@ -331,14 +344,7 @@ class Browser:
return False
def find_and_click_btn(self, btn_type: str = 'login', timeout: int = 10) -> bool:
"""
Find and click a submit button matching the specified type.
Args:
btn_type: The type of button to find.
timeout: time to wait for button to appear.
Returns:
bool: True if the button was found and clicked, False otherwise.
"""
"""Find and click a submit button matching the specified type."""
buttons = self.get_buttons_xpath()
if not buttons:
self.logger.warning("No visible buttons found")
@ -446,19 +452,19 @@ class Browser:
input_elements = self.driver.execute_script(script)
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
driver = create_driver()
browser = Browser(driver, anticaptcha_manual_install=True)
time.sleep(10)
#browser.go_to("https://coinmarketcap.com/")
#browser.go_to("https://github.com/Fosowl/agenticSeek")
#txt = browser.get_text()
#print(txt)
#time.sleep(10)
#browser.go_to("https://practicetestautomation.com/practice-test-login/")
print("AntiCaptcha / Form Test")
browser.go_to("https://www.google.com/recaptcha/api2/demo")
#browser.go_to("https://practicetestautomation.com/practice-test-login/")
time.sleep(10)
inputs = browser.get_form_inputs()
inputs = ['[input1](Martin)', f'[input2](Test)', '[input3](test@gmail.com)']
#inputs = ['[input1](Martin)', f'[input2](Test)', '[input3](test@gmail.com)']
browser.fill_form_inputs(inputs)
browser.find_and_click_submission()
time.sleep(10)

View File

@ -112,17 +112,20 @@ class Interaction:
def think(self) -> bool:
"""Request AI agents to process the user input."""
push_last_agent_memory = False
if self.last_query is None or len(self.last_query) == 0:
return False
agent = self.router.select_agent(self.last_query)
if agent is None:
return False
if self.current_agent != agent and self.last_answer is not None:
push_last_agent_memory = True
tmp = self.last_answer
self.current_agent = agent
self.last_answer, _ = agent.process(self.last_query, self.speech)
if push_last_agent_memory:
self.current_agent.memory.push('user', self.last_query)
self.current_agent.memory.push('assistant', self.last_answer)
self.current_agent = agent
tmp = self.last_answer
self.last_answer, _ = agent.process(self.last_query, self.speech)
if self.last_answer == tmp:
self.last_answer = None
return True

View File

@ -6,6 +6,7 @@ from nltk.sentiment.vader import SentimentIntensityAnalyzer
from transformers import MarianMTModel, MarianTokenizer
from sources.utility import pretty_print, animate_thinking
from sources.logger import Logger
class LanguageUtility:
"""LanguageUtility for language, or emotion identification"""
@ -14,6 +15,7 @@ class LanguageUtility:
self.translators_tokenizer = None
self.translators_model = None
self.load_model()
self.logger = Logger("language.log")
def load_model(self) -> None:
animate_thinking("Loading language utility...", color="status")
@ -40,6 +42,7 @@ class LanguageUtility:
"""
langid.set_languages(['fr', 'en', 'zh'])
lang, score = langid.classify(text)
self.logger.info(f"Identified: {text} as {lang} with conf {score}")
return lang
def translate(self, text: str, origin_lang: str) -> str:
@ -86,6 +89,7 @@ class LanguageUtility:
dominant_emotion = max(emotions, key=emotions.get)
if emotions[dominant_emotion] == 0:
return 'Neutral'
self.logger.info(f"Emotion: {dominant_emotion} for text: {text}")
return dominant_emotion
except Exception as e:
raise e

View File

@ -13,6 +13,7 @@ from openai import OpenAI
from huggingface_hub import InferenceClient
from typing import List, Tuple, Type, Dict
from sources.utility import pretty_print, animate_thinking
from sources.logger import Logger
class Provider:
def __init__(self, provider_name, model, server_address = "127.0.0.1:5000", is_local=False):
@ -42,6 +43,7 @@ class Provider:
self.check_address_format(self.server_ip)
if not self.is_ip_online(self.server_ip.split(':')[0]):
raise Exception(f"Server at {self.server_ip} is offline.")
self.logger = Logger("provider.log")
def get_api_key(self, provider):
load_dotenv()
@ -50,6 +52,7 @@ class Provider:
if not api_key:
api_key = input(f"Please enter your {provider} API key: ")
set_key(".env", api_key_var, api_key)
self.logger.info("Set API key in env.")
load_dotenv()
return api_key
@ -73,6 +76,7 @@ class Provider:
Use the choosen provider to generate text.
"""
llm = self.available_providers[self.provider_name]
self.logger.info(f"Using provider: {self.provider_name} at {self.server_ip}")
try:
thought = llm(history, verbose)
except ConnectionError as e:
@ -98,11 +102,14 @@ class Provider:
if output.returncode == 0:
return True
else:
self.logger.error(f"Ping command returned code: {output.returncode}")
return False
except subprocess.TimeoutExpired:
self.logger.error("Ping subprocess timeout.")
return False
except Exception as e:
pretty_print(f"Error with ping request {str(e)}", color="failure")
self.logger.error(f"Ping error: {str(e)}")
return False
def server_fn(self, history, verbose = False):

View File

@ -8,9 +8,8 @@ from typing import List, Tuple, Type, Dict, Tuple
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sources.utility import timer_decorator, pretty_print
from sources.logger import Logger
class Memory():
"""
@ -36,6 +35,7 @@ class Memory():
self.memory_compression = memory_compression
self.tokenizer = AutoTokenizer.from_pretrained(self.model)
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model)
self.logger = Logger("memory.log")
def get_filename(self) -> str:
"""Get the filename for the save file."""
@ -44,6 +44,7 @@ class Memory():
def save_memory(self, agent_type: str = "casual_agent") -> None:
"""Save the session memory to a file."""
if not os.path.exists(self.conversation_folder):
self.logger.info(f"Created folder {self.conversation_folder}.")
os.makedirs(self.conversation_folder)
save_path = os.path.join(self.conversation_folder, agent_type)
if not os.path.exists(save_path):
@ -52,6 +53,7 @@ class Memory():
path = os.path.join(save_path, filename)
json_memory = json.dumps(self.memory)
with open(path, 'w') as f:
self.logger.info(f"Saved memory json at {path}")
f.write(json_memory)
def find_last_session_path(self, path) -> str:
@ -63,6 +65,7 @@ class Memory():
saved_sessions.append((filename, date))
saved_sessions.sort(key=lambda x: x[1], reverse=True)
if len(saved_sessions) > 0:
self.logger.info(f"Last session found at {saved_sessions[0][0]}")
return saved_sessions[0][0]
return None
@ -87,12 +90,14 @@ class Memory():
self.compress()
pretty_print("Session recovered successfully", color="success")
def reset(self, memory: list) -> None:
def reset(self, memory: list = []) -> None:
self.logger.info("Memory reset performed.")
self.memory = memory
def push(self, role: str, content: str) -> int:
"""Push a message to the memory."""
if self.memory_compression and role == 'assistant':
self.logger.info("Compressing memories on message push.")
self.compress()
curr_idx = len(self.memory)
if self.memory[curr_idx-1]['content'] == content:
@ -101,10 +106,12 @@ class Memory():
return curr_idx-1
def clear(self) -> None:
self.logger.info("Memory clear performed.")
self.memory = []
def clear_section(self, start: int, end: int) -> None:
"""Clear a section of the memory."""
self.logger.info(f"Memory section {start} to {end} cleared.")
self.memory = self.memory[:start] + self.memory[end:]
def get(self) -> list:
@ -128,6 +135,7 @@ class Memory():
str: The summarized text
"""
if self.tokenizer is None or self.model is None:
self.logger.warning("No tokenizer or model to perform summarization.")
return text
if len(text) < min_length*1.5:
return text
@ -144,6 +152,7 @@ class Memory():
)
summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
summary.replace('summary:', '')
self.logger.info(f"Memory summarization success from len {len(text)} to {len(summary)}.")
return summary
#@timer_decorator
@ -160,6 +169,7 @@ class Memory():
self.memory[i]['content'] = self.summarize(self.memory[i]['content'])
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
memory = Memory("You are a helpful assistant.",
recover_last_session=False, memory_compression=True)

View File

@ -6,8 +6,6 @@ from typing import List, Tuple, Type, Dict, Tuple
from transformers import pipeline
from adaptive_classifier import AdaptiveClassifier
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sources.agents.agent import Agent
from sources.agents.code_agent import CoderAgent
from sources.agents.casual_agent import CasualAgent
@ -15,6 +13,7 @@ from sources.agents.planner_agent import FileAgent
from sources.agents.browser_agent import BrowserAgent
from sources.language import LanguageUtility
from sources.utility import pretty_print, animate_thinking, timer_decorator
from sources.logger import Logger
class AgentRouter:
"""
@ -28,6 +27,7 @@ class AgentRouter:
self.complexity_classifier = self.load_llm_router()
self.learn_few_shots_tasks()
self.learn_few_shots_complexity()
self.logger = Logger("router.log")
def load_pipelines(self) -> Dict[str, Type[pipeline]]:
"""
@ -307,6 +307,7 @@ class AgentRouter:
llm_router, confidence_llm_router = result_llm_router[0], result_llm_router[1]
final_score_bart = confidence_bart / (confidence_bart + confidence_llm_router)
final_score_llm = confidence_llm_router / (confidence_bart + confidence_llm_router)
self.logger.info(f"Routing Vote: BART: {bart} ({final_score_bart}) LLM-router: {llm_router} ({final_score_llm})")
if log_confidence:
pretty_print(f"Agent choice -> BART: {bart} ({final_score_bart}) LLM-router: {llm_router} ({final_score_llm})")
return bart if final_score_bart > final_score_llm else llm_router
@ -334,6 +335,7 @@ class AgentRouter:
return "LOW"
complexity, confidence = predictions[0][0], predictions[0][1]
if confidence < 0.4:
self.logger.info(f"Low confidence in complexity estimation: {confidence}")
return "LOW"
if complexity == "HIGH" and len(text) < 64:
return None # ask for more info
@ -354,6 +356,7 @@ class AgentRouter:
if agent.type == "planner_agent":
return agent
pretty_print(f"Error finding planner agent. Please add a planner agent to the list of agents.", color="failure")
self.logger.error("Planner agent not found.")
return None
def select_agent(self, text: str) -> Agent:
@ -380,15 +383,18 @@ class AgentRouter:
try:
best_agent = self.router_vote(text, labels, log_confidence=False)
except Exception as e:
self.logger.error(f"Router failure: {str(e)}")
raise e
for agent in self.agents:
if best_agent == agent.role["en"]:
pretty_print(f"Selected agent: {agent.agent_name} (roles: {agent.role[lang]})", color="warning")
return agent
pretty_print(f"Error choosing agent.", color="failure")
self.logger.error("No agent selected.")
return None
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
agents = [
CasualAgent("jarvis", "../prompts/base/casual_agent.txt", None),
BrowserAgent("browser", "../prompts/base/planner_agent.txt", None),

View File

@ -1,4 +1,4 @@
import os
import os, sys
import re
import platform
import subprocess
@ -9,10 +9,7 @@ from kokoro import KPipeline
from IPython.display import display, Audio
import soundfile as sf
if __name__ == "__main__":
from utility import pretty_print, animate_thinking
else:
from sources.utility import pretty_print, animate_thinking
from sources.utility import pretty_print, animate_thinking
class Speech():
"""
@ -47,22 +44,22 @@ class Speech():
if not os.path.exists(path):
os.makedirs(path)
def speak(self, sentence: str, voice_number: int = 1):
def speak(self, sentence: str, voice_idx: int = 1):
"""
Convert text to speech using an AI model and play the audio.
Args:
sentence (str): The text to convert to speech. Will be pre-processed.
voice_number (int, optional): Index of the voice to use from the voice map.
voice_idx (int, optional): Index of the voice to use from the voice map.
"""
if not self.pipeline:
return
if voice_number >= len(self.voice_map[self.language]) or voice_number < 0:
if voice_idx >= len(self.voice_map[self.language]):
pretty_print("Invalid voice number, using default voice", color="error")
voice_number = 0
voice_idx = 0
sentence = self.clean_sentence(sentence)
audio_file = f"{self.voice_folder}/sample_{self.voice_map[self.language][voice_number]}.wav"
self.voice = self.voice_map[self.language][voice_number]
audio_file = f"{self.voice_folder}/sample_{self.voice_map[self.language][voice_idx]}.wav"
self.voice = self.voice_map[self.language][voice_idx]
generator = self.pipeline(
sentence, voice=self.voice,
speed=self.speed, split_pattern=r'\n+'
@ -143,6 +140,7 @@ class Speech():
return sentence
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
speech = Speech()
tosay_en = """
I looked up recent news using the website https://www.theguardian.com/world
@ -154,8 +152,8 @@ if __name__ == "__main__":
J'ai consulté les dernières nouvelles sur le site https://www.theguardian.com/world
"""
spk = Speech(enable=True, language="en", voice_idx=0)
spk.speak(tosay_en)
spk.speak(tosay_en, voice_idx=0)
spk = Speech(enable=True, language="fr", voice_idx=0)
spk.speak(tosay_fr)
spk = Speech(enable=True, language="zh", voice_idx=0)
spk.speak(tosay_zh)
#spk = Speech(enable=True, language="zh", voice_idx=0)
#spk.speak(tosay_zh)