Feat : better print of think process and answer

This commit is contained in:
martin legrand 2025-02-28 16:29:07 +01:00
parent c5da8319bf
commit 5bc2c26757
10 changed files with 103 additions and 58 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
*.wav *.wav
config.ini config.ini
experimental/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files

View File

@ -4,3 +4,5 @@ provider_name = ollama
provider_model = deepseek-r1:7b provider_model = deepseek-r1:7b
provider_server_address = 127.0.0.1:5000 provider_server_address = 127.0.0.1:5000
agent_name = jarvis agent_name = jarvis
recover_last_session = False
speak = True

View File

@ -36,7 +36,8 @@ def main():
prompt_path="prompts/coder_agent.txt", prompt_path="prompts/coder_agent.txt",
provider=provider) provider=provider)
interaction = Interaction([agent], tts_enabled=args.speak) interaction = Interaction([agent], tts_enabled=config.getboolean('MAIN', 'speak'),
recover_last_session=config.getboolean('MAIN', 'recover_last_session'))
while interaction.is_active: while interaction.is_active:
interaction.get_user() interaction.get_user()
interaction.think() interaction.think()

View File

@ -12,7 +12,7 @@ kokoro==0.7.12
flask==3.1.0 flask==3.1.0
soundfile==0.13.1 soundfile==0.13.1
protobuf==3.20.3 protobuf==3.20.3
termcolor termcolor==2.3.0
# if use chinese # if use chinese
ordered_set ordered_set
pypinyin pypinyin

View File

@ -5,31 +5,48 @@ import random
from sources.memory import Memory from sources.memory import Memory
from sources.utility import pretty_print from sources.utility import pretty_print
class executorResult:
def __init__(self, blocks, feedback, success):
self.blocks = blocks
self.feedback = feedback
self.success = success
def show(self):
for block in self.blocks:
pretty_print("-"*100, color="output")
pretty_print(block, color="code" if self.success else "failure")
pretty_print("-"*100, color="output")
pretty_print(self.feedback, color="success" if self.success else "failure")
class Agent(): class Agent():
def __init__(self, model: str, def __init__(self, model: str,
name: str, name: str,
prompt_path:str, prompt_path:str,
provider) -> None: provider,
self._name = name recover_last_session=False) -> None:
self._current_directory = os.getcwd() self.agent_name = name
self._model = model self.current_directory = os.getcwd()
self._llm = provider self.model = model
self._memory = Memory(self.load_prompt(prompt_path), self.llm = provider
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=recover_last_session,
memory_compression=False) memory_compression=False)
self._tools = {} self.tools = {}
self.blocks_result = []
self.last_answer = ""
@property @property
def name(self) -> str: def name(self) -> str:
return self._name return self.name
@property @property
def get_tools(self) -> dict: def get_tools(self) -> dict:
return self._tools return self.tools
def add_tool(self, name: str, tool: Callable) -> None: def add_tool(self, name: str, tool: Callable) -> None:
if tool is not Callable: if tool is not Callable:
raise TypeError("Tool must be a callable object (a method)") raise TypeError("Tool must be a callable object (a method)")
self._tools[name] = tool self.tools[name] = tool
def load_prompt(self, file_path: str) -> str: def load_prompt(self, file_path: str) -> str:
try: try:
@ -62,12 +79,12 @@ class Agent():
return text[start_idx:end_idx] return text[start_idx:end_idx]
def llm_request(self, verbose = True) -> Tuple[str, str]: def llm_request(self, verbose = True) -> Tuple[str, str]:
memory = self._memory.get() memory = self.memory.get()
thought = self._llm.respond(memory, verbose) thought = self.llm.respond(memory, verbose)
reasoning = self.extract_reasoning_text(thought) reasoning = self.extract_reasoning_text(thought)
answer = self.remove_reasoning_text(thought) answer = self.remove_reasoning_text(thought)
self._memory.push('assistant', answer) self.memory.push('assistant', answer)
return answer, reasoning return answer, reasoning
def wait_message(self, speech_module): def wait_message(self, speech_module):
@ -84,23 +101,27 @@ class Agent():
pretty_print("-"*100, color="output") pretty_print("-"*100, color="output")
pretty_print(block, color="code") pretty_print(block, color="code")
pretty_print("-"*100, color="output") pretty_print("-"*100, color="output")
def get_blocks_result(self) -> list:
return self.blocks_result
def execute_modules(self, answer: str) -> Tuple[bool, str]: def execute_modules(self, answer: str) -> Tuple[bool, str]:
feedback = "" feedback = ""
success = False
blocks = None blocks = None
for name, tool in self._tools.items(): for name, tool in self.tools.items():
feedback = "" feedback = ""
blocks, save_path = tool.load_exec_block(answer) blocks, save_path = tool.load_exec_block(answer)
if blocks != None: if blocks != None:
self.print_code_blocks(blocks, name)
output = tool.execute(blocks) output = tool.execute(blocks)
feedback = tool.interpreter_feedback(output) feedback = tool.interpreter_feedback(output) # tool interpreter feedback
self._memory.push('user', feedback) success = not "failure" in feedback.lower()
self.memory.push('user', feedback)
if "failure" in feedback.lower(): self.blocks_result.append(executorResult(blocks, feedback, success))
return False, feedback if not success:
if save_path != None: return False, feedback
tool.save_block(blocks, save_path) if save_path != None:
tool.save_block(blocks, save_path)
return True, feedback return True, feedback

