mirror of
https://github.com/tcsenpai/agenticSeek.git
synced 2025-06-08 03:55:32 +00:00
Merge pull request #87 from Fosowl/dev
Integration of new custom server provider
This commit is contained in:
commit
7e18d78805
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,4 +1,5 @@
|
|||||||
*.wav
|
*.wav
|
||||||
|
*.DS_Store
|
||||||
*.safetensors
|
*.safetensors
|
||||||
config.ini
|
config.ini
|
||||||
*.egg-info
|
*.egg-info
|
||||||
|
24
README.md
24
README.md
@ -149,6 +149,8 @@ You will be prompted with `>>> `
|
|||||||
This indicate agenticSeek await you type for instructions.
|
This indicate agenticSeek await you type for instructions.
|
||||||
You can also use speech to text by setting `listen = True` in the config.
|
You can also use speech to text by setting `listen = True` in the config.
|
||||||
|
|
||||||
|
To exit, simply say `goodbye`.
|
||||||
|
|
||||||
Here are some example usage:
|
Here are some example usage:
|
||||||
|
|
||||||
### Coding/Bash
|
### Coding/Bash
|
||||||
@ -216,14 +218,28 @@ Note: For Windows or macOS, use ipconfig or ifconfig respectively to find the IP
|
|||||||
|
|
||||||
**If you wish to use openai based provider follow the *Run with an API* section.**
|
**If you wish to use openai based provider follow the *Run with an API* section.**
|
||||||
|
|
||||||
Make sure ollama is installed (Currently our script only support ollama)
|
Clone the repository and enter the `server/`folder.
|
||||||
|
|
||||||
Run our server script.
|
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
python3 server_ollama.py --model "deepseek-r1:32b"
|
git clone --depth 1 https://github.com/Fosowl/agenticSeek.git
|
||||||
|
cd agenticSeek/server/
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Install server specific requirements:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
pip3 install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the server script.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
python3 app.py --provider ollama --port 3333
|
||||||
|
```
|
||||||
|
|
||||||
|
You have the choice between using `ollama` and `llamacpp` as a LLM service.
|
||||||
|
|
||||||
### 2️⃣ **Run it**
|
### 2️⃣ **Run it**
|
||||||
|
|
||||||
Now on your personal computer:
|
Now on your personal computer:
|
||||||
@ -236,7 +252,7 @@ Set the `provider_server_address` to the ip address of the machine that will run
|
|||||||
is_local = False
|
is_local = False
|
||||||
provider_name = server
|
provider_name = server
|
||||||
provider_model = deepseek-r1:14b
|
provider_model = deepseek-r1:14b
|
||||||
provider_server_address = x.x.x.x:5000
|
provider_server_address = x.x.x.x:3333
|
||||||
```
|
```
|
||||||
|
|
||||||
Run the assistant:
|
Run the assistant:
|
||||||
|
2
main.py
2
main.py
@ -27,7 +27,7 @@ def main():
|
|||||||
server_address=config["MAIN"]["provider_server_address"],
|
server_address=config["MAIN"]["provider_server_address"],
|
||||||
is_local=config.getboolean('MAIN', 'is_local'))
|
is_local=config.getboolean('MAIN', 'is_local'))
|
||||||
|
|
||||||
browser = Browser(create_driver(), headless=config.getboolean('MAIN', 'headless_browser'))
|
browser = Browser(create_driver(headless=config.getboolean('MAIN', 'headless_browser')))
|
||||||
personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base"
|
personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base"
|
||||||
|
|
||||||
agents = [
|
agents = [
|
||||||
|
Binary file not shown.
Before Width: | Height: | Size: 147 KiB |
47
server/app.py
Normal file
47
server/app.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
#!/usr/bin python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import time
|
||||||
|
from flask import Flask, jsonify, request
|
||||||
|
|
||||||
|
from sources.llamacpp_handler import LlamacppLLM
|
||||||
|
from sources.ollama_handler import OllamaLLM
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='AgenticSeek server script')
|
||||||
|
parser.add_argument('--provider', type=str, help='LLM backend library to use. set to [ollama] or [llamacpp]', required=True)
|
||||||
|
parser.add_argument('--port', type=int, help='port to use', required=True)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
assert args.provider in ["ollama", "llamacpp"], f"Provider {args.provider} does not exists. see --help for more information"
|
||||||
|
|
||||||
|
generator = OllamaLLM() if args.provider == "ollama" else LlamacppLLM()
|
||||||
|
|
||||||
|
@app.route('/generate', methods=['POST'])
|
||||||
|
def start_generation():
|
||||||
|
if generator is None:
|
||||||
|
return jsonify({"error": "Generator not initialized"}), 401
|
||||||
|
data = request.get_json()
|
||||||
|
history = data.get('messages', [])
|
||||||
|
if generator.start(history):
|
||||||
|
return jsonify({"message": "Generation started"}), 202
|
||||||
|
return jsonify({"error": "Generation already in progress"}), 402
|
||||||
|
|
||||||
|
@app.route('/setup', methods=['POST'])
|
||||||
|
def setup():
|
||||||
|
data = request.get_json()
|
||||||
|
model = data.get('model', None)
|
||||||
|
if model is None:
|
||||||
|
return jsonify({"error": "Model not provided"}), 403
|
||||||
|
generator.set_model(model)
|
||||||
|
return jsonify({"message": "Model set"}), 200
|
||||||
|
|
||||||
|
@app.route('/get_updated_sentence')
|
||||||
|
def get_updated_sentence():
|
||||||
|
if not generator:
|
||||||
|
return jsonify({"error": "Generator not initialized"}), 405
|
||||||
|
return generator.get_status()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.run(host='0.0.0.0', threaded=True, debug=True, port=args.port)
|
5
server/install.sh
Normal file
5
server/install.sh
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
pip3 install --upgrade packaging
|
||||||
|
pip3 install --upgrade pip setuptools
|
||||||
|
pip3 install -r requirements.txt
|
@ -1,2 +1,4 @@
|
|||||||
flask>=2.3.0
|
flask>=2.3.0
|
||||||
ollama>=0.4.7
|
ollama>=0.4.7
|
||||||
|
gunicorn==19.10.0
|
||||||
|
llama-cpp-python
|
@ -1,86 +0,0 @@
|
|||||||
#!/usr/bin python3
|
|
||||||
|
|
||||||
from flask import Flask, jsonify, request
|
|
||||||
import threading
|
|
||||||
import ollama
|
|
||||||
import logging
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
log = logging.getLogger('werkzeug')
|
|
||||||
log.setLevel(logging.ERROR)
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='AgenticSeek server script')
|
|
||||||
parser.add_argument('--model', type=str, help='Model to use. eg: deepseek-r1:14b', required=True)
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
model = args.model
|
|
||||||
|
|
||||||
# Shared state with thread-safe locks
|
|
||||||
class GenerationState:
|
|
||||||
def __init__(self):
|
|
||||||
self.lock = threading.Lock()
|
|
||||||
self.last_complete_sentence = ""
|
|
||||||
self.current_buffer = ""
|
|
||||||
self.is_generating = False
|
|
||||||
|
|
||||||
state = GenerationState()
|
|
||||||
|
|
||||||
def generate_response(history, model):
|
|
||||||
global state
|
|
||||||
print("using model:::::::", model)
|
|
||||||
try:
|
|
||||||
with state.lock:
|
|
||||||
state.is_generating = True
|
|
||||||
state.last_complete_sentence = ""
|
|
||||||
state.current_buffer = ""
|
|
||||||
|
|
||||||
stream = ollama.chat(
|
|
||||||
model=model,
|
|
||||||
messages=history,
|
|
||||||
stream=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
for chunk in stream:
|
|
||||||
content = chunk['message']['content']
|
|
||||||
print(content, end='', flush=True)
|
|
||||||
|
|
||||||
with state.lock:
|
|
||||||
state.current_buffer += content
|
|
||||||
|
|
||||||
except ollama.ResponseError as e:
|
|
||||||
if e.status_code == 404:
|
|
||||||
ollama.pull(model)
|
|
||||||
with state.lock:
|
|
||||||
state.is_generating = False
|
|
||||||
print(f"Error: {e}")
|
|
||||||
finally:
|
|
||||||
with state.lock:
|
|
||||||
state.is_generating = False
|
|
||||||
|
|
||||||
@app.route('/generate', methods=['POST'])
|
|
||||||
def start_generation():
|
|
||||||
global state
|
|
||||||
data = request.get_json()
|
|
||||||
|
|
||||||
with state.lock:
|
|
||||||
if state.is_generating:
|
|
||||||
return jsonify({"error": "Generation already in progress"}), 400
|
|
||||||
|
|
||||||
history = data.get('messages', [])
|
|
||||||
# Start generation in background thread
|
|
||||||
threading.Thread(target=generate_response, args=(history, model)).start()
|
|
||||||
return jsonify({"message": "Generation started"}), 202
|
|
||||||
|
|
||||||
@app.route('/get_updated_sentence')
|
|
||||||
def get_updated_sentence():
|
|
||||||
global state
|
|
||||||
with state.lock:
|
|
||||||
return jsonify({
|
|
||||||
"sentence": state.current_buffer,
|
|
||||||
"is_complete": not state.is_generating
|
|
||||||
})
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
app.run(host='0.0.0.0', threaded=True, debug=True, port=5000)
|
|
65
server/sources/generator.py
Normal file
65
server/sources/generator.py
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
from abc import abstractmethod
|
||||||
|
|
||||||
|
class GenerationState:
|
||||||
|
def __init__(self):
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
self.last_complete_sentence = ""
|
||||||
|
self.current_buffer = ""
|
||||||
|
self.is_generating = False
|
||||||
|
|
||||||
|
def status(self) -> dict:
|
||||||
|
return {
|
||||||
|
"sentence": self.current_buffer,
|
||||||
|
"is_complete": not self.is_generating,
|
||||||
|
"last_complete_sentence": self.last_complete_sentence,
|
||||||
|
"is_generating": self.is_generating,
|
||||||
|
}
|
||||||
|
|
||||||
|
class GeneratorLLM():
|
||||||
|
def __init__(self):
|
||||||
|
self.model = None
|
||||||
|
self.state = GenerationState()
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setLevel(logging.INFO)
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
self.logger.addHandler(handler)
|
||||||
|
self.logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
def set_model(self, model: str) -> None:
|
||||||
|
self.logger.info(f"Model set to {model}")
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
def start(self, history: list) -> bool:
|
||||||
|
if self.model is None:
|
||||||
|
raise Exception("Model not set")
|
||||||
|
with self.state.lock:
|
||||||
|
if self.state.is_generating:
|
||||||
|
return False
|
||||||
|
self.state.is_generating = True
|
||||||
|
self.logger.info("Starting generation")
|
||||||
|
threading.Thread(target=self.generate, args=(history,)).start()
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_status(self) -> dict:
|
||||||
|
with self.state.lock:
|
||||||
|
return self.state.status()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def generate(self, history: list) -> None:
|
||||||
|
"""
|
||||||
|
Generate text using the model.
|
||||||
|
args:
|
||||||
|
history: list of strings
|
||||||
|
returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
generator = GeneratorLLM()
|
||||||
|
generator.get_status()
|
38
server/sources/llamacpp_handler.py
Normal file
38
server/sources/llamacpp_handler.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
|
||||||
|
from .generator import GeneratorLLM
|
||||||
|
from llama_cpp import Llama
|
||||||
|
|
||||||
|
class LlamacppLLM(GeneratorLLM):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
Handle generation using llama.cpp
|
||||||
|
"""
|
||||||
|
super().__init__()
|
||||||
|
self.llm = None
|
||||||
|
|
||||||
|
def generate(self, history):
|
||||||
|
if self.llm is None:
|
||||||
|
self.logger.info(f"Loading {self.model}...")
|
||||||
|
self.llm = Llama.from_pretrained(
|
||||||
|
repo_id=self.model,
|
||||||
|
filename="*Q8_0.gguf",
|
||||||
|
n_ctx=4096,
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
self.logger.info(f"Using {self.model} for generation with Llama.cpp")
|
||||||
|
try:
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.is_generating = True
|
||||||
|
self.state.last_complete_sentence = ""
|
||||||
|
self.state.current_buffer = ""
|
||||||
|
output = self.llm.create_chat_completion(
|
||||||
|
messages = history
|
||||||
|
)
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.current_buffer = output['choices'][0]['message']['content']
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error: {e}")
|
||||||
|
finally:
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.is_generating = False
|
59
server/sources/ollama_handler.py
Normal file
59
server/sources/ollama_handler.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
|
||||||
|
import time
|
||||||
|
from .generator import GeneratorLLM
|
||||||
|
import ollama
|
||||||
|
|
||||||
|
class OllamaLLM(GeneratorLLM):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
Handle generation using Ollama.
|
||||||
|
"""
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
def generate(self, history):
|
||||||
|
self.logger.info(f"Using {self.model} for generation with Ollama")
|
||||||
|
try:
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.is_generating = True
|
||||||
|
self.state.last_complete_sentence = ""
|
||||||
|
self.state.current_buffer = ""
|
||||||
|
|
||||||
|
stream = ollama.chat(
|
||||||
|
model=self.model,
|
||||||
|
messages=history,
|
||||||
|
stream=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
for chunk in stream:
|
||||||
|
content = chunk['message']['content']
|
||||||
|
if '\n' in content:
|
||||||
|
self.logger.info(content)
|
||||||
|
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.current_buffer += content
|
||||||
|
except Exception as e:
|
||||||
|
if "404" in str(e):
|
||||||
|
self.logger.info(f"Downloading {self.model}...")
|
||||||
|
ollama.pull(self.model)
|
||||||
|
if "refused" in str(e).lower():
|
||||||
|
raise Exception("Ollama connection failed. is the server running ?") from e
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
self.logger.info("Generation complete")
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.is_generating = False
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
generator = OllamaLLM()
|
||||||
|
history = [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "Hello, how are you ?"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
generator.set_model("deepseek-r1:1.5b")
|
||||||
|
generator.start(history)
|
||||||
|
while True:
|
||||||
|
print(generator.get_status())
|
||||||
|
time.sleep(1)
|
@ -50,18 +50,21 @@ def create_driver(headless=False):
|
|||||||
if headless:
|
if headless:
|
||||||
chrome_options.add_argument("--headless")
|
chrome_options.add_argument("--headless")
|
||||||
chrome_options.add_argument("--disable-gpu")
|
chrome_options.add_argument("--disable-gpu")
|
||||||
|
chrome_options.add_argument("--disable-webgl")
|
||||||
chrome_options.add_argument("--no-sandbox")
|
chrome_options.add_argument("--no-sandbox")
|
||||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||||
chrome_options.add_argument("--autoplay-policy=user-gesture-required")
|
chrome_options.add_argument("--autoplay-policy=user-gesture-required")
|
||||||
chrome_options.add_argument("--mute-audio")
|
chrome_options.add_argument("--mute-audio")
|
||||||
chrome_options.add_argument("--disable-webgl")
|
|
||||||
chrome_options.add_argument("--disable-notifications")
|
chrome_options.add_argument("--disable-notifications")
|
||||||
|
chrome_options.add_argument('--window-size=1080,560')
|
||||||
security_prefs = {
|
security_prefs = {
|
||||||
"profile.default_content_setting_values.media_stream": 2,
|
"profile.default_content_setting_values.media_stream": 2,
|
||||||
"profile.default_content_setting_values.geolocation": 2,
|
"profile.default_content_setting_values.geolocation": 2,
|
||||||
"safebrowsing.enabled": True,
|
"safebrowsing.enabled": True,
|
||||||
}
|
}
|
||||||
chrome_options.add_experimental_option("prefs", security_prefs)
|
chrome_options.add_experimental_option("prefs", security_prefs)
|
||||||
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||||
|
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||||||
|
|
||||||
chromedriver_path = shutil.which("chromedriver")
|
chromedriver_path = shutil.which("chromedriver")
|
||||||
if not chromedriver_path:
|
if not chromedriver_path:
|
||||||
@ -74,8 +77,8 @@ def create_driver(headless=False):
|
|||||||
return webdriver.Chrome(service=service, options=chrome_options)
|
return webdriver.Chrome(service=service, options=chrome_options)
|
||||||
|
|
||||||
class Browser:
|
class Browser:
|
||||||
def __init__(self, driver, headless=False, anticaptcha_install=True):
|
def __init__(self, driver, anticaptcha_install=True):
|
||||||
"""Initialize the browser with optional headless mode."""
|
"""Initialize the browser with optional AntiCaptcha installation."""
|
||||||
self.js_scripts_folder = "./sources/web_scripts/" if not __name__ == "__main__" else "./web_scripts/"
|
self.js_scripts_folder = "./sources/web_scripts/" if not __name__ == "__main__" else "./web_scripts/"
|
||||||
self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related"
|
self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related"
|
||||||
try:
|
try:
|
||||||
|
@ -25,10 +25,10 @@ class Interaction:
|
|||||||
if stt_enabled:
|
if stt_enabled:
|
||||||
self.transcriber = AudioTranscriber(self.ai_name, verbose=False)
|
self.transcriber = AudioTranscriber(self.ai_name, verbose=False)
|
||||||
self.recorder = AudioRecorder()
|
self.recorder = AudioRecorder()
|
||||||
if tts_enabled:
|
|
||||||
self.speech.speak("Hello, we are online and ready. What can I do for you ?")
|
|
||||||
if recover_last_session:
|
if recover_last_session:
|
||||||
self.load_last_session()
|
self.load_last_session()
|
||||||
|
if tts_enabled:
|
||||||
|
self.speech.speak("Hello, we are online and ready. What can I do for you ?")
|
||||||
|
|
||||||
def find_ai_name(self) -> str:
|
def find_ai_name(self) -> str:
|
||||||
"""Find the name of the default AI. It is required for STT as a trigger word."""
|
"""Find the name of the default AI. It is required for STT as a trigger word."""
|
||||||
|
@ -79,6 +79,8 @@ class Provider:
|
|||||||
except AttributeError as e:
|
except AttributeError as e:
|
||||||
raise NotImplementedError(f"{str(e)}\nIs {self.provider_name} implemented ?")
|
raise NotImplementedError(f"{str(e)}\nIs {self.provider_name} implemented ?")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
if "RemoteDisconnected" in str(e):
|
||||||
|
return f"{self.server_ip} seem offline. RemoteDisconnected error."
|
||||||
raise Exception(f"Provider {self.provider_name} failed: {str(e)}") from e
|
raise Exception(f"Provider {self.provider_name} failed: {str(e)}") from e
|
||||||
return thought
|
return thought
|
||||||
|
|
||||||
@ -107,21 +109,26 @@ class Provider:
|
|||||||
Use a remote server with LLM to generate text.
|
Use a remote server with LLM to generate text.
|
||||||
"""
|
"""
|
||||||
thought = ""
|
thought = ""
|
||||||
route_start = f"http://{self.server_ip}/generate"
|
route_setup = f"http://{self.server_ip}/setup"
|
||||||
|
route_gen = f"http://{self.server_ip}/generate"
|
||||||
|
|
||||||
if not self.is_ip_online(self.server_ip.split(":")[0]):
|
if not self.is_ip_online(self.server_ip.split(":")[0]):
|
||||||
raise Exception(f"Server is offline at {self.server_ip}")
|
raise Exception(f"Server is offline at {self.server_ip}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
requests.post(route_start, json={"messages": history})
|
requests.post(route_setup, json={"model": self.model})
|
||||||
|
requests.post(route_gen, json={"messages": history})
|
||||||
is_complete = False
|
is_complete = False
|
||||||
while not is_complete:
|
while not is_complete:
|
||||||
response = requests.get(f"http://{self.server_ip}/get_updated_sentence")
|
response = requests.get(f"http://{self.server_ip}/get_updated_sentence")
|
||||||
|
if "error" in response.json():
|
||||||
|
pretty_print(response.json()["error"], color="failure")
|
||||||
|
break
|
||||||
thought = response.json()["sentence"]
|
thought = response.json()["sentence"]
|
||||||
is_complete = bool(response.json()["is_complete"])
|
is_complete = bool(response.json()["is_complete"])
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
raise Exception(f"{str(e)}\n\nError occured with server route. Are you using the correct address for the config.ini provider?") from e
|
raise Exception(f"{str(e)}\nError occured with server route. Are you using the correct address for the config.ini provider?") from e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
return thought
|
return thought
|
||||||
@ -263,5 +270,6 @@ goodbye!
|
|||||||
return thought
|
return thought
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
provider = Provider("openai", "gpt-4o-mini")
|
provider = Provider("server", "deepseek-r1:1.5b", "192.168.1.20:3333")
|
||||||
print(provider.respond(["user", "Hello, how are you?"]))
|
res = provider.respond(["user", "Hello, how are you?"])
|
||||||
|
print("Response:", res)
|
||||||
|
@ -6,8 +6,33 @@ import threading
|
|||||||
import itertools
|
import itertools
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
global thinking_toggle
|
||||||
|
thinking_toggle = False
|
||||||
|
|
||||||
def pretty_print(text, color = "info"):
|
def get_color_map():
|
||||||
|
if platform.system().lower() != "windows":
|
||||||
|
color_map = {
|
||||||
|
"success": "green",
|
||||||
|
"failure": "red",
|
||||||
|
"status": "light_green",
|
||||||
|
"code": "light_blue",
|
||||||
|
"warning": "yellow",
|
||||||
|
"output": "cyan",
|
||||||
|
"info": "cyan"
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
color_map = {
|
||||||
|
"success": "green",
|
||||||
|
"failure": "red",
|
||||||
|
"status": "light_green",
|
||||||
|
"code": "light_blue",
|
||||||
|
"warning": "yellow",
|
||||||
|
"output": "cyan",
|
||||||
|
"info": "black"
|
||||||
|
}
|
||||||
|
return color_map
|
||||||
|
|
||||||
|
def pretty_print(text, color="info"):
|
||||||
"""
|
"""
|
||||||
Print text with color formatting.
|
Print text with color formatting.
|
||||||
|
|
||||||
@ -23,37 +48,23 @@ def pretty_print(text, color = "info"):
|
|||||||
- "output": Cyan
|
- "output": Cyan
|
||||||
- "default": Black (Windows only)
|
- "default": Black (Windows only)
|
||||||
"""
|
"""
|
||||||
if platform.system().lower() != "windows":
|
global thinking_toggle
|
||||||
color_map = {
|
thinking_toggle = False
|
||||||
"success": Fore.GREEN,
|
color_map = get_color_map()
|
||||||
"failure": Fore.RED,
|
|
||||||
"status": Fore.LIGHTGREEN_EX,
|
|
||||||
"code": Fore.LIGHTBLUE_EX,
|
|
||||||
"warning": Fore.YELLOW,
|
|
||||||
"output": Fore.LIGHTCYAN_EX,
|
|
||||||
"info": Fore.CYAN
|
|
||||||
}
|
|
||||||
if color not in color_map:
|
if color not in color_map:
|
||||||
print(text)
|
color = "info"
|
||||||
pretty_print(f"Invalid color {color} in pretty_print", "warning")
|
|
||||||
return
|
|
||||||
print(color_map[color], text, Fore.RESET)
|
|
||||||
else:
|
|
||||||
color_map = {
|
|
||||||
"success": "green",
|
|
||||||
"failure": "red",
|
|
||||||
"status": "light_green",
|
|
||||||
"code": "light_blue",
|
|
||||||
"warning": "yellow",
|
|
||||||
"output": "cyan",
|
|
||||||
"default": "black"
|
|
||||||
}
|
|
||||||
if color not in color_map:
|
|
||||||
color = "default"
|
|
||||||
print(colored(text, color_map[color]))
|
print(colored(text, color_map[color]))
|
||||||
|
|
||||||
def animate_thinking(text, color="status", duration=2):
|
def animate_thinking(text, color="status", duration=120):
|
||||||
|
"""
|
||||||
|
Animate a thinking spinner while a task is being executed.
|
||||||
|
It use a daemon thread to run the animation. This will not block the main thread.
|
||||||
|
Color are the same as pretty_print.
|
||||||
|
"""
|
||||||
|
global thinking_toggle
|
||||||
|
thinking_toggle = True
|
||||||
def _animate():
|
def _animate():
|
||||||
|
global thinking_toggle
|
||||||
color_map = {
|
color_map = {
|
||||||
"success": (Fore.GREEN, "green"),
|
"success": (Fore.GREEN, "green"),
|
||||||
"failure": (Fore.RED, "red"),
|
"failure": (Fore.RED, "red"),
|
||||||
@ -65,20 +76,28 @@ def animate_thinking(text, color="status", duration=2):
|
|||||||
"info": (Fore.CYAN, "cyan")
|
"info": (Fore.CYAN, "cyan")
|
||||||
}
|
}
|
||||||
fore_color, term_color = color_map.get(color, color_map["default"])
|
fore_color, term_color = color_map.get(color, color_map["default"])
|
||||||
spinner = itertools.cycle(['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'])
|
spinner = itertools.cycle([
|
||||||
|
'▉▁▁▁▁▁', '▉▉▂▁▁▁', '▉▉▉▃▁▁', '▉▉▉▉▅▁', '▉▉▉▉▉▇', '▉▉▉▉▉▉',
|
||||||
|
'▉▉▉▉▇▅', '▉▉▉▆▃▁', '▉▉▅▃▁▁', '▉▇▃▁▁▁', '▇▃▁▁▁▁', '▃▁▁▁▁▁',
|
||||||
|
'▁▃▅▃▁▁', '▁▅▉▅▁▁', '▃▉▉▉▃▁', '▅▉▁▉▅▃', '▇▃▁▃▇▅', '▉▁▁▁▉▇',
|
||||||
|
'▉▅▃▁▃▅', '▇▉▅▃▅▇', '▅▉▇▅▇▉', '▃▇▉▇▉▅', '▁▅▇▉▇▃', '▁▃▅▇▅▁'
|
||||||
|
])
|
||||||
end_time = time.time() + duration
|
end_time = time.time() + duration
|
||||||
|
|
||||||
while time.time() < end_time:
|
while time.time() < end_time:
|
||||||
|
if not thinking_toggle:
|
||||||
|
# stop if another text is printed
|
||||||
|
break
|
||||||
symbol = next(spinner)
|
symbol = next(spinner)
|
||||||
if platform.system().lower() != "windows":
|
if platform.system().lower() != "windows":
|
||||||
print(f"{fore_color}{symbol} {text}{Fore.RESET}", flush=True)
|
print(f"\r{fore_color}{symbol} {text}{Fore.RESET}", end="", flush=True)
|
||||||
else:
|
else:
|
||||||
print(colored(f"{symbol} {text}", term_color), flush=True)
|
print(f"\r{colored(f'{symbol} {text}', term_color)}", end="", flush=True)
|
||||||
time.sleep(0.1)
|
time.sleep(0.2)
|
||||||
print("\033[1A\033[K", end="", flush=True)
|
print("\r" + " " * (len(text) + 7) + "\r", end="", flush=True)
|
||||||
animation_thread = threading.Thread(target=_animate)
|
print()
|
||||||
|
animation_thread = threading.Thread(target=_animate, daemon=True)
|
||||||
animation_thread.start()
|
animation_thread.start()
|
||||||
animation_thread.join()
|
|
||||||
|
|
||||||
def timer_decorator(func):
|
def timer_decorator(func):
|
||||||
"""
|
"""
|
||||||
@ -96,3 +115,16 @@ def timer_decorator(func):
|
|||||||
pretty_print(f"{func.__name__} took {end_time - start_time:.2f} seconds to execute", "status")
|
pretty_print(f"{func.__name__} took {end_time - start_time:.2f} seconds to execute", "status")
|
||||||
return result
|
return result
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import time
|
||||||
|
pretty_print("starting imaginary task", "success")
|
||||||
|
animate_thinking("Thinking...", "status")
|
||||||
|
time.sleep(4)
|
||||||
|
pretty_print("starting another task", "failure")
|
||||||
|
animate_thinking("Thinking...", "status")
|
||||||
|
time.sleep(4)
|
||||||
|
pretty_print("yet another task", "info")
|
||||||
|
animate_thinking("Thinking...", "status")
|
||||||
|
time.sleep(4)
|
||||||
|
pretty_print("This is an info message", "info")
|
Loading…
x
Reference in New Issue
Block a user