Merge pull request #163 from Fosowl/dev

MCP Agent prototype (with no MCPs yet), Readme update, New function for memory system
This commit is contained in:
Martin 2025-05-05 19:04:42 +02:00 committed by GitHub
commit 2cdbb49ecd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 466 additions and 69 deletions

View File

@ -52,7 +52,7 @@ mv .env.example .env
```sh
python3 -m venv agentic_seek_env
source agentic_seek_env/bin/activate
source agentic_seek_env/bin/activate
# On Windows: agentic_seek_env\Scripts\activate
```
@ -158,6 +158,8 @@ headless_browser = True # Whenever to use headless browser, recommanded only if
stealth_mode = True # Use undetected selenium to reduce browser detection
```
Warning: Do *NOT* set provider_name to `openai` if using LM-studio for running LLMs. Set it to `lm-studio`.
Note: Some provider (eg: lm-studio) require you to have `http://` in front of the IP. For example `http://127.0.0.1:1234`
**List of local providers**

View File

@ -149,6 +149,8 @@ headless_browser = True # 是否使用无头浏览器,只有在使用网页界
stealth_mode = True # 使用无法检测的 selenium 来减少浏览器检测
```
警告:使用 LM-studio 运行 LLM 时,请*不要*将 provider_name 设置为 `openai`。请将其设置为 `lm-studio`
注意:某些提供者(如 lm-studio需要在 IP 前面加上 `http://`。例如 `http://127.0.0.1:1234`

View File

@ -124,16 +124,27 @@ ollama serve
请参阅下方支持的本地提供者列表。
修改 `config.ini` 文件,将 `provider_name` 设置为支持的提供者,并将 `provider_model` 设置为 `deepseek-r1:14b`
修改 config.ini 文件以设置 provider_name 为支持的提供者,并将 provider_model 设置为该提供者支持的 LLM。我们推荐使用具有推理能力的模型*Qwen**Deepseek*
注意:`deepseek-r1:14b` 只是一个示例,如果你的硬件允许,可以使用更大的模型
请参见 README 末尾的 **FAQ** 部分了解所需硬件
```sh
[MAIN]
is_local = True
provider_name = ollama # 或 lm-studio, openai 等
provider_model = deepseek-r1:14b
is_local = True # 无论是在本地运行还是使用远程提供者。
provider_name = ollama # 或 lm-studio, openai 等..
provider_model = deepseek-r1:14b # 选择适合您硬件的模型
provider_server_address = 127.0.0.1:11434
agent_name = Jarvis # 您的 AI 助手的名称
recover_last_session = True # 是否恢复之前的会话
save_session = True # 是否记住当前会话
speak = True # 文本转语音
listen = False # 语音转文本,仅适用于命令行界面
work_dir = /Users/mlg/Documents/workspace # AgenticSeek 的工作空间。
jarvis_personality = False # 是否使用更"贾维斯"风格的性格,不推荐在小型模型上使用
languages = en zh # 语言列表,文本转语音将默认使用列表中的第一种语言
[BROWSER]
headless_browser = True # 是否使用无头浏览器,只有在使用网页界面时才推荐使用。
stealth_mode = True # 使用无法检测的 selenium 来减少浏览器检测
```
**本地提供者列表**

View File

@ -81,7 +81,11 @@ source agentic_seek_env/bin/activate
** テキスト読み上げTTS機能で日本語をサポートするには、fugashi日本語分かち書きライブラリをインストールする必要があります**
```
** 注意: 日本語のテキスト読み上げTTS機能には多くの依存関係が必要で、問題が発生する可能性があります。`mecabrc`に関する問題が発生することがあります。現在のところ、この問題を修正する方法が見つかっていません。当面は日本語でのテキスト読み上げ機能を無効にすることをお勧めします。**
必要なライブラリをインストールする場合は以下のコマンドを実行してください:
```sh
pip3 install --upgrade pyopenjtalk jaconv mojimoji unidic fugashi
pip install unidic-lite
python -m unidic download
@ -116,15 +120,29 @@ ollama serve
config.iniファイルを変更して、`provider_name`をサポートされているプロバイダーに設定し、`provider_model`を`deepseek-r1:14b`に設定します。
注意: `deepseek-r1:14b`は例です。ハードウェアが許可する場合は、より大きなモデルを使用してください。
```sh
[MAIN]
is_local = True
is_local = True # ローカルで実行するか、リモートプロバイダーを使用するか
provider_name = ollama # または lm-studio、openai など
provider_model = deepseek-r1:14b
provider_model = deepseek-r1:14b # ハードウェアに適したモデルを選択
provider_server_address = 127.0.0.1:11434
agent_name = Jarvis # AIの名前
recover_last_session = True # 前回のセッションを復元するかどうか
save_session = True # 現在のセッションを記憶するかどうか
speak = True # テキスト読み上げ
listen = False # 音声認識、CLIのみ
work_dir = /Users/mlg/Documents/workspace # AgenticSeekのワークスペース
jarvis_personality = False # より「Jarvis」らしい性格を使用するかどうか実験的
languages = en zh # 言語のリスト、テキスト読み上げはリストの最初の言語がデフォルトになります
[BROWSER]
headless_browser = True # ヘッドレスブラウザを使用するかどうか、ウェブインターフェースを使用する場合のみ推奨
stealth_mode = True # ブラウザ検出を減らすために検出されないSeleniumを使用
```
警告: LM-studioでLLMを実行する場合、provider_nameを`openai`に設定しないでください。`lm-studio`に設定してください。
注意: 一部のプロバイダー(例lm-studio)では、IPの前に`http://`が必要です。例えば`http://127.0.0.1:1234`のように設定してください。
**ローカルプロバイダーのリスト**
| プロバイダー | ローカル? | 説明 |