View File

@ -1,12 +1,12 @@
from sources.tools import PyInterpreter, BashInterpreter from sources.tools import PyInterpreter, BashInterpreter
from sources.utility import pretty_print from sources.utility import pretty_print
from sources.agent import Agent from sources.agent import Agent, executorResult
class CoderAgent(Agent): class CoderAgent(Agent):
def __init__(self, model, name, prompt_path, provider): def __init__(self, model, name, prompt_path, provider):
super().__init__(model, name, prompt_path, provider) super().__init__(model, name, prompt_path, provider)
self._tools = { self.tools = {
"bash": BashInterpreter(), "bash": BashInterpreter(),
"python": PyInterpreter() "python": PyInterpreter()
} }
@ -19,6 +19,7 @@ class CoderAgent(Agent):
lines = text.split('\n') lines = text.split('\n')
post_lines = [] post_lines = []
in_block = False in_block = False
block_idx = 0
for line in lines: for line in lines:
if tag in line and not in_block: if tag in line and not in_block:
in_block = True in_block = True
@ -27,26 +28,37 @@ class CoderAgent(Agent):
post_lines.append(line) post_lines.append(line)
if tag in line: if tag in line:
in_block = False in_block = False
post_lines.append(f"block:{block_idx}")
block_idx += 1
return "\n".join(post_lines) return "\n".join(post_lines)
def answer(self, prompt, speech_module) -> str: def show_answer(self):
lines = self.last_answer.split("\n")
for line in lines:
if "block:" in line:
block_idx = int(line.split(":")[1])
if block_idx < len(self.blocks_result):
self.blocks_result[block_idx].show()
else:
pretty_print(line, color="output")
def process(self, prompt, speech_module) -> str:
answer = "" answer = ""
attempt = 0 attempt = 0
max_attempts = 3 max_attempts = 3
self._memory.push('user', prompt) self.memory.push('user', prompt)
while attempt < max_attempts: while attempt < max_attempts:
pretty_print("Thinking...", color="status") pretty_print("Thinking...", color="status")
self.wait_message(speech_module) self.wait_message(speech_module)
answer, reasoning = self.llm_request() answer, reasoning = self.llm_request()
exec_success, feedback = self.execute_modules(answer) exec_success, _ = self.execute_modules(answer)
pretty_print(feedback, color="failure" if "failure" in feedback.lower() else "success")
answer = self.remove_blocks(answer) answer = self.remove_blocks(answer)
pretty_print(answer, color="output") self.last_answer = answer
if exec_success: if exec_success:
break break
self.show_answer()
attempt += 1 attempt += 1
return answer, reasoning return answer, reasoning
if __name__ == "__main__": if __name__ == "__main__":
@ -55,5 +67,5 @@ if __name__ == "__main__":
#local_provider = Provider("ollama", "deepseek-r1:14b", None) #local_provider = Provider("ollama", "deepseek-r1:14b", None)
server_provider = Provider("server", "deepseek-r1:14b", "192.168.1.100:5000") server_provider = Provider("server", "deepseek-r1:14b", "192.168.1.100:5000")
agent = CoderAgent("deepseek-r1:14b", "jarvis", "prompts/coder_agent.txt", server_provider) agent = CoderAgent("deepseek-r1:14b", "jarvis", "prompts/coder_agent.txt", server_provider)
ans = agent.answer("What is the output of 5+5 in python ?") ans = agent.process("What is the output of 5+5 in python ?")
print(ans) print(ans)

