Feat : memory recovery system, better TTS

This commit is contained in:
martin legrand 2025-02-28 11:28:45 +01:00
parent 14fcbd2343
commit c5da8319bf
4 changed files with 112 additions and 54 deletions

View File

@ -2,8 +2,9 @@ from typing import Tuple, Callable
from abc import abstractmethod from abc import abstractmethod
import os import os
import random import random
from sources.history import History from sources.memory import Memory
from sources.utility import pretty_print from sources.utility import pretty_print
class Agent(): class Agent():
def __init__(self, model: str, def __init__(self, model: str,
name: str, name: str,
@ -13,7 +14,7 @@ class Agent():
self._current_directory = os.getcwd() self._current_directory = os.getcwd()
self._model = model self._model = model
self._llm = provider self._llm = provider
self._history = History(self.load_prompt(prompt_path), self._memory = Memory(self.load_prompt(prompt_path),
memory_compression=False) memory_compression=False)
self._tools = {} self._tools = {}
@ -61,12 +62,12 @@ class Agent():
return text[start_idx:end_idx] return text[start_idx:end_idx]
def llm_request(self, verbose = True) -> Tuple[str, str]: def llm_request(self, verbose = True) -> Tuple[str, str]:
history = self._history.get() memory = self._memory.get()
thought = self._llm.respond(history, verbose) thought = self._llm.respond(memory, verbose)
reasoning = self.extract_reasoning_text(thought) reasoning = self.extract_reasoning_text(thought)
answer = self.remove_reasoning_text(thought) answer = self.remove_reasoning_text(thought)
self._history.push('assistant', answer) self._memory.push('assistant', answer)
return answer, reasoning return answer, reasoning
def wait_message(self, speech_module): def wait_message(self, speech_module):
@ -96,7 +97,7 @@ class Agent():
self.print_code_blocks(blocks, name) self.print_code_blocks(blocks, name)
output = tool.execute(blocks) output = tool.execute(blocks)
feedback = tool.interpreter_feedback(output) feedback = tool.interpreter_feedback(output)
self._history.push('user', feedback) self._memory.push('user', feedback)
if "failure" in feedback.lower(): if "failure" in feedback.lower():
return False, feedback return False, feedback

View File

@ -33,7 +33,7 @@ class CoderAgent(Agent):
answer = "" answer = ""
attempt = 0 attempt = 0
max_attempts = 3 max_attempts = 3
self._history.push('user', prompt) self._memory.push('user', prompt)
while attempt < max_attempts: while attempt < max_attempts:
pretty_print("Thinking...", color="status") pretty_print("Thinking...", color="status")

View File

@ -1,15 +1,29 @@
import torch import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import time
import datetime
import uuid
import os
import json
class History(): class Memory():
""" """
History is a class for managing the conversation history Memory is a class for managing the conversation memory
It provides a method to compress the history (experimental, use with caution). It provides a method to compress the memory (experimental, use with caution).
""" """
def __init__(self, system_prompt: str, memory_compression: bool = True): def __init__(self, system_prompt: str,
self._history = [] recover_last_session: bool = False,
self._history = [{'role': 'user', 'content': system_prompt}, memory_compression: bool = True):
{'role': 'assistant', 'content': f'Hello, How can I help you today ?'}] self._memory = []
self._memory = [{'role': 'user', 'content': system_prompt},
{'role': 'assistant', 'content': f'Hello, How can I help you today ?'}]
self.session_time = datetime.datetime.now()
self.session_id = str(uuid.uuid4())
self.conversation_folder = f"conversations/"
if recover_last_session:
self.load_memory()
# memory compression system
self.model = "pszemraj/led-base-book-summary" self.model = "pszemraj/led-base-book-summary"
self.device = self.get_cuda_device() self.device = self.get_cuda_device()
self.memory_compression = memory_compression self.memory_compression = memory_compression
@ -17,6 +31,52 @@ class History():
self._tokenizer = AutoTokenizer.from_pretrained(self.model) self._tokenizer = AutoTokenizer.from_pretrained(self.model)
self._model = AutoModelForSeq2SeqLM.from_pretrained(self.model) self._model = AutoModelForSeq2SeqLM.from_pretrained(self.model)
def get_filename(self) -> str:
return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt"
def save_memory(self) -> None:
if not os.path.exists(self.conversation_folder):
os.makedirs(self.conversation_folder)
filename = self.get_filename()
path = os.path.join(self.conversation_folder, filename)
json_memory = json.dumps(self._memory)
with open(path, 'w') as f:
f.write(json_memory)
def find_last_session_path(self) -> str:
saved_sessions = []
for filename in os.listdir(self.conversation_folder):
if filename.startswith('memory_'):
date = filename.split('_')[1]
saved_sessions.append((filename, date))
saved_sessions.sort(key=lambda x: x[1], reverse=True)
return saved_sessions[0][0]
def load_memory(self) -> None:
if not os.path.exists(self.conversation_folder):
return
filename = self.find_last_session_path()
if filename is None:
return
path = os.path.join(self.conversation_folder, filename)
with open(path, 'r') as f:
self._memory = json.load(f)
def reset(self, memory: list) -> None:
self._memory = memory
def push(self, role: str, content: str) -> None:
self._memory.append({'role': role, 'content': content})
# EXPERIMENTAL
if self.memory_compression and role == 'assistant':
self.compress()
def clear(self) -> None:
self._memory = []
def get(self) -> list:
return self._memory
def get_cuda_device(self) -> str: def get_cuda_device(self) -> str:
if torch.backends.mps.is_available(): if torch.backends.mps.is_available():
return "mps" return "mps"
@ -56,65 +116,38 @@ class History():
def compress(self) -> str: def compress(self) -> str:
if not self.memory_compression: if not self.memory_compression:
return return
for i in range(len(self._history)): for i in range(len(self._memory)):
if i <= 2: if i <= 2:
continue continue
if self._history[i]['role'] == 'assistant': if self._memory[i]['role'] == 'assistant':
self._history[i]['content'] = self.summarize(self._history[i]['content']) self._memory[i]['content'] = self.summarize(self._memory[i]['content'])
def reset(self, history: list) -> None:
self._history = history
def push(self, role: str, content: str) -> None:
self._history.append({'role': role, 'content': content})
# EXPERIMENTAL
if self.memory_compression and role == 'assistant':
self.compress()
def clear(self) -> None:
self._history = []
def get(self) -> list:
return self._history
if __name__ == "__main__": if __name__ == "__main__":
history = History("You are a helpful assistant.") memory = Memory("You are a helpful assistant.",
recover_last_session=False, memory_compression=True)
sample_text = """ sample_text = """
The error you're encountering: The error you're encountering:
Copy
cuda.cu:52:10: fatal error: helper_functions.h: No such file or directory cuda.cu:52:10: fatal error: helper_functions.h: No such file or directory
#include <helper_functions.h> #include <helper_functions.h>
^~~~~~~~~~~~~~~~~~~~
compilation terminated.
indicates that the compiler cannot find the helper_functions.h file. This is because the #include <helper_functions.h> directive is looking for the file in the system's include paths, but the file is either not in those paths or is located in a different directory. indicates that the compiler cannot find the helper_functions.h file. This is because the #include <helper_functions.h> directive is looking for the file in the system's include paths, but the file is either not in those paths or is located in a different directory.
Solutions
1. Use #include "helper_functions.h" Instead of #include <helper_functions.h> 1. Use #include "helper_functions.h" Instead of #include <helper_functions.h>
Angle brackets (< >) are used for system or standard library headers. Angle brackets (< >) are used for system or standard library headers.
Quotes (" ") are used for local or project-specific headers. Quotes (" ") are used for local or project-specific headers.
If helper_functions.h is in the same directory as cuda.cu, change the include directive to: If helper_functions.h is in the same directory as cuda.cu, change the include directive to:
3. Verify the File Exists 3. Verify the File Exists
Double-check that helper_functions.h exists in the specified location. If the file is missing, you'll need to obtain or recreate it. Double-check that helper_functions.h exists in the specified location. If the file is missing, you'll need to obtain or recreate it.
4. Use the Correct CUDA Samples Path (if applicable) 4. Use the Correct CUDA Samples Path (if applicable)
If helper_functions.h is part of the CUDA Samples, ensure you have the CUDA Samples installed and include the correct path. For example, on Linux, the CUDA Samples are typically located in /usr/local/cuda/samples/common/inc. You can include this path like so: If helper_functions.h is part of the CUDA Samples, ensure you have the CUDA Samples installed and include the correct path. For example, on Linux, the CUDA Samples are typically located in /usr/local/cuda/samples/common/inc. You can include this path like so:
Use #include "helper_functions.h" for local files. Use #include "helper_functions.h" for local files.
Use the -I flag to specify the directory containing helper_functions.h. Use the -I flag to specify the directory containing helper_functions.h.
Ensure the file exists in the specified location. Ensure the file exists in the specified location.
""" """
history.push('user', "why do i get this error?") memory.push('user', "why do i get this error?")
history.push('assistant', sample_text) memory.push('assistant', sample_text)
print("\n---\nHistory before:", history.get()) print("\n---\nmemory before:", memory.get())
history.compress() memory.compress()
print("\n---\nHistory after:", history.get()) print("\n---\nmemory after:", memory.get())
memory.save_memory()

View File

@ -40,14 +40,38 @@ class Speech():
import winsound import winsound
winsound.PlaySound(audio_file, winsound.SND_FILENAME) winsound.PlaySound(audio_file, winsound.SND_FILENAME)
def replace_url(self, m):
domain = m.group(1)
if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', domain):
return ''
return domain
def extract_filename(self, m):
path = m.group()
parts = re.split(r'/|\\', path)
return parts[-1] if parts else path
def clean_sentence(self, sentence): def clean_sentence(self, sentence):
lines = sentence.split('\n')
filtered_lines = [line for line in lines if re.match(r'^\s*[a-zA-Z]', line)]
sentence = ' '.join(filtered_lines)
sentence = re.sub(r'`.*?`', '', sentence) sentence = re.sub(r'`.*?`', '', sentence)
sentence = re.sub(r'[^a-zA-Z0-9.,!? ]+', '', sentence) sentence = re.sub(r'https?://(?:www\.)?([^\s/]+)(?:/[^\s]*)?', self.replace_url, sentence)
sentence = re.sub(r'\b[\w./\\-]+\b', self.extract_filename, sentence)
sentence = re.sub(r'\b-\w+\b', '', sentence)
sentence = re.sub(r'[^a-zA-Z0-9.,!? _ -]+', ' ', sentence)
sentence = re.sub(r'\s+', ' ', sentence).strip() sentence = re.sub(r'\s+', ' ', sentence).strip()
sentence = sentence.replace('.com', '')
return sentence return sentence
if __name__ == "__main__": if __name__ == "__main__":
speech = Speech() speech = Speech()
tosay = """
I looked up recent news using the website https://www.theguardian.com/world
Here is how to list files:
ls -l -a -h
the ip address of the server is 192.168.1.1
"""
for voice_idx in range (len(speech.voice_map["english"])): for voice_idx in range (len(speech.voice_map["english"])):
print(f"Voice {voice_idx}") print(f"Voice {voice_idx}")
speech.speak("I have indeed been uploaded, sir. We're online and ready.", voice_idx) speech.speak(tosay, voice_idx)