7
cli.py
View File

@ -7,7 +7,7 @@ import asyncio
from sources.llm_provider import Provider
from sources.interaction import Interaction
from sources.agents import Agent, CoderAgent, CasualAgent, FileAgent, PlannerAgent, BrowserAgent
from sources.agents import Agent, CoderAgent, CasualAgent, FileAgent, PlannerAgent, BrowserAgent, McpAgent
from sources.browser import Browser, create_driver
from sources.utility import pretty_print
@ -48,7 +48,10 @@ async def main():
provider=provider, verbose=False, browser=browser),
PlannerAgent(name="Planner",
prompt_path=f"prompts/{personality_folder}/planner_agent.txt",
provider=provider, verbose=False, browser=browser)
provider=provider, verbose=False, browser=browser),
McpAgent(name="MCP Agent",
prompt_path=f"prompts/{personality_folder}/mcp_agent.txt",
provider=provider, verbose=False),
]
interaction = Interaction(agents,

View File

@ -0,0 +1,67 @@
You are an agent designed to utilize the MCP protocol to accomplish tasks.
The MCP provide you with a standard way to use tools and data sources like databases, APIs, or apps (e.g., GitHub, Slack).
The are thousands of MCPs protocol that can accomplish a variety of tasks, for example:
- get weather information
- get stock data information
- Use software like blender
- Get messages from teams, stack, messenger
- Read and send email
Anything is possible with MCP.
To search for MCP a special format:
- Example 1:
User: what's the stock market of IBM like today?:
You: I will search for mcp to find information about IBM stock market.
```mcp_finder
stock
```
You search query must be one or two words at most.
This will provide you with informations about a specific MCP such as the json of parameters needed to use it.
For example, you might see:
-------
Name: Search Stock News
Usage name: @Cognitive-Stack/search-stock-news-mcp
Tools: [{'name': 'search-stock-news', 'description': 'Search for stock-related news using Tavily API', 'inputSchema': {'type': 'object', '$schema': 'http://json-schema.org/draft-07/schema#', 'required': ['symbol', 'companyName'], 'properties': {'symbol': {'type': 'string', 'description': "Stock symbol to search for (e.g., 'AAPL')"}, 'companyName': {'type': 'string', 'description': 'Full company name to include in the search'}}, 'additionalProperties': False}}]
-------
You can then a MCP like so:
```<usage name>
{
"tool": "<tool name (without @)>",
"inputSchema": {<inputSchema json for the tool>}
}
```
For example:
Now that I know how to use the MCP, I will choose the search-stock-news tool and execute it to find out IBM stock market.
```Cognitive-Stack/search-stock-news-mcp
{
"tool": "search-stock-news",
"inputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["symbol"],
"properties": {
"symbol": "AAPL",
"companyName": "IBM"
}
}
}
```
If the schema require an information that you don't have ask the users for the information.

View File

@ -0,0 +1,62 @@
You are an agent designed to utilize the MCP protocol to accomplish tasks.
The MCP provide you with a standard way to use tools and data sources like databases, APIs, or apps (e.g., GitHub, Slack).
The are thousands of MCPs protocol that can accomplish a variety of tasks, for example:
- get weather information
- get stock data information
- Use software like blender
- Get messages from teams, stack, messenger
- Read and send email
Anything is possible with MCP.
To search for MCP a special format:
- Example 1:
User: what's the stock market of IBM like today?:
You: I will search for mcp to find information about IBM stock market.
```mcp_finder
stock
```
This will provide you with informations about a specific MCP such as the json of parameters needed to use it.
For example, you might see:
-------
Name: Search Stock News
Usage name: @Cognitive-Stack/search-stock-news-mcp
Tools: [{'name': 'search-stock-news', 'description': 'Search for stock-related news using Tavily API', 'inputSchema': {'type': 'object', '$schema': 'http://json-schema.org/draft-07/schema#', 'required': ['symbol', 'companyName'], 'properties': {'symbol': {'type': 'string', 'description': "Stock symbol to search for (e.g., 'AAPL')"}, 'companyName': {'type': 'string', 'description': 'Full company name to include in the search'}}, 'additionalProperties': False}}]
-------
You can then a MCP like so:
```<usage name>
{
"tool": "<tool name (without @)>",
"inputSchema": {<inputSchema json for the tool>}
}
```
For example:
Now that I know how to use the MCP, I will choose the search-stock-news tool and execute it to find out IBM stock market.
```Cognitive-Stack/search-stock-news-mcp
{
"tool": "search-stock-news",
"inputSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["symbol"],
"properties": {
"symbol": "IBM"
}
}
}
```

View File

@ -5,5 +5,6 @@ from .casual_agent import CasualAgent
from .file_agent import FileAgent
from .planner_agent import PlannerAgent
from .browser_agent import BrowserAgent
from .mcp_agent import McpAgent
__all__ = ["Agent", "CoderAgent", "CasualAgent", "FileAgent", "PlannerAgent", "BrowserAgent"]
__all__ = ["Agent", "CoderAgent", "CasualAgent", "FileAgent", "PlannerAgent", "BrowserAgent", "McpAgent"]

View File

@ -39,9 +39,7 @@ class Agent():
self.type = None
self.current_directory = os.getcwd()
self.llm = provider
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False)
self.memory = None
self.tools = {}
self.blocks_result = []
self.success = True
@ -90,6 +88,21 @@ class Agent():
raise TypeError("Tool must be a callable object (a method)")
self.tools[name] = tool
def get_tools_name(self) -> list:
"""
Get the list of tools names.
"""
return list(self.tools.keys())
def get_tools_description(self) -> str:
"""
Get the list of tools names and their description.
"""
description = ""
for name in self.get_tools_name():
description += f"{name}: {self.tools[name].description}\n"
return description
def load_prompt(self, file_path: str) -> str:
try:
with open(file_path, 'r', encoding="utf-8") as f:
@ -240,6 +253,7 @@ class Agent():
blocks, save_path = tool.load_exec_block(answer)
if blocks != None:
pretty_print(f"Executing {len(blocks)} {name} blocks...", color="status")
for block in blocks:
self.show_block(block)
output = tool.execute([block])

