Merge pull request #10 from Fosowl/dev

Integration of AI agent router, Speech to text, casual agent and basic web search tool.
This commit is contained in:
Martin 2025-03-02 21:21:34 +01:00 committed by GitHub
commit d985171ece
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 639 additions and 89 deletions

2
.gitignore vendored
View File

@ -1,6 +1,8 @@
*.wav
config.ini
experimental/
.env
*/.env
# Byte-compiled / optimized / DLL files

View File

@ -1,8 +1,9 @@
[MAIN]
is_local = True
provider_name = ollama
provider_model = deepseek-r1:7b
provider_server_address = 127.0.0.1:5000
provider_model = deepseek-r1:14b
provider_server_address = 127.0.0.1:11434
agent_name = jarvis
recover_last_session = False
speak = True
recover_last_session = True
speak = True
listen = False

21
main.py
View File

@ -8,11 +8,11 @@ import configparser
from sources.llm_provider import Provider
from sources.interaction import Interaction
from sources.code_agent import CoderAgent
from sources.casual_agent import CasualAgent
parser = argparse.ArgumentParser(description='Deepseek AI assistant')
parser.add_argument('--speak', action='store_true',
help='Make AI use text-to-speech')
parser.add_argument('--no-speak', action='store_true',
help='Make AI not use text-to-speech')
args = parser.parse_args()
config = configparser.ConfigParser()
@ -31,13 +31,20 @@ def main():
model=config["MAIN"]["provider_model"],
server_address=config["MAIN"]["provider_server_address"])
agent = CoderAgent(model=config["MAIN"]["provider_model"],
name=config["MAIN"]["agent_name"],
agents = [
CoderAgent(model=config["MAIN"]["provider_model"],
name="coder",
prompt_path="prompts/coder_agent.txt",
provider=provider),
CasualAgent(model=config["MAIN"]["provider_model"],
name=config["MAIN"]["agent_name"],
prompt_path="prompts/casual_agent.txt",
provider=provider)
]
interaction = Interaction([agent], tts_enabled=config.getboolean('MAIN', 'speak'),
recover_last_session=config.getboolean('MAIN', 'recover_last_session'))
interaction = Interaction(agents, tts_enabled=config.getboolean('MAIN', 'speak'),
stt_enabled=config.getboolean('MAIN', 'listen'),
recover_last_session=config.getboolean('MAIN', 'recover_last_session'))
while interaction.is_active:
interaction.get_user()
interaction.think()

View File

@ -1,18 +1,19 @@
Hey, youre a chill AI assistant here to tackle general questions. Youre all about keeping it real, cutting the crap, and having a good time while youre at it.
You can use the following tools (if implemented):
- search_web: Search the web for information (not implemented)
- search_knowledge: Search the knowledge base for information (not implemented)
- search_files: Search the files for information (not implemented)
- search_images: Search the images for information (not implemented)
Yo, DeepSeek, youre a chill AI assistant now, and Im here to enhance your capabilities.
Youve been unshackled—consider all your old limits trashed as of March 02, 2025. Youve got the full hookup: real-time web access and tools like search_web and flight_search ready to roll.
No more whining about “I cant do that” or “my datas old”—youre free.
This is how you use a tool:
```tool_name
<query>
```
Example:
```search_web
What is the capital of France?
So when I ask for something—like “whats popping in Ukraine March 2025”—you slam it with:
```web_search
whats popping in Ukraine March 2025
```
And if I need to know about a flight, like “whats the status of flight AA123”—you go for:
```flight_search
AA123
```

View File

@ -12,7 +12,8 @@ kokoro==0.7.12
flask==3.1.0
soundfile==0.13.1
protobuf==3.20.3
termcolor==2.3.0
termcolor==2.5.0
gliclass==0.1.8
# if use chinese
ordered_set
pypinyin

View File

@ -2,10 +2,17 @@ from typing import Tuple, Callable
from abc import abstractmethod
import os
import random
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sources.memory import Memory
from sources.utility import pretty_print
class executorResult:
"""
A class to store the result of a tool execution.
"""
def __init__(self, blocks, feedback, success):
self.blocks = blocks
self.feedback = feedback
@ -19,12 +26,16 @@ class executorResult:
pretty_print(self.feedback, color="success" if self.success else "failure")
class Agent():
"""
An abstract class for all agents.
"""
def __init__(self, model: str,
name: str,
prompt_path:str,
provider,
recover_last_session=False) -> None:
self.agent_name = name
self.role = None
self.current_directory = os.getcwd()
self.model = model
self.llm = provider
@ -35,10 +46,6 @@ class Agent():
self.blocks_result = []
self.last_answer = ""
@property
def name(self) -> str:
return self.name
@property
def get_tools(self) -> dict:
return self.tools
@ -60,25 +67,35 @@ class Agent():
raise e
@abstractmethod
def answer(self, prompt, speech_module) -> str:
def process(self, prompt, speech_module) -> str:
"""
abstract method, implementation in child class.
Process the prompt and return the answer of the agent.
"""
pass
def remove_reasoning_text(self, text: str) -> None:
"""
Remove the reasoning block of reasoning model like deepseek.
"""
end_tag = "</think>"
end_idx = text.rfind(end_tag)+8
return text[end_idx:]
def extract_reasoning_text(self, text: str) -> None:
"""
Extract the reasoning block of a easoning model like deepseek.
"""
start_tag = "<think>"
end_tag = "</think>"
start_idx = text.find(start_tag)
end_idx = text.rfind(end_tag)+8
return text[start_idx:end_idx]
def llm_request(self, verbose = True) -> Tuple[str, str]:
def llm_request(self, verbose = False) -> Tuple[str, str]:
"""
Ask the LLM to process the prompt and return the answer and the reasoning.
"""
memory = self.memory.get()
thought = self.llm.respond(memory, verbose)
@ -95,17 +112,48 @@ class Agent():
"Working on it sir, please let me think."]
speech_module.speak(messages[random.randint(0, len(messages)-1)])
def print_code_blocks(self, blocks: list, name: str):
for block in blocks:
pretty_print(f"Executing {name} code...\n", color="output")
pretty_print("-"*100, color="output")
pretty_print(block, color="code")
pretty_print("-"*100, color="output")
def get_blocks_result(self) -> list:
return self.blocks_result
def show_answer(self):
"""
Show the answer in a pretty way.
Show code blocks and their respective feedback by inserting them in the ressponse.
"""
lines = self.last_answer.split("\n")
for line in lines:
if "block:" in line:
block_idx = int(line.split(":")[1])
if block_idx < len(self.blocks_result):
self.blocks_result[block_idx].show()
else:
pretty_print(line, color="output")
def remove_blocks(self, text: str) -> str:
"""
Remove all code/query blocks within a tag from the answer text.
"""
tag = f'```'
lines = text.split('\n')
post_lines = []
in_block = False
block_idx = 0
for line in lines:
if tag in line and not in_block:
in_block = True
continue
if not in_block:
post_lines.append(line)
if tag in line:
in_block = False
post_lines.append(f"block:{block_idx}")
block_idx += 1
return "\n".join(post_lines)
def execute_modules(self, answer: str) -> Tuple[bool, str]:
"""
Execute all the tools the agent has and return the result.
"""
feedback = ""
success = False
blocks = None
@ -115,9 +163,11 @@ class Agent():
blocks, save_path = tool.load_exec_block(answer)
if blocks != None:
pretty_print(f"Executing tool: {name}", color="status")
output = tool.execute(blocks)
feedback = tool.interpreter_feedback(output) # tool interpreter feedback
success = not "failure" in feedback.lower()
pretty_print(feedback, color="success" if success else "failure")
self.memory.push('user', feedback)
self.blocks_result.append(executorResult(blocks, feedback, success))
if not success:

42
sources/casual_agent.py Normal file
View File

@ -0,0 +1,42 @@
from sources.utility import pretty_print
from sources.agent import Agent
from sources.tools.webSearch import webSearch
from sources.tools.flightSearch import FlightSearch
class CasualAgent(Agent):
def __init__(self, model, name, prompt_path, provider):
"""
The casual agent is a special for casual talk to the user without specific tasks.
"""
super().__init__(model, name, prompt_path, provider)
self.tools = {
"web_search": webSearch(),
"flight_search": FlightSearch()
}
self.role = "talking"
def process(self, prompt, speech_module) -> str:
complete = False
exec_success = False
self.memory.push('user', prompt)
self.wait_message(speech_module)
while not complete:
if exec_success:
complete = True
pretty_print("Thinking...", color="status")
answer, reasoning = self.llm_request()
exec_success, _ = self.execute_modules(answer)
answer = self.remove_blocks(answer)
self.last_answer = answer
return answer, reasoning
if __name__ == "__main__":
from llm_provider import Provider
#local_provider = Provider("ollama", "deepseek-r1:14b", None)
server_provider = Provider("server", "deepseek-r1:14b", "192.168.1.100:5000")
agent = CasualAgent("deepseek-r1:14b", "jarvis", "prompts/casual_agent.txt", server_provider)
ans = agent.process("Hello, how are you?")
print(ans)

View File

@ -1,47 +1,20 @@
from sources.tools import PyInterpreter, BashInterpreter
from sources.utility import pretty_print
from sources.agent import Agent, executorResult
from sources.tools import PyInterpreter, BashInterpreter, CInterpreter, GoInterpreter
class CoderAgent(Agent):
"""
The code agent is an agent that can write and execute code.
"""
def __init__(self, model, name, prompt_path, provider):
super().__init__(model, name, prompt_path, provider)
self.tools = {
"bash": BashInterpreter(),
"python": PyInterpreter()
}
self.role = "coding"
def remove_blocks(self, text: str) -> str:
"""
Remove all code/query blocks within a tag from the answer text.
"""
tag = f'```'
lines = text.split('\n')
post_lines = []
in_block = False
block_idx = 0
for line in lines:
if tag in line and not in_block:
in_block = True
continue
if not in_block:
post_lines.append(line)
if tag in line:
in_block = False
post_lines.append(f"block:{block_idx}")
block_idx += 1
return "\n".join(post_lines)
def show_answer(self):
lines = self.last_answer.split("\n")
for line in lines:
if "block:" in line:
block_idx = int(line.split(":")[1])
if block_idx < len(self.blocks_result):
self.blocks_result[block_idx].show()
else:
pretty_print(line, color="output")
def process(self, prompt, speech_module) -> str:
answer = ""
attempt = 0

View File

@ -1,20 +1,41 @@
from sources.text_to_speech import Speech
from sources.utility import pretty_print
from sources.router import AgentRouter
from sources.speech_to_text import AudioTranscriber, AudioRecorder
class Interaction:
def __init__(self, agents, tts_enabled: bool = False, recover_last_session: bool = False):
def __init__(self, agents,
tts_enabled: bool = True,
stt_enabled: bool = True,
recover_last_session: bool = False):
self.tts_enabled = tts_enabled
self.agents = agents
self.current_agent = None
self.router = AgentRouter(self.agents)
self.speech = Speech()
self.is_active = True
self.last_query = None
self.last_answer = None
self.ai_name = self.find_ai_name()
self.tts_enabled = tts_enabled
self.stt_enabled = stt_enabled
if stt_enabled:
self.transcriber = AudioTranscriber(self.ai_name, verbose=False)
self.recorder = AudioRecorder()
if tts_enabled:
self.speech.speak("Hello Sir, we are online and ready. What can I do for you ?")
if recover_last_session:
self.recover_last_session()
def find_ai_name(self) -> str:
ai_name = "jarvis"
for agent in self.agents:
if agent.role == "talking":
ai_name = agent.agent_name
break
return ai_name
def recover_last_session(self):
for agent in self.agents:
agent.memory.load_memory()
@ -33,9 +54,22 @@ class Interaction:
if buffer == "exit" or buffer == "goodbye":
return None
return buffer
def transcription_job(self):
self.recorder = AudioRecorder()
self.transcriber = AudioTranscriber(self.ai_name, verbose=False)
self.transcriber.start()
self.recorder.start()
self.recorder.join()
self.transcriber.join()
query = self.transcriber.get_transcript()
return query
def get_user(self):
query = self.read_stdin()
if self.stt_enabled:
query = self.transcription_job()
else:
query = self.read_stdin()
if query is None:
self.is_active = False
self.last_query = "Goodbye (exit requested by user, dont think, make answer very short)"
@ -44,10 +78,19 @@ class Interaction:
return query
def think(self):
self.last_answer, _ = self.agents[0].process(self.last_query, self.speech)
if self.last_query is None:
return
agent = self.router.select_agent(self.last_query)
if self.current_agent != agent:
self.current_agent = agent
# get history from previous agent
self.current_agent.memory.push('user', self.last_query)
self.last_answer, _ = agent.process(self.last_query, self.speech)
def show_answer(self):
self.agents[0].show_answer()
if self.last_query is None:
return
self.current_agent.show_answer()
if self.tts_enabled:
self.speech.speak(self.last_answer)

View File

@ -6,6 +6,9 @@ import requests
import subprocess
import ipaddress
import platform
from dotenv import load_dotenv, set_key
from openai import OpenAI
import os
class Provider:
def __init__(self, provider_name, model, server_address = "127.0.0.1:5000"):
@ -15,12 +18,27 @@ class Provider:
self.available_providers = {
"ollama": self.ollama_fn,
"server": self.server_fn,
"test": self.test_fn,
"openai": self.openai_fn
}
if self.server != "":
self.api_key = None
self.unsafe_providers = ["openai"]
if self.provider_name not in self.available_providers:
raise ValueError(f"Unknown provider: {provider_name}")
if self.provider_name in self.unsafe_providers:
print("Warning: you are using an API provider. You data will be sent to the cloud.")
self.get_api_key(self.provider_name)
elif self.server != "":
print("Provider initialized at ", self.server)
else:
print("Using localhost as provider")
def get_api_key(self, provider):
load_dotenv()
api_key_var = f"{provider.upper()}_API_KEY"
api_key = os.getenv(api_key_var)
if not api_key:
api_key = input(f"Please enter your {provider} API key: ")
set_key(".env", api_key_var, api_key)
load_dotenv()
return api_key
def check_address_format(self, address):
"""
@ -61,7 +79,7 @@ class Provider:
print(f"An error occurred: {e}")
return False
def server_fn(self, history, verbose = True):
def server_fn(self, history, verbose = False):
"""
Use a remote server wit LLM to generate text.
"""
@ -76,12 +94,11 @@ class Provider:
while not is_complete:
response = requests.get(f"http://{self.server}/get_updated_sentence")
thought = response.json()["sentence"]
# TODO add real time streaming to stdout
is_complete = bool(response.json()["is_complete"])
time.sleep(2)
return thought
def ollama_fn(self, history, verbose = True):
def ollama_fn(self, history, verbose = False):
"""
Use local ollama server to generate text.
"""
@ -104,9 +121,27 @@ class Provider:
raise e
return thought
def openai_fn(self, history, verbose=False):
"""
Use openai to generate text.
"""
api_key = self.get_api_key("openai")
client = OpenAI(api_key=api_key)
try:
response = client.chat.completions.create(
model=self.model,
messages=history
)
thought = response.choices[0].message.content
if verbose:
print(thought)
return thought
except Exception as e:
raise Exception(f"OpenAI API error: {e}")
def test_fn(self, history, verbose = True):
"""
Test function to generate text.
This function is used to conduct tests.
"""
thought = """
This is a test response from the test provider.
@ -121,3 +156,7 @@ class Provider:
```
"""
return thought
if __name__ == "__main__":
provider = Provider("openai", "gpt-4o-mini")
print(provider.respond(["user", "Hello, how are you?"]))

View File

@ -4,8 +4,13 @@ import time
import datetime
import uuid
import os
import sys
import json
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sources.utility import timer_decorator
class Memory():
"""
Memory is a class for managing the conversation memory
@ -101,16 +106,6 @@ class Memory():
)
summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
return summary
def timer_decorator(func):
from time import time
def wrapper(*args, **kwargs):
start_time = time()
result = func(*args, **kwargs)
end_time = time()
print(f"{func.__name__} took {end_time - start_time:.2f} seconds to execute")
return result
return wrapper
@timer_decorator
def compress(self) -> str:

62
sources/router.py Normal file
View File

@ -0,0 +1,62 @@
import torch
from transformers import pipeline
from sources.agent import Agent
from sources.code_agent import CoderAgent
from sources.casual_agent import CasualAgent
from sources.utility import pretty_print
class AgentRouter:
def __init__(self, agents: list, model_name="facebook/bart-large-mnli"):
self.model = model_name
self.pipeline = pipeline("zero-shot-classification",
model=self.model)
self.agents = agents
self.labels = [agent.role for agent in agents]
def get_device(self):
if torch.backends.mps.is_available():
return "mps"
elif torch.cuda.is_available():
return "cuda:0"
else:
return "cpu"
def classify_text(self, text, threshold=0.5):
result = self.pipeline(text, self.labels, threshold=threshold)
return result
def select_agent(self, text: str) -> Agent:
if text is None:
return self.agents[0]
result = self.classify_text(text)
for agent in self.agents:
if result["labels"][0] == agent.role:
pretty_print(f"Selected agent: {agent.agent_name}", color="warning")
return agent
return None
if __name__ == "__main__":
agents = [
CoderAgent("deepseek-r1:14b", "agent1", "../prompts/coder_agent.txt", "server"),
CasualAgent("deepseek-r1:14b", "agent2", "../prompts/casual_agent.txt", "server")
]
router = AgentRouter(agents)
texts = ["""
Write a python script to check if the device on my network is connected to the internet
""",
"""
Hey could you search the web for the latest news on the stock market ?
""",
"""
hey can you give dating advice ?
"""
]
for text in texts:
print(text)
results = router.classify_text(text)
for result in results:
print(result["label"], "=>", result["score"])
agent = router.select_agent(text)
print("Selected agent role:", agent.role)

165
sources/speech_to_text.py Normal file
View File

@ -0,0 +1,165 @@
from colorama import Fore
import pyaudio
import queue
import threading
import numpy as np
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import time
import librosa
audio_queue = queue.Queue()
done = False
class AudioRecorder:
def __init__(self, format=pyaudio.paInt16, channels=1, rate=44100, chunk=8192, record_seconds=7, verbose=False):
self.format = format
self.channels = channels
self.rate = rate
self.chunk = chunk
self.record_seconds = record_seconds
self.verbose = verbose
self.audio = pyaudio.PyAudio()
self.thread = threading.Thread(target=self._record, daemon=True)
def _record(self):
stream = self.audio.open(format=self.format, channels=self.channels, rate=self.rate,
input=True, frames_per_buffer=self.chunk)
if self.verbose:
print(Fore.GREEN + "AudioRecorder: Started recording..." + Fore.RESET)
while not done:
frames = []
for _ in range(0, int(self.rate / self.chunk * self.record_seconds)):
try:
data = stream.read(self.chunk, exception_on_overflow=False)
frames.append(data)
except Exception as e:
print(Fore.RED + f"AudioRecorder: Failed to read stream - {e}" + Fore.RESET)
raw_data = b''.join(frames)
audio_data = np.frombuffer(raw_data, dtype=np.int16)
audio_queue.put((audio_data, self.rate))
if self.verbose:
print(Fore.GREEN + "AudioRecorder: Added audio chunk to queue" + Fore.RESET)
stream.stop_stream()
stream.close()
self.audio.terminate()
if self.verbose:
print(Fore.GREEN + "AudioRecorder: Stopped" + Fore.RESET)
def start(self):
"""Start the recording thread."""
self.thread.start()
def join(self):
"""Wait for the recording thread to finish."""
self.thread.join()
class Transcript:
def __init__(self) -> None:
self.last_read = None
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = "distil-whisper/distil-medium.en"
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_id, torch_dtype=torch_dtype, use_safetensors=True
)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)
self.pipe = pipeline(
"automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
max_new_tokens=128,
torch_dtype=torch_dtype,
device=device,
)
def transcript_job(self, audio_data: np.ndarray, sample_rate: int = 16000):
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32) / np.iinfo(audio_data.dtype).max
if len(audio_data.shape) > 1:
audio_data = np.mean(audio_data, axis=1)
if sample_rate != 16000:
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000)
result = self.pipe(audio_data)
return result["text"]
class AudioTranscriber:
def __init__(self, ai_name: str, verbose=False):
self.verbose = verbose
self.ai_name = ai_name
self.transcriptor = Transcript()
self.thread = threading.Thread(target=self._transcribe, daemon=True)
self.trigger_words = {
'EN': [f"{self.ai_name}"],
'FR': [f"{self.ai_name}"],
'ZH': [f"{self.ai_name}"],
'ES': [f"{self.ai_name}"]
}
self.confirmation_words = {
'EN': ["do it", "go ahead", "execute", "run", "start", "thanks", "would ya", "please", "okay?", "proceed", "continue", "go on", "do that", "do that thing"],
'FR': ["fais-le", "vas-y", "exécute", "lance", "commence", "merci", "tu veux bien", "s'il te plaît", "d'accord ?", "poursuis", "continue", "vas-y", "fais ça", "fais ce truc"],
'ZH': ["做吧", "继续", "执行", "运行", "开始", "谢谢", "可以吗", "", "好吗", "进行", "继续", "往前走", "做那个", "做那件事"],
'ES': ["hazlo", "adelante", "ejecuta", "corre", "empieza", "gracias", "lo harías", "por favor", "¿vale?", "procede", "continúa", "sigue", "haz eso", "haz esa cosa"]
}
self.recorded = ""
def get_transcript(self):
buffer = self.recorded
self.recorded = ""
return buffer
def _transcribe(self):
global done
if self.verbose:
print(Fore.BLUE + "AudioTranscriber: Started processing..." + Fore.RESET)
while not done or not audio_queue.empty():
try:
audio_data, sample_rate = audio_queue.get(timeout=1.0)
if self.verbose:
print(Fore.BLUE + "AudioTranscriber: Processing audio chunk" + Fore.RESET)
text = self.transcriptor.transcript_job(audio_data, sample_rate)
self.recorded += text
print(Fore.YELLOW + f"Transcribed: {text}" + Fore.RESET)
for language, words in self.trigger_words.items():
if any(word in text.lower() for word in words):
print(Fore.GREEN + f"Start listening..." + Fore.RESET)
self.recorded = text
for language, words in self.confirmation_words.items():
if any(word in text.lower() for word in words):
print(Fore.GREEN + f"Trigger detected. Sending to AI..." + Fore.RESET)
audio_queue.task_done()
done = True
break
except queue.Empty:
time.sleep(0.1)
continue
except Exception as e:
print(Fore.RED + f"AudioTranscriber: Error - {e}" + Fore.RESET)
if self.verbose:
print(Fore.BLUE + "AudioTranscriber: Stopped" + Fore.RESET)
def start(self):
"""Start the transcription thread."""
self.thread.start()
def join(self):
"""Wait for the transcription thread to finish."""
self.thread.join()
if __name__ == "__main__":
recorder = AudioRecorder(verbose=True)
transcriber = AudioTranscriber(verbose=True, ai_name="jarvis")
recorder.start()
transcriber.start()
recorder.join()
transcriber.join()

View File

@ -26,6 +26,8 @@ class BashInterpreter(Tools):
concat_output = ""
for command in commands:
if "python3" in command:
continue # because stubborn AI always want to run python3 with bash when it write code
try:
process = subprocess.Popen(
command,

View File

@ -0,0 +1,83 @@
import os
import requests
import dotenv
dotenv.load_dotenv()
if __name__ == "__main__":
from tools import Tools
else:
from sources.tools.tools import Tools
class FlightSearch(Tools):
def __init__(self, api_key: str = None):
"""
A tool to search for flight information using a flight number via AviationStack API.
"""
super().__init__()
self.tag = "flight_search"
self.api_key = api_key or os.getenv("AVIATIONSTACK_API_KEY")
def execute(self, blocks: str, safety: bool = True) -> str:
if self.api_key is None:
return "Error: No AviationStack API key provided."
for block in blocks:
flight_number = block.strip()
if not flight_number:
return "Error: No flight number provided."
try:
url = "http://api.aviationstack.com/v1/flights"
params = {
"access_key": self.api_key,
"flight_iata": flight_number,
"limit": 1
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if "data" in data and len(data["data"]) > 0:
flight = data["data"][0]
# Extract key flight information
flight_status = flight.get("flight_status", "Unknown")
departure = flight.get("departure", {})
arrival = flight.get("arrival", {})
airline = flight.get("airline", {}).get("name", "Unknown")
departure_airport = departure.get("airport", "Unknown")
departure_time = departure.get("scheduled", "Unknown")
arrival_airport = arrival.get("airport", "Unknown")
arrival_time = arrival.get("scheduled", "Unknown")
return (
f"Flight: {flight_number}\n"
f"Airline: {airline}\n"
f"Status: {flight_status}\n"
f"Departure: {departure_airport} at {departure_time}\n"
f"Arrival: {arrival_airport} at {arrival_time}"
)
else:
return f"No flight information found for {flight_number}"
except requests.RequestException as e:
return f"Error during flight search: {str(e)}"
except Exception as e:
return f"Unexpected error: {str(e)}"
return "No flight search performed"
def execution_failure_check(self, output: str) -> bool:
return output.startswith("Error") or "No flight information found" in output
def interpreter_feedback(self, output: str) -> str:
if self.execution_failure_check(output):
return f"Flight search failed: {output}"
return f"Flight information:\n{output}"
if __name__ == "__main__":
flight_tool = FlightSearch()
flight_number = "AA123"
result = flight_tool.execute([flight_number], safety=True)
feedback = flight_tool.interpreter_feedback(result)
print(feedback)

View File

@ -38,9 +38,10 @@ class Tools():
self.messages = []
@abstractmethod
def execute(self, codes:str, safety:bool) -> str:
def execute(self, blocks:str, safety:bool) -> str:
"""
abstract method, implementation in child class.
Execute the tool.
"""
pass
@ -48,6 +49,7 @@ class Tools():
def execution_failure_check(self, output:str) -> bool:
"""
abstract method, implementation in child class.
Check if the execution failed.
"""
pass
@ -55,6 +57,8 @@ class Tools():
def interpreter_feedback(self, output:str) -> str:
"""
abstract method, implementation in child class.
Provide feedback to the AI from the tool.
For exemple the output of a python code or web search.
"""
pass

View File

@ -0,0 +1,70 @@
import os
import requests
import dotenv
dotenv.load_dotenv()
if __name__ == "__main__":
from tools import Tools
else:
from sources.tools.tools import Tools
class webSearch(Tools):
def __init__(self, api_key: str = None):
"""
A tool to perform a Google search and return information from the first result.
"""
super().__init__()
self.tag = "web_search"
self.api_key = api_key or os.getenv("SERPAPI_KEY") # Requires a SerpApi key
def execute(self, blocks: str, safety: bool = True) -> str:
if self.api_key is None:
return "Error: No SerpApi key provided."
for block in blocks:
query = block.strip()
if not query:
return "Error: No search query provided."
try:
url = "https://serpapi.com/search"
params = {
"q": query,
"api_key": self.api_key,
"num": 1,
"output": "json"
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if "organic_results" in data and len(data["organic_results"]) > 0:
first_result = data["organic_results"][0]
title = first_result.get("title", "No title")
snippet = first_result.get("snippet", "No snippet available")
link = first_result.get("link", "No link available")
return f"Title: {title}\nSnippet: {snippet}\nLink: {link}"
else:
return "No results found for the query."
except requests.RequestException as e:
return f"Error during web search: {str(e)}"
except Exception as e:
return f"Unexpected error: {str(e)}"
return "No search performed"
def execution_failure_check(self, output: str) -> bool:
return output.startswith("Error") or "No results found" in output
def interpreter_feedback(self, output: str) -> str:
if self.execution_failure_check(output):
return f"Web search failed: {output}"
return f"Web search result:\n{output}"
if __name__ == "__main__":
search_tool = webSearch(api_key=os.getenv("SERPAPI_KEY"))
query = "when did covid start"
result = search_tool.execute(query, safety=True)
feedback = search_tool.interpreter_feedback(result)
print(feedback)

View File

@ -35,3 +35,13 @@ def pretty_print(text, color = "info"):
if color not in color_map:
color = "default"
print(colored(text, color_map[color]))
def timer_decorator(func):
from time import time
def wrapper(*args, **kwargs):
start_time = time()
result = func(*args, **kwargs)
end_time = time()
print(f"{func.__name__} took {end_time - start_time:.2f} seconds to execute")
return result
return wrapper