mirror of
https://github.com/tcsenpai/agenticSeek.git
synced 2025-06-03 01:30:11 +00:00
199 lines
7.0 KiB
Python
199 lines
7.0 KiB
Python
import readline
|
|
from typing import List, Tuple, Type, Dict
|
|
|
|
from sources.text_to_speech import Speech
|
|
from sources.utility import pretty_print, animate_thinking
|
|
from sources.router import AgentRouter
|
|
from sources.speech_to_text import AudioTranscriber, AudioRecorder
|
|
import threading
|
|
|
|
|
|
class Interaction:
|
|
"""
|
|
Interaction is a class that handles the interaction between the user and the agents.
|
|
"""
|
|
def __init__(self, agents,
|
|
tts_enabled: bool = True,
|
|
stt_enabled: bool = True,
|
|
recover_last_session: bool = False,
|
|
langs: List[str] = ["en", "zh"]
|
|
):
|
|
self.is_active = True
|
|
self.current_agent = None
|
|
self.last_query = None
|
|
self.last_answer = None
|
|
self.agents = agents
|
|
self.tts_enabled = tts_enabled
|
|
self.stt_enabled = stt_enabled
|
|
self.recover_last_session = recover_last_session
|
|
self.router = AgentRouter(self.agents, supported_language=langs)
|
|
self.ai_name = self.find_ai_name()
|
|
self.speech = None
|
|
self.transcriber = None
|
|
self.recorder = None
|
|
self.is_generating = False
|
|
self.languages = langs
|
|
if tts_enabled:
|
|
self.initialize_tts()
|
|
if stt_enabled:
|
|
self.initialize_stt()
|
|
if recover_last_session:
|
|
self.load_last_session()
|
|
self.emit_status()
|
|
|
|
def get_spoken_language(self) -> str:
|
|
"""Get the primary TTS language."""
|
|
lang = self.languages[0]
|
|
return lang
|
|
|
|
def initialize_tts(self):
|
|
"""Initialize TTS."""
|
|
if not self.speech:
|
|
animate_thinking("Initializing text-to-speech...", color="status")
|
|
self.speech = Speech(enable=self.tts_enabled, language=self.get_spoken_language(), voice_idx=1)
|
|
|
|
def initialize_stt(self):
|
|
"""Initialize STT."""
|
|
if not self.transcriber or not self.recorder:
|
|
animate_thinking("Initializing speech recognition...", color="status")
|
|
self.transcriber = AudioTranscriber(self.ai_name, verbose=False)
|
|
self.recorder = AudioRecorder()
|
|
|
|
def emit_status(self):
|
|
"""Print the current status of agenticSeek."""
|
|
if self.stt_enabled:
|
|
pretty_print(f"Text-to-speech trigger is {self.ai_name}", color="status")
|
|
if self.tts_enabled:
|
|
self.speech.speak("Hello, we are online and ready. What can I do for you ?")
|
|
pretty_print("AgenticSeek is ready.", color="status")
|
|
|
|
def find_ai_name(self) -> str:
|
|
"""Find the name of the default AI. It is required for STT as a trigger word."""
|
|
ai_name = "jarvis"
|
|
for agent in self.agents:
|
|
if agent.type == "casual_agent":
|
|
ai_name = agent.agent_name
|
|
break
|
|
return ai_name
|
|
|
|
def get_last_blocks_result(self) -> List[Dict]:
|
|
"""Get the last blocks result."""
|
|
if self.current_agent is None:
|
|
return []
|
|
blks = []
|
|
for agent in self.agents:
|
|
blks.extend(agent.get_blocks_result())
|
|
return blks
|
|
|
|
def load_last_session(self):
|
|
"""Recover the last session."""
|
|
for agent in self.agents:
|
|
if agent.type == "planner_agent":
|
|
continue
|
|
agent.memory.load_memory(agent.type)
|
|
|
|
def save_session(self):
|
|
"""Save the current session."""
|
|
for agent in self.agents:
|
|
agent.memory.save_memory(agent.type)
|
|
|
|
def is_active(self) -> bool:
|
|
return self.is_active
|
|
|
|
def read_stdin(self) -> str:
|
|
"""Read the input from the user."""
|
|
buffer = ""
|
|
|
|
PROMPT = "\033[1;35m➤➤➤ \033[0m"
|
|
while not buffer:
|
|
try:
|
|
buffer = input(PROMPT)
|
|
except EOFError:
|
|
return None
|
|
if buffer == "exit" or buffer == "goodbye":
|
|
return None
|
|
return buffer
|
|
|
|
def transcription_job(self) -> str:
|
|
"""Transcribe the audio from the microphone."""
|
|
self.recorder = AudioRecorder(verbose=True)
|
|
self.transcriber = AudioTranscriber(self.ai_name, verbose=True)
|
|
self.transcriber.start()
|
|
self.recorder.start()
|
|
self.recorder.join()
|
|
self.transcriber.join()
|
|
query = self.transcriber.get_transcript()
|
|
if query == "exit" or query == "goodbye":
|
|
return None
|
|
return query
|
|
|
|
def get_user(self) -> str:
|
|
"""Get the user input from the microphone or the keyboard."""
|
|
if self.stt_enabled:
|
|
query = "TTS transcription of user: " + self.transcription_job()
|
|
else:
|
|
query = self.read_stdin()
|
|
if query is None:
|
|
self.is_active = False
|
|
self.last_query = None
|
|
return None
|
|
self.last_query = query
|
|
return query
|
|
|
|
def set_query(self, query: str) -> None:
|
|
"""Set the query"""
|
|
self.is_active = True
|
|
self.last_query = query
|
|
|
|
async def think(self) -> bool:
|
|
"""Request AI agents to process the user input."""
|
|
push_last_agent_memory = False
|
|
if self.last_query is None or len(self.last_query) == 0:
|
|
return False
|
|
agent = self.router.select_agent(self.last_query)
|
|
if agent is None:
|
|
return False
|
|
if self.current_agent != agent and self.last_answer is not None:
|
|
push_last_agent_memory = True
|
|
tmp = self.last_answer
|
|
self.current_agent = agent
|
|
self.is_generating = True
|
|
self.last_answer, _ = await agent.process(self.last_query, self.speech)
|
|
self.is_generating = False
|
|
if push_last_agent_memory:
|
|
self.current_agent.memory.push('user', self.last_query)
|
|
self.current_agent.memory.push('assistant', self.last_answer)
|
|
if self.last_answer == tmp:
|
|
self.last_answer = None
|
|
return True
|
|
|
|
def get_updated_process_answer(self) -> str:
|
|
"""Get the answer from the last agent."""
|
|
if self.current_agent is None:
|
|
return None
|
|
return self.current_agent.get_last_answer()
|
|
|
|
def get_updated_block_answer(self) -> str:
|
|
"""Get the answer from the last agent."""
|
|
if self.current_agent is None:
|
|
return None
|
|
return self.current_agent.get_last_block_answer()
|
|
|
|
def speak_answer(self) -> None:
|
|
"""Speak the answer to the user in a non-blocking thread."""
|
|
if self.last_query is None:
|
|
return
|
|
if self.tts_enabled and self.last_answer and self.speech:
|
|
def speak_in_thread(speech_instance, text):
|
|
speech_instance.speak(text)
|
|
thread = threading.Thread(target=speak_in_thread, args=(self.speech, self.last_answer))
|
|
thread.start()
|
|
|
|
def show_answer(self) -> None:
|
|
"""Show the answer to the user."""
|
|
if self.last_query is None:
|
|
return
|
|
if self.current_agent is not None:
|
|
self.current_agent.show_answer()
|
|
|