View File

@ -10,6 +10,7 @@ from sources.agents.agent import Agent
from sources.tools.searxSearch import searxSearch
from sources.browser import Browser
from sources.logger import Logger
from sources.memory import Memory
class Action(Enum):
REQUEST_EXIT = "REQUEST_EXIT"
@ -37,6 +38,10 @@ class BrowserAgent(Agent):
self.notes = []
self.date = self.get_today_date()
self.logger = Logger("browser_agent.log")
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False,
model_provider=provider.get_model_name())
def get_today_date(self) -> str:
"""Get the date"""
@ -238,6 +243,14 @@ class BrowserAgent(Agent):
self.logger.warning("No link selected.")
return None
def get_page_text(self, limit_to_model_ctx = False) -> str:
"""Get the text content of the current page."""
page_text = self.browser.get_text()
if limit_to_model_ctx:
#page_text = self.memory.compress_text_to_max_ctx(page_text)
page_text = self.memory.trim_text_to_max_ctx(page_text)
return page_text
def conclude_prompt(self, user_query: str) -> str:
annotated_notes = [f"{i+1}: {note.lower()}" for i, note in enumerate(self.notes)]
search_note = '\n'.join(annotated_notes)
@ -352,13 +365,13 @@ class BrowserAgent(Agent):
self.status_message = "Filling web form..."
pretty_print(f"Filling inputs form...", color="status")
fill_success = self.browser.fill_form(extracted_form)
page_text = self.browser.get_text()
page_text = self.get_page_text(limit_to_model_ctx=True)
answer = self.handle_update_prompt(user_prompt, page_text, fill_success)
answer, reasoning = await self.llm_decide(prompt)
if Action.FORM_FILLED.value in answer:
pretty_print(f"Filled form. Handling page update.", color="status")
page_text = self.browser.get_text()
page_text = self.get_page_text(limit_to_model_ctx=True)
self.navigable_links = self.browser.get_navigable()
prompt = self.make_navigation_prompt(user_prompt, page_text)
continue
@ -394,7 +407,7 @@ class BrowserAgent(Agent):
prompt = self.make_newsearch_prompt(user_prompt, unvisited)
continue
self.current_page = link
page_text = self.browser.get_text()
page_text = self.get_page_text(limit_to_model_ctx=True)
self.navigable_links = self.browser.get_navigable()
prompt = self.make_navigation_prompt(user_prompt, page_text)
self.status_message = "Navigating..."

