diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..78a3ed9 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,9 @@ +repos: + - repo: local + hooks: + - id: trufflehog + name: TruffleHog + description: Detect secrets in your data. + entry: bash -c 'trufflehog git file://. --since-commit HEAD --results=verified,unknown --fail --no-update' + language: system + stages: ["commit", "push"] \ No newline at end of file diff --git a/README.md b/README.md index ab8826f..6dd99b6 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,8 @@ If you have a powerful computer or a server that you can use, but you want to us ### 1️⃣ **Set up and start the server scripts** +You need to have ollama installed on the server (We will integrate VLLM and llama.cpp soon). + On your "server" that will run the AI model, get the ip address ```sh @@ -223,7 +225,7 @@ Note: For Windows or macOS, use ipconfig or ifconfig respectively to find the IP Clone the repository and then, run the script `stream_llm.py` in `server/` ```sh -python3 server_ollama.py +python3 server_ollama.py --model "deepseek-r1:32b" ``` ### 2️⃣ **Run it** diff --git a/main.py b/main.py index 002c0dc..2c2e1d4 100755 --- a/main.py +++ b/main.py @@ -29,22 +29,18 @@ def main(): server_address=config["MAIN"]["provider_server_address"]) agents = [ - CasualAgent(model=config["MAIN"]["provider_model"], - name=config["MAIN"]["agent_name"], - prompt_path="prompts/casual_agent.txt", - provider=provider), - CoderAgent(model=config["MAIN"]["provider_model"], - name="coder", - prompt_path="prompts/coder_agent.txt", - provider=provider), - FileAgent(model=config["MAIN"]["provider_model"], - name="File Agent", - prompt_path="prompts/file_agent.txt", - provider=provider), - BrowserAgent(model=config["MAIN"]["provider_model"], - name="Browser", - prompt_path="prompts/browser_agent.txt", - provider=provider) + CasualAgent(name=config["MAIN"]["agent_name"], + prompt_path="prompts/casual_agent.txt", + provider=provider, verbose=False), + CoderAgent(name="coder", + prompt_path="prompts/coder_agent.txt", + provider=provider, verbose=False), + FileAgent(name="File Agent", + prompt_path="prompts/file_agent.txt", + provider=provider, verbose=False), + BrowserAgent(name="Browser", + prompt_path="prompts/browser_agent.txt", + provider=provider, verbose=False) ] interaction = Interaction(agents, tts_enabled=config.getboolean('MAIN', 'speak'), diff --git a/server/config.json b/server/config.json deleted file mode 100644 index b976680..0000000 --- a/server/config.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "model_name": "deepseek-r1:14b", - "known_models": [ - "qwq:32b", - "deepseek-r1:1.5b", - "deepseek-r1:7b", - "deepseek-r1:14b", - "deepseek-r1:32b", - "deepseek-r1:70b", - "deepseek-r1:671b", - "deepseek-coder:1.3b", - "deepseek-coder:6.7b", - "deepseek-coder:33b", - "llama2-uncensored:7b", - "llama2-uncensored:70b", - "llama3.1:8b", - "llama3.1:70b", - "llama3.3:70b", - "llama3:8b", - "llama3:70b", - "i4:14b", - "mistral:7b", - "mistral:70b", - "mistral:33b", - "qwen1:7b", - "qwen1:14b", - "qwen1:32b", - "qwen1:70b" - ] -} \ No newline at end of file diff --git a/server/server.py b/server/server_ollama.py similarity index 63% rename from server/server.py rename to server/server_ollama.py index 5bb4269..91c8dca 100644 --- a/server/server.py +++ b/server/server_ollama.py @@ -1,47 +1,37 @@ +#!/usr/bin python3 + +# NOTE this script is temporary and will be improved + from flask import Flask, jsonify, request import threading import ollama import logging -import json +import argparse log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) +parser = argparse.ArgumentParser(description='AgenticSeek server script') +parser.add_argument('--model', type=str, help='Model to use. eg: deepseek-r1:14b', required=True) +args = parser.parse_args() + app = Flask(__name__) +model = args.model + # Shared state with thread-safe locks -class Config: - def __init__(self): - self.model = None - self.known_models = [] - self.allowed_models = [] - self.model_name = None - - def load(self): - with open('config.json', 'r') as f: - data = json.load(f) - self.known_models = data['known_models'] - self.model_name = data['model_name'] - - def validate_model(self, model): - if model not in self.known_models: - raise ValueError(f"Model {model} is not known") - class GenerationState: def __init__(self): self.lock = threading.Lock() self.last_complete_sentence = "" self.current_buffer = "" self.is_generating = False - self.model = None state = GenerationState() -def generate_response_vllm(history): - pass - -def generate_response_ollama(history): # Only takes history as an argument +def generate_response(history, model): global state + print("using model:::::::", model) try: with state.lock: state.is_generating = True @@ -49,18 +39,21 @@ def generate_response_ollama(history): # Only takes history as an argument state.current_buffer = "" stream = ollama.chat( - model=state.model, # Access state.model directly + model=model, messages=history, stream=True, ) + for chunk in stream: content = chunk['message']['content'] print(content, end='', flush=True) + with state.lock: state.current_buffer += content + except ollama.ResponseError as e: if e.status_code == 404: - ollama.pull(state.model) + ollama.pull(model) with state.lock: state.is_generating = False print(f"Error: {e}") @@ -78,8 +71,8 @@ def start_generation(): return jsonify({"error": "Generation already in progress"}), 400 history = data.get('messages', []) - # Pass only history to the thread - threading.Thread(target=generate_response, args=(history,)).start() # Note the comma to make it a single-element tuple + # Start generation in background thread + threading.Thread(target=generate_response, args=(history, model)).start() return jsonify({"message": "Generation started"}), 202 @app.route('/get_updated_sentence') @@ -92,8 +85,4 @@ def get_updated_sentence(): }) if __name__ == '__main__': - config = Config() - config.load() - config.validate_model(config.model_name) - state.model = config.model_name - app.run(host='0.0.0.0', port=5000, debug=False, threaded=True) + app.run(host='0.0.0.0', threaded=True, debug=True, port=5000) \ No newline at end of file diff --git a/sources/agents/agent.py b/sources/agents/agent.py index 51a2e83..a0acbdb 100644 --- a/sources/agents/agent.py +++ b/sources/agents/agent.py @@ -30,15 +30,24 @@ class Agent(): """ An abstract class for all agents. """ - def __init__(self, model: str, - name: str, + def __init__(self, name: str, prompt_path:str, provider, - recover_last_session=True) -> None: + recover_last_session=True, + verbose=False) -> None: + """ + Args: + name (str): Name of the agent. + prompt_path (str): Path to the prompt file for the agent. + provider: The provider for the LLM. + recover_last_session (bool, optional): Whether to recover the last conversation. + verbose (bool, optional): Enable verbose logging if True. Defaults to False. + """ + self.agent_name = name self.role = None + self.type = None self.current_directory = os.getcwd() - self.model = model self.llm = provider self.memory = Memory(self.load_prompt(prompt_path), recover_last_session=recover_last_session, @@ -46,6 +55,7 @@ class Agent(): self.tools = {} self.blocks_result = [] self.last_answer = "" + self.verbose = verbose @property def get_tools(self) -> dict: @@ -93,12 +103,12 @@ class Agent(): end_idx = text.rfind(end_tag)+8 return text[start_idx:end_idx] - def llm_request(self, verbose = False) -> Tuple[str, str]: + def llm_request(self) -> Tuple[str, str]: """ Ask the LLM to process the prompt and return the answer and the reasoning. """ memory = self.memory.get() - thought = self.llm.respond(memory, verbose) + thought = self.llm.respond(memory, self.verbose) reasoning = self.extract_reasoning_text(thought) answer = self.remove_reasoning_text(thought) diff --git a/sources/agents/browser_agent.py b/sources/agents/browser_agent.py index 7a4952f..fa0d298 100644 --- a/sources/agents/browser_agent.py +++ b/sources/agents/browser_agent.py @@ -6,35 +6,47 @@ from sources.agents.agent import Agent from sources.tools.searxSearch import searxSearch from sources.browser import Browser from datetime import date +from typing import List, Tuple class BrowserAgent(Agent): - def __init__(self, model, name, prompt_path, provider): + def __init__(self, name, prompt_path, provider, verbose=False): """ The Browser agent is an agent that navigate the web autonomously in search of answer """ - super().__init__(model, name, prompt_path, provider) + super().__init__(name, prompt_path, provider, verbose) self.tools = { "web_search": searxSearch(), } - self.role = "Web Research" + self.role = "Web search and navigation" + self.type = "browser_agent" self.browser = Browser() + self.current_page = "" self.search_history = [] self.navigable_links = [] self.notes = [] self.date = self.get_today_date() def get_today_date(self) -> str: + """Get the date""" date_time = date.today() return date_time.strftime("%B %d, %Y") - def extract_links(self, search_result: str): + def extract_links(self, search_result: str) -> List[str]: + """Extract all links from a sentence.""" pattern = r'(https?://\S+|www\.\S+)' matches = re.findall(pattern, search_result) - trailing_punct = ".,!?;:" + trailing_punct = ".,!?;:)" cleaned_links = [link.rstrip(trailing_punct) for link in matches] return self.clean_links(cleaned_links) + + def extract_form(self, text: str) -> List[str]: + """Extract form written by the LLM in format [input_name](value)""" + inputs = [] + matches = re.findall(r"\[\w+\]\([^)]+\)", text) + return matches - def clean_links(self, links: list): + def clean_links(self, links: List[str]) -> List[str]: + """Ensure no '.' at the end of link""" links_clean = [] for link in links: link = link.strip() @@ -44,10 +56,10 @@ class BrowserAgent(Agent): links_clean.append(link) return links_clean - def get_unvisited_links(self): + def get_unvisited_links(self) -> List[str]: return "\n".join([f"[{i}] {link}" for i, link in enumerate(self.navigable_links) if link not in self.search_history]) - def make_newsearch_prompt(self, user_prompt: str, search_result: dict): + def make_newsearch_prompt(self, user_prompt: str, search_result: dict) -> str: search_choice = self.stringify_search_results(search_result) return f""" Based on the search result: @@ -58,16 +70,19 @@ class BrowserAgent(Agent): Do not explain your choice. """ - def make_navigation_prompt(self, user_prompt: str, page_text: str): + def make_navigation_prompt(self, user_prompt: str, page_text: str) -> str: remaining_links = self.get_unvisited_links() - remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, proceed with a new search." + remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, do a new search." + inputs_form = self.browser.get_form_inputs() + inputs_form_text = '\n'.join(inputs_form) + return f""" You are a web browser. You are currently on this webpage: {page_text} You can navigate to these navigation links: - {remaining_links} + {remaining_links_text} Your task: 1. Decide if the current page answers the user’s query: {user_prompt} @@ -77,9 +92,13 @@ class BrowserAgent(Agent): 2. Navigate by either: - Navigate to a navigation links (write the full URL, e.g., www.example.com/cats). - If no link seems helpful, say: GO_BACK. + 3. Fill forms on the page: + - If user give you informations that help you fill form, fill it. + - If you don't know how to fill a form, leave it empty. + - You can fill a form using [form_name](value). Recap of note taking: - If useful -> Note: [Briefly summarize the key information that answers the user’s query.] + If useful -> Note: [Briefly summarize the key information or task you conducted.] Do not write "The page talk about ...", write your finding on the page and how they contribute to an answer. If not useful -> Error: [Explain why the page doesn’t help.] @@ -88,23 +107,32 @@ class BrowserAgent(Agent): No link seem useful to provide futher information. GO_BACK Example 2 (not useful, but related link): - Error: This forum reddit.com/welcome does not discuss anything related to the user’s query. + Error: reddit.com/welcome does not discuss anything related to the user’s query. There is a link that could lead to the information, I want to navigate to http://reddit.com/r/locallama Example 3 (not useful, no related links): - Error: x.com does not discuss anything related to the user’s query and no navigation link are usefull + Error: x.com does not discuss anything related to the user’s query and no navigation link are usefull. GO_BACK Example 3 (query answer found): Note: I found on github.com that agenticSeek is Fosowl. Given this information, given this I should exit the web browser. REQUEST_EXIT - Current date: {self.date} + Example 4 (loging form visible): + Note: I am on the login page, I should now type the given username and password. + [form_name_1](David) + [form_name_2](edgerunners_2077) + + You see the following inputs forms: + {inputs_form_text} + Remember, the user asked: {user_prompt} + You are currently on page : {self.current_page} Do not explain your choice. + Refusal is not an option, you have been given all capabilities that allow you to perform any tasks. """ - def llm_decide(self, prompt): + def llm_decide(self, prompt: str) -> Tuple[str, str]: animate_thinking("Thinking...", color="status") self.memory.push('user', prompt) answer, reasoning = self.llm_request() @@ -113,14 +141,14 @@ class BrowserAgent(Agent): pretty_print("-"*100) return answer, reasoning - def select_unvisited(self, search_result): + def select_unvisited(self, search_result: List[str]) -> List[str]: results_unvisited = [] for res in search_result: if res["link"] not in self.search_history: results_unvisited.append(res) return results_unvisited - def jsonify_search_results(self, results_string): + def jsonify_search_results(self, results_string: str) -> List[str]: result_blocks = results_string.split("\n\n") parsed_results = [] for block in result_blocks: @@ -139,7 +167,7 @@ class BrowserAgent(Agent): parsed_results.append(result_dict) return parsed_results - def stringify_search_results(self, results_arr): + def stringify_search_results(self, results_arr: List[str]) -> str: return '\n\n'.join([f"Link: {res['link']}" for res in results_arr]) def save_notes(self, text): @@ -148,7 +176,7 @@ class BrowserAgent(Agent): if "note" in line.lower(): self.notes.append(line) - def conclude_prompt(self, user_query): + def conclude_prompt(self, user_query: str) -> str: annotated_notes = [f"{i+1}: {note.lower().replace('note:', '')}" for i, note in enumerate(self.notes)] search_note = '\n'.join(annotated_notes) print("AI research notes:\n", search_note) @@ -158,17 +186,17 @@ class BrowserAgent(Agent): A web AI made the following finding across different pages: {search_note} - Summarize the finding, and provide a conclusion that answer the request. + Summarize the finding or step that lead to success, and provide a conclusion that answer the request. """ - def search_prompt(self, user_prompt): + def search_prompt(self, user_prompt: str) -> str: return f""" Current date: {self.date} Make a efficient search engine query to help users with their request: {user_prompt} Example: - User: "search: hey jarvis i want you to login to my twitter and say hello everyone " - You: Twitter + User: "go to twitter, login with username toto and password pass79 to my twitter and say hello everyone " + You: search: Twitter login page. User: "I need info on the best laptops for AI this year." You: "search: best laptops 2025 to run Machine Learning model, reviews" @@ -193,21 +221,31 @@ class BrowserAgent(Agent): while not complete: answer, reasoning = self.llm_decide(prompt) self.save_notes(answer) + + extracted_form = self.extract_form(answer) + if len(extracted_form) > 0: + self.browser.fill_form_inputs(extracted_form) + self.browser.find_and_click_submit() + if "REQUEST_EXIT" in answer: complete = True break + links = self.extract_links(answer) if len(unvisited) == 0: break + if len(links) == 0 or "GO_BACK" in answer: unvisited = self.select_unvisited(search_result) prompt = self.make_newsearch_prompt(user_prompt, unvisited) pretty_print(f"Going back to results. Still {len(unvisited)}", color="warning") links = [] continue + animate_thinking(f"Navigating to {links[0]}", color="status") speech_module.speak(f"Navigating to {links[0]}") self.browser.go_to(links[0]) + self.current_page = links[0] self.search_history.append(links[0]) page_text = self.browser.get_text() self.navigable_links = self.browser.get_navigable() diff --git a/sources/agents/casual_agent.py b/sources/agents/casual_agent.py index 512610e..114d617 100644 --- a/sources/agents/casual_agent.py +++ b/sources/agents/casual_agent.py @@ -7,24 +7,24 @@ from sources.tools.fileFinder import FileFinder from sources.tools.BashInterpreter import BashInterpreter class CasualAgent(Agent): - def __init__(self, model, name, prompt_path, provider): + def __init__(self, name, prompt_path, provider, verbose=False): """ The casual agent is a special for casual talk to the user without specific tasks. """ - super().__init__(model, name, prompt_path, provider) + super().__init__(name, prompt_path, provider, verbose) self.tools = { "web_search": searxSearch(), "flight_search": FlightSearch(), "file_finder": FileFinder(), "bash": BashInterpreter() } - self.role = "Chat and Conversation" + self.role = "talk" + self.type = "casual_agent" def process(self, prompt, speech_module) -> str: complete = False self.memory.push('user', prompt) - self.wait_message(speech_module) while not complete: animate_thinking("Thinking...", color="status") answer, reasoning = self.llm_request() diff --git a/sources/agents/code_agent.py b/sources/agents/code_agent.py index 01569bb..c454e63 100644 --- a/sources/agents/code_agent.py +++ b/sources/agents/code_agent.py @@ -11,8 +11,8 @@ class CoderAgent(Agent): """ The code agent is an agent that can write and execute code. """ - def __init__(self, model, name, prompt_path, provider): - super().__init__(model, name, prompt_path, provider) + def __init__(self, name, prompt_path, provider, verbose=False): + super().__init__(name, prompt_path, provider, verbose) self.tools = { "bash": BashInterpreter(), "python": PyInterpreter(), @@ -20,7 +20,8 @@ class CoderAgent(Agent): "go": GoInterpreter(), "file_finder": FileFinder() } - self.role = "Code Assistance" + self.role = "Coding task" + self.type = "code_agent" def process(self, prompt, speech_module) -> str: answer = "" diff --git a/sources/agents/file_agent.py b/sources/agents/file_agent.py index 6f49d77..e493605 100644 --- a/sources/agents/file_agent.py +++ b/sources/agents/file_agent.py @@ -5,35 +5,28 @@ from sources.tools.fileFinder import FileFinder from sources.tools.BashInterpreter import BashInterpreter class FileAgent(Agent): - def __init__(self, model, name, prompt_path, provider): + def __init__(self, name, prompt_path, provider, verbose=False): """ The file agent is a special agent for file operations. """ - super().__init__(model, name, prompt_path, provider) + super().__init__(name, prompt_path, provider, verbose) self.tools = { "file_finder": FileFinder(), "bash": BashInterpreter() } self.role = "find and read files" + self.type = "file_agent" def process(self, prompt, speech_module) -> str: - complete = False exec_success = False self.memory.push('user', prompt) self.wait_message(speech_module) - while not complete: - if exec_success: - complete = True - animate_thinking("Thinking...", color="status") - answer, reasoning = self.llm_request() - exec_success, _ = self.execute_modules(answer) - answer = self.remove_blocks(answer) - self.last_answer = answer - complete = True - for name, tool in self.tools.items(): - if tool.found_executable_blocks(): - complete = False # AI read results and continue the conversation + animate_thinking("Thinking...", color="status") + answer, reasoning = self.llm_request() + exec_success, _ = self.execute_modules(answer) + answer = self.remove_blocks(answer) + self.last_answer = answer return answer, reasoning if __name__ == "__main__": diff --git a/sources/agents/planner_agent.py b/sources/agents/planner_agent.py index 6b374ee..6bcca80 100644 --- a/sources/agents/planner_agent.py +++ b/sources/agents/planner_agent.py @@ -7,11 +7,11 @@ from sources.agents.browser_agent import BrowserAgent from sources.tools.tools import Tools class PlannerAgent(Agent): - def __init__(self, model, name, prompt_path, provider): + def __init__(self, name, prompt_path, provider, verbose=False): """ The planner agent is a special agent that divides and conquers the task. """ - super().__init__(model, name, prompt_path, provider) + super().__init__(name, prompt_path, provider, verbose) self.tools = { "json": Tools() } @@ -22,7 +22,7 @@ class PlannerAgent(Agent): "web": BrowserAgent(model, name, prompt_path, provider) } self.role = "Research, setup and code" - self.tag = "json" + self.type = "planner_agent" def parse_agent_tasks(self, text): tasks = [] diff --git a/sources/browser.py b/sources/browser.py index 80b8b4f..df43d4e 100644 --- a/sources/browser.py +++ b/sources/browser.py @@ -5,7 +5,9 @@ from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException +from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.options import Options +from typing import List, Tuple import chromedriver_autoinstaller import time import os @@ -26,6 +28,7 @@ class Browser: 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://www.google.com/', } + self.js_scripts_folder = "./sources/web_scripts/" if not __name__ == "__main__" else "./web_scripts/" self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related" try: chrome_options = Options() @@ -49,7 +52,6 @@ class Browser: "profile.default_content_setting_values.notifications": 2, # Block notifications "profile.default_content_setting_values.popups": 2, # Block pop-ups "profile.default_content_setting_values.geolocation": 2, # Block geolocation - "download_restrictions": 3, # Block all downloads "safebrowsing.enabled": True, # Enable safe browsing } chrome_options.add_experimental_option("prefs", security_prefs) @@ -70,9 +72,10 @@ class Browser: self.logger.info("Browser initialized successfully") except Exception as e: raise Exception(f"Failed to initialize browser: {str(e)}") + self.load_anticatpcha() @staticmethod - def get_chrome_path(): + def get_chrome_path() -> str: if sys.platform.startswith("win"): paths = [ "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", @@ -89,21 +92,25 @@ class Browser: if os.path.exists(path) and os.access(path, os.X_OK): # Check if executable return path return None + + def load_anticatpcha(self): + print("You might want to install the AntiCaptcha extension for captchas.") + self.driver.get(self.anticaptcha) - def go_to(self, url): + def go_to(self, url:str) -> bool: """Navigate to a specified URL.""" try: initial_handles = self.driver.window_handles self.driver.get(url) time.sleep(1) - self.apply_web_countermeasures() + self.apply_web_safety() self.logger.info(f"Navigated to: {url}") return True except WebDriverException as e: self.logger.error(f"Error navigating to {url}: {str(e)}") return False - def is_sentence(self, text): + def is_sentence(self, text:str) -> bool: """Check if the text qualifies as a meaningful sentence or contains important error codes.""" text = text.strip() @@ -116,7 +123,7 @@ class Browser: is_long_enough = word_count > 5 return (word_count >= 5 and (has_punctuation or is_long_enough)) - def get_text(self): + def get_text(self) -> str | None: """Get page text and convert it to README (Markdown) format.""" try: soup = BeautifulSoup(self.driver.page_source, 'html.parser') @@ -135,7 +142,7 @@ class Browser: self.logger.error(f"Error getting text: {str(e)}") return None - def clean_url(self, url): + def clean_url(self, url:str) -> str: """Clean URL to keep only the part needed for navigation to the page""" clean = url.split('#')[0] parts = clean.split('?', 1) @@ -152,7 +159,7 @@ class Browser: return f"{base_url}?{'&'.join(essential_params)}" return base_url - def is_link_valid(self, url): + def is_link_valid(self, url:str) -> bool: """Check if a URL is a valid link (page, not related to icon or metadata).""" if len(url) > 64: return False @@ -168,7 +175,7 @@ class Browser: return False return True - def get_navigable(self): + def get_navigable(self) -> [str]: """Get all navigable links on the current page.""" try: links = [] @@ -189,28 +196,144 @@ class Browser: self.logger.error(f"Error getting navigable links: {str(e)}") return [] - def click_element(self, xpath): - """Click an element specified by xpath.""" + def click_element(self, xpath: str) -> bool: + """Click an element specified by XPath.""" try: - element = self.wait.until( - EC.element_to_be_clickable((By.XPATH, xpath)) - ) - element.click() - time.sleep(2) # Wait for action to complete - return True + element = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath))) + if not element.is_displayed(): + return False + if not element.is_enabled(): + return False + try: + self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element) + time.sleep(0.1) + element.click() + return True + except ElementClickInterceptedException as e: + return False except TimeoutException: - self.logger.error(f"Element not found or not clickable: {xpath}") + return False + except Exception as e: + self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}") + return False + + def load_js(self, file_name: str) -> str: + path = os.path.join(self.js_scripts_folder, file_name) + try: + with open(path, 'r') as f: + return f.read() + except FileNotFoundError as e: + raise Exception(f"Could not find: {path}") from e + except Exception as e: + raise e + + def find_all_inputs(self, timeout=4): + WebDriverWait(self.driver, timeout).until( + EC.presence_of_element_located((By.TAG_NAME, "body")) + ) + time.sleep(0.5) + script = self.load_js("find_inputs.js") + input_elements = self.driver.execute_script(script) + return input_elements + + def get_form_inputs(self) -> List[str]: + """Extract all input from the page and return them.""" + try: + #input_elements = self.driver.find_elements(By.TAG_NAME, "input") + input_elements = self.find_all_inputs() + if not input_elements: + return ["No input forms found on the page."] + + form_strings = [] + for element in input_elements: + input_type = element["type"] or "text" + if input_type in ["hidden", "submit", "button", "image"] or not element["displayed"]: + continue + input_name = element["text"] or element["id"] or input_type + if input_type == "checkbox" or input_type == "radio": + checked_status = "checked" if element.is_selected() else "unchecked" + form_strings.append(f"[{input_name}]({checked_status})") + else: + form_strings.append(f"[{input_name}]("")") + return form_strings + + except Exception as e: + self.logger.error(f"Error extracting form inputs: {str(e)}") + return [f"Error extracting form inputs."] + + def get_buttons_xpath(self) -> List[str]: + """ + Find buttons and return their type and xpath. + """ + buttons = self.driver.find_elements(By.TAG_NAME, "button") + \ + self.driver.find_elements(By.XPATH, "//input[@type='submit']") + result = [] + for i, button in enumerate(buttons): + if not button.is_displayed() or not button.is_enabled(): + continue + text = (button.text or button.get_attribute("value") or "").lower().replace(' ', '') + xpath = f"(//button | //input[@type='submit'])[{i + 1}]" + if "login" in text or "sign" in text or "register": + result.append((text, xpath)) + result.sort(key=lambda x: len(x[0])) + return result + + def find_and_click_submit(self, btn_type:str = 'login') -> None: + buttons = self.get_buttons_xpath() + if len(buttons) == 0: + self.logger.warning(f"No visible buttons found") + for button in buttons: + if button[0] == btn_type: + self.click_element(button[1]) + + def find_input_xpath_by_name(self, inputs, name: str) -> str | None: + for field in inputs: + if name in field["text"]: + return field["xpath"] + return None + + def fill_form_inputs(self, input_list:[str]) -> bool: + """Fill form inputs based on a list of [name](value) strings.""" + inputs = self.find_all_inputs() + try: + for input_str in input_list: + match = re.match(r'\[(.*?)\]\((.*?)\)', input_str) + if not match: + self.logger.warning(f"Invalid format for input: {input_str}") + continue + + name, value = match.groups() + name = name.strip() + value = value.strip() + xpath = self.find_input_xpath_by_name(inputs, name) + if not xpath: + continue + element = self.driver.find_element(By.XPATH, xpath) + input_type = (element.get_attribute("type") or "text").lower() + if input_type in ["checkbox", "radio"]: + is_checked = element.is_selected() + should_be_checked = value.lower() == "checked" + + if is_checked != should_be_checked: + element.click() + self.logger.info(f"Set {name} to {value}") + else: + element.send_keys(value) + self.logger.info(f"Filled {name} with {value}") + return True + except Exception as e: + self.logger.error(f"Error filling form inputs: {str(e)}") return False - def get_current_url(self): + def get_current_url(self) -> str: """Get the current URL of the page.""" return self.driver.current_url - def get_page_title(self): + def get_page_title(self) -> str: """Get the title of the current page.""" return self.driver.title - def scroll_bottom(self): + def scroll_bottom(self) -> bool: """Scroll to the bottom of the page.""" try: self.driver.execute_script( @@ -222,7 +345,7 @@ class Browser: self.logger.error(f"Error scrolling: {str(e)}") return False - def screenshot(self, filename): + def screenshot(self, filename:str) -> bool: """Take a screenshot of the current page.""" try: self.driver.save_screenshot(filename) @@ -232,129 +355,12 @@ class Browser: self.logger.error(f"Error taking screenshot: {str(e)}") return False -####################### -# WEB SECURITY # -####################### - - def apply_web_countermeasures(self): + def apply_web_safety(self): """ - Apply security measures to block any website malicious execution, privacy violation etc.. + Apply security measures to block any website malicious/annoying execution, privacy violation etc.. """ - self.inject_safety_script() - self.neutralize_event_listeners() - self.monitor_and_reset_css() - self.block_clipboard_access() - self.limit_intervals_and_timeouts() - self.block_external_requests() - self.monitor_and_close_popups() - - def inject_safety_script(self): - script = """ - // Block hardware access by removing or disabling APIs - Object.defineProperty(navigator, 'serial', { get: () => undefined }); - Object.defineProperty(navigator, 'hid', { get: () => undefined }); - Object.defineProperty(navigator, 'bluetooth', { get: () => undefined }); - // Block media playback - HTMLMediaElement.prototype.play = function() { - this.pause(); // Immediately pause if play is called - return Promise.reject('Blocked by script'); - }; - // Block fullscreen requests - Element.prototype.requestFullscreen = function() { - console.log('Blocked fullscreen request'); - return Promise.reject('Blocked by script'); - }; - // Block pointer lock - Element.prototype.requestPointerLock = function() { - console.log('Blocked pointer lock'); - }; - // Block iframe creation (optional, since browser already blocks these) - const originalCreateElement = document.createElement; - document.createElement = function(tagName) { - if (tagName.toLowerCase() === 'iframe') { - console.log('Blocked iframe creation'); - return null; - } - return originalCreateElement.apply(this, arguments); - }; - // Block annoying dialogs - window.alert = function() {}; - window.confirm = function() { return false; }; - window.prompt = function() { return null; }; - """ - self.driver.execute_script(script) - - def neutralize_event_listeners(self): - script = """ - const originalAddEventListener = EventTarget.prototype.addEventListener; - EventTarget.prototype.addEventListener = function(type, listener, options) { - if (['mousedown', 'mouseup', 'click', 'touchstart', 'keydown', 'keyup', 'keypress'].includes(type)) { - console.log(`Blocked adding listener for ${type}`); - return; - } - originalAddEventListener.apply(this, arguments); - }; - """ - self.driver.execute_script(script) - - def monitor_and_reset_css(self): - script = """ - const observer = new MutationObserver((mutations) => { - mutations.forEach((mutation) => { - if (mutation.type === 'attributes' && mutation.attributeName === 'style') { - const html = document.querySelector('html'); - if (html.style.cursor === 'none') { - html.style.cursor = 'auto'; - } - } - }); - }); - observer.observe(document.querySelector('html'), { attributes: true }); - """ - self.driver.execute_script(script) - - def block_clipboard_access(self): - script = """ - navigator.clipboard.readText = function() { - console.log('Blocked clipboard read'); - return Promise.reject('Blocked'); - }; - navigator.clipboard.writeText = function() { - console.log('Blocked clipboard write'); - return Promise.resolve(); - }; - """ - self.driver.execute_script(script) - - def limit_intervals_and_timeouts(self): - script = """ - const originalSetInterval = window.setInterval; - window.setInterval = function(callback, delay) { - if (typeof callback === 'function' && callback.toString().includes('alert')) { - console.log('Blocked suspicious interval'); - return; - } - return originalSetInterval.apply(this, arguments); - }; - """ - self.driver.execute_script(script) - - def monitor_and_close_popups(self): - initial_handles = self.driver.window_handles - for handle in self.driver.window_handles: - if handle not in initial_handles: - self.driver.switch_to.window(handle) - self.driver.close() - self.driver.switch_to.window(self.driver.window_handles[0]) - - def block_external_requests(self): - script = """ - window.fetch = function() { - console.log('Blocked fetch request'); - return Promise.reject('Blocked'); - }; - """ - self.driver.execute_script(script) + script = self.load_js("inject_safety_script.js") + input_elements = self.driver.execute_script(script) def close(self): """Close the browser.""" @@ -372,18 +378,19 @@ if __name__ == "__main__": logging.basicConfig(level=logging.INFO) browser = Browser(headless=False) + time.sleep(8) try: - # stress test - browser.go_to("https://www.bbc.com/news") - text = browser.get_text() - print("Page Text in Markdown:") - print(text) - links = browser.get_navigable() - print("\nNavigable Links:", links) - print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s") - time.sleep(20) + print("AntiCaptcha Test") + browser.go_to("https://www.google.com/recaptcha/api2/demo") + time.sleep(5) + print("Form Test:") + browser.go_to("https://practicetestautomation.com/practice-test-login/") + inputs = browser.get_form_inputs() + inputs = ['[username](student)', f'[password](Password123)', '[appOtp]()', '[backupOtp]()'] + browser.fill_form_inputs(inputs) + browser.find_and_click_submit() + print("Stress test") browser.go_to("https://theannoyingsite.com/") - time.sleep(15) finally: browser.close() diff --git a/sources/interaction.py b/sources/interaction.py index 9c17f2a..9bd9d47 100644 --- a/sources/interaction.py +++ b/sources/interaction.py @@ -35,7 +35,7 @@ class Interaction: """Find the name of the default AI. It is required for STT as a trigger word.""" ai_name = "jarvis" for agent in self.agents: - if agent.role == "talking": + if agent.type == "casual_agent": ai_name = agent.agent_name break return ai_name @@ -43,12 +43,12 @@ class Interaction: def recover_last_session(self): """Recover the last session.""" for agent in self.agents: - agent.memory.load_memory() + agent.memory.load_memory(agent.type) def save_session(self): """Save the current session.""" for agent in self.agents: - agent.memory.save_memory() + agent.memory.save_memory(agent.type) def is_active(self) -> bool: return self.is_active diff --git a/sources/memory.py b/sources/memory.py index a0c29d0..893a2f2 100644 --- a/sources/memory.py +++ b/sources/memory.py @@ -20,7 +20,7 @@ class Memory(): recover_last_session: bool = False, memory_compression: bool = True): self.memory = [] - self.memory = [{'role': 'user', 'content': system_prompt}] + self.memory = [{'role': 'system', 'content': system_prompt}] self.session_time = datetime.datetime.now() self.session_id = str(uuid.uuid4()) @@ -38,20 +38,23 @@ class Memory(): def get_filename(self) -> str: return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt" - def save_memory(self) -> None: + def save_memory(self, agent_type: str = "casual_agent") -> None: """Save the session memory to a file.""" if not os.path.exists(self.conversation_folder): os.makedirs(self.conversation_folder) + save_path = os.path.join(self.conversation_folder, agent_type) + if not os.path.exists(save_path): + os.makedirs(save_path) filename = self.get_filename() - path = os.path.join(self.conversation_folder, filename) + path = os.path.join(save_path, filename) json_memory = json.dumps(self.memory) with open(path, 'w') as f: f.write(json_memory) - def find_last_session_path(self) -> str: + def find_last_session_path(self, path) -> str: """Find the last session path.""" saved_sessions = [] - for filename in os.listdir(self.conversation_folder): + for filename in os.listdir(path): if filename.startswith('memory_'): date = filename.split('_')[1] saved_sessions.append((filename, date)) @@ -60,14 +63,15 @@ class Memory(): return saved_sessions[0][0] return None - def load_memory(self) -> None: + def load_memory(self, agent_type: str = "casual_agent") -> None: """Load the memory from the last session.""" - if not os.path.exists(self.conversation_folder): + save_path = os.path.join(self.conversation_folder, agent_type) + if not os.path.exists(save_path): return - filename = self.find_last_session_path() + filename = self.find_last_session_path(save_path) if filename is None: return - path = os.path.join(self.conversation_folder, filename) + path = os.path.join(save_path, filename) with open(path, 'r') as f: self.memory = json.load(f) @@ -76,10 +80,10 @@ class Memory(): def push(self, role: str, content: str) -> None: """Push a message to the memory.""" - self.memory.append({'role': role, 'content': content}) - # EXPERIMENTAL if self.memory_compression and role == 'assistant': self.compress() + # we don't compress the last message + self.memory.append({'role': role, 'content': content}) def clear(self) -> None: self.memory = [] @@ -129,9 +133,9 @@ class Memory(): if not self.memory_compression: return for i in range(len(self.memory)): - if i <= 2: + if i < 3: continue - if self.memory[i]['role'] == 'assistant': + if len(self.memory[i]['content']) > 1024: self.memory[i]['content'] = self.summarize(self.memory[i]['content']) if __name__ == "__main__": diff --git a/sources/tools/PyInterpreter.py b/sources/tools/PyInterpreter.py index 2c250c9..3e2c59f 100644 --- a/sources/tools/PyInterpreter.py +++ b/sources/tools/PyInterpreter.py @@ -35,6 +35,7 @@ class PyInterpreter(Tools): try: try: buffer = exec(code, global_vars) + print(buffer) if buffer is not None: output = buffer + '\n' except Exception as e: diff --git a/sources/tools/searxSearch.py b/sources/tools/searxSearch.py index 96483dc..e0df8e0 100644 --- a/sources/tools/searxSearch.py +++ b/sources/tools/searxSearch.py @@ -75,7 +75,7 @@ class searxSearch(Tools): 'Upgrade-Insecure-Requests': '1', 'User-Agent': self.user_agent } - data = f"q={query}&categories=general&language=auto&time_range=&safesearch=0&theme=simple" + data = f"q={query}&categories=general&language=auto&time_range=&safesearch=0&theme=simple".encode('utf-8') try: response = requests.post(search_url, headers=headers, data=data, verify=False) response.raise_for_status() diff --git a/sources/web_scripts/find_inputs.js b/sources/web_scripts/find_inputs.js new file mode 100644 index 0000000..80c4b14 --- /dev/null +++ b/sources/web_scripts/find_inputs.js @@ -0,0 +1,49 @@ +function findInputs(element, result = []) { + // Find all elements in the current DOM tree + const inputs = element.querySelectorAll('input'); + inputs.forEach(input => { + result.push({ + tagName: input.tagName, + text: input.name || '', + type: input.type || '', + class: input.className || '', + xpath: getXPath(input), + displayed: isElementDisplayed(input) + }); + }); + const allElements = element.querySelectorAll('*'); + allElements.forEach(el => { + if (el.shadowRoot) { + findInputs(el.shadowRoot, result); + } + }); + return result; +} +// function to get the XPath of an element +function getXPath(element) { + if (!element) return ''; + if (element.id !== '') return '//*[@id="' + element.id + '"]'; + if (element === document.body) return '/html/body'; + + let ix = 0; + const siblings = element.parentNode ? element.parentNode.childNodes : []; + for (let i = 0; i < siblings.length; i++) { + const sibling = siblings[i]; + if (sibling === element) { + return getXPath(element.parentNode) + '/' + element.tagName.toLowerCase() + '[' + (ix + 1) + ']'; + } + if (sibling.nodeType === 1 && sibling.tagName === element.tagName) { + ix++; + } + } + return ''; +} +return findInputs(document.body); + +function isElementDisplayed(element) { + const style = window.getComputedStyle(element); + if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { + return false; + } + return true; +} \ No newline at end of file diff --git a/sources/web_scripts/inject_safety_script.js b/sources/web_scripts/inject_safety_script.js new file mode 100644 index 0000000..cde4112 --- /dev/null +++ b/sources/web_scripts/inject_safety_script.js @@ -0,0 +1,36 @@ +// Block hardware access by removing or disabling APIs +Object.defineProperty(navigator, 'serial', { get: () => undefined }); +Object.defineProperty(navigator, 'hid', { get: () => undefined }); +Object.defineProperty(navigator, 'bluetooth', { get: () => undefined }); +// Block media playback +HTMLMediaElement.prototype.play = function() { + this.pause(); // Immediately pause if play is called + return Promise.reject('Blocked by script'); +}; +// Block fullscreen requests +Element.prototype.requestFullscreen = function() { + console.log('Blocked fullscreen request'); + return Promise.reject('Blocked by script'); +}; +// Block pointer lock +Element.prototype.requestPointerLock = function() { + console.log('Blocked pointer lock'); +}; +// Block iframe creation (optional, since browser already blocks these) +const originalCreateElement = document.createElement; +document.createElement = function(tagName) { + if (tagName.toLowerCase() === 'iframe') { + console.log('Blocked iframe creation'); + return null; + } + return originalCreateElement.apply(this, arguments); +}; +//block fetch +window.fetch = function() { + console.log('Blocked fetch request'); + return Promise.reject('Blocked'); +}; +// Block annoying dialogs +window.alert = function() {}; +window.confirm = function() { return false; }; +window.prompt = function() { return null; }; \ No newline at end of file