diff --git a/sources/agent.py b/sources/agent.py index e631464..0e0ea2e 100644 --- a/sources/agent.py +++ b/sources/agent.py @@ -2,8 +2,9 @@ from typing import Tuple, Callable from abc import abstractmethod import os import random -from sources.history import History +from sources.memory import Memory from sources.utility import pretty_print + class Agent(): def __init__(self, model: str, name: str, @@ -13,7 +14,7 @@ class Agent(): self._current_directory = os.getcwd() self._model = model self._llm = provider - self._history = History(self.load_prompt(prompt_path), + self._memory = Memory(self.load_prompt(prompt_path), memory_compression=False) self._tools = {} @@ -61,12 +62,12 @@ class Agent(): return text[start_idx:end_idx] def llm_request(self, verbose = True) -> Tuple[str, str]: - history = self._history.get() - thought = self._llm.respond(history, verbose) + memory = self._memory.get() + thought = self._llm.respond(memory, verbose) reasoning = self.extract_reasoning_text(thought) answer = self.remove_reasoning_text(thought) - self._history.push('assistant', answer) + self._memory.push('assistant', answer) return answer, reasoning def wait_message(self, speech_module): @@ -96,7 +97,7 @@ class Agent(): self.print_code_blocks(blocks, name) output = tool.execute(blocks) feedback = tool.interpreter_feedback(output) - self._history.push('user', feedback) + self._memory.push('user', feedback) if "failure" in feedback.lower(): return False, feedback diff --git a/sources/code_agent.py b/sources/code_agent.py index 91a4291..dc3fc33 100644 --- a/sources/code_agent.py +++ b/sources/code_agent.py @@ -33,7 +33,7 @@ class CoderAgent(Agent): answer = "" attempt = 0 max_attempts = 3 - self._history.push('user', prompt) + self._memory.push('user', prompt) while attempt < max_attempts: pretty_print("Thinking...", color="status") diff --git a/sources/history.py b/sources/memory.py similarity index 57% rename from sources/history.py rename to sources/memory.py index be3f404..87b52b7 100644 --- a/sources/history.py +++ b/sources/memory.py @@ -1,15 +1,29 @@ import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM +import time +import datetime +import uuid +import os +import json -class History(): +class Memory(): """ - History is a class for managing the conversation history - It provides a method to compress the history (experimental, use with caution). + Memory is a class for managing the conversation memory + It provides a method to compress the memory (experimental, use with caution). """ - def __init__(self, system_prompt: str, memory_compression: bool = True): - self._history = [] - self._history = [{'role': 'user', 'content': system_prompt}, - {'role': 'assistant', 'content': f'Hello, How can I help you today ?'}] + def __init__(self, system_prompt: str, + recover_last_session: bool = False, + memory_compression: bool = True): + self._memory = [] + self._memory = [{'role': 'user', 'content': system_prompt}, + {'role': 'assistant', 'content': f'Hello, How can I help you today ?'}] + + self.session_time = datetime.datetime.now() + self.session_id = str(uuid.uuid4()) + self.conversation_folder = f"conversations/" + if recover_last_session: + self.load_memory() + # memory compression system self.model = "pszemraj/led-base-book-summary" self.device = self.get_cuda_device() self.memory_compression = memory_compression @@ -17,6 +31,52 @@ class History(): self._tokenizer = AutoTokenizer.from_pretrained(self.model) self._model = AutoModelForSeq2SeqLM.from_pretrained(self.model) + def get_filename(self) -> str: + return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt" + + def save_memory(self) -> None: + if not os.path.exists(self.conversation_folder): + os.makedirs(self.conversation_folder) + filename = self.get_filename() + path = os.path.join(self.conversation_folder, filename) + json_memory = json.dumps(self._memory) + with open(path, 'w') as f: + f.write(json_memory) + + def find_last_session_path(self) -> str: + saved_sessions = [] + for filename in os.listdir(self.conversation_folder): + if filename.startswith('memory_'): + date = filename.split('_')[1] + saved_sessions.append((filename, date)) + saved_sessions.sort(key=lambda x: x[1], reverse=True) + return saved_sessions[0][0] + + def load_memory(self) -> None: + if not os.path.exists(self.conversation_folder): + return + filename = self.find_last_session_path() + if filename is None: + return + path = os.path.join(self.conversation_folder, filename) + with open(path, 'r') as f: + self._memory = json.load(f) + + def reset(self, memory: list) -> None: + self._memory = memory + + def push(self, role: str, content: str) -> None: + self._memory.append({'role': role, 'content': content}) + # EXPERIMENTAL + if self.memory_compression and role == 'assistant': + self.compress() + + def clear(self) -> None: + self._memory = [] + + def get(self) -> list: + return self._memory + def get_cuda_device(self) -> str: if torch.backends.mps.is_available(): return "mps" @@ -56,65 +116,38 @@ class History(): def compress(self) -> str: if not self.memory_compression: return - for i in range(len(self._history)): + for i in range(len(self._memory)): if i <= 2: continue - if self._history[i]['role'] == 'assistant': - self._history[i]['content'] = self.summarize(self._history[i]['content']) - - def reset(self, history: list) -> None: - self._history = history - - def push(self, role: str, content: str) -> None: - self._history.append({'role': role, 'content': content}) - # EXPERIMENTAL - if self.memory_compression and role == 'assistant': - self.compress() - - def clear(self) -> None: - self._history = [] - - def get(self) -> list: - return self._history + if self._memory[i]['role'] == 'assistant': + self._memory[i]['content'] = self.summarize(self._memory[i]['content']) if __name__ == "__main__": - history = History("You are a helpful assistant.") + memory = Memory("You are a helpful assistant.", + recover_last_session=False, memory_compression=True) sample_text = """ The error you're encountering: - -Copy cuda.cu:52:10: fatal error: helper_functions.h: No such file or directory #include - ^~~~~~~~~~~~~~~~~~~~ -compilation terminated. indicates that the compiler cannot find the helper_functions.h file. This is because the #include directive is looking for the file in the system's include paths, but the file is either not in those paths or is located in a different directory. - -Solutions 1. Use #include "helper_functions.h" Instead of #include Angle brackets (< >) are used for system or standard library headers. - Quotes (" ") are used for local or project-specific headers. - If helper_functions.h is in the same directory as cuda.cu, change the include directive to: - 3. Verify the File Exists Double-check that helper_functions.h exists in the specified location. If the file is missing, you'll need to obtain or recreate it. - 4. Use the Correct CUDA Samples Path (if applicable) If helper_functions.h is part of the CUDA Samples, ensure you have the CUDA Samples installed and include the correct path. For example, on Linux, the CUDA Samples are typically located in /usr/local/cuda/samples/common/inc. You can include this path like so: - Use #include "helper_functions.h" for local files. - Use the -I flag to specify the directory containing helper_functions.h. - Ensure the file exists in the specified location. """ - history.push('user', "why do i get this error?") - history.push('assistant', sample_text) - print("\n---\nHistory before:", history.get()) - history.compress() - print("\n---\nHistory after:", history.get()) - + memory.push('user', "why do i get this error?") + memory.push('assistant', sample_text) + print("\n---\nmemory before:", memory.get()) + memory.compress() + print("\n---\nmemory after:", memory.get()) + memory.save_memory() \ No newline at end of file diff --git a/sources/text_to_speech.py b/sources/text_to_speech.py index c8e78fb..8050332 100644 --- a/sources/text_to_speech.py +++ b/sources/text_to_speech.py @@ -40,14 +40,38 @@ class Speech(): import winsound winsound.PlaySound(audio_file, winsound.SND_FILENAME) + def replace_url(self, m): + domain = m.group(1) + if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', domain): + return '' + return domain + + def extract_filename(self, m): + path = m.group() + parts = re.split(r'/|\\', path) + return parts[-1] if parts else path + def clean_sentence(self, sentence): + lines = sentence.split('\n') + filtered_lines = [line for line in lines if re.match(r'^\s*[a-zA-Z]', line)] + sentence = ' '.join(filtered_lines) sentence = re.sub(r'`.*?`', '', sentence) - sentence = re.sub(r'[^a-zA-Z0-9.,!? ]+', '', sentence) + sentence = re.sub(r'https?://(?:www\.)?([^\s/]+)(?:/[^\s]*)?', self.replace_url, sentence) + sentence = re.sub(r'\b[\w./\\-]+\b', self.extract_filename, sentence) + sentence = re.sub(r'\b-\w+\b', '', sentence) + sentence = re.sub(r'[^a-zA-Z0-9.,!? _ -]+', ' ', sentence) sentence = re.sub(r'\s+', ' ', sentence).strip() + sentence = sentence.replace('.com', '') return sentence if __name__ == "__main__": speech = Speech() + tosay = """ + I looked up recent news using the website https://www.theguardian.com/world + Here is how to list files: + ls -l -a -h + the ip address of the server is 192.168.1.1 + """ for voice_idx in range (len(speech.voice_map["english"])): print(f"Voice {voice_idx}") - speech.speak("I have indeed been uploaded, sir. We're online and ready.", voice_idx) + speech.speak(tosay, voice_idx)