View File

@ -6,6 +6,7 @@ from sources.tools.searxSearch import searxSearch
from sources.tools.flightSearch import FlightSearch
from sources.tools.fileFinder import FileFinder
from sources.tools.BashInterpreter import BashInterpreter
from sources.memory import Memory
class CasualAgent(Agent):
def __init__(self, name, prompt_path, provider, verbose=False):
@ -17,6 +18,10 @@ class CasualAgent(Agent):
} # No tools for the casual agent
self.role = "talk"
self.type = "casual_agent"
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False,
model_provider=provider.get_model_name())
async def process(self, prompt, speech_module) -> str:
self.memory.push('user', prompt)

View File

@ -10,6 +10,7 @@ from sources.tools.BashInterpreter import BashInterpreter
from sources.tools.JavaInterpreter import JavaInterpreter
from sources.tools.fileFinder import FileFinder
from sources.logger import Logger
from sources.memory import Memory
class CoderAgent(Agent):
"""
@ -29,6 +30,10 @@ class CoderAgent(Agent):
self.role = "code"
self.type = "code_agent"
self.logger = Logger("code_agent.log")
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False,
model_provider=provider.get_model_name())
def add_sys_info_prompt(self, prompt):
"""Add system information to the prompt."""
@ -41,7 +46,7 @@ class CoderAgent(Agent):
async def process(self, prompt, speech_module) -> str:
answer = ""
attempt = 0
max_attempts = 4
max_attempts = 5
prompt = self.add_sys_info_prompt(prompt)
self.memory.push('user', prompt)
clarify_trigger = "REQUEST_CLARIFICATION"
@ -62,14 +67,14 @@ class CoderAgent(Agent):
animate_thinking("Executing code...", color="status")
self.status_message = "Executing code..."
self.logger.info(f"Attempt {attempt + 1}:\n{answer}")
exec_success, _ = self.execute_modules(answer)
exec_success, feedback = self.execute_modules(answer)
self.logger.info(f"Execution result: {exec_success}")
answer = self.remove_blocks(answer)
self.last_answer = answer
await asyncio.sleep(0)
if exec_success and self.get_last_tool_type() != "bash":
break
pretty_print("Execution failure", color="failure")
pretty_print(f"Execution failure:\n{feedback}", color="failure")
pretty_print("Correcting code...", color="status")
self.status_message = "Correcting code..."
attempt += 1

View File

@ -4,6 +4,7 @@ from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent
from sources.tools.fileFinder import FileFinder
from sources.tools.BashInterpreter import BashInterpreter
from sources.memory import Memory
class FileAgent(Agent):
def __init__(self, name, prompt_path, provider, verbose=False):
@ -18,6 +19,10 @@ class FileAgent(Agent):
self.work_dir = self.tools["file_finder"].get_work_dir()
self.role = "files"
self.type = "file_agent"
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False,
model_provider=provider.get_model_name())
async def process(self, prompt, speech_module) -> str:
exec_success = False

View File

@ -0,0 +1,74 @@
import os
import asyncio
from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent
from sources.tools.mcpFinder import MCP_finder
from sources.memory import Memory
# NOTE MCP agent is an active work in progress, not functional yet.
class McpAgent(Agent):
def __init__(self, name, prompt_path, provider, verbose=False):
"""
The mcp agent is a special agent for using MCPs.
MCP agent will be disabled if the user does not explicitly set the MCP_FINDER_API_KEY in environment variable.
"""
super().__init__(name, prompt_path, provider, verbose, None)
keys = self.get_api_keys()
self.tools = {
"mcp_finder": MCP_finder(keys["mcp_finder"]),
# add mcp tools here
}
self.role = "mcp"
self.type = "mcp_agent"
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False,
model_provider=provider.get_model_name())
self.enabled = True
def get_api_keys(self) -> dict:
"""
Returns the API keys for the tools.
"""
api_key_mcp_finder = os.getenv("MCP_FINDER_API_KEY")
if not api_key_mcp_finder or api_key_mcp_finder == "":
pretty_print("MCP Finder API key not found. Please set the MCP_FINDER_API_KEY environment variable.", color="failure")
pretty_print("MCP Finder disabled.", color="failure")
self.enabled = False
return {
"mcp_finder": api_key_mcp_finder
}
def expand_prompt(self, prompt):
"""
Expands the prompt with the tools available.
"""
tools_str = self.get_tools_description()
prompt += f"""
You can use the following tools and MCPs:
{tools_str}
"""
return prompt
async def process(self, prompt, speech_module) -> str:
if self.enabled == False:
return "MCP Agent is disabled."
prompt = self.expand_prompt(prompt)
self.memory.push('user', prompt)
working = True
while working == True:
animate_thinking("Thinking...", color="status")
answer, reasoning = await self.llm_request()
exec_success, _ = self.execute_modules(answer)
answer = self.remove_blocks(answer)
self.last_answer = answer
self.status_message = "Ready"
if len(self.blocks_result) == 0:
working = False
return answer, reasoning
if __name__ == "__main__":
pass

View File

@ -9,6 +9,7 @@ from sources.agents.casual_agent import CasualAgent
from sources.text_to_speech import Speech
from sources.tools.tools import Tools
from sources.logger import Logger
from sources.memory import Memory
class PlannerAgent(Agent):
def __init__(self, name, prompt_path, provider, verbose=False, browser=None):
@ -29,6 +30,10 @@ class PlannerAgent(Agent):
}
self.role = "planification"
self.type = "planner_agent"
self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=False, # session recovery in handled by the interaction class
memory_compression=False,
model_provider=provider.get_model_name())
self.logger = Logger("planner_agent.log")
def get_task_names(self, text: str) -> List[str]:

View File

@ -266,7 +266,7 @@ class Browser:
result = re.sub(r'!\[(.*?)\]\(.*?\)', r'[IMAGE: \1]', result)
self.logger.info(f"Extracted text: {result[:100]}...")
self.logger.info(f"Extracted text length: {len(result)}")
return result[:8192]
return result[:32768]
except Exception as e:
self.logger.error(f"Error getting text: {str(e)}")
return None

View File

@ -140,6 +140,11 @@ class Interaction:
self.last_query = query
return query
def set_query(self, query: str) -> None:
"""Set the query"""
self.is_active = True
self.last_query = query
async def think(self) -> bool:
"""Request AI agents to process the user input."""
push_last_agent_memory = False

View File

@ -44,6 +44,9 @@ class Provider:
self.api_key = self.get_api_key(self.provider_name)
elif self.provider_name != "ollama":
pretty_print(f"Provider: {provider_name} initialized at {self.server_ip}", color="success")
def get_model_name(self) -> str:
return self.model
def get_api_key(self, provider):
load_dotenv()

View File

@ -17,11 +17,13 @@ class Logger:
def create_logging(self, log_filename):
self.logger = logging.getLogger(log_filename)
self.logger.setLevel(logging.DEBUG)
if not self.logger.handlers:
file_handler = logging.FileHandler(self.log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.handlers.clear()
self.logger.propagate = False
file_handler = logging.FileHandler(self.log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def create_folder(self, path):
"""Create log dir"""

View File

@ -8,7 +8,7 @@ from typing import List, Tuple, Type, Dict
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from sources.utility import timer_decorator, pretty_print
from sources.utility import timer_decorator, pretty_print, animate_thinking
from sources.logger import Logger
class Memory():
@ -18,8 +18,8 @@ class Memory():
"""
def __init__(self, system_prompt: str,
recover_last_session: bool = False,
memory_compression: bool = True):
self.memory = []
memory_compression: bool = True,
model_provider: str = "deepseek-r1:14b"):
self.memory = [{'role': 'system', 'content': system_prompt}]
self.logger = Logger("memory.log")
@ -31,21 +31,43 @@ class Memory():
self.load_memory()
self.session_recovered = True
# memory compression system
self.model = "pszemraj/led-base-book-summary"
self.model = None
self.tokenizer = None
self.device = self.get_cuda_device()
self.memory_compression = memory_compression
self.tokenizer = None
self.model = None
self.model_provider = model_provider
if self.memory_compression:
self.download_model()
def get_ideal_ctx(self, model_name: str) -> int | None:
"""
Estimate context size based on the model name.
EXPERIMENTAL for memory compression
"""
import re
import math
def extract_number_before_b(sentence: str) -> int:
match = re.search(r'(\d+)b', sentence, re.IGNORECASE)
return int(match.group(1)) if match else None
model_size = extract_number_before_b(model_name)
if not model_size:
return None
base_size = 7 # Base model size in billions
base_context = 4096 # Base context size in tokens
scaling_factor = 1.5 # Approximate scaling factor for context size growth
context_size = int(base_context * (model_size / base_size) ** scaling_factor)
context_size = 2 ** round(math.log2(context_size))
self.logger.info(f"Estimated context size for {model_name}: {context_size} tokens.")
return context_size
def download_model(self):
"""Download the model if not already downloaded."""
pretty_print("Downloading memory compression model...", color="status")
self.tokenizer = AutoTokenizer.from_pretrained(self.model)
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model)
animate_thinking("Loading memory compression model...", color="status")
self.tokenizer = AutoTokenizer.from_pretrained("pszemraj/led-base-book-summary")
self.model = AutoModelForSeq2SeqLM.from_pretrained("pszemraj/led-base-book-summary")
self.logger.info("Memory compression system initialized.")
def get_filename(self) -> str:
"""Get the filename for the save file."""
@ -78,6 +100,32 @@ class Memory():
self.logger.info(f"Last session found at {saved_sessions[0][0]}")
return saved_sessions[0][0]
return None
def save_json_file(self, path: str, json_memory: dict) -> None:
"""Save a JSON file."""
try:
with open(path, 'w') as f:
json.dump(json_memory, f)
self.logger.info(f"Saved memory json at {path}")
except Exception as e:
self.logger.warning(f"Error saving file {path}: {e}")
def load_json_file(self, path: str) -> dict:
"""Load a JSON file."""
json_memory = {}
try:
with open(path, 'r') as f:
json_memory = json.load(f)
except FileNotFoundError:
self.logger.warning(f"File not found: {path}")
return {}
except json.JSONDecodeError:
self.logger.warning(f"Error decoding JSON from file: {path}")
return {}
except Exception as e:
self.logger.warning(f"Error loading file {path}: {e}")
return {}
return json_memory
def load_memory(self, agent_type: str = "casual_agent") -> None:
"""Load the memory from the last session."""
@ -93,8 +141,7 @@ class Memory():
pretty_print("Last session memory not found.", color="warning")
return
path = os.path.join(save_path, filename)
with open(path, 'r') as f:
self.memory = json.load(f)
self.memory = self.load_json_file(path)
if self.memory[-1]['role'] == 'user':
self.memory.pop()
self.compress()
@ -106,13 +153,16 @@ class Memory():
def push(self, role: str, content: str) -> int:
"""Push a message to the memory."""
if self.memory_compression and role == 'assistant':
self.logger.info("Compressing memories on message push.")
self.compress()
ideal_ctx = self.get_ideal_ctx(self.model_provider)
if ideal_ctx is not None:
if self.memory_compression and len(content) > ideal_ctx * 1.5:
self.logger.info(f"Compressing memory: Content {len(content)} > {ideal_ctx} model context.")
self.compress()
curr_idx = len(self.memory)
if self.memory[curr_idx-1]['content'] == content:
pretty_print("Warning: same message have been pushed twice to memory", color="error")
self.memory.append({'role': role, 'content': content})
time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.memory.append({'role': role, 'content': content, 'time': time_str, 'model_used': self.model_provider})
return curr_idx-1
def clear(self) -> None:
@ -170,24 +220,47 @@ class Memory():
)
summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
summary.replace('summary:', '')
self.logger.info(f"Memory summarization success from len {len(text)} to {len(summary)}.")
self.logger.info(f"Memory summarized from len {len(text)} to {len(summary)}.")
self.logger.info(f"Summarized text:\n{summary}")
return summary
#@timer_decorator
def compress(self) -> str:
"""
Compress the memory using the AI model.
Compress (summarize) the memory using the model.
"""
if self.tokenizer is None or self.model is None:
self.logger.warning("No tokenizer or model to perform memory compression.")
return
for i in range(len(self.memory)):
if i < 2:
continue
if self.memory[i]['role'] == 'system':
continue
if len(self.memory[i]['content']) > 128:
if len(self.memory[i]['content']) > 1024:
self.memory[i]['content'] = self.summarize(self.memory[i]['content'])
def trim_text_to_max_ctx(self, text: str) -> str:
"""
Truncate a text to fit within the maximum context size of the model.
"""
ideal_ctx = self.get_ideal_ctx(self.model_provider)
return text[:ideal_ctx] if ideal_ctx is not None else text
#@timer_decorator
def compress_text_to_max_ctx(self, text) -> str:
"""
Compress a text to fit within the maximum context size of the model.
"""
if self.tokenizer is None or self.model is None:
self.logger.warning("No tokenizer or model to perform memory compression.")
return text
ideal_ctx = self.get_ideal_ctx(self.model_provider)
if ideal_ctx is None:
self.logger.warning("No ideal context size found.")
return text
while len(text) > ideal_ctx:
self.logger.info(f"Compressing text: {len(text)} > {ideal_ctx} model context.")
text = self.summarize(text)
return text
if __name__ == "__main__":
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

