#!/usr/bin/env python3 import os, sys import uvicorn import aiofiles import configparser import asyncio from typing import List from fastapi import FastAPI from fastapi.responses import JSONResponse from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from sources.llm_provider import Provider from sources.interaction import Interaction from sources.agents import CasualAgent, CoderAgent, FileAgent, PlannerAgent, BrowserAgent from sources.browser import Browser, create_driver from sources.utility import pretty_print from sources.logger import Logger from sources.schemas import QueryRequest, QueryResponse from concurrent.futures import ThreadPoolExecutor from celery import Celery api = FastAPI(title="AgenticSeek API", version="0.1.0") celery_app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0") celery_app.conf.update(task_track_started=True) logger = Logger("backend.log") config = configparser.ConfigParser() config.read('config.ini') api.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) if not os.path.exists(".screenshots"): os.makedirs(".screenshots") api.mount("/screenshots", StaticFiles(directory=".screenshots"), name="screenshots") executor = ThreadPoolExecutor(max_workers=1) def initialize_system(): stealth_mode = config.getboolean('BROWSER', 'stealth_mode') personality_folder = "jarvis" if config.getboolean('MAIN', 'jarvis_personality') else "base" languages = config["MAIN"]["languages"].split(' ') provider = Provider( provider_name=config["MAIN"]["provider_name"], model=config["MAIN"]["provider_model"], server_address=config["MAIN"]["provider_server_address"], is_local=config.getboolean('MAIN', 'is_local') ) logger.info(f"Provider initialized: {provider.provider_name} ({provider.model})") browser = Browser( create_driver(headless=config.getboolean('BROWSER', 'headless_browser'), stealth_mode=stealth_mode), anticaptcha_manual_install=stealth_mode ) logger.info("Browser initialized") agents = [ CasualAgent( name=config["MAIN"]["agent_name"], prompt_path=f"prompts/{personality_folder}/casual_agent.txt", provider=provider, verbose=False ), CoderAgent( name="coder", prompt_path=f"prompts/{personality_folder}/coder_agent.txt", provider=provider, verbose=False ), FileAgent( name="File Agent", prompt_path=f"prompts/{personality_folder}/file_agent.txt", provider=provider, verbose=False ), BrowserAgent( name="Browser", prompt_path=f"prompts/{personality_folder}/browser_agent.txt", provider=provider, verbose=False, browser=browser ), PlannerAgent( name="Planner", prompt_path=f"prompts/{personality_folder}/planner_agent.txt", provider=provider, verbose=False, browser=browser ) ] logger.info("Agents initialized") interaction = Interaction( agents, tts_enabled=config.getboolean('MAIN', 'speak'), stt_enabled=config.getboolean('MAIN', 'listen'), recover_last_session=config.getboolean('MAIN', 'recover_last_session'), langs=languages ) logger.info("Interaction initialized") return interaction interaction = initialize_system() is_generating = False @api.get("/screenshot") async def get_screenshot(): logger.info("Screenshot endpoint called") screenshot_path = ".screenshots/updated_screen.png" if os.path.exists(screenshot_path): return FileResponse(screenshot_path) logger.error("No screenshot available") return JSONResponse( status_code=404, content={"error": "No screenshot available"} ) @api.get("/health") async def health_check(): logger.info("Health check endpoint called") return {"status": "healthy", "version": "0.1.0"} @api.get("/is_active") async def is_active(): logger.info("Is active endpoint called") return {"is_active": interaction.is_active} def think_wrapper(interaction, query, tts_enabled): try: interaction.tts_enabled = tts_enabled interaction.last_query = query logger.info("Agents request is being processed") success = interaction.think() if not success: interaction.last_answer = "Error: No answer from agent" interaction.last_success = False else: interaction.last_success = True return success except Exception as e: logger.error(f"Error in think_wrapper: {str(e)}") interaction.last_answer = f"Error: {str(e)}" interaction.last_success = False raise e @api.post("/query", response_model=QueryResponse) async def process_query(request: QueryRequest): global is_generating logger.info(f"Processing query: {request.query}") query_resp = QueryResponse( done="false", answer="Waiting for agent...", agent_name="Waiting for agent...", success="false", blocks={} ) if is_generating: logger.warning("Another query is being processed, please wait.") return JSONResponse(status_code=429, content=query_resp.jsonify()) try: is_generating = True loop = asyncio.get_running_loop() success = await loop.run_in_executor( executor, think_wrapper, interaction, request.query, request.tts_enabled ) is_generating = False if not success: query_resp.answer = interaction.last_answer return JSONResponse(status_code=400, content=query_resp.jsonify()) if interaction.current_agent: blocks_json = {f'{i}': block.jsonify() for i, block in enumerate(interaction.current_agent.get_blocks_result())} else: logger.error("No current agent found") blocks_json = {} query_resp.answer = "Error: No current agent" return JSONResponse(status_code=400, content=query_resp.jsonify()) logger.info(f"Answer: {interaction.last_answer}") logger.info(f"Blocks: {blocks_json}") query_resp.done = "true" query_resp.answer = interaction.last_answer query_resp.agent_name = interaction.current_agent.agent_name query_resp.success = str(interaction.last_success) query_resp.blocks = blocks_json logger.info("Query processed successfully") return JSONResponse(status_code=200, content=query_resp.jsonify()) except Exception as e: logger.error(f"An error occurred: {str(e)}") sys.exit(1) finally: logger.info("Processing finished") if config.getboolean('MAIN', 'save_session'): interaction.save_session() if __name__ == "__main__": uvicorn.run(api, host="0.0.0.0", port=8000)