feat : frontend message streaming

This commit is contained in:
martin legrand 2025-04-18 15:42:53 +02:00
parent 3a9514629a
commit 83c595144b
13 changed files with 191 additions and 108 deletions

46
api.py
View File

@ -20,8 +20,6 @@ from sources.utility import pretty_print
from sources.logger import Logger
from sources.schemas import QueryRequest, QueryResponse
from concurrent.futures import ThreadPoolExecutor
from celery import Celery
api = FastAPI(title="AgenticSeek API", version="0.1.0")
@ -43,8 +41,6 @@ if not os.path.exists(".screenshots"):
os.makedirs(".screenshots")
api.mount("/screenshots", StaticFiles(directory=".screenshots"), name="screenshots")
executor = ThreadPoolExecutor(max_workers=1)
def initialize_system():
stealth_mode = config.getboolean('BROWSER', 'stealth_mode')
personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base"
@ -105,6 +101,7 @@ def initialize_system():
interaction = initialize_system()
is_generating = False
query_resp_history = []
@api.get("/screenshot")
async def get_screenshot():
@ -128,12 +125,31 @@ async def is_active():
logger.info("Is active endpoint called")
return {"is_active": interaction.is_active}
def think_wrapper(interaction, query, tts_enabled):
@api.get("/latest_answer")
async def get_latest_answer():
global query_resp_history
if interaction.current_agent is None:
return JSONResponse(status_code=404, content={"error": "No agent available"})
if interaction.current_agent.last_answer not in [q["answer"] for q in query_resp_history]:
query_resp = {
"done": "false",
"answer": interaction.current_agent.last_answer,
"agent_name": interaction.current_agent.agent_name if interaction.current_agent else "None",
"success": "false",
"blocks": {f'{i}': block.jsonify() for i, block in enumerate(interaction.current_agent.get_blocks_result())} if interaction.current_agent else {}
}
query_resp_history.append(query_resp)
return JSONResponse(status_code=200, content=query_resp)
if query_resp_history:
return JSONResponse(status_code=200, content=query_resp_history[-1])
return JSONResponse(status_code=404, content={"error": "No answer available"})
async def think_wrapper(interaction, query, tts_enabled):
try:
interaction.tts_enabled = tts_enabled
interaction.last_query = query
logger.info("Agents request is being processed")
success = interaction.think()
success = await interaction.think()
if not success:
interaction.last_answer = "Error: No answer from agent"
interaction.last_success = False
@ -148,7 +164,7 @@ def think_wrapper(interaction, query, tts_enabled):
@api.post("/query", response_model=QueryResponse)
async def process_query(request: QueryRequest):
global is_generating
global is_generating, query_resp_history
logger.info(f"Processing query: {request.query}")
query_resp = QueryResponse(
done="false",
@ -163,10 +179,7 @@ async def process_query(request: QueryRequest):
try:
is_generating = True
loop = asyncio.get_running_loop()
success = await loop.run_in_executor(
executor, think_wrapper, interaction, request.query, request.tts_enabled
)
success = await think_wrapper(interaction, request.query, request.tts_enabled)
is_generating = False
if not success:
@ -188,6 +201,17 @@ async def process_query(request: QueryRequest):
query_resp.agent_name = interaction.current_agent.agent_name
query_resp.success = str(interaction.last_success)
query_resp.blocks = blocks_json
# Store the raw dictionary representation
query_resp_dict = {
"done": query_resp.done,
"answer": query_resp.answer,
"agent_name": query_resp.agent_name,
"success": query_resp.success,
"blocks": query_resp.blocks
}
query_resp_history.append(query_resp_dict)
logger.info("Query processed successfully")
return JSONResponse(status_code=200, content=query_resp.jsonify())
except Exception as e:

8
cli.py
View File

@ -3,6 +3,7 @@
import sys
import argparse
import configparser
import asyncio
from sources.llm_provider import Provider
from sources.interaction import Interaction
@ -16,7 +17,7 @@ warnings.filterwarnings("ignore")
config = configparser.ConfigParser()
config.read('config.ini')
def main():
async def main():
pretty_print("Initializing...", color="status")
stealth_mode = config.getboolean('BROWSER', 'stealth_mode')
personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base"
@ -59,7 +60,7 @@ def main():
try:
while interaction.is_active:
interaction.get_user()
if interaction.think():
if await interaction.think():
interaction.show_answer()
except Exception as e:
if config.getboolean('MAIN', 'save_session'):
@ -69,6 +70,5 @@ def main():
if config.getboolean('MAIN', 'save_session'):
interaction.save_session()
if __name__ == "__main__":
main()
asyncio.run(main())

View File

@ -10,11 +10,24 @@ function App() {
const [currentView, setCurrentView] = useState('blocks');
const [responseData, setResponseData] = useState(null);
const [isOnline, setIsOnline] = useState(false);
const [isMounted, setIsMounted] = useState(true);
const messagesEndRef = useRef(null);
useEffect(() => {
scrollToBottom();
checkHealth();
const intervalId = setInterval(() => {
checkHealth();
fetchLatestAnswer();
fetchScreenshot();
}, 1500);
return () => clearInterval(intervalId);
}, [messages]);
useEffect(() => {
const intervalId = setInterval(() => {
scrollToBottom();
}, 7000);
return () => clearInterval(intervalId);
}, [messages]);
const checkHealth = async () => {
@ -32,54 +45,58 @@ function App() {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
useEffect(() => {
if (currentView === 'screenshot') {
let isMounted = true;
const fetchScreenshot = async () => {
try {
const timestamp = new Date().getTime();
const res = await axios.get(`http://0.0.0.0:8000/screenshots/updated_screen.png?timestamp=${timestamp}`, {
responseType: 'blob'
});
if (isMounted) {
console.log('Screenshot fetched successfully');
const imageUrl = URL.createObjectURL(res.data);
setResponseData((prev) => {
if (prev?.screenshot && prev.screenshot !== 'placeholder.png') {
URL.revokeObjectURL(prev.screenshot);
}
return {
...prev,
screenshot: imageUrl,
screenshotTimestamp: new Date().getTime()
};
});
const fetchScreenshot = async () => {
try {
const timestamp = new Date().getTime();
const res = await axios.get(`http://0.0.0.0:8000/screenshots/updated_screen.png?timestamp=${timestamp}`, {
responseType: 'blob'
});
if (isMounted) {
console.log('Screenshot fetched successfully');
const imageUrl = URL.createObjectURL(res.data);
setResponseData((prev) => {
if (prev?.screenshot && prev.screenshot !== 'placeholder.png') {
URL.revokeObjectURL(prev.screenshot);
}
} catch (err) {
console.error('Error fetching screenshot:', err);
if (isMounted) {
setResponseData((prev) => ({
...prev,
screenshot: 'placeholder.png',
screenshotTimestamp: new Date().getTime()
}));
}
}
};
fetchScreenshot();
const interval = setInterval(fetchScreenshot, 1000);
return () => {
isMounted = false;
clearInterval(interval);
if (responseData?.screenshot && responseData.screenshot !== 'placeholder.png') {
URL.revokeObjectURL(responseData.screenshot);
}
};
return {
...prev,
screenshot: imageUrl,
screenshotTimestamp: new Date().getTime()
};
});
}
} catch (err) {
console.error('Error fetching screenshot:', err);
if (isMounted) {
setResponseData((prev) => ({
...prev,
screenshot: 'placeholder.png',
screenshotTimestamp: new Date().getTime()
}));
}
}
}, [currentView]);
};
const normalizeAnswer = (answer) => answer.trim().toLowerCase();
const fetchLatestAnswer = async () => {
try {
const res = await axios.get('http://0.0.0.0:8000/latest_answer');
const data = res.data;
const normalizedAnswer = normalizeAnswer(data.answer);
const answerExists = messages.some(
(msg) => normalizeAnswer(msg.content) === normalizedAnswer && data.answer != undefined
);
if (!answerExists) {
setMessages((prev) => [
...prev,
{ type: 'agent', content: data.answer, agentName: data.agent_name },
]);
}
} catch (error) {
console.error("Error fetching latest answer:", error);
}
};
const handleSubmit = async (e) => {
e.preventDefault();
@ -101,10 +118,7 @@ function App() {
console.log('Response:', res.data);
const data = res.data;
setResponseData(data);
setMessages((prev) => [
...prev,
{ type: 'agent', content: data.answer, agentName: data.agent_name },
]);
fetchLatestAnswer();
} catch (err) {
console.error('Error:', err);
setError('Failed to process query.');

View File

@ -5,6 +5,9 @@ import os
import random
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from sources.memory import Memory
from sources.utility import pretty_print
from sources.schemas import executorResult
@ -43,6 +46,27 @@ class Agent():
self.blocks_result = []
self.last_answer = ""
self.verbose = verbose
self.executor = ThreadPoolExecutor(max_workers=1)
@property
def get_agent_name(self) -> str:
return self.agent_name
@property
def get_agent_type(self) -> str:
return self.type
@property
def get_agent_role(self) -> str:
return self.role
@property
def get_last_answer(self) -> str:
return self.last_answer
@property
def get_blocks(self) -> list:
return self.blocks_result
@property
def get_tools(self) -> dict:
@ -90,7 +114,14 @@ class Agent():
end_idx = text.rfind(end_tag)+8
return text[start_idx:end_idx]
def llm_request(self) -> Tuple[str, str]:
async def llm_request(self) -> Tuple[str, str]:
"""
Asynchronously ask the LLM to process the prompt.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, self.sync_llm_request)
def sync_llm_request(self) -> Tuple[str, str]:
"""
Ask the LLM to process the prompt and return the answer and the reasoning.
"""
@ -102,14 +133,15 @@ class Agent():
self.memory.push('assistant', answer)
return answer, reasoning
def wait_message(self, speech_module):
async def wait_message(self, speech_module):
if speech_module is None:
return
messages = ["Please be patient, I am working on it.",
"Computing... I recommand you have a coffee while I work.",
"Hold on, Im crunching numbers.",
"Working on it, please let me think."]
if speech_module: speech_module.speak(messages[random.randint(0, len(messages)-1)])
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, lambda: speech_module.speak(messages[random.randint(0, len(messages)-1)]))
def get_blocks_result(self) -> list:
return self.blocks_result

View File

@ -3,6 +3,7 @@ import time
from datetime import date
from typing import List, Tuple, Type, Dict
from enum import Enum
import asyncio
from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent
@ -166,10 +167,10 @@ class BrowserAgent(Agent):
You must always take notes.
"""
def llm_decide(self, prompt: str, show_reasoning: bool = False) -> Tuple[str, str]:
async def llm_decide(self, prompt: str, show_reasoning: bool = False) -> Tuple[str, str]:
animate_thinking("Thinking...", color="status")
self.memory.push('user', prompt)
answer, reasoning = self.llm_request()
answer, reasoning = await self.llm_request()
if show_reasoning:
pretty_print(reasoning, color="failure")
pretty_print(answer, color="output")
@ -287,7 +288,7 @@ class BrowserAgent(Agent):
pretty_print(f"Title: {res['title']} - ", color="info", no_newline=True)
pretty_print(f"Link: {res['link']}", color="status")
def process(self, user_prompt: str, speech_module: type) -> Tuple[str, str]:
async def process(self, user_prompt: str, speech_module: type) -> Tuple[str, str]:
"""
Process the user prompt to conduct an autonomous web search.
Start with a google search with searxng using web_search tool.
@ -302,7 +303,7 @@ class BrowserAgent(Agent):
animate_thinking(f"Thinking...", color="status")
mem_begin_idx = self.memory.push('user', self.search_prompt(user_prompt))
ai_prompt, reasoning = self.llm_request()
ai_prompt, reasoning = await self.llm_request()
if Action.REQUEST_EXIT.value in ai_prompt:
pretty_print(f"Web agent requested exit.\n{reasoning}\n\n{ai_prompt}", color="failure")
return ai_prompt, ""
@ -315,7 +316,8 @@ class BrowserAgent(Agent):
while not complete and len(unvisited) > 0:
self.memory.clear()
answer, reasoning = self.llm_decide(prompt, show_reasoning = False)
answer, reasoning = await self.llm_decide(prompt, show_reasoning = False)
self.last_answer = answer
pretty_print(''*32, color="status")
extracted_form = self.extract_form(answer)
@ -324,7 +326,7 @@ class BrowserAgent(Agent):
fill_success = self.browser.fill_form(extracted_form)
page_text = self.browser.get_text()
answer = self.handle_update_prompt(user_prompt, page_text, fill_success)
answer, reasoning = self.llm_decide(prompt)
answer, reasoning = await self.llm_decide(prompt)
if Action.FORM_FILLED.value in answer:
pretty_print(f"Filled form. Handling page update.", color="status")
@ -355,11 +357,12 @@ class BrowserAgent(Agent):
page_text = self.browser.get_text()
self.navigable_links = self.browser.get_navigable()
prompt = self.make_navigation_prompt(user_prompt, page_text)
self.browser.screenshot()
pretty_print("Exited navigation, starting to summarize finding...", color="status")
prompt = self.conclude_prompt(user_prompt)
mem_last_idx = self.memory.push('user', prompt)
answer, reasoning = self.llm_request()
answer, reasoning = await self.llm_request()
pretty_print(answer, color="output")
return answer, reasoning

View File

@ -1,3 +1,4 @@
import asyncio
from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent
@ -17,10 +18,10 @@ class CasualAgent(Agent):
self.role = "talk"
self.type = "casual_agent"
def process(self, prompt, speech_module) -> str:
async def process(self, prompt, speech_module) -> str:
self.memory.push('user', prompt)
animate_thinking("Thinking...", color="status")
answer, reasoning = self.llm_request()
answer, reasoning = await self.llm_request()
self.last_answer = answer
return answer, reasoning

View File

@ -1,4 +1,5 @@
import platform, os
import asyncio
from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent, executorResult
@ -27,7 +28,6 @@ class CoderAgent(Agent):
self.role = "code"
self.type = "code_agent"
def add_sys_info_prompt(self, prompt):
"""Add system information to the prompt."""
info = f"System Info:\n" \
@ -36,7 +36,7 @@ class CoderAgent(Agent):
f"\nYou must save file in work directory: {self.work_dir}"
return f"{prompt}\n\n{info}"
def process(self, prompt, speech_module) -> str:
async def process(self, prompt, speech_module) -> str:
answer = ""
attempt = 0
max_attempts = 4
@ -46,20 +46,22 @@ class CoderAgent(Agent):
while attempt < max_attempts:
animate_thinking("Thinking...", color="status")
self.wait_message(speech_module)
answer, reasoning = self.llm_request()
await self.wait_message(speech_module)
answer, reasoning = await self.llm_request()
if clarify_trigger in answer:
self.last_answer = answer
await asyncio.sleep(0)
return answer, reasoning
if not "```" in answer:
self.last_answer = answer
await asyncio.sleep(0)
break
animate_thinking("Executing code...", color="status")
exec_success, _ = self.execute_modules(answer)
answer = self.remove_blocks(answer)
self.last_answer = answer
if self.get_last_tool_type() == "bash":
continue
if exec_success:
await asyncio.sleep(0)
if exec_success and self.get_last_tool_type() != "bash":
break
pretty_print("Execution failure", color="failure")
pretty_print("Correcting code...", color="status")

View File

@ -1,3 +1,4 @@
import asyncio
from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent
@ -18,14 +19,14 @@ class FileAgent(Agent):
self.role = "files"
self.type = "file_agent"
def process(self, prompt, speech_module) -> str:
async def process(self, prompt, speech_module) -> str:
exec_success = False
prompt += f"\nYou must work in directory: {self.work_dir}"
self.memory.push('user', prompt)
while exec_success is False:
self.wait_message(speech_module)
await self.wait_message(speech_module)
animate_thinking("Thinking...", color="status")
answer, reasoning = self.llm_request()
answer, reasoning = await self.llm_request()
exec_success, _ = self.execute_modules(answer)
answer = self.remove_blocks(answer)
self.last_answer = answer

View File

@ -86,13 +86,13 @@ class PlannerAgent(Agent):
pretty_print(f"{task['agent']} -> {task['task']}", color="info")
pretty_print("▔▗ E N D ▖▔", color="status")
def make_plan(self, prompt: str) -> str:
async def make_plan(self, prompt: str) -> str:
ok = False
answer = None
while not ok:
animate_thinking("Thinking...", color="status")
self.memory.push('user', prompt)
answer, _ = self.llm_request()
answer, _ = await self.llm_request()
agents_tasks = self.parse_agent_tasks(answer)
if agents_tasks == (None, None):
prompt = f"Failed to parse the tasks. Please make a plan within ```json.\n"
@ -102,10 +102,10 @@ class PlannerAgent(Agent):
ok = True
return answer
def start_agent_process(self, task: str, required_infos: dict | None) -> str:
async def start_agent_process(self, task: str, required_infos: dict | None) -> str:
agent_prompt = self.make_prompt(task['task'], required_infos)
pretty_print(f"Agent {task['agent']} started working...", color="status")
agent_answer, _ = self.agents[task['agent'].lower()].process(agent_prompt, None)
agent_answer, _ = await self.agents[task['agent'].lower()].process(agent_prompt, None)
self.agents[task['agent'].lower()].show_answer()
pretty_print(f"Agent {task['agent']} completed task.", color="status")
return agent_answer
@ -113,11 +113,11 @@ class PlannerAgent(Agent):
def get_work_result_agent(self, task_needs, agents_work_result):
return {k: agents_work_result[k] for k in task_needs if k in agents_work_result}
def process(self, prompt: str, speech_module: Speech) -> Tuple[str, str]:
async def process(self, prompt: str, speech_module: Speech) -> Tuple[str, str]:
agents_tasks = (None, None)
agents_work_result = dict()
answer = self.make_plan(prompt)
answer = await self.make_plan(prompt)
agents_tasks = self.parse_agent_tasks(answer)
if agents_tasks == (None, None):
@ -130,7 +130,7 @@ class PlannerAgent(Agent):
if agents_work_result is not None:
required_infos = self.get_work_result_agent(task['need'], agents_work_result)
try:
self.last_answer = self.start_agent_process(task, required_infos)
self.last_answer = await self.start_agent_process(task, required_infos)
except Exception as e:
raise e
agents_work_result[task['id']] = self.last_answer

View File

@ -172,8 +172,6 @@ class Browser:
)
self.apply_web_safety()
self.logger.log(f"Navigated to: {url}")
self.logger.info(f"Navigated to: {self.get_page_title()}")
self.screenshot()
return True
except TimeoutException as e:
self.logger.error(f"Timeout waiting for {url} to load: {str(e)}")
@ -297,7 +295,6 @@ class Browser:
time.sleep(0.1)
element.click()
self.logger.info(f"Clicked element at {xpath}")
self.screenshot()
return True
except ElementClickInterceptedException as e:
self.logger.error(f"Error click_element: {str(e)}")
@ -540,7 +537,6 @@ class Browser:
if self.find_and_click_submission():
if self.wait_for_submission_outcome():
self.logger.info("Submission outcome detected")
self.screenshot()
return True
else:
self.logger.warning("No submission outcome detected")
@ -564,8 +560,7 @@ class Browser:
self.driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
time.sleep(1)
self.screenshot()
time.sleep(0.5)
return True
except Exception as e:
self.logger.error(f"Error scrolling: {str(e)}")
@ -577,6 +572,7 @@ class Browser:
def screenshot(self, filename:str = 'updated_screen.png') -> bool:
"""Take a screenshot of the current page."""
self.logger.info("Taking screenshot...")
time.sleep(0.1)
try:
path = os.path.join(self.screenshot_folder, filename)
if not os.path.exists(self.screenshot_folder):

View File

@ -124,7 +124,7 @@ class Interaction:
self.last_query = query
return query
def think(self) -> bool:
async def think(self) -> bool:
"""Request AI agents to process the user input."""
push_last_agent_memory = False
if self.last_query is None or len(self.last_query) == 0:
@ -137,7 +137,7 @@ class Interaction:
tmp = self.last_answer
self.current_agent = agent
self.is_generating = True
self.last_answer, _ = agent.process(self.last_query, self.speech)
self.last_answer, _ = await agent.process(self.last_query, self.speech)
self.is_generating = False
if push_last_agent_memory:
self.current_agent.memory.push('user', self.last_query)
@ -146,6 +146,18 @@ class Interaction:
self.last_answer = None
return True
def get_updated_process_answer(self) -> str:
"""Get the answer from the last agent."""
if self.current_agent is None:
return None
return self.current_agent.get_last_answer()
def get_updated_block_answer(self) -> str:
"""Get the answer from the last agent."""
if self.current_agent is None:
return None
return self.current_agent.get_last_block_answer()
def show_answer(self) -> None:
"""Show the answer to the user."""
if self.last_query is None:

View File

@ -362,6 +362,8 @@ class AgentRouter:
Returns:
str: The selected label
"""
if len(text) <= 8:
return "talk"
result_bart = self.pipelines['bart'](text, labels)
result_llm_router = self.llm_router(text)
bart, confidence_bart = result_bart['labels'][0], result_bart['scores'][0]

View File

@ -3,10 +3,6 @@
REM Up the provider in windows
start ollama serve
timeout /t 4 /nobreak >nul
for /f "tokens=*" %%i in ('docker ps -a -q') do docker stop %%i
echo All containers stopped
docker-compose up
if %ERRORLEVEL% neq 0 (
echo Error: Failed to start containers. Check Docker logs with 'docker compose logs'.