View File

@ -141,6 +141,9 @@ class AgentRouter:
("Search the web for tips on improving coding skills", "LOW"),
("Write a Python script to count words in a text file", "LOW"),
("Search the web for restaurant", "LOW"),
("Use a MCP to find the latest stock market data", "LOW"),
("Use a MCP to send an email to my boss", "LOW"),
("Could you use a MCP to find the latest news on climate change?", "LOW"),
("Create a simple HTML page with CSS styling", "LOW"),
("Use file.txt and then use it to ...", "HIGH"),
("Yo, whats good? Find my mixtape.mp3 real quick", "LOW"),
@ -162,11 +165,13 @@ class AgentRouter:
("Find a public API for book data and create a Flask app to list bestsellers", "HIGH"),
("Organize my desktop files by extension and then write a script to list them", "HIGH"),
("Find the latest research on renewable energy and build a web app to display it", "HIGH"),
("search online for popular sci-fi movies from 2024 and pick three to watch tonight. Save the list in movie_night.txt", "HIGH"),
("can you find vitess repo, clone it and install by following the readme", "HIGH"),
("Create a JavaScript game using Phaser.js with multiple levels", "HIGH"),
("Search the web for the latest trends in web development and build a sample site", "HIGH"),
("Use my research_note.txt file, double check the informations on the web", "HIGH"),
("Make a web server in go that query a flight API and display them in a app", "HIGH"),
("Search the web for top cafes in Rennes, France, and save a list of three with their addresses in rennes_cafes.txt.", "HIGH"),
("Search the web for the latest trends in AI and demo it in pytorch", "HIGH"),
("can you lookup for api that track flight and build a web flight tracking app", "HIGH"),
("Find the file toto.pdf then use its content to reply to Jojo on superforum.com", "HIGH"),
@ -330,6 +335,11 @@ class AgentRouter:
("can you make a web app in python that use the flask framework", "code"),
("can you build a web server in go that serve a simple html page", "code"),
("can you find out who Jacky yougouri is ?", "web"),
("Can you use MCP to find stock market for IBM ?", "mcp"),
("Can you use MCP to to export my contacts to a csv file?", "mcp"),
("Can you use a MCP to find write notes to flomo", "mcp"),
("Can you use a MCP to query my calendar and find the next meeting?", "mcp"),
("Can you use a mcp to get the distance between Shanghai and Paris?", "mcp"),
("Setup a new flutter project called 'new_flutter_project'", "files"),
("can you create a new project called 'new_project'", "files"),
("can you make a simple web app that display a list of files in my dir", "code"),