View File

@ -3,7 +3,7 @@ from sources.text_to_speech import Speech
from sources.utility import pretty_print from sources.utility import pretty_print
class Interaction: class Interaction:
def __init__(self, agents, tts_enabled: bool = False): def __init__(self, agents, tts_enabled: bool = False, recover_last_session: bool = False):
self.tts_enabled = tts_enabled self.tts_enabled = tts_enabled
self.agents = agents self.agents = agents
self.speech = Speech() self.speech = Speech()
@ -12,6 +12,12 @@ class Interaction:
self.last_answer = None self.last_answer = None
if tts_enabled: if tts_enabled:
self.speech.speak("Hello Sir, we are online and ready. What can I do for you ?") self.speech.speak("Hello Sir, we are online and ready. What can I do for you ?")
if recover_last_session:
self.recover_last_session()
def recover_last_session(self):
for agent in self.agents:
agent.memory.load_memory()
def is_active(self): def is_active(self):
return self.is_active return self.is_active
@ -32,15 +38,16 @@ class Interaction:
query = self.read_stdin() query = self.read_stdin()
if query is None: if query is None:
self.is_active = False self.is_active = False
return self.last_query = "Goodbye (exit requested by user, dont think, make answer very short)"
return None
self.last_query = query self.last_query = query
return query return query
def think(self): def think(self):
self.last_answer, _ = self.agents[0].answer(self.last_query, self.speech) self.last_answer, _ = self.agents[0].process(self.last_query, self.speech)
def show_answer(self): def show_answer(self):
pretty_print(self.last_answer, color="output") self.agents[0].show_answer()
if self.tts_enabled: if self.tts_enabled:
self.speech.speak(self.last_answer) self.speech.speak(self.last_answer)

View File

