From 688e94d97cbde6393a8291e5e9b5afe2ed181f94 Mon Sep 17 00:00:00 2001 From: martin legrand Date: Sat, 5 Apr 2025 14:14:23 +0200 Subject: [PATCH] feat : html to actual markdown for browser + better logging system --- sources/agents/browser_agent.py | 6 ++-- sources/browser.py | 64 ++++++++++++++++++--------------- sources/interaction.py | 9 +++-- sources/language.py | 4 +++ sources/llm_provider.py | 7 ++++ sources/memory.py | 16 +++++++-- sources/router.py | 10 ++++-- sources/text_to_speech.py | 26 +++++++------- 8 files changed, 88 insertions(+), 54 deletions(-) diff --git a/sources/agents/browser_agent.py b/sources/agents/browser_agent.py index 812b812..caf5dc6 100644 --- a/sources/agents/browser_agent.py +++ b/sources/agents/browser_agent.py @@ -192,10 +192,10 @@ class BrowserAgent(Agent): return f""" Following a human request: {user_query} - A web AI made the following finding across different pages: + A web browsing AI made the following finding across different pages: {search_note} - Summarize the finding or step that lead to success, and provide a conclusion that answer the request. + Expand on the finding or step that lead to success, and provide a conclusion that answer the request. Include link when possible. """ def search_prompt(self, user_prompt: str) -> str: @@ -306,7 +306,7 @@ class BrowserAgent(Agent): prompt = self.make_navigation_prompt(user_prompt, page_text) prompt = self.conclude_prompt(user_prompt) - mem_last_idx = self.memory.push('assistant', prompt) + mem_last_idx = self.memory.push('user', prompt) answer, reasoning = self.llm_request() pretty_print(answer, color="output") self.memory.clear_section(mem_begin_idx, mem_last_idx) diff --git a/sources/browser.py b/sources/browser.py index fbf73ad..6885eab 100644 --- a/sources/browser.py +++ b/sources/browser.py @@ -21,12 +21,8 @@ import markdownify import sys import re -if __name__ == "__main__": - from utility import pretty_print, animate_thinking - from logger import Logger -else: - from sources.utility import pretty_print, animate_thinking - from sources.logger import Logger +from sources.utility import pretty_print, animate_thinking +from sources.logger import Logger def get_chrome_path() -> str: """Get the path to the Chrome executable.""" @@ -163,19 +159,29 @@ class Browser: return (word_count >= 5 and (has_punctuation or is_long_enough)) def get_text(self) -> str | None: - """Get page text.""" + """Get page text as formatted Markdown""" try: soup = BeautifulSoup(self.driver.page_source, 'html.parser') - - for element in soup(['script', 'style']): + for element in soup(['script', 'style', 'noscript', 'meta', 'link']): element.decompose() - - text = soup.get_text() - lines = (f"{line.strip()}\n" for line in text.splitlines()) - text = "\n".join(chunk for chunk in lines if chunk and self.is_sentence(chunk)) - text = text[:4096] - #markdown_text = markdownify.markdownify(text, heading_style="ATX") - return "[Start of page]\n" + text + "\n[End of page]" + markdown_converter = markdownify.MarkdownConverter( + heading_style="ATX", + strip=['a'], + autolinks=False, + bullets='•', + strong_em_symbol='*', + default_title=False, + ) + markdown_text = markdown_converter.convert(str(soup.body)) + lines = [] + for line in markdown_text.splitlines(): + stripped = line.strip() + if stripped and self.is_sentence(stripped): + cleaned = ' '.join(stripped.split()) + lines.append(cleaned) + result = "[Start of page]\n\n" + "\n\n".join(lines) + "\n\n[End of page]" + result = re.sub(r'!\[(.*?)\]\(.*?\)', r'[IMAGE: \1]', result) + return result[:8192] except Exception as e: self.logger.error(f"Error getting text: {str(e)}") return None @@ -243,20 +249,25 @@ class Browser: if not element.is_enabled(): return False try: + self.logger.error(f"Scrolling to element for click_element.") self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element) time.sleep(0.1) element.click() return True except ElementClickInterceptedException as e: + self.logger.error(f"Error click_element: {str(e)}") return False except TimeoutException: + self.logger.warning(f"Timeout clicking element.") return False except Exception as e: self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}") return False def load_js(self, file_name: str) -> str: + """Load javascript from script folder to inject to page.""" path = os.path.join(self.js_scripts_folder, file_name) + self.logger.info(f"Loading js at {path}") try: with open(path, 'r') as f: return f.read() @@ -266,6 +277,7 @@ class Browser: raise e def find_all_inputs(self, timeout=3): + """Find all inputs elements on the page.""" try: WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) @@ -283,6 +295,7 @@ class Browser: try: input_elements = self.find_all_inputs() if not input_elements: + self.logger.info("No input element on page.") return ["No input forms found on the page."] form_strings = [] @@ -331,14 +344,7 @@ class Browser: return False def find_and_click_btn(self, btn_type: str = 'login', timeout: int = 10) -> bool: - """ - Find and click a submit button matching the specified type. - Args: - btn_type: The type of button to find. - timeout: time to wait for button to appear. - Returns: - bool: True if the button was found and clicked, False otherwise. - """ + """Find and click a submit button matching the specified type.""" buttons = self.get_buttons_xpath() if not buttons: self.logger.warning("No visible buttons found") @@ -446,19 +452,19 @@ class Browser: input_elements = self.driver.execute_script(script) if __name__ == "__main__": + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) driver = create_driver() browser = Browser(driver, anticaptcha_manual_install=True) - time.sleep(10) - #browser.go_to("https://coinmarketcap.com/") + #browser.go_to("https://github.com/Fosowl/agenticSeek") #txt = browser.get_text() #print(txt) + #time.sleep(10) + #browser.go_to("https://practicetestautomation.com/practice-test-login/") print("AntiCaptcha / Form Test") browser.go_to("https://www.google.com/recaptcha/api2/demo") - #browser.go_to("https://practicetestautomation.com/practice-test-login/") - time.sleep(10) inputs = browser.get_form_inputs() - inputs = ['[input1](Martin)', f'[input2](Test)', '[input3](test@gmail.com)'] + #inputs = ['[input1](Martin)', f'[input2](Test)', '[input3](test@gmail.com)'] browser.fill_form_inputs(inputs) browser.find_and_click_submission() time.sleep(10) diff --git a/sources/interaction.py b/sources/interaction.py index 2f07e81..4fef15f 100644 --- a/sources/interaction.py +++ b/sources/interaction.py @@ -112,17 +112,20 @@ class Interaction: def think(self) -> bool: """Request AI agents to process the user input.""" + push_last_agent_memory = False if self.last_query is None or len(self.last_query) == 0: return False agent = self.router.select_agent(self.last_query) if agent is None: return False if self.current_agent != agent and self.last_answer is not None: + push_last_agent_memory = True + tmp = self.last_answer + self.current_agent = agent + self.last_answer, _ = agent.process(self.last_query, self.speech) + if push_last_agent_memory: self.current_agent.memory.push('user', self.last_query) self.current_agent.memory.push('assistant', self.last_answer) - self.current_agent = agent - tmp = self.last_answer - self.last_answer, _ = agent.process(self.last_query, self.speech) if self.last_answer == tmp: self.last_answer = None return True diff --git a/sources/language.py b/sources/language.py index f2bde5d..a7ed91f 100644 --- a/sources/language.py +++ b/sources/language.py @@ -6,6 +6,7 @@ from nltk.sentiment.vader import SentimentIntensityAnalyzer from transformers import MarianMTModel, MarianTokenizer from sources.utility import pretty_print, animate_thinking +from sources.logger import Logger class LanguageUtility: """LanguageUtility for language, or emotion identification""" @@ -14,6 +15,7 @@ class LanguageUtility: self.translators_tokenizer = None self.translators_model = None self.load_model() + self.logger = Logger("language.log") def load_model(self) -> None: animate_thinking("Loading language utility...", color="status") @@ -40,6 +42,7 @@ class LanguageUtility: """ langid.set_languages(['fr', 'en', 'zh']) lang, score = langid.classify(text) + self.logger.info(f"Identified: {text} as {lang} with conf {score}") return lang def translate(self, text: str, origin_lang: str) -> str: @@ -86,6 +89,7 @@ class LanguageUtility: dominant_emotion = max(emotions, key=emotions.get) if emotions[dominant_emotion] == 0: return 'Neutral' + self.logger.info(f"Emotion: {dominant_emotion} for text: {text}") return dominant_emotion except Exception as e: raise e diff --git a/sources/llm_provider.py b/sources/llm_provider.py index 65b9228..ab52af3 100644 --- a/sources/llm_provider.py +++ b/sources/llm_provider.py @@ -13,6 +13,7 @@ from openai import OpenAI from huggingface_hub import InferenceClient from typing import List, Tuple, Type, Dict from sources.utility import pretty_print, animate_thinking +from sources.logger import Logger class Provider: def __init__(self, provider_name, model, server_address = "127.0.0.1:5000", is_local=False): @@ -42,6 +43,7 @@ class Provider: self.check_address_format(self.server_ip) if not self.is_ip_online(self.server_ip.split(':')[0]): raise Exception(f"Server at {self.server_ip} is offline.") + self.logger = Logger("provider.log") def get_api_key(self, provider): load_dotenv() @@ -50,6 +52,7 @@ class Provider: if not api_key: api_key = input(f"Please enter your {provider} API key: ") set_key(".env", api_key_var, api_key) + self.logger.info("Set API key in env.") load_dotenv() return api_key @@ -73,6 +76,7 @@ class Provider: Use the choosen provider to generate text. """ llm = self.available_providers[self.provider_name] + self.logger.info(f"Using provider: {self.provider_name} at {self.server_ip}") try: thought = llm(history, verbose) except ConnectionError as e: @@ -98,11 +102,14 @@ class Provider: if output.returncode == 0: return True else: + self.logger.error(f"Ping command returned code: {output.returncode}") return False except subprocess.TimeoutExpired: + self.logger.error("Ping subprocess timeout.") return False except Exception as e: pretty_print(f"Error with ping request {str(e)}", color="failure") + self.logger.error(f"Ping error: {str(e)}") return False def server_fn(self, history, verbose = False): diff --git a/sources/memory.py b/sources/memory.py index 9929203..e48ff57 100644 --- a/sources/memory.py +++ b/sources/memory.py @@ -8,9 +8,8 @@ from typing import List, Tuple, Type, Dict, Tuple import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - from sources.utility import timer_decorator, pretty_print +from sources.logger import Logger class Memory(): """ @@ -36,6 +35,7 @@ class Memory(): self.memory_compression = memory_compression self.tokenizer = AutoTokenizer.from_pretrained(self.model) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model) + self.logger = Logger("memory.log") def get_filename(self) -> str: """Get the filename for the save file.""" @@ -44,6 +44,7 @@ class Memory(): def save_memory(self, agent_type: str = "casual_agent") -> None: """Save the session memory to a file.""" if not os.path.exists(self.conversation_folder): + self.logger.info(f"Created folder {self.conversation_folder}.") os.makedirs(self.conversation_folder) save_path = os.path.join(self.conversation_folder, agent_type) if not os.path.exists(save_path): @@ -52,6 +53,7 @@ class Memory(): path = os.path.join(save_path, filename) json_memory = json.dumps(self.memory) with open(path, 'w') as f: + self.logger.info(f"Saved memory json at {path}") f.write(json_memory) def find_last_session_path(self, path) -> str: @@ -63,6 +65,7 @@ class Memory(): saved_sessions.append((filename, date)) saved_sessions.sort(key=lambda x: x[1], reverse=True) if len(saved_sessions) > 0: + self.logger.info(f"Last session found at {saved_sessions[0][0]}") return saved_sessions[0][0] return None @@ -87,12 +90,14 @@ class Memory(): self.compress() pretty_print("Session recovered successfully", color="success") - def reset(self, memory: list) -> None: + def reset(self, memory: list = []) -> None: + self.logger.info("Memory reset performed.") self.memory = memory def push(self, role: str, content: str) -> int: """Push a message to the memory.""" if self.memory_compression and role == 'assistant': + self.logger.info("Compressing memories on message push.") self.compress() curr_idx = len(self.memory) if self.memory[curr_idx-1]['content'] == content: @@ -101,10 +106,12 @@ class Memory(): return curr_idx-1 def clear(self) -> None: + self.logger.info("Memory clear performed.") self.memory = [] def clear_section(self, start: int, end: int) -> None: """Clear a section of the memory.""" + self.logger.info(f"Memory section {start} to {end} cleared.") self.memory = self.memory[:start] + self.memory[end:] def get(self) -> list: @@ -128,6 +135,7 @@ class Memory(): str: The summarized text """ if self.tokenizer is None or self.model is None: + self.logger.warning("No tokenizer or model to perform summarization.") return text if len(text) < min_length*1.5: return text @@ -144,6 +152,7 @@ class Memory(): ) summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True) summary.replace('summary:', '') + self.logger.info(f"Memory summarization success from len {len(text)} to {len(summary)}.") return summary #@timer_decorator @@ -160,6 +169,7 @@ class Memory(): self.memory[i]['content'] = self.summarize(self.memory[i]['content']) if __name__ == "__main__": + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) memory = Memory("You are a helpful assistant.", recover_last_session=False, memory_compression=True) diff --git a/sources/router.py b/sources/router.py index 4aa5852..56c0afe 100644 --- a/sources/router.py +++ b/sources/router.py @@ -6,8 +6,6 @@ from typing import List, Tuple, Type, Dict, Tuple from transformers import pipeline from adaptive_classifier import AdaptiveClassifier -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - from sources.agents.agent import Agent from sources.agents.code_agent import CoderAgent from sources.agents.casual_agent import CasualAgent @@ -15,6 +13,7 @@ from sources.agents.planner_agent import FileAgent from sources.agents.browser_agent import BrowserAgent from sources.language import LanguageUtility from sources.utility import pretty_print, animate_thinking, timer_decorator +from sources.logger import Logger class AgentRouter: """ @@ -28,6 +27,7 @@ class AgentRouter: self.complexity_classifier = self.load_llm_router() self.learn_few_shots_tasks() self.learn_few_shots_complexity() + self.logger = Logger("router.log") def load_pipelines(self) -> Dict[str, Type[pipeline]]: """ @@ -307,6 +307,7 @@ class AgentRouter: llm_router, confidence_llm_router = result_llm_router[0], result_llm_router[1] final_score_bart = confidence_bart / (confidence_bart + confidence_llm_router) final_score_llm = confidence_llm_router / (confidence_bart + confidence_llm_router) + self.logger.info(f"Routing Vote: BART: {bart} ({final_score_bart}) LLM-router: {llm_router} ({final_score_llm})") if log_confidence: pretty_print(f"Agent choice -> BART: {bart} ({final_score_bart}) LLM-router: {llm_router} ({final_score_llm})") return bart if final_score_bart > final_score_llm else llm_router @@ -334,6 +335,7 @@ class AgentRouter: return "LOW" complexity, confidence = predictions[0][0], predictions[0][1] if confidence < 0.4: + self.logger.info(f"Low confidence in complexity estimation: {confidence}") return "LOW" if complexity == "HIGH" and len(text) < 64: return None # ask for more info @@ -354,6 +356,7 @@ class AgentRouter: if agent.type == "planner_agent": return agent pretty_print(f"Error finding planner agent. Please add a planner agent to the list of agents.", color="failure") + self.logger.error("Planner agent not found.") return None def select_agent(self, text: str) -> Agent: @@ -380,15 +383,18 @@ class AgentRouter: try: best_agent = self.router_vote(text, labels, log_confidence=False) except Exception as e: + self.logger.error(f"Router failure: {str(e)}") raise e for agent in self.agents: if best_agent == agent.role["en"]: pretty_print(f"Selected agent: {agent.agent_name} (roles: {agent.role[lang]})", color="warning") return agent pretty_print(f"Error choosing agent.", color="failure") + self.logger.error("No agent selected.") return None if __name__ == "__main__": + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) agents = [ CasualAgent("jarvis", "../prompts/base/casual_agent.txt", None), BrowserAgent("browser", "../prompts/base/planner_agent.txt", None), diff --git a/sources/text_to_speech.py b/sources/text_to_speech.py index 3d17a7c..32a96b9 100644 --- a/sources/text_to_speech.py +++ b/sources/text_to_speech.py @@ -1,4 +1,4 @@ -import os +import os, sys import re import platform import subprocess @@ -9,10 +9,7 @@ from kokoro import KPipeline from IPython.display import display, Audio import soundfile as sf -if __name__ == "__main__": - from utility import pretty_print, animate_thinking -else: - from sources.utility import pretty_print, animate_thinking +from sources.utility import pretty_print, animate_thinking class Speech(): """ @@ -47,22 +44,22 @@ class Speech(): if not os.path.exists(path): os.makedirs(path) - def speak(self, sentence: str, voice_number: int = 1): + def speak(self, sentence: str, voice_idx: int = 1): """ Convert text to speech using an AI model and play the audio. Args: sentence (str): The text to convert to speech. Will be pre-processed. - voice_number (int, optional): Index of the voice to use from the voice map. + voice_idx (int, optional): Index of the voice to use from the voice map. """ if not self.pipeline: return - if voice_number >= len(self.voice_map[self.language]) or voice_number < 0: + if voice_idx >= len(self.voice_map[self.language]): pretty_print("Invalid voice number, using default voice", color="error") - voice_number = 0 + voice_idx = 0 sentence = self.clean_sentence(sentence) - audio_file = f"{self.voice_folder}/sample_{self.voice_map[self.language][voice_number]}.wav" - self.voice = self.voice_map[self.language][voice_number] + audio_file = f"{self.voice_folder}/sample_{self.voice_map[self.language][voice_idx]}.wav" + self.voice = self.voice_map[self.language][voice_idx] generator = self.pipeline( sentence, voice=self.voice, speed=self.speed, split_pattern=r'\n+' @@ -143,6 +140,7 @@ class Speech(): return sentence if __name__ == "__main__": + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) speech = Speech() tosay_en = """ I looked up recent news using the website https://www.theguardian.com/world @@ -154,8 +152,8 @@ if __name__ == "__main__": J'ai consulté les dernières nouvelles sur le site https://www.theguardian.com/world """ spk = Speech(enable=True, language="en", voice_idx=0) - spk.speak(tosay_en) + spk.speak(tosay_en, voice_idx=0) spk = Speech(enable=True, language="fr", voice_idx=0) spk.speak(tosay_fr) - spk = Speech(enable=True, language="zh", voice_idx=0) - spk.speak(tosay_zh) \ No newline at end of file + #spk = Speech(enable=True, language="zh", voice_idx=0) + #spk.speak(tosay_zh) \ No newline at end of file