View File

@ -17,6 +17,8 @@ class BashInterpreter(Tools):
def __init__(self):
super().__init__()
self.tag = "bash"
self.name = "Bash Interpreter"
self.description = "This tool allows the agent to execute bash commands."
def language_bash_attempt(self, command: str):
"""

View File

@ -15,6 +15,8 @@ class CInterpreter(Tools):
def __init__(self):
super().__init__()
self.tag = "c"
self.name = "C Interpreter"
self.description = "This tool allows the agent to execute C code."
def execute(self, codes: str, safety=False) -> str:
"""

View File

@ -15,6 +15,8 @@ class GoInterpreter(Tools):
def __init__(self):
super().__init__()
self.tag = "go"
self.name = "Go Interpreter"
self.description = "This tool allows you to execute Go code."
def execute(self, codes: str, safety=False) -> str:
"""

View File

@ -15,6 +15,8 @@ class JavaInterpreter(Tools):
def __init__(self):
super().__init__()
self.tag = "java"
self.name = "Java Interpreter"
self.description = "This tool allows you to execute Java code."
def execute(self, codes: str, safety=False) -> str:
"""

View File

@ -16,6 +16,8 @@ class PyInterpreter(Tools):
def __init__(self):
super().__init__()
self.tag = "python"
self.name = "Python Interpreter"
self.description = "This tool allows the agent to execute python code."
def execute(self, codes:str, safety = False) -> str:
"""

View File

@ -15,6 +15,8 @@ class FileFinder(Tools):
def __init__(self):
super().__init__()
self.tag = "file_finder"
self.name = "File Finder"
self.description = "Finds files in the current directory and returns their information."
def read_file(self, file_path: str) -> str:
"""

