mirror of
https://github.com/tcsenpai/agenticSeek.git
synced 2025-06-06 11:05:26 +00:00
commit
c41c259cd6
46
README.md
46
README.md
@ -131,6 +131,8 @@ python3 main.py
|
||||
|
||||
*See the **Run with an API** section if your hardware can't run deepseek locally*
|
||||
|
||||
*See the **Config** section for detailled config file explanation.*
|
||||
|
||||
---
|
||||
|
||||
## Usage
|
||||
@ -206,8 +208,6 @@ If you have a powerful computer or a server that you can use, but you want to us
|
||||
|
||||
### 1️⃣ **Set up and start the server scripts**
|
||||
|
||||
You need to have ollama installed on the server (We will integrate VLLM and llama.cpp soon).
|
||||
|
||||
On your "server" that will run the AI model, get the ip address
|
||||
|
||||
```sh
|
||||
@ -289,8 +289,6 @@ python3 main.py
|
||||
|
||||
---
|
||||
|
||||
|
||||
|
||||
## Speech to Text
|
||||
|
||||
The speech-to-text functionality is disabled by default. To enable it, set the listen option to True in the config.ini file:
|
||||
@ -316,6 +314,43 @@ End your request with a confirmation phrase to signal the system to proceed. Exa
|
||||
"do it", "go ahead", "execute", "run", "start", "thanks", "would ya", "please", "okay?", "proceed", "continue", "go on", "do that", "go it", "do you understand?"
|
||||
```
|
||||
|
||||
## Config
|
||||
|
||||
Example config:
|
||||
```
|
||||
[MAIN]
|
||||
is_local = True
|
||||
provider_name = ollama
|
||||
provider_model = deepseek-r1:1.5b
|
||||
provider_server_address = 127.0.0.1:11434
|
||||
agent_name = Friday
|
||||
recover_last_session = False
|
||||
save_session = False
|
||||
speak = False
|
||||
listen = False
|
||||
work_dir = /Users/mlg/Documents/ai_folder
|
||||
jarvis_personality = False
|
||||
[BROWSER]
|
||||
headless_browser = False
|
||||
stealth_mode = False
|
||||
```
|
||||
|
||||
**Explanation**:
|
||||
|
||||
- is_local -> Runs the agent locally (True) or on a remote server (False).
|
||||
- provider_name -> The provider to use (one of: `ollama`, `server`, `lm-studio`, `deepseek-api`)
|
||||
- provider_model -> The model used, e.g., deepseek-r1:1.5b.
|
||||
- provider_server_address -> Server address, e.g., 127.0.0.1:11434 for local. Set to anything for non-local API.
|
||||
- agent_name -> Name of the agent, e.g., Friday. Used as a trigger word for TTS.
|
||||
- recover_last_session -> Restarts from last session (True) or not (False).
|
||||
- save_session -> Saves session data (True) or not (False).
|
||||
- speak -> Enables voice output (True) or not (False).
|
||||
- listen -> listen to voice input (True) or not (False).
|
||||
- work_dir -> Folder the AI will have access to. eg: /Users/user/Documents/.
|
||||
- jarvis_personality -> Uses a JARVIS-like personality (True) or not (False). This simply change the prompt file.
|
||||
- headless_browser -> Runs browser without a visible window (True) or not (False).
|
||||
- stealth_mode -> Make bot detector time harder. Only downside is you have to manually install the anticaptcha extension.
|
||||
|
||||
## Providers
|
||||
|
||||
The table below show the available providers:
|
||||
@ -329,7 +364,6 @@ The table below show the available providers:
|
||||
| deepseek-api | No | Deepseek API (non-private) |
|
||||
| huggingface| No | Hugging-Face API (non-private) |
|
||||
|
||||
|
||||
To select a provider change the config.ini:
|
||||
|
||||
```
|
||||
@ -369,6 +403,8 @@ And download the chromedriver version matching your OS.
|
||||
|
||||

|
||||
|
||||
If this section is incomplete please raise an issue.
|
||||
|
||||
## FAQ
|
||||
|
||||
**Q: What hardware do I need?**
|
||||
|
@ -9,5 +9,7 @@ save_session = False
|
||||
speak = False
|
||||
listen = False
|
||||
work_dir = /Users/mlg/Documents/ai_folder
|
||||
jarvis_personality = False
|
||||
[BROWSER]
|
||||
headless_browser = False
|
||||
jarvis_personality = True
|
||||
stealth_mode = False
|
BIN
crx/nopecha.crx
Normal file
BIN
crx/nopecha.crx
Normal file
Binary file not shown.
@ -7,6 +7,7 @@ echo "Detecting operating system..."
|
||||
|
||||
OS_TYPE=$(uname -s)
|
||||
|
||||
|
||||
case "$OS_TYPE" in
|
||||
"Linux"*)
|
||||
echo "Detected Linux OS"
|
||||
@ -37,4 +38,4 @@ case "$OS_TYPE" in
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "Installation process finished!"
|
||||
echo "Installation process finished!"
|
||||
|
8
main.py
8
main.py
@ -9,6 +9,7 @@ from sources.llm_provider import Provider
|
||||
from sources.interaction import Interaction
|
||||
from sources.agents import Agent, CoderAgent, CasualAgent, FileAgent, PlannerAgent, BrowserAgent
|
||||
from sources.browser import Browser, create_driver
|
||||
from sources.utility import pretty_print
|
||||
|
||||
import warnings
|
||||
warnings.filterwarnings("ignore")
|
||||
@ -22,12 +23,17 @@ def handleInterrupt(signum, frame):
|
||||
def main():
|
||||
signal.signal(signal.SIGINT, handler=handleInterrupt)
|
||||
|
||||
pretty_print("Initializing...", color="status")
|
||||
provider = Provider(provider_name=config["MAIN"]["provider_name"],
|
||||
model=config["MAIN"]["provider_model"],
|
||||
server_address=config["MAIN"]["provider_server_address"],
|
||||
is_local=config.getboolean('MAIN', 'is_local'))
|
||||
|
||||
browser = Browser(create_driver(headless=config.getboolean('MAIN', 'headless_browser')))
|
||||
stealth_mode = config.getboolean('BROWSER', 'stealth_mode')
|
||||
browser = Browser(
|
||||
create_driver(headless=config.getboolean('BROWSER', 'headless_browser'), stealth_mode=stealth_mode),
|
||||
anticaptcha_manual_install=stealth_mode
|
||||
)
|
||||
personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base"
|
||||
|
||||
agents = [
|
||||
|
@ -29,6 +29,9 @@ distro>=1.7.0,<2
|
||||
jiter>=0.4.0,<1
|
||||
sniffio
|
||||
tqdm>4
|
||||
fake_useragent>=2.1.0
|
||||
selenium_stealth>=1.0.6
|
||||
undetected-chromedriver>=3.5.5
|
||||
# for api provider
|
||||
openai
|
||||
# if use chinese
|
||||
|
34
scripts/linux_install.sh
Normal file → Executable file
34
scripts/linux_install.sh
Normal file → Executable file
@ -2,24 +2,34 @@
|
||||
|
||||
echo "Starting installation for Linux..."
|
||||
|
||||
set -e
|
||||
# Update package list
|
||||
sudo apt-get update
|
||||
|
||||
pip install --upgrade pip
|
||||
|
||||
sudo apt-get update || { echo "Failed to update package list"; exit 1; }
|
||||
# make sure essential tool are installed
|
||||
sudo apt install python3-dev python3-pip python3-wheel build-essential alsa-utils
|
||||
# install port audio
|
||||
sudo apt-get install portaudio19-dev python-pyaudio python3-pyaudio
|
||||
# install chromedriver misc
|
||||
sudo apt install libgtk-3-dev libnotify-dev libgconf-2-4 libnss3 libxss1 libasound2t64
|
||||
# Install essential tools
|
||||
sudo apt-get install -y \
|
||||
python3-dev \
|
||||
python3-pip \
|
||||
python3-wheel \
|
||||
build-essential \
|
||||
alsa-utils \
|
||||
portaudio19-dev \
|
||||
python3-pyaudio \
|
||||
libgtk-3-dev \
|
||||
libnotify-dev \
|
||||
libgconf-2-4 \
|
||||
libnss3 \
|
||||
libxss1 || { echo "Failed to install packages"; exit 1; }
|
||||
|
||||
# upgrade pip
|
||||
pip install --upgrade pip
|
||||
# install wheel
|
||||
pip install --upgrade pip setuptools wheel
|
||||
# install docker compose
|
||||
sudo apt install docker-compose
|
||||
# Install Python dependencies from requirements.txt
|
||||
pip3 install -r requirements.txt
|
||||
sudo apt install -y docker-compose
|
||||
# Install Selenium for chromedriver
|
||||
pip3 install selenium
|
||||
# Install Python dependencies from requirements.txt
|
||||
pip3 install -r requirements.txt
|
||||
|
||||
echo "Installation complete for Linux!"
|
29
scripts/macos_install.sh
Normal file → Executable file
29
scripts/macos_install.sh
Normal file → Executable file
@ -2,16 +2,27 @@
|
||||
|
||||
echo "Starting installation for macOS..."
|
||||
|
||||
set -e
|
||||
|
||||
# Check if homebrew is installed
|
||||
if ! command -v brew &> /dev/null; then
|
||||
echo "Homebrew not found. Installing Homebrew..."
|
||||
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
|
||||
fi
|
||||
|
||||
# update
|
||||
brew update
|
||||
# make sure wget installed
|
||||
brew install wget
|
||||
# Install chromedriver using Homebrew
|
||||
brew install --cask chromedriver
|
||||
# Install portaudio for pyAudio using Homebrew
|
||||
brew install portaudio
|
||||
# update pip
|
||||
python3 -m pip install --upgrade pip
|
||||
# Install Selenium
|
||||
pip3 install selenium
|
||||
# Install Python dependencies from requirements.txt
|
||||
pip3 install -r requirements.txt
|
||||
|
||||
# Install chromedriver using Homebrew
|
||||
brew install --cask chromedriver
|
||||
|
||||
# Install portaudio for pyAudio using Homebrew
|
||||
brew install portaudio
|
||||
|
||||
# Install Selenium
|
||||
pip3 install selenium
|
||||
|
||||
echo "Installation complete for macOS!"
|
0
scripts/windows_install.bat
Normal file → Executable file
0
scripts/windows_install.bat
Normal file → Executable file
17
server/sources/decorator.py
Normal file
17
server/sources/decorator.py
Normal file
@ -0,0 +1,17 @@
|
||||
|
||||
def timer_decorator(func):
|
||||
"""
|
||||
Decorator to measure the execution time of a function.
|
||||
Usage:
|
||||
@timer_decorator
|
||||
def my_function():
|
||||
# code to execute
|
||||
"""
|
||||
from time import time
|
||||
def wrapper(*args, **kwargs):
|
||||
start_time = time()
|
||||
result = func(*args, **kwargs)
|
||||
end_time = time()
|
||||
print(f"\n{func.__name__} took {end_time - start_time:.2f} seconds to execute\n")
|
||||
return result
|
||||
return wrapper
|
@ -1,6 +1,7 @@
|
||||
|
||||
from .generator import GeneratorLLM
|
||||
from llama_cpp import Llama
|
||||
from .decorator import timer_decorator
|
||||
|
||||
class LlamacppLLM(GeneratorLLM):
|
||||
|
||||
@ -11,6 +12,7 @@ class LlamacppLLM(GeneratorLLM):
|
||||
super().__init__()
|
||||
self.llm = None
|
||||
|
||||
@timer_decorator
|
||||
def generate(self, history):
|
||||
if self.llm is None:
|
||||
self.logger.info(f"Loading {self.model}...")
|
||||
|
@ -24,7 +24,6 @@ class OllamaLLM(GeneratorLLM):
|
||||
messages=history,
|
||||
stream=True,
|
||||
)
|
||||
|
||||
for chunk in stream:
|
||||
content = chunk['message']['content']
|
||||
if '\n' in content:
|
||||
@ -32,6 +31,7 @@ class OllamaLLM(GeneratorLLM):
|
||||
|
||||
with self.state.lock:
|
||||
self.state.current_buffer += content
|
||||
|
||||
except Exception as e:
|
||||
if "404" in str(e):
|
||||
self.logger.info(f"Downloading {self.model}...")
|
||||
|
3
setup.py
3
setup.py
@ -41,6 +41,9 @@ setup(
|
||||
"anyio>=3.5.0,<5",
|
||||
"distro>=1.7.0,<2",
|
||||
"jiter>=0.4.0,<1",
|
||||
"fake_useragent>=2.1.0",
|
||||
"selenium_stealth>=1.0.6",
|
||||
"undetected-chromedriver>=3.5.5",
|
||||
"sniffio",
|
||||
"tqdm>4"
|
||||
],
|
||||
|
@ -74,6 +74,8 @@ class PlannerAgent(Agent):
|
||||
|
||||
def show_plan(self, json_plan):
|
||||
agents_tasks = self.parse_agent_tasks(json_plan)
|
||||
if agents_tasks == (None, None):
|
||||
return
|
||||
pretty_print(f"--- Plan ---", color="output")
|
||||
for task_name, task in agents_tasks:
|
||||
pretty_print(f"{task}", color="output")
|
||||
|
@ -7,19 +7,23 @@ from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.common.exceptions import TimeoutException, WebDriverException
|
||||
from selenium.webdriver.common.action_chains import ActionChains
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from bs4 import BeautifulSoup
|
||||
from urllib.parse import urlparse
|
||||
from typing import List, Tuple
|
||||
from fake_useragent import UserAgent
|
||||
from selenium_stealth import stealth
|
||||
import undetected_chromedriver as uc
|
||||
import chromedriver_autoinstaller
|
||||
import time
|
||||
import random
|
||||
import os
|
||||
import shutil
|
||||
from bs4 import BeautifulSoup
|
||||
import markdownify
|
||||
import logging
|
||||
import sys
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from sources.utility import pretty_print
|
||||
from sources.utility import pretty_print, animate_thinking
|
||||
|
||||
def get_chrome_path() -> str:
|
||||
if sys.platform.startswith("win"):
|
||||
@ -39,7 +43,8 @@ def get_chrome_path() -> str:
|
||||
return path
|
||||
return None
|
||||
|
||||
def create_driver(headless=False):
|
||||
def create_driver(headless=False, stealth_mode=True) -> webdriver.Chrome:
|
||||
"""Create a Chrome WebDriver with specified options."""
|
||||
chrome_options = Options()
|
||||
chrome_path = get_chrome_path()
|
||||
|
||||
@ -51,20 +56,21 @@ def create_driver(headless=False):
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--disable-gpu")
|
||||
chrome_options.add_argument("--disable-webgl")
|
||||
#ua = UserAgent()
|
||||
#user_agent = ua.random # NOTE sometime return wrong user agent, investigate
|
||||
#chrome_options.add_argument(f'user-agent={user_agent}')
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
chrome_options.add_argument("--autoplay-policy=user-gesture-required")
|
||||
chrome_options.add_argument("--mute-audio")
|
||||
chrome_options.add_argument("--disable-notifications")
|
||||
chrome_options.add_argument('--window-size=1080,560')
|
||||
security_prefs = {
|
||||
"profile.default_content_setting_values.media_stream": 2,
|
||||
"profile.default_content_setting_values.geolocation": 2,
|
||||
"safebrowsing.enabled": True,
|
||||
}
|
||||
chrome_options.add_experimental_option("prefs", security_prefs)
|
||||
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||||
if not stealth_mode:
|
||||
# crx file can't be installed in stealth mode
|
||||
crx_path = "./crx/nopecha.crx"
|
||||
if not os.path.exists(crx_path):
|
||||
raise FileNotFoundError(f"Extension file not found at: {crx_path}")
|
||||
chrome_options.add_extension(crx_path)
|
||||
|
||||
chromedriver_path = shutil.which("chromedriver")
|
||||
if not chromedriver_path:
|
||||
@ -74,10 +80,29 @@ def create_driver(headless=False):
|
||||
raise FileNotFoundError("ChromeDriver not found. Please install it or add it to your PATH.")
|
||||
|
||||
service = Service(chromedriver_path)
|
||||
if stealth_mode:
|
||||
driver = uc.Chrome(service=service, options=chrome_options)
|
||||
stealth(driver,
|
||||
languages=["en-US", "en"],
|
||||
vendor="Google Inc.",
|
||||
platform="Win32",
|
||||
webgl_vendor="Intel Inc.",
|
||||
renderer="Intel Iris OpenGL Engine",
|
||||
fix_hairline=True,
|
||||
)
|
||||
return driver
|
||||
security_prefs = {
|
||||
"profile.default_content_setting_values.media_stream": 2,
|
||||
"profile.default_content_setting_values.geolocation": 2,
|
||||
"safebrowsing.enabled": True,
|
||||
}
|
||||
chrome_options.add_experimental_option("prefs", security_prefs)
|
||||
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||||
return webdriver.Chrome(service=service, options=chrome_options)
|
||||
|
||||
class Browser:
|
||||
def __init__(self, driver, anticaptcha_install=True):
|
||||
def __init__(self, driver, anticaptcha_manual_install=False):
|
||||
"""Initialize the browser with optional AntiCaptcha installation."""
|
||||
self.js_scripts_folder = "./sources/web_scripts/" if not __name__ == "__main__" else "./web_scripts/"
|
||||
self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related"
|
||||
@ -88,10 +113,11 @@ class Browser:
|
||||
self.logger.info("Browser initialized successfully")
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to initialize browser: {str(e)}")
|
||||
if anticaptcha_install:
|
||||
self.load_anticatpcha()
|
||||
self.driver.get("https://www.google.com")
|
||||
if anticaptcha_manual_install:
|
||||
self.load_anticatpcha_manually()
|
||||
|
||||
def load_anticatpcha(self):
|
||||
def load_anticatpcha_manually(self):
|
||||
print("You might want to install the AntiCaptcha extension for captchas.")
|
||||
self.driver.get(self.anticaptcha)
|
||||
|
||||
@ -130,10 +156,10 @@ class Browser:
|
||||
element.decompose()
|
||||
|
||||
text = soup.get_text()
|
||||
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||||
text = "\n".join(chunk for chunk in chunks if chunk and self.is_sentence(chunk))
|
||||
text = text[:4096]
|
||||
#markdown_text = markdownify.markdownify(text, heading_style="ATX")
|
||||
return "[Start of page]\n" + text + "\n[End of page]"
|
||||
except Exception as e:
|
||||
@ -362,20 +388,16 @@ class Browser:
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
browser = Browser(headless=False)
|
||||
time.sleep(8)
|
||||
driver = create_driver()
|
||||
browser = Browser(driver)
|
||||
time.sleep(10)
|
||||
|
||||
try:
|
||||
print("AntiCaptcha Test")
|
||||
browser.go_to("https://www.google.com/recaptcha/api2/demo")
|
||||
time.sleep(5)
|
||||
print("Form Test:")
|
||||
browser.go_to("https://practicetestautomation.com/practice-test-login/")
|
||||
inputs = browser.get_form_inputs()
|
||||
inputs = ['[username](student)', f'[password](Password123)', '[appOtp]()', '[backupOtp]()']
|
||||
browser.fill_form_inputs(inputs)
|
||||
browser.find_and_click_submit()
|
||||
print("Stress test")
|
||||
browser.go_to("https://theannoyingsite.com/")
|
||||
finally:
|
||||
browser.close()
|
||||
print("AntiCaptcha Test")
|
||||
browser.go_to("https://www.google.com/recaptcha/api2/demo")
|
||||
time.sleep(10)
|
||||
print("Form Test:")
|
||||
browser.go_to("https://practicetestautomation.com/practice-test-login/")
|
||||
inputs = browser.get_form_inputs()
|
||||
inputs = ['[username](student)', f'[password](Password123)', '[appOtp]()', '[backupOtp]()']
|
||||
browser.fill_form_inputs(inputs)
|
||||
browser.find_and_click_submit()
|
||||
|
@ -1,6 +1,6 @@
|
||||
|
||||
from sources.text_to_speech import Speech
|
||||
from sources.utility import pretty_print
|
||||
from sources.utility import pretty_print, animate_thinking
|
||||
from sources.router import AgentRouter
|
||||
from sources.speech_to_text import AudioTranscriber, AudioRecorder
|
||||
|
||||
@ -12,23 +12,37 @@ class Interaction:
|
||||
tts_enabled: bool = True,
|
||||
stt_enabled: bool = True,
|
||||
recover_last_session: bool = False):
|
||||
self.agents = agents
|
||||
self.current_agent = None
|
||||
self.router = AgentRouter(self.agents)
|
||||
self.speech = Speech(enable=tts_enabled)
|
||||
self.is_active = True
|
||||
self.current_agent = None
|
||||
self.last_query = None
|
||||
self.last_answer = None
|
||||
self.ai_name = self.find_ai_name()
|
||||
self.speech = None
|
||||
self.agents = agents
|
||||
self.tts_enabled = tts_enabled
|
||||
self.stt_enabled = stt_enabled
|
||||
self.recover_last_session = recover_last_session
|
||||
self.router = AgentRouter(self.agents)
|
||||
if tts_enabled:
|
||||
animate_thinking("Initializing text-to-speech...", color="status")
|
||||
self.speech = Speech(enable=tts_enabled)
|
||||
self.ai_name = self.find_ai_name()
|
||||
self.transcriber = None
|
||||
self.recorder = None
|
||||
if stt_enabled:
|
||||
animate_thinking("Initializing speech recognition...", color="status")
|
||||
self.transcriber = AudioTranscriber(self.ai_name, verbose=False)
|
||||
self.recorder = AudioRecorder()
|
||||
if recover_last_session:
|
||||
self.load_last_session()
|
||||
if tts_enabled:
|
||||
self.emit_status()
|
||||
|
||||
def emit_status(self):
|
||||
"""Print the current status of agenticSeek."""
|
||||
if self.stt_enabled:
|
||||
pretty_print(f"Text-to-speech trigger is {self.ai_name}", color="status")
|
||||
if self.tts_enabled:
|
||||
self.speech.speak("Hello, we are online and ready. What can I do for you ?")
|
||||
pretty_print("AgenticSeek is ready.", color="status")
|
||||
|
||||
def find_ai_name(self) -> str:
|
||||
"""Find the name of the default AI. It is required for STT as a trigger word."""
|
||||
|
@ -2,7 +2,6 @@ import os
|
||||
import sys
|
||||
import torch
|
||||
from transformers import pipeline
|
||||
# adaptive-classifier==0.0.10
|
||||
from adaptive_classifier import AdaptiveClassifier
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
@ -6,8 +6,8 @@ import threading
|
||||
import itertools
|
||||
import time
|
||||
|
||||
global thinking_toggle
|
||||
thinking_toggle = False
|
||||
thinking_event = threading.Event()
|
||||
current_animation_thread = None
|
||||
|
||||
def get_color_map():
|
||||
if platform.system().lower() != "windows":
|
||||
@ -48,8 +48,11 @@ def pretty_print(text, color="info"):
|
||||
- "output": Cyan
|
||||
- "default": Black (Windows only)
|
||||
"""
|
||||
global thinking_toggle
|
||||
thinking_toggle = False
|
||||
thinking_event.set()
|
||||
if current_animation_thread and current_animation_thread.is_alive():
|
||||
current_animation_thread.join()
|
||||
thinking_event.clear()
|
||||
|
||||
color_map = get_color_map()
|
||||
if color not in color_map:
|
||||
color = "info"
|
||||
@ -61,10 +64,14 @@ def animate_thinking(text, color="status", duration=120):
|
||||
It use a daemon thread to run the animation. This will not block the main thread.
|
||||
Color are the same as pretty_print.
|
||||
"""
|
||||
global thinking_toggle
|
||||
thinking_toggle = True
|
||||
global current_animation_thread
|
||||
|
||||
thinking_event.set()
|
||||
if current_animation_thread and current_animation_thread.is_alive():
|
||||
current_animation_thread.join()
|
||||
thinking_event.clear()
|
||||
|
||||
def _animate():
|
||||
global thinking_toggle
|
||||
color_map = {
|
||||
"success": (Fore.GREEN, "green"),
|
||||
"failure": (Fore.RED, "red"),
|
||||
@ -84,10 +91,7 @@ def animate_thinking(text, color="status", duration=120):
|
||||
])
|
||||
end_time = time.time() + duration
|
||||
|
||||
while time.time() < end_time:
|
||||
if not thinking_toggle:
|
||||
# stop if another text is printed
|
||||
break
|
||||
while not thinking_event.is_set() and time.time() < end_time:
|
||||
symbol = next(spinner)
|
||||
if platform.system().lower() != "windows":
|
||||
print(f"\r{fore_color}{symbol} {text}{Fore.RESET}", end="", flush=True)
|
||||
@ -95,9 +99,8 @@ def animate_thinking(text, color="status", duration=120):
|
||||
print(f"\r{colored(f'{symbol} {text}', term_color)}", end="", flush=True)
|
||||
time.sleep(0.2)
|
||||
print("\r" + " " * (len(text) + 7) + "\r", end="", flush=True)
|
||||
print()
|
||||
animation_thread = threading.Thread(target=_animate, daemon=True)
|
||||
animation_thread.start()
|
||||
current_animation_thread = threading.Thread(target=_animate, daemon=True)
|
||||
current_animation_thread.start()
|
||||
|
||||
def timer_decorator(func):
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user