From 83c595144b7cb469abbf757e7b633573c1837a6a Mon Sep 17 00:00:00 2001 From: martin legrand Date: Fri, 18 Apr 2025 15:42:53 +0200 Subject: [PATCH] feat : frontend message streaming --- api.py | 46 +++++++--- cli.py | 8 +- frontend/agentic-seek-front/src/App.js | 118 ++++++++++++++----------- sources/agents/agent.py | 38 +++++++- sources/agents/browser_agent.py | 17 ++-- sources/agents/casual_agent.py | 5 +- sources/agents/code_agent.py | 16 ++-- sources/agents/file_agent.py | 7 +- sources/agents/planner_agent.py | 14 +-- sources/browser.py | 8 +- sources/interaction.py | 16 +++- sources/router.py | 2 + start_services.cmd | 4 - 13 files changed, 191 insertions(+), 108 deletions(-) diff --git a/api.py b/api.py index 1bebd88..f39c900 100755 --- a/api.py +++ b/api.py @@ -20,8 +20,6 @@ from sources.utility import pretty_print from sources.logger import Logger from sources.schemas import QueryRequest, QueryResponse -from concurrent.futures import ThreadPoolExecutor - from celery import Celery api = FastAPI(title="AgenticSeek API", version="0.1.0") @@ -43,8 +41,6 @@ if not os.path.exists(".screenshots"): os.makedirs(".screenshots") api.mount("/screenshots", StaticFiles(directory=".screenshots"), name="screenshots") -executor = ThreadPoolExecutor(max_workers=1) - def initialize_system(): stealth_mode = config.getboolean('BROWSER', 'stealth_mode') personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base" @@ -105,6 +101,7 @@ def initialize_system(): interaction = initialize_system() is_generating = False +query_resp_history = [] @api.get("/screenshot") async def get_screenshot(): @@ -128,12 +125,31 @@ async def is_active(): logger.info("Is active endpoint called") return {"is_active": interaction.is_active} -def think_wrapper(interaction, query, tts_enabled): +@api.get("/latest_answer") +async def get_latest_answer(): + global query_resp_history + if interaction.current_agent is None: + return JSONResponse(status_code=404, content={"error": "No agent available"}) + if interaction.current_agent.last_answer not in [q["answer"] for q in query_resp_history]: + query_resp = { + "done": "false", + "answer": interaction.current_agent.last_answer, + "agent_name": interaction.current_agent.agent_name if interaction.current_agent else "None", + "success": "false", + "blocks": {f'{i}': block.jsonify() for i, block in enumerate(interaction.current_agent.get_blocks_result())} if interaction.current_agent else {} + } + query_resp_history.append(query_resp) + return JSONResponse(status_code=200, content=query_resp) + if query_resp_history: + return JSONResponse(status_code=200, content=query_resp_history[-1]) + return JSONResponse(status_code=404, content={"error": "No answer available"}) + +async def think_wrapper(interaction, query, tts_enabled): try: interaction.tts_enabled = tts_enabled interaction.last_query = query logger.info("Agents request is being processed") - success = interaction.think() + success = await interaction.think() if not success: interaction.last_answer = "Error: No answer from agent" interaction.last_success = False @@ -148,7 +164,7 @@ def think_wrapper(interaction, query, tts_enabled): @api.post("/query", response_model=QueryResponse) async def process_query(request: QueryRequest): - global is_generating + global is_generating, query_resp_history logger.info(f"Processing query: {request.query}") query_resp = QueryResponse( done="false", @@ -163,10 +179,7 @@ async def process_query(request: QueryRequest): try: is_generating = True - loop = asyncio.get_running_loop() - success = await loop.run_in_executor( - executor, think_wrapper, interaction, request.query, request.tts_enabled - ) + success = await think_wrapper(interaction, request.query, request.tts_enabled) is_generating = False if not success: @@ -188,6 +201,17 @@ async def process_query(request: QueryRequest): query_resp.agent_name = interaction.current_agent.agent_name query_resp.success = str(interaction.last_success) query_resp.blocks = blocks_json + + # Store the raw dictionary representation + query_resp_dict = { + "done": query_resp.done, + "answer": query_resp.answer, + "agent_name": query_resp.agent_name, + "success": query_resp.success, + "blocks": query_resp.blocks + } + query_resp_history.append(query_resp_dict) + logger.info("Query processed successfully") return JSONResponse(status_code=200, content=query_resp.jsonify()) except Exception as e: diff --git a/cli.py b/cli.py index 281de84..7d52fda 100755 --- a/cli.py +++ b/cli.py @@ -3,6 +3,7 @@ import sys import argparse import configparser +import asyncio from sources.llm_provider import Provider from sources.interaction import Interaction @@ -16,7 +17,7 @@ warnings.filterwarnings("ignore") config = configparser.ConfigParser() config.read('config.ini') -def main(): +async def main(): pretty_print("Initializing...", color="status") stealth_mode = config.getboolean('BROWSER', 'stealth_mode') personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base" @@ -59,7 +60,7 @@ def main(): try: while interaction.is_active: interaction.get_user() - if interaction.think(): + if await interaction.think(): interaction.show_answer() except Exception as e: if config.getboolean('MAIN', 'save_session'): @@ -69,6 +70,5 @@ def main(): if config.getboolean('MAIN', 'save_session'): interaction.save_session() - if __name__ == "__main__": - main() + asyncio.run(main()) \ No newline at end of file diff --git a/frontend/agentic-seek-front/src/App.js b/frontend/agentic-seek-front/src/App.js index ea210d4..5153f58 100644 --- a/frontend/agentic-seek-front/src/App.js +++ b/frontend/agentic-seek-front/src/App.js @@ -10,11 +10,24 @@ function App() { const [currentView, setCurrentView] = useState('blocks'); const [responseData, setResponseData] = useState(null); const [isOnline, setIsOnline] = useState(false); + const [isMounted, setIsMounted] = useState(true); const messagesEndRef = useRef(null); useEffect(() => { - scrollToBottom(); - checkHealth(); + const intervalId = setInterval(() => { + checkHealth(); + fetchLatestAnswer(); + fetchScreenshot(); + }, 1500); + return () => clearInterval(intervalId); + }, [messages]); + + useEffect(() => { + const intervalId = setInterval(() => { + scrollToBottom(); + }, 7000); + + return () => clearInterval(intervalId); }, [messages]); const checkHealth = async () => { @@ -32,54 +45,58 @@ function App() { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); }; - useEffect(() => { - if (currentView === 'screenshot') { - let isMounted = true; - - const fetchScreenshot = async () => { - try { - const timestamp = new Date().getTime(); - const res = await axios.get(`http://0.0.0.0:8000/screenshots/updated_screen.png?timestamp=${timestamp}`, { - responseType: 'blob' - }); - if (isMounted) { - console.log('Screenshot fetched successfully'); - const imageUrl = URL.createObjectURL(res.data); - setResponseData((prev) => { - if (prev?.screenshot && prev.screenshot !== 'placeholder.png') { - URL.revokeObjectURL(prev.screenshot); - } - return { - ...prev, - screenshot: imageUrl, - screenshotTimestamp: new Date().getTime() - }; - }); + const fetchScreenshot = async () => { + try { + const timestamp = new Date().getTime(); + const res = await axios.get(`http://0.0.0.0:8000/screenshots/updated_screen.png?timestamp=${timestamp}`, { + responseType: 'blob' + }); + if (isMounted) { + console.log('Screenshot fetched successfully'); + const imageUrl = URL.createObjectURL(res.data); + setResponseData((prev) => { + if (prev?.screenshot && prev.screenshot !== 'placeholder.png') { + URL.revokeObjectURL(prev.screenshot); } - } catch (err) { - console.error('Error fetching screenshot:', err); - if (isMounted) { - setResponseData((prev) => ({ - ...prev, - screenshot: 'placeholder.png', - screenshotTimestamp: new Date().getTime() - })); - } - } - }; - - fetchScreenshot(); - const interval = setInterval(fetchScreenshot, 1000); - - return () => { - isMounted = false; - clearInterval(interval); - if (responseData?.screenshot && responseData.screenshot !== 'placeholder.png') { - URL.revokeObjectURL(responseData.screenshot); - } - }; + return { + ...prev, + screenshot: imageUrl, + screenshotTimestamp: new Date().getTime() + }; + }); + } + } catch (err) { + console.error('Error fetching screenshot:', err); + if (isMounted) { + setResponseData((prev) => ({ + ...prev, + screenshot: 'placeholder.png', + screenshotTimestamp: new Date().getTime() + })); + } } - }, [currentView]); + }; + + const normalizeAnswer = (answer) => answer.trim().toLowerCase(); + + const fetchLatestAnswer = async () => { + try { + const res = await axios.get('http://0.0.0.0:8000/latest_answer'); + const data = res.data; + const normalizedAnswer = normalizeAnswer(data.answer); + const answerExists = messages.some( + (msg) => normalizeAnswer(msg.content) === normalizedAnswer && data.answer != undefined + ); + if (!answerExists) { + setMessages((prev) => [ + ...prev, + { type: 'agent', content: data.answer, agentName: data.agent_name }, + ]); + } + } catch (error) { + console.error("Error fetching latest answer:", error); + } + }; const handleSubmit = async (e) => { e.preventDefault(); @@ -101,10 +118,7 @@ function App() { console.log('Response:', res.data); const data = res.data; setResponseData(data); - setMessages((prev) => [ - ...prev, - { type: 'agent', content: data.answer, agentName: data.agent_name }, - ]); + fetchLatestAnswer(); } catch (err) { console.error('Error:', err); setError('Failed to process query.'); diff --git a/sources/agents/agent.py b/sources/agents/agent.py index 04a212f..79cab02 100644 --- a/sources/agents/agent.py +++ b/sources/agents/agent.py @@ -5,6 +5,9 @@ import os import random import time +import asyncio +from concurrent.futures import ThreadPoolExecutor + from sources.memory import Memory from sources.utility import pretty_print from sources.schemas import executorResult @@ -43,7 +46,28 @@ class Agent(): self.blocks_result = [] self.last_answer = "" self.verbose = verbose + self.executor = ThreadPoolExecutor(max_workers=1) + @property + def get_agent_name(self) -> str: + return self.agent_name + + @property + def get_agent_type(self) -> str: + return self.type + + @property + def get_agent_role(self) -> str: + return self.role + + @property + def get_last_answer(self) -> str: + return self.last_answer + + @property + def get_blocks(self) -> list: + return self.blocks_result + @property def get_tools(self) -> dict: return self.tools @@ -90,7 +114,14 @@ class Agent(): end_idx = text.rfind(end_tag)+8 return text[start_idx:end_idx] - def llm_request(self) -> Tuple[str, str]: + async def llm_request(self) -> Tuple[str, str]: + """ + Asynchronously ask the LLM to process the prompt. + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor(self.executor, self.sync_llm_request) + + def sync_llm_request(self) -> Tuple[str, str]: """ Ask the LLM to process the prompt and return the answer and the reasoning. """ @@ -102,14 +133,15 @@ class Agent(): self.memory.push('assistant', answer) return answer, reasoning - def wait_message(self, speech_module): + async def wait_message(self, speech_module): if speech_module is None: return messages = ["Please be patient, I am working on it.", "Computing... I recommand you have a coffee while I work.", "Hold on, I’m crunching numbers.", "Working on it, please let me think."] - if speech_module: speech_module.speak(messages[random.randint(0, len(messages)-1)]) + loop = asyncio.get_event_loop() + return await loop.run_in_executor(self.executor, lambda: speech_module.speak(messages[random.randint(0, len(messages)-1)])) def get_blocks_result(self) -> list: return self.blocks_result diff --git a/sources/agents/browser_agent.py b/sources/agents/browser_agent.py index 25fdcfe..221c9f9 100644 --- a/sources/agents/browser_agent.py +++ b/sources/agents/browser_agent.py @@ -3,6 +3,7 @@ import time from datetime import date from typing import List, Tuple, Type, Dict from enum import Enum +import asyncio from sources.utility import pretty_print, animate_thinking from sources.agents.agent import Agent @@ -166,10 +167,10 @@ class BrowserAgent(Agent): You must always take notes. """ - def llm_decide(self, prompt: str, show_reasoning: bool = False) -> Tuple[str, str]: + async def llm_decide(self, prompt: str, show_reasoning: bool = False) -> Tuple[str, str]: animate_thinking("Thinking...", color="status") self.memory.push('user', prompt) - answer, reasoning = self.llm_request() + answer, reasoning = await self.llm_request() if show_reasoning: pretty_print(reasoning, color="failure") pretty_print(answer, color="output") @@ -287,7 +288,7 @@ class BrowserAgent(Agent): pretty_print(f"Title: {res['title']} - ", color="info", no_newline=True) pretty_print(f"Link: {res['link']}", color="status") - def process(self, user_prompt: str, speech_module: type) -> Tuple[str, str]: + async def process(self, user_prompt: str, speech_module: type) -> Tuple[str, str]: """ Process the user prompt to conduct an autonomous web search. Start with a google search with searxng using web_search tool. @@ -302,7 +303,7 @@ class BrowserAgent(Agent): animate_thinking(f"Thinking...", color="status") mem_begin_idx = self.memory.push('user', self.search_prompt(user_prompt)) - ai_prompt, reasoning = self.llm_request() + ai_prompt, reasoning = await self.llm_request() if Action.REQUEST_EXIT.value in ai_prompt: pretty_print(f"Web agent requested exit.\n{reasoning}\n\n{ai_prompt}", color="failure") return ai_prompt, "" @@ -315,7 +316,8 @@ class BrowserAgent(Agent): while not complete and len(unvisited) > 0: self.memory.clear() - answer, reasoning = self.llm_decide(prompt, show_reasoning = False) + answer, reasoning = await self.llm_decide(prompt, show_reasoning = False) + self.last_answer = answer pretty_print('▂'*32, color="status") extracted_form = self.extract_form(answer) @@ -324,7 +326,7 @@ class BrowserAgent(Agent): fill_success = self.browser.fill_form(extracted_form) page_text = self.browser.get_text() answer = self.handle_update_prompt(user_prompt, page_text, fill_success) - answer, reasoning = self.llm_decide(prompt) + answer, reasoning = await self.llm_decide(prompt) if Action.FORM_FILLED.value in answer: pretty_print(f"Filled form. Handling page update.", color="status") @@ -355,11 +357,12 @@ class BrowserAgent(Agent): page_text = self.browser.get_text() self.navigable_links = self.browser.get_navigable() prompt = self.make_navigation_prompt(user_prompt, page_text) + self.browser.screenshot() pretty_print("Exited navigation, starting to summarize finding...", color="status") prompt = self.conclude_prompt(user_prompt) mem_last_idx = self.memory.push('user', prompt) - answer, reasoning = self.llm_request() + answer, reasoning = await self.llm_request() pretty_print(answer, color="output") return answer, reasoning diff --git a/sources/agents/casual_agent.py b/sources/agents/casual_agent.py index f4e3689..e5d1a13 100644 --- a/sources/agents/casual_agent.py +++ b/sources/agents/casual_agent.py @@ -1,3 +1,4 @@ +import asyncio from sources.utility import pretty_print, animate_thinking from sources.agents.agent import Agent @@ -17,10 +18,10 @@ class CasualAgent(Agent): self.role = "talk" self.type = "casual_agent" - def process(self, prompt, speech_module) -> str: + async def process(self, prompt, speech_module) -> str: self.memory.push('user', prompt) animate_thinking("Thinking...", color="status") - answer, reasoning = self.llm_request() + answer, reasoning = await self.llm_request() self.last_answer = answer return answer, reasoning diff --git a/sources/agents/code_agent.py b/sources/agents/code_agent.py index 237eedb..cd617d1 100644 --- a/sources/agents/code_agent.py +++ b/sources/agents/code_agent.py @@ -1,4 +1,5 @@ import platform, os +import asyncio from sources.utility import pretty_print, animate_thinking from sources.agents.agent import Agent, executorResult @@ -26,7 +27,6 @@ class CoderAgent(Agent): self.work_dir = self.tools["file_finder"].get_work_dir() self.role = "code" self.type = "code_agent" - def add_sys_info_prompt(self, prompt): """Add system information to the prompt.""" @@ -36,7 +36,7 @@ class CoderAgent(Agent): f"\nYou must save file in work directory: {self.work_dir}" return f"{prompt}\n\n{info}" - def process(self, prompt, speech_module) -> str: + async def process(self, prompt, speech_module) -> str: answer = "" attempt = 0 max_attempts = 4 @@ -46,20 +46,22 @@ class CoderAgent(Agent): while attempt < max_attempts: animate_thinking("Thinking...", color="status") - self.wait_message(speech_module) - answer, reasoning = self.llm_request() + await self.wait_message(speech_module) + answer, reasoning = await self.llm_request() if clarify_trigger in answer: + self.last_answer = answer + await asyncio.sleep(0) return answer, reasoning if not "```" in answer: self.last_answer = answer + await asyncio.sleep(0) break animate_thinking("Executing code...", color="status") exec_success, _ = self.execute_modules(answer) answer = self.remove_blocks(answer) self.last_answer = answer - if self.get_last_tool_type() == "bash": - continue - if exec_success: + await asyncio.sleep(0) + if exec_success and self.get_last_tool_type() != "bash": break pretty_print("Execution failure", color="failure") pretty_print("Correcting code...", color="status") diff --git a/sources/agents/file_agent.py b/sources/agents/file_agent.py index 159b616..c9338e5 100644 --- a/sources/agents/file_agent.py +++ b/sources/agents/file_agent.py @@ -1,3 +1,4 @@ +import asyncio from sources.utility import pretty_print, animate_thinking from sources.agents.agent import Agent @@ -18,14 +19,14 @@ class FileAgent(Agent): self.role = "files" self.type = "file_agent" - def process(self, prompt, speech_module) -> str: + async def process(self, prompt, speech_module) -> str: exec_success = False prompt += f"\nYou must work in directory: {self.work_dir}" self.memory.push('user', prompt) while exec_success is False: - self.wait_message(speech_module) + await self.wait_message(speech_module) animate_thinking("Thinking...", color="status") - answer, reasoning = self.llm_request() + answer, reasoning = await self.llm_request() exec_success, _ = self.execute_modules(answer) answer = self.remove_blocks(answer) self.last_answer = answer diff --git a/sources/agents/planner_agent.py b/sources/agents/planner_agent.py index c82aafa..b835345 100644 --- a/sources/agents/planner_agent.py +++ b/sources/agents/planner_agent.py @@ -86,13 +86,13 @@ class PlannerAgent(Agent): pretty_print(f"{task['agent']} -> {task['task']}", color="info") pretty_print("▔▗ E N D ▖▔", color="status") - def make_plan(self, prompt: str) -> str: + async def make_plan(self, prompt: str) -> str: ok = False answer = None while not ok: animate_thinking("Thinking...", color="status") self.memory.push('user', prompt) - answer, _ = self.llm_request() + answer, _ = await self.llm_request() agents_tasks = self.parse_agent_tasks(answer) if agents_tasks == (None, None): prompt = f"Failed to parse the tasks. Please make a plan within ```json.\n" @@ -102,10 +102,10 @@ class PlannerAgent(Agent): ok = True return answer - def start_agent_process(self, task: str, required_infos: dict | None) -> str: + async def start_agent_process(self, task: str, required_infos: dict | None) -> str: agent_prompt = self.make_prompt(task['task'], required_infos) pretty_print(f"Agent {task['agent']} started working...", color="status") - agent_answer, _ = self.agents[task['agent'].lower()].process(agent_prompt, None) + agent_answer, _ = await self.agents[task['agent'].lower()].process(agent_prompt, None) self.agents[task['agent'].lower()].show_answer() pretty_print(f"Agent {task['agent']} completed task.", color="status") return agent_answer @@ -113,11 +113,11 @@ class PlannerAgent(Agent): def get_work_result_agent(self, task_needs, agents_work_result): return {k: agents_work_result[k] for k in task_needs if k in agents_work_result} - def process(self, prompt: str, speech_module: Speech) -> Tuple[str, str]: + async def process(self, prompt: str, speech_module: Speech) -> Tuple[str, str]: agents_tasks = (None, None) agents_work_result = dict() - answer = self.make_plan(prompt) + answer = await self.make_plan(prompt) agents_tasks = self.parse_agent_tasks(answer) if agents_tasks == (None, None): @@ -130,7 +130,7 @@ class PlannerAgent(Agent): if agents_work_result is not None: required_infos = self.get_work_result_agent(task['need'], agents_work_result) try: - self.last_answer = self.start_agent_process(task, required_infos) + self.last_answer = await self.start_agent_process(task, required_infos) except Exception as e: raise e agents_work_result[task['id']] = self.last_answer diff --git a/sources/browser.py b/sources/browser.py index 0c3bfe0..69e9022 100644 --- a/sources/browser.py +++ b/sources/browser.py @@ -172,8 +172,6 @@ class Browser: ) self.apply_web_safety() self.logger.log(f"Navigated to: {url}") - self.logger.info(f"Navigated to: {self.get_page_title()}") - self.screenshot() return True except TimeoutException as e: self.logger.error(f"Timeout waiting for {url} to load: {str(e)}") @@ -297,7 +295,6 @@ class Browser: time.sleep(0.1) element.click() self.logger.info(f"Clicked element at {xpath}") - self.screenshot() return True except ElementClickInterceptedException as e: self.logger.error(f"Error click_element: {str(e)}") @@ -540,7 +537,6 @@ class Browser: if self.find_and_click_submission(): if self.wait_for_submission_outcome(): self.logger.info("Submission outcome detected") - self.screenshot() return True else: self.logger.warning("No submission outcome detected") @@ -564,8 +560,7 @@ class Browser: self.driver.execute_script( "window.scrollTo(0, document.body.scrollHeight);" ) - time.sleep(1) - self.screenshot() + time.sleep(0.5) return True except Exception as e: self.logger.error(f"Error scrolling: {str(e)}") @@ -577,6 +572,7 @@ class Browser: def screenshot(self, filename:str = 'updated_screen.png') -> bool: """Take a screenshot of the current page.""" self.logger.info("Taking screenshot...") + time.sleep(0.1) try: path = os.path.join(self.screenshot_folder, filename) if not os.path.exists(self.screenshot_folder): diff --git a/sources/interaction.py b/sources/interaction.py index 8ce2de8..fe52bf4 100644 --- a/sources/interaction.py +++ b/sources/interaction.py @@ -124,7 +124,7 @@ class Interaction: self.last_query = query return query - def think(self) -> bool: + async def think(self) -> bool: """Request AI agents to process the user input.""" push_last_agent_memory = False if self.last_query is None or len(self.last_query) == 0: @@ -137,7 +137,7 @@ class Interaction: tmp = self.last_answer self.current_agent = agent self.is_generating = True - self.last_answer, _ = agent.process(self.last_query, self.speech) + self.last_answer, _ = await agent.process(self.last_query, self.speech) self.is_generating = False if push_last_agent_memory: self.current_agent.memory.push('user', self.last_query) @@ -146,6 +146,18 @@ class Interaction: self.last_answer = None return True + def get_updated_process_answer(self) -> str: + """Get the answer from the last agent.""" + if self.current_agent is None: + return None + return self.current_agent.get_last_answer() + + def get_updated_block_answer(self) -> str: + """Get the answer from the last agent.""" + if self.current_agent is None: + return None + return self.current_agent.get_last_block_answer() + def show_answer(self) -> None: """Show the answer to the user.""" if self.last_query is None: diff --git a/sources/router.py b/sources/router.py index b45b4c3..e3743f7 100644 --- a/sources/router.py +++ b/sources/router.py @@ -362,6 +362,8 @@ class AgentRouter: Returns: str: The selected label """ + if len(text) <= 8: + return "talk" result_bart = self.pipelines['bart'](text, labels) result_llm_router = self.llm_router(text) bart, confidence_bart = result_bart['labels'][0], result_bart['scores'][0] diff --git a/start_services.cmd b/start_services.cmd index cee3b09..0c9c195 100644 --- a/start_services.cmd +++ b/start_services.cmd @@ -3,10 +3,6 @@ REM Up the provider in windows start ollama serve -timeout /t 4 /nobreak >nul -for /f "tokens=*" %%i in ('docker ps -a -q') do docker stop %%i -echo All containers stopped - docker-compose up if %ERRORLEVEL% neq 0 ( echo Error: Failed to start containers. Check Docker logs with 'docker compose logs'.