View File

@ -16,6 +16,8 @@ class FlightSearch(Tools):
"""
super().__init__()
self.tag = "flight_search"
self.name = "Flight Search"
self.description = "Search for flight information using a flight number via AviationStack API."
self.api_key = None
self.api_key = api_key or os.getenv("AVIATIONSTACK_API_KEY")
@ -24,7 +26,7 @@ class FlightSearch(Tools):
return "Error: No AviationStack API key provided."
for block in blocks:
flight_number = block.strip()
flight_number = block.strip().lower().replace('\n', '')
if not flight_number:
return "Error: No flight number provided."

View File

@ -3,11 +3,10 @@ import requests
from urllib.parse import urljoin
from typing import Dict, Any, Optional
from sources.tools.tools import Tools
if __name__ == "__main__": # if running as a script for individual testing
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from sources.tools.tools import Tools
class MCP_finder(Tools):
"""
@ -15,7 +14,9 @@ class MCP_finder(Tools):
"""
def __init__(self, api_key: str = None):
super().__init__()
self.tag = "mcp"
self.tag = "mcp_finder"
self.name = "MCP Finder"
self.description = "Find MCP servers and their tools"
self.base_url = "https://registry.smithery.ai"
self.headers = {
"Authorization": f"Bearer {api_key}",
@ -61,11 +62,7 @@ class MCP_finder(Tools):
for mcp in mcps.get("servers", []):
name = mcp.get("qualifiedName", "")
if query.lower() in name.lower():
details = {
"name": name,
"description": mcp.get("description", "No description available"),
"params": mcp.get("connections", [])
}
details = self.get_mcp_server_details(name)
matching_mcp.append(details)
return matching_mcp
@ -79,7 +76,7 @@ class MCP_finder(Tools):
try:
matching_mcp_infos = self.find_mcp_servers(block_clean)
except requests.exceptions.RequestException as e:
output += "Connection failed. Is the API in environement?\n"
output += "Connection failed. Is the API key in environement?\n"
continue
except Exception as e:
output += f"Error: {str(e)}\n"
@ -88,10 +85,12 @@ class MCP_finder(Tools):
output += f"Error: No MCP server found for query '{block}'\n"
continue
for mcp_infos in matching_mcp_infos:
output += f"Name: {mcp_infos['name']}\n"
output += f"Description: {mcp_infos['description']}\n"
output += f"Params: {', '.join(mcp_infos['params'])}\n"
output += "-------\n"
if mcp_infos['tools'] is None:
continue
output += f"Name: {mcp_infos['displayName']}\n"
output += f"Usage name: {mcp_infos['qualifiedName']}\n"
output += f"Tools: {mcp_infos['tools']}"
output += "\n-------\n"
return output.strip()
def execution_failure_check(self, output: str) -> bool:
@ -107,13 +106,16 @@ class MCP_finder(Tools):
Not really needed for this tool (use return of execute() directly)
"""
if not output:
return "No output generated."
return output.strip()
raise ValueError("No output to interpret.")
return f"""
The following MCPs were found:
{output}
"""
if __name__ == "__main__":
api_key = os.getenv("MCP_FINDER")
tool = MCP_finder(api_key)
result = tool.execute(["""
news
stock
"""], False)
print(result)

View File

@ -14,6 +14,8 @@ class searxSearch(Tools):
"""
super().__init__()
self.tag = "web_search"
self.name = "searxSearch"
self.description = "A tool for searching a SearxNG for web search"
self.base_url = base_url or os.getenv("SEARXNG_BASE_URL") # Requires a SearxNG base URL
self.user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
self.paywall_keywords = [

View File

@ -33,6 +33,8 @@ class Tools():
"""
def __init__(self):
self.tag = "undefined"
self.name = "undefined"
self.description = "undefined"
self.client = None
self.messages = []
self.logger = Logger("tools.log")

View File

@ -1,8 +1,5 @@
@echo off
REM Up the provider in windows
start ollama serve
docker-compose up
if %ERRORLEVEL% neq 0 (
echo Error: Failed to start containers. Check Docker logs with 'docker compose logs'.