@ -14,8 +14,8 @@ class Memory():
def __init__(self, system_prompt: str, def __init__(self, system_prompt: str,
recover_last_session: bool = False, recover_last_session: bool = False,
memory_compression: bool = True): memory_compression: bool = True):
self._memory = [] self.memory = []
self._memory = [{'role': 'user', 'content': system_prompt}, self.memory = [{'role': 'user', 'content': system_prompt},
{'role': 'assistant', 'content': f'Hello, How can I help you today ?'}] {'role': 'assistant', 'content': f'Hello, How can I help you today ?'}]
self.session_time = datetime.datetime.now() self.session_time = datetime.datetime.now()
@ -28,8 +28,8 @@ class Memory():
self.device = self.get_cuda_device() self.device = self.get_cuda_device()
self.memory_compression = memory_compression self.memory_compression = memory_compression
if memory_compression: if memory_compression:
self._tokenizer = AutoTokenizer.from_pretrained(self.model) self.tokenizer = AutoTokenizer.from_pretrained(self.model)
self._model = AutoModelForSeq2SeqLM.from_pretrained(self.model) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model)
def get_filename(self) -> str: def get_filename(self) -> str:
return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt" return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt"
@ -39,7 +39,7 @@ class Memory():
os.makedirs(self.conversation_folder) os.makedirs(self.conversation_folder)
filename = self.get_filename() filename = self.get_filename()
path = os.path.join(self.conversation_folder, filename) path = os.path.join(self.conversation_folder, filename)
json_memory = json.dumps(self._memory) json_memory = json.dumps(self.memory)
with open(path, 'w') as f: with open(path, 'w') as f:
f.write(json_memory) f.write(json_memory)
@ -60,22 +60,22 @@ class Memory():
return return
path = os.path.join(self.conversation_folder, filename) path = os.path.join(self.conversation_folder, filename)
with open(path, 'r') as f: with open(path, 'r') as f:
self._memory = json.load(f) self.memory = json.load(f)
def reset(self, memory: list) -> None: def reset(self, memory: list) -> None:
self._memory = memory self.memory = memory
def push(self, role: str, content: str) -> None: def push(self, role: str, content: str) -> None:
self._memory.append({'role': role, 'content': content}) self.memory.append({'role': role, 'content': content})
# EXPERIMENTAL # EXPERIMENTAL
if self.memory_compression and role == 'assistant': if self.memory_compression and role == 'assistant':
self.compress() self.compress()
def clear(self) -> None: def clear(self) -> None:
self._memory = [] self.memory = []
def get(self) -> list: def get(self) -> list:
return self._memory return self.memory
def get_cuda_device(self) -> str: def get_cuda_device(self) -> str:
if torch.backends.mps.is_available(): if torch.backends.mps.is_available():
@ -86,12 +86,12 @@ class Memory():
return "cpu" return "cpu"
def summarize(self, text: str, min_length: int = 64) -> str: def summarize(self, text: str, min_length: int = 64) -> str:
if self._tokenizer is None or self._model is None: if self.tokenizer is None or self.model is None:
return text return text
max_length = len(text) // 2 if len(text) > min_length*2 else min_length*2 max_length = len(text) // 2 if len(text) > min_length*2 else min_length*2
input_text = "summarize: " + text input_text = "summarize: " + text
inputs = self._tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True) inputs = self.tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True)
summary_ids = self._model.generate( summary_ids = self.model.generate(
inputs['input_ids'], inputs['input_ids'],
max_length=max_length, # Maximum length of the summary max_length=max_length, # Maximum length of the summary
min_length=min_length, # Minimum length of the summary min_length=min_length, # Minimum length of the summary
@ -99,7 +99,7 @@ class Memory():
num_beams=4, # Beam search for better quality num_beams=4, # Beam search for better quality
early_stopping=True # Stop when all beams finish early_stopping=True # Stop when all beams finish
) )
summary = self._tokenizer.decode(summary_ids[0], skip_special_tokens=True) summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
return summary return summary
def timer_decorator(func): def timer_decorator(func):
@ -116,11 +116,11 @@ class Memory():
def compress(self) -> str: def compress(self) -> str:
if not self.memory_compression: if not self.memory_compression:
return return
for i in range(len(self._memory)): for i in range(len(self.memory)):
if i <= 2: if i <= 2:
continue continue
if self._memory[i]['role'] == 'assistant': if self.memory[i]['role'] == 'assistant':
self._memory[i]['content'] = self.summarize(self._memory[i]['content']) self.memory[i]['content'] = self.summarize(self.memory[i]['content'])
if __name__ == "__main__": if __name__ == "__main__":
memory = Memory("You are a helpful assistant.", memory = Memory("You are a helpful assistant.",

View File

@ -6,6 +6,7 @@ import re
import platform import platform
class Speech(): class Speech():
def __init__(self, language = "english") -> None: def __init__(self, language = "english") -> None:
self.lang_map = { self.lang_map = {
@ -19,10 +20,10 @@ class Speech():
"french": ['ff_siwis'] "french": ['ff_siwis']
} }
self.pipeline = KPipeline(lang_code=self.lang_map[language]) self.pipeline = KPipeline(lang_code=self.lang_map[language])
self.voice = self.voice_map[language][4] self.voice = self.voice_map[language][2]
self.speed = 1.2 self.speed = 1.2
def speak(self, sentence, voice_number = 2): def speak(self, sentence, voice_number = 1):
sentence = self.clean_sentence(sentence) sentence = self.clean_sentence(sentence)
self.voice = self.voice_map["english"][voice_number] self.voice = self.voice_map["english"][voice_number]
generator = self.pipeline( generator = self.pipeline(

View File

@ -19,7 +19,7 @@ def pretty_print(text, color = "info"):
} }
if color not in color_map: if color not in color_map:
print(text) print(text)
pretty_print("Invalid color in pretty_print", "warning") pretty_print(f"Invalid color {color} in pretty_print", "warning")
return return
print(color_map[color], text, Fore.RESET) print(color_map[color], text, Fore.RESET)
else: else: