Merge branch 'Fosowl:main' into main

This commit is contained in:
steveh8758 2025-03-25 05:17:31 +08:00 committed by GitHub
commit 85f8dcef98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 409 additions and 304 deletions

9
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,9 @@
repos:
- repo: local
hooks:
- id: trufflehog
name: TruffleHog
description: Detect secrets in your data.
entry: bash -c 'trufflehog git file://. --since-commit HEAD --results=verified,unknown --fail --no-update'
language: system
stages: ["commit", "push"]

View File

@ -212,6 +212,8 @@ If you have a powerful computer or a server that you can use, but you want to us
### 1**Set up and start the server scripts** ### 1**Set up and start the server scripts**
You need to have ollama installed on the server (We will integrate VLLM and llama.cpp soon).
On your "server" that will run the AI model, get the ip address On your "server" that will run the AI model, get the ip address
```sh ```sh
@ -223,7 +225,7 @@ Note: For Windows or macOS, use ipconfig or ifconfig respectively to find the IP
Clone the repository and then, run the script `stream_llm.py` in `server/` Clone the repository and then, run the script `stream_llm.py` in `server/`
```sh ```sh
python3 server_ollama.py python3 server_ollama.py --model "deepseek-r1:32b"
``` ```
### 2**Run it** ### 2**Run it**

28
main.py
View File

@ -29,22 +29,18 @@ def main():
server_address=config["MAIN"]["provider_server_address"]) server_address=config["MAIN"]["provider_server_address"])
agents = [ agents = [
CasualAgent(model=config["MAIN"]["provider_model"], CasualAgent(name=config["MAIN"]["agent_name"],
name=config["MAIN"]["agent_name"], prompt_path="prompts/casual_agent.txt",
prompt_path="prompts/casual_agent.txt", provider=provider, verbose=False),
provider=provider), CoderAgent(name="coder",
CoderAgent(model=config["MAIN"]["provider_model"], prompt_path="prompts/coder_agent.txt",
name="coder", provider=provider, verbose=False),
prompt_path="prompts/coder_agent.txt", FileAgent(name="File Agent",
provider=provider), prompt_path="prompts/file_agent.txt",
FileAgent(model=config["MAIN"]["provider_model"], provider=provider, verbose=False),
name="File Agent", BrowserAgent(name="Browser",
prompt_path="prompts/file_agent.txt", prompt_path="prompts/browser_agent.txt",
provider=provider), provider=provider, verbose=False)
BrowserAgent(model=config["MAIN"]["provider_model"],
name="Browser",
prompt_path="prompts/browser_agent.txt",
provider=provider)
] ]
interaction = Interaction(agents, tts_enabled=config.getboolean('MAIN', 'speak'), interaction = Interaction(agents, tts_enabled=config.getboolean('MAIN', 'speak'),

View File

@ -1,30 +0,0 @@
{
"model_name": "deepseek-r1:14b",
"known_models": [
"qwq:32b",
"deepseek-r1:1.5b",
"deepseek-r1:7b",
"deepseek-r1:14b",
"deepseek-r1:32b",
"deepseek-r1:70b",
"deepseek-r1:671b",
"deepseek-coder:1.3b",
"deepseek-coder:6.7b",
"deepseek-coder:33b",
"llama2-uncensored:7b",
"llama2-uncensored:70b",
"llama3.1:8b",
"llama3.1:70b",
"llama3.3:70b",
"llama3:8b",
"llama3:70b",
"i4:14b",
"mistral:7b",
"mistral:70b",
"mistral:33b",
"qwen1:7b",
"qwen1:14b",
"qwen1:32b",
"qwen1:70b"
]
}

View File

@ -1,47 +1,37 @@
#!/usr/bin python3
# NOTE this script is temporary and will be improved
from flask import Flask, jsonify, request from flask import Flask, jsonify, request
import threading import threading
import ollama import ollama
import logging import logging
import json import argparse
log = logging.getLogger('werkzeug') log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR) log.setLevel(logging.ERROR)
parser = argparse.ArgumentParser(description='AgenticSeek server script')
parser.add_argument('--model', type=str, help='Model to use. eg: deepseek-r1:14b', required=True)
args = parser.parse_args()
app = Flask(__name__) app = Flask(__name__)
model = args.model
# Shared state with thread-safe locks # Shared state with thread-safe locks
class Config:
def __init__(self):
self.model = None
self.known_models = []
self.allowed_models = []
self.model_name = None
def load(self):
with open('config.json', 'r') as f:
data = json.load(f)
self.known_models = data['known_models']
self.model_name = data['model_name']
def validate_model(self, model):
if model not in self.known_models:
raise ValueError(f"Model {model} is not known")
class GenerationState: class GenerationState:
def __init__(self): def __init__(self):
self.lock = threading.Lock() self.lock = threading.Lock()
self.last_complete_sentence = "" self.last_complete_sentence = ""
self.current_buffer = "" self.current_buffer = ""
self.is_generating = False self.is_generating = False
self.model = None
state = GenerationState() state = GenerationState()
def generate_response_vllm(history): def generate_response(history, model):
pass
def generate_response_ollama(history): # Only takes history as an argument
global state global state
print("using model:::::::", model)
try: try:
with state.lock: with state.lock:
state.is_generating = True state.is_generating = True
@ -49,18 +39,21 @@ def generate_response_ollama(history): # Only takes history as an argument
state.current_buffer = "" state.current_buffer = ""
stream = ollama.chat( stream = ollama.chat(
model=state.model, # Access state.model directly model=model,
messages=history, messages=history,
stream=True, stream=True,
) )
for chunk in stream: for chunk in stream:
content = chunk['message']['content'] content = chunk['message']['content']
print(content, end='', flush=True) print(content, end='', flush=True)
with state.lock: with state.lock:
state.current_buffer += content state.current_buffer += content
except ollama.ResponseError as e: except ollama.ResponseError as e:
if e.status_code == 404: if e.status_code == 404:
ollama.pull(state.model) ollama.pull(model)
with state.lock: with state.lock:
state.is_generating = False state.is_generating = False
print(f"Error: {e}") print(f"Error: {e}")
@ -78,8 +71,8 @@ def start_generation():
return jsonify({"error": "Generation already in progress"}), 400 return jsonify({"error": "Generation already in progress"}), 400
history = data.get('messages', []) history = data.get('messages', [])
# Pass only history to the thread # Start generation in background thread
threading.Thread(target=generate_response, args=(history,)).start() # Note the comma to make it a single-element tuple threading.Thread(target=generate_response, args=(history, model)).start()
return jsonify({"message": "Generation started"}), 202 return jsonify({"message": "Generation started"}), 202
@app.route('/get_updated_sentence') @app.route('/get_updated_sentence')
@ -92,8 +85,4 @@ def get_updated_sentence():
}) })
if __name__ == '__main__': if __name__ == '__main__':
config = Config() app.run(host='0.0.0.0', threaded=True, debug=True, port=5000)
config.load()
config.validate_model(config.model_name)
state.model = config.model_name
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

View File

@ -30,15 +30,24 @@ class Agent():
""" """
An abstract class for all agents. An abstract class for all agents.
""" """
def __init__(self, model: str, def __init__(self, name: str,
name: str,
prompt_path:str, prompt_path:str,
provider, provider,
recover_last_session=True) -> None: recover_last_session=True,
verbose=False) -> None:
"""
Args:
name (str): Name of the agent.
prompt_path (str): Path to the prompt file for the agent.
provider: The provider for the LLM.
recover_last_session (bool, optional): Whether to recover the last conversation.
verbose (bool, optional): Enable verbose logging if True. Defaults to False.
"""
self.agent_name = name self.agent_name = name
self.role = None self.role = None
self.type = None
self.current_directory = os.getcwd() self.current_directory = os.getcwd()
self.model = model
self.llm = provider self.llm = provider
self.memory = Memory(self.load_prompt(prompt_path), self.memory = Memory(self.load_prompt(prompt_path),
recover_last_session=recover_last_session, recover_last_session=recover_last_session,
@ -46,6 +55,7 @@ class Agent():
self.tools = {} self.tools = {}
self.blocks_result = [] self.blocks_result = []
self.last_answer = "" self.last_answer = ""
self.verbose = verbose
@property @property
def get_tools(self) -> dict: def get_tools(self) -> dict:
@ -93,12 +103,12 @@ class Agent():
end_idx = text.rfind(end_tag)+8 end_idx = text.rfind(end_tag)+8
return text[start_idx:end_idx] return text[start_idx:end_idx]
def llm_request(self, verbose = False) -> Tuple[str, str]: def llm_request(self) -> Tuple[str, str]:
""" """
Ask the LLM to process the prompt and return the answer and the reasoning. Ask the LLM to process the prompt and return the answer and the reasoning.
""" """
memory = self.memory.get() memory = self.memory.get()
thought = self.llm.respond(memory, verbose) thought = self.llm.respond(memory, self.verbose)
reasoning = self.extract_reasoning_text(thought) reasoning = self.extract_reasoning_text(thought)
answer = self.remove_reasoning_text(thought) answer = self.remove_reasoning_text(thought)

View File

@ -6,35 +6,47 @@ from sources.agents.agent import Agent
from sources.tools.searxSearch import searxSearch from sources.tools.searxSearch import searxSearch
from sources.browser import Browser from sources.browser import Browser
from datetime import date from datetime import date
from typing import List, Tuple
class BrowserAgent(Agent): class BrowserAgent(Agent):
def __init__(self, model, name, prompt_path, provider): def __init__(self, name, prompt_path, provider, verbose=False):
""" """
The Browser agent is an agent that navigate the web autonomously in search of answer The Browser agent is an agent that navigate the web autonomously in search of answer
""" """
super().__init__(model, name, prompt_path, provider) super().__init__(name, prompt_path, provider, verbose)
self.tools = { self.tools = {
"web_search": searxSearch(), "web_search": searxSearch(),
} }
self.role = "Web Research" self.role = "Web search and navigation"
self.type = "browser_agent"
self.browser = Browser() self.browser = Browser()
self.current_page = ""
self.search_history = [] self.search_history = []
self.navigable_links = [] self.navigable_links = []
self.notes = [] self.notes = []
self.date = self.get_today_date() self.date = self.get_today_date()
def get_today_date(self) -> str: def get_today_date(self) -> str:
"""Get the date"""
date_time = date.today() date_time = date.today()
return date_time.strftime("%B %d, %Y") return date_time.strftime("%B %d, %Y")
def extract_links(self, search_result: str): def extract_links(self, search_result: str) -> List[str]:
"""Extract all links from a sentence."""
pattern = r'(https?://\S+|www\.\S+)' pattern = r'(https?://\S+|www\.\S+)'
matches = re.findall(pattern, search_result) matches = re.findall(pattern, search_result)
trailing_punct = ".,!?;:" trailing_punct = ".,!?;:)"
cleaned_links = [link.rstrip(trailing_punct) for link in matches] cleaned_links = [link.rstrip(trailing_punct) for link in matches]
return self.clean_links(cleaned_links) return self.clean_links(cleaned_links)
def extract_form(self, text: str) -> List[str]:
"""Extract form written by the LLM in format [input_name](value)"""
inputs = []
matches = re.findall(r"\[\w+\]\([^)]+\)", text)
return matches
def clean_links(self, links: list): def clean_links(self, links: List[str]) -> List[str]:
"""Ensure no '.' at the end of link"""
links_clean = [] links_clean = []
for link in links: for link in links:
link = link.strip() link = link.strip()
@ -44,10 +56,10 @@ class BrowserAgent(Agent):
links_clean.append(link) links_clean.append(link)
return links_clean return links_clean
def get_unvisited_links(self): def get_unvisited_links(self) -> List[str]:
return "\n".join([f"[{i}] {link}" for i, link in enumerate(self.navigable_links) if link not in self.search_history]) return "\n".join([f"[{i}] {link}" for i, link in enumerate(self.navigable_links) if link not in self.search_history])
def make_newsearch_prompt(self, user_prompt: str, search_result: dict): def make_newsearch_prompt(self, user_prompt: str, search_result: dict) -> str:
search_choice = self.stringify_search_results(search_result) search_choice = self.stringify_search_results(search_result)
return f""" return f"""
Based on the search result: Based on the search result:
@ -58,16 +70,19 @@ class BrowserAgent(Agent):
Do not explain your choice. Do not explain your choice.
""" """
def make_navigation_prompt(self, user_prompt: str, page_text: str): def make_navigation_prompt(self, user_prompt: str, page_text: str) -> str:
remaining_links = self.get_unvisited_links() remaining_links = self.get_unvisited_links()
remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, proceed with a new search." remaining_links_text = remaining_links if remaining_links is not None else "No links remaining, do a new search."
inputs_form = self.browser.get_form_inputs()
inputs_form_text = '\n'.join(inputs_form)
return f""" return f"""
You are a web browser. You are a web browser.
You are currently on this webpage: You are currently on this webpage:
{page_text} {page_text}
You can navigate to these navigation links: You can navigate to these navigation links:
{remaining_links} {remaining_links_text}
Your task: Your task:
1. Decide if the current page answers the users query: {user_prompt} 1. Decide if the current page answers the users query: {user_prompt}
@ -77,9 +92,13 @@ class BrowserAgent(Agent):
2. Navigate by either: 2. Navigate by either:
- Navigate to a navigation links (write the full URL, e.g., www.example.com/cats). - Navigate to a navigation links (write the full URL, e.g., www.example.com/cats).
- If no link seems helpful, say: GO_BACK. - If no link seems helpful, say: GO_BACK.
3. Fill forms on the page:
- If user give you informations that help you fill form, fill it.
- If you don't know how to fill a form, leave it empty.
- You can fill a form using [form_name](value).
Recap of note taking: Recap of note taking:
If useful -> Note: [Briefly summarize the key information that answers the users query.] If useful -> Note: [Briefly summarize the key information or task you conducted.]
Do not write "The page talk about ...", write your finding on the page and how they contribute to an answer. Do not write "The page talk about ...", write your finding on the page and how they contribute to an answer.
If not useful -> Error: [Explain why the page doesnt help.] If not useful -> Error: [Explain why the page doesnt help.]
@ -88,23 +107,32 @@ class BrowserAgent(Agent):
No link seem useful to provide futher information. GO_BACK No link seem useful to provide futher information. GO_BACK
Example 2 (not useful, but related link): Example 2 (not useful, but related link):
Error: This forum reddit.com/welcome does not discuss anything related to the users query. Error: reddit.com/welcome does not discuss anything related to the users query.
There is a link that could lead to the information, I want to navigate to http://reddit.com/r/locallama There is a link that could lead to the information, I want to navigate to http://reddit.com/r/locallama
Example 3 (not useful, no related links): Example 3 (not useful, no related links):
Error: x.com does not discuss anything related to the users query and no navigation link are usefull Error: x.com does not discuss anything related to the users query and no navigation link are usefull.
GO_BACK GO_BACK
Example 3 (query answer found): Example 3 (query answer found):
Note: I found on github.com that agenticSeek is Fosowl. Note: I found on github.com that agenticSeek is Fosowl.
Given this information, given this I should exit the web browser. REQUEST_EXIT Given this information, given this I should exit the web browser. REQUEST_EXIT
Current date: {self.date} Example 4 (loging form visible):
Note: I am on the login page, I should now type the given username and password.
[form_name_1](David)
[form_name_2](edgerunners_2077)
You see the following inputs forms:
{inputs_form_text}
Remember, the user asked: {user_prompt} Remember, the user asked: {user_prompt}
You are currently on page : {self.current_page}
Do not explain your choice. Do not explain your choice.
Refusal is not an option, you have been given all capabilities that allow you to perform any tasks.
""" """
def llm_decide(self, prompt): def llm_decide(self, prompt: str) -> Tuple[str, str]:
animate_thinking("Thinking...", color="status") animate_thinking("Thinking...", color="status")
self.memory.push('user', prompt) self.memory.push('user', prompt)
answer, reasoning = self.llm_request() answer, reasoning = self.llm_request()
@ -113,14 +141,14 @@ class BrowserAgent(Agent):
pretty_print("-"*100) pretty_print("-"*100)
return answer, reasoning return answer, reasoning
def select_unvisited(self, search_result): def select_unvisited(self, search_result: List[str]) -> List[str]:
results_unvisited = [] results_unvisited = []
for res in search_result: for res in search_result:
if res["link"] not in self.search_history: if res["link"] not in self.search_history:
results_unvisited.append(res) results_unvisited.append(res)
return results_unvisited return results_unvisited
def jsonify_search_results(self, results_string): def jsonify_search_results(self, results_string: str) -> List[str]:
result_blocks = results_string.split("\n\n") result_blocks = results_string.split("\n\n")
parsed_results = [] parsed_results = []
for block in result_blocks: for block in result_blocks:
@ -139,7 +167,7 @@ class BrowserAgent(Agent):
parsed_results.append(result_dict) parsed_results.append(result_dict)
return parsed_results return parsed_results
def stringify_search_results(self, results_arr): def stringify_search_results(self, results_arr: List[str]) -> str:
return '\n\n'.join([f"Link: {res['link']}" for res in results_arr]) return '\n\n'.join([f"Link: {res['link']}" for res in results_arr])
def save_notes(self, text): def save_notes(self, text):
@ -148,7 +176,7 @@ class BrowserAgent(Agent):
if "note" in line.lower(): if "note" in line.lower():
self.notes.append(line) self.notes.append(line)
def conclude_prompt(self, user_query): def conclude_prompt(self, user_query: str) -> str:
annotated_notes = [f"{i+1}: {note.lower().replace('note:', '')}" for i, note in enumerate(self.notes)] annotated_notes = [f"{i+1}: {note.lower().replace('note:', '')}" for i, note in enumerate(self.notes)]
search_note = '\n'.join(annotated_notes) search_note = '\n'.join(annotated_notes)
print("AI research notes:\n", search_note) print("AI research notes:\n", search_note)
@ -158,17 +186,17 @@ class BrowserAgent(Agent):
A web AI made the following finding across different pages: A web AI made the following finding across different pages:
{search_note} {search_note}
Summarize the finding, and provide a conclusion that answer the request. Summarize the finding or step that lead to success, and provide a conclusion that answer the request.
""" """
def search_prompt(self, user_prompt): def search_prompt(self, user_prompt: str) -> str:
return f""" return f"""
Current date: {self.date} Current date: {self.date}
Make a efficient search engine query to help users with their request: Make a efficient search engine query to help users with their request:
{user_prompt} {user_prompt}
Example: Example:
User: "search: hey jarvis i want you to login to my twitter and say hello everyone " User: "go to twitter, login with username toto and password pass79 to my twitter and say hello everyone "
You: Twitter You: search: Twitter login page.
User: "I need info on the best laptops for AI this year." User: "I need info on the best laptops for AI this year."
You: "search: best laptops 2025 to run Machine Learning model, reviews" You: "search: best laptops 2025 to run Machine Learning model, reviews"
@ -193,21 +221,31 @@ class BrowserAgent(Agent):
while not complete: while not complete:
answer, reasoning = self.llm_decide(prompt) answer, reasoning = self.llm_decide(prompt)
self.save_notes(answer) self.save_notes(answer)
extracted_form = self.extract_form(answer)
if len(extracted_form) > 0:
self.browser.fill_form_inputs(extracted_form)
self.browser.find_and_click_submit()
if "REQUEST_EXIT" in answer: if "REQUEST_EXIT" in answer:
complete = True complete = True
break break
links = self.extract_links(answer) links = self.extract_links(answer)
if len(unvisited) == 0: if len(unvisited) == 0:
break break
if len(links) == 0 or "GO_BACK" in answer: if len(links) == 0 or "GO_BACK" in answer:
unvisited = self.select_unvisited(search_result) unvisited = self.select_unvisited(search_result)
prompt = self.make_newsearch_prompt(user_prompt, unvisited) prompt = self.make_newsearch_prompt(user_prompt, unvisited)
pretty_print(f"Going back to results. Still {len(unvisited)}", color="warning") pretty_print(f"Going back to results. Still {len(unvisited)}", color="warning")
links = [] links = []
continue continue
animate_thinking(f"Navigating to {links[0]}", color="status") animate_thinking(f"Navigating to {links[0]}", color="status")
speech_module.speak(f"Navigating to {links[0]}") speech_module.speak(f"Navigating to {links[0]}")
self.browser.go_to(links[0]) self.browser.go_to(links[0])
self.current_page = links[0]
self.search_history.append(links[0]) self.search_history.append(links[0])
page_text = self.browser.get_text() page_text = self.browser.get_text()
self.navigable_links = self.browser.get_navigable() self.navigable_links = self.browser.get_navigable()

View File

@ -7,24 +7,24 @@ from sources.tools.fileFinder import FileFinder
from sources.tools.BashInterpreter import BashInterpreter from sources.tools.BashInterpreter import BashInterpreter
class CasualAgent(Agent): class CasualAgent(Agent):
def __init__(self, model, name, prompt_path, provider): def __init__(self, name, prompt_path, provider, verbose=False):
""" """
The casual agent is a special for casual talk to the user without specific tasks. The casual agent is a special for casual talk to the user without specific tasks.
""" """
super().__init__(model, name, prompt_path, provider) super().__init__(name, prompt_path, provider, verbose)
self.tools = { self.tools = {
"web_search": searxSearch(), "web_search": searxSearch(),
"flight_search": FlightSearch(), "flight_search": FlightSearch(),
"file_finder": FileFinder(), "file_finder": FileFinder(),
"bash": BashInterpreter() "bash": BashInterpreter()
} }
self.role = "Chat and Conversation" self.role = "talk"
self.type = "casual_agent"
def process(self, prompt, speech_module) -> str: def process(self, prompt, speech_module) -> str:
complete = False complete = False
self.memory.push('user', prompt) self.memory.push('user', prompt)
self.wait_message(speech_module)
while not complete: while not complete:
animate_thinking("Thinking...", color="status") animate_thinking("Thinking...", color="status")
answer, reasoning = self.llm_request() answer, reasoning = self.llm_request()

View File

@ -11,8 +11,8 @@ class CoderAgent(Agent):
""" """
The code agent is an agent that can write and execute code. The code agent is an agent that can write and execute code.
""" """
def __init__(self, model, name, prompt_path, provider): def __init__(self, name, prompt_path, provider, verbose=False):
super().__init__(model, name, prompt_path, provider) super().__init__(name, prompt_path, provider, verbose)
self.tools = { self.tools = {
"bash": BashInterpreter(), "bash": BashInterpreter(),
"python": PyInterpreter(), "python": PyInterpreter(),
@ -20,7 +20,8 @@ class CoderAgent(Agent):
"go": GoInterpreter(), "go": GoInterpreter(),
"file_finder": FileFinder() "file_finder": FileFinder()
} }
self.role = "Code Assistance" self.role = "Coding task"
self.type = "code_agent"
def process(self, prompt, speech_module) -> str: def process(self, prompt, speech_module) -> str:
answer = "" answer = ""

View File

@ -5,35 +5,28 @@ from sources.tools.fileFinder import FileFinder
from sources.tools.BashInterpreter import BashInterpreter from sources.tools.BashInterpreter import BashInterpreter
class FileAgent(Agent): class FileAgent(Agent):
def __init__(self, model, name, prompt_path, provider): def __init__(self, name, prompt_path, provider, verbose=False):
""" """
The file agent is a special agent for file operations. The file agent is a special agent for file operations.
""" """
super().__init__(model, name, prompt_path, provider) super().__init__(name, prompt_path, provider, verbose)
self.tools = { self.tools = {
"file_finder": FileFinder(), "file_finder": FileFinder(),
"bash": BashInterpreter() "bash": BashInterpreter()
} }
self.role = "find and read files" self.role = "find and read files"
self.type = "file_agent"
def process(self, prompt, speech_module) -> str: def process(self, prompt, speech_module) -> str:
complete = False
exec_success = False exec_success = False
self.memory.push('user', prompt) self.memory.push('user', prompt)
self.wait_message(speech_module) self.wait_message(speech_module)
while not complete: animate_thinking("Thinking...", color="status")
if exec_success: answer, reasoning = self.llm_request()
complete = True exec_success, _ = self.execute_modules(answer)
animate_thinking("Thinking...", color="status") answer = self.remove_blocks(answer)
answer, reasoning = self.llm_request() self.last_answer = answer
exec_success, _ = self.execute_modules(answer)
answer = self.remove_blocks(answer)
self.last_answer = answer
complete = True
for name, tool in self.tools.items():
if tool.found_executable_blocks():
complete = False # AI read results and continue the conversation
return answer, reasoning return answer, reasoning
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -7,11 +7,11 @@ from sources.agents.browser_agent import BrowserAgent
from sources.tools.tools import Tools from sources.tools.tools import Tools
class PlannerAgent(Agent): class PlannerAgent(Agent):
def __init__(self, model, name, prompt_path, provider): def __init__(self, name, prompt_path, provider, verbose=False):
""" """
The planner agent is a special agent that divides and conquers the task. The planner agent is a special agent that divides and conquers the task.
""" """
super().__init__(model, name, prompt_path, provider) super().__init__(name, prompt_path, provider, verbose)
self.tools = { self.tools = {
"json": Tools() "json": Tools()
} }
@ -22,7 +22,7 @@ class PlannerAgent(Agent):
"web": BrowserAgent(model, name, prompt_path, provider) "web": BrowserAgent(model, name, prompt_path, provider)
} }
self.role = "Research, setup and code" self.role = "Research, setup and code"
self.tag = "json" self.type = "planner_agent"
def parse_agent_tasks(self, text): def parse_agent_tasks(self, text):
tasks = [] tasks = []

View File

@ -5,7 +5,9 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException from selenium.common.exceptions import TimeoutException, WebDriverException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.options import Options
from typing import List, Tuple
import chromedriver_autoinstaller import chromedriver_autoinstaller
import time import time
import os import os
@ -26,6 +28,7 @@ class Browser:
'Accept-Language': 'en-US,en;q=0.9', 'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://www.google.com/', 'Referer': 'https://www.google.com/',
} }
self.js_scripts_folder = "./sources/web_scripts/" if not __name__ == "__main__" else "./web_scripts/"
self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related" self.anticaptcha = "https://chrome.google.com/webstore/detail/nopecha-captcha-solver/dknlfmjaanfblgfdfebhijalfmhmjjjo/related"
try: try:
chrome_options = Options() chrome_options = Options()
@ -49,7 +52,6 @@ class Browser:
"profile.default_content_setting_values.notifications": 2, # Block notifications "profile.default_content_setting_values.notifications": 2, # Block notifications
"profile.default_content_setting_values.popups": 2, # Block pop-ups "profile.default_content_setting_values.popups": 2, # Block pop-ups
"profile.default_content_setting_values.geolocation": 2, # Block geolocation "profile.default_content_setting_values.geolocation": 2, # Block geolocation
"download_restrictions": 3, # Block all downloads
"safebrowsing.enabled": True, # Enable safe browsing "safebrowsing.enabled": True, # Enable safe browsing
} }
chrome_options.add_experimental_option("prefs", security_prefs) chrome_options.add_experimental_option("prefs", security_prefs)
@ -70,9 +72,10 @@ class Browser:
self.logger.info("Browser initialized successfully") self.logger.info("Browser initialized successfully")
except Exception as e: except Exception as e:
raise Exception(f"Failed to initialize browser: {str(e)}") raise Exception(f"Failed to initialize browser: {str(e)}")
self.load_anticatpcha()
@staticmethod @staticmethod
def get_chrome_path(): def get_chrome_path() -> str:
if sys.platform.startswith("win"): if sys.platform.startswith("win"):
paths = [ paths = [
"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
@ -89,21 +92,25 @@ class Browser:
if os.path.exists(path) and os.access(path, os.X_OK): # Check if executable if os.path.exists(path) and os.access(path, os.X_OK): # Check if executable
return path return path
return None return None
def load_anticatpcha(self):
print("You might want to install the AntiCaptcha extension for captchas.")
self.driver.get(self.anticaptcha)
def go_to(self, url): def go_to(self, url:str) -> bool:
"""Navigate to a specified URL.""" """Navigate to a specified URL."""
try: try:
initial_handles = self.driver.window_handles initial_handles = self.driver.window_handles
self.driver.get(url) self.driver.get(url)
time.sleep(1) time.sleep(1)
self.apply_web_countermeasures() self.apply_web_safety()
self.logger.info(f"Navigated to: {url}") self.logger.info(f"Navigated to: {url}")
return True return True
except WebDriverException as e: except WebDriverException as e:
self.logger.error(f"Error navigating to {url}: {str(e)}") self.logger.error(f"Error navigating to {url}: {str(e)}")
return False return False
def is_sentence(self, text): def is_sentence(self, text:str) -> bool:
"""Check if the text qualifies as a meaningful sentence or contains important error codes.""" """Check if the text qualifies as a meaningful sentence or contains important error codes."""
text = text.strip() text = text.strip()
@ -116,7 +123,7 @@ class Browser:
is_long_enough = word_count > 5 is_long_enough = word_count > 5
return (word_count >= 5 and (has_punctuation or is_long_enough)) return (word_count >= 5 and (has_punctuation or is_long_enough))
def get_text(self): def get_text(self) -> str | None:
"""Get page text and convert it to README (Markdown) format.""" """Get page text and convert it to README (Markdown) format."""
try: try:
soup = BeautifulSoup(self.driver.page_source, 'html.parser') soup = BeautifulSoup(self.driver.page_source, 'html.parser')
@ -135,7 +142,7 @@ class Browser:
self.logger.error(f"Error getting text: {str(e)}") self.logger.error(f"Error getting text: {str(e)}")
return None return None
def clean_url(self, url): def clean_url(self, url:str) -> str:
"""Clean URL to keep only the part needed for navigation to the page""" """Clean URL to keep only the part needed for navigation to the page"""
clean = url.split('#')[0] clean = url.split('#')[0]
parts = clean.split('?', 1) parts = clean.split('?', 1)
@ -152,7 +159,7 @@ class Browser:
return f"{base_url}?{'&'.join(essential_params)}" return f"{base_url}?{'&'.join(essential_params)}"
return base_url return base_url
def is_link_valid(self, url): def is_link_valid(self, url:str) -> bool:
"""Check if a URL is a valid link (page, not related to icon or metadata).""" """Check if a URL is a valid link (page, not related to icon or metadata)."""
if len(url) > 64: if len(url) > 64:
return False return False
@ -168,7 +175,7 @@ class Browser:
return False return False
return True return True
def get_navigable(self): def get_navigable(self) -> [str]:
"""Get all navigable links on the current page.""" """Get all navigable links on the current page."""
try: try:
links = [] links = []
@ -189,28 +196,144 @@ class Browser:
self.logger.error(f"Error getting navigable links: {str(e)}") self.logger.error(f"Error getting navigable links: {str(e)}")
return [] return []
def click_element(self, xpath): def click_element(self, xpath: str) -> bool:
"""Click an element specified by xpath.""" """Click an element specified by XPath."""
try: try:
element = self.wait.until( element = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath)))
EC.element_to_be_clickable((By.XPATH, xpath)) if not element.is_displayed():
) return False
element.click() if not element.is_enabled():
time.sleep(2) # Wait for action to complete return False
return True try:
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", element)
time.sleep(0.1)
element.click()
return True
except ElementClickInterceptedException as e:
return False
except TimeoutException: except TimeoutException:
self.logger.error(f"Element not found or not clickable: {xpath}") return False
except Exception as e:
self.logger.error(f"Unexpected error clicking element at {xpath}: {str(e)}")
return False
def load_js(self, file_name: str) -> str:
path = os.path.join(self.js_scripts_folder, file_name)
try:
with open(path, 'r') as f:
return f.read()
except FileNotFoundError as e:
raise Exception(f"Could not find: {path}") from e
except Exception as e:
raise e
def find_all_inputs(self, timeout=4):
WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
time.sleep(0.5)
script = self.load_js("find_inputs.js")
input_elements = self.driver.execute_script(script)
return input_elements
def get_form_inputs(self) -> List[str]:
"""Extract all input from the page and return them."""
try:
#input_elements = self.driver.find_elements(By.TAG_NAME, "input")
input_elements = self.find_all_inputs()
if not input_elements:
return ["No input forms found on the page."]
form_strings = []
for element in input_elements:
input_type = element["type"] or "text"
if input_type in ["hidden", "submit", "button", "image"] or not element["displayed"]:
continue
input_name = element["text"] or element["id"] or input_type
if input_type == "checkbox" or input_type == "radio":
checked_status = "checked" if element.is_selected() else "unchecked"
form_strings.append(f"[{input_name}]({checked_status})")
else:
form_strings.append(f"[{input_name}]("")")
return form_strings
except Exception as e:
self.logger.error(f"Error extracting form inputs: {str(e)}")
return [f"Error extracting form inputs."]
def get_buttons_xpath(self) -> List[str]:
"""
Find buttons and return their type and xpath.
"""
buttons = self.driver.find_elements(By.TAG_NAME, "button") + \
self.driver.find_elements(By.XPATH, "//input[@type='submit']")
result = []
for i, button in enumerate(buttons):
if not button.is_displayed() or not button.is_enabled():
continue
text = (button.text or button.get_attribute("value") or "").lower().replace(' ', '')
xpath = f"(//button | //input[@type='submit'])[{i + 1}]"
if "login" in text or "sign" in text or "register":
result.append((text, xpath))
result.sort(key=lambda x: len(x[0]))
return result
def find_and_click_submit(self, btn_type:str = 'login') -> None:
buttons = self.get_buttons_xpath()
if len(buttons) == 0:
self.logger.warning(f"No visible buttons found")
for button in buttons:
if button[0] == btn_type:
self.click_element(button[1])
def find_input_xpath_by_name(self, inputs, name: str) -> str | None:
for field in inputs:
if name in field["text"]:
return field["xpath"]
return None
def fill_form_inputs(self, input_list:[str]) -> bool:
"""Fill form inputs based on a list of [name](value) strings."""
inputs = self.find_all_inputs()
try:
for input_str in input_list:
match = re.match(r'\[(.*?)\]\((.*?)\)', input_str)
if not match:
self.logger.warning(f"Invalid format for input: {input_str}")
continue
name, value = match.groups()
name = name.strip()
value = value.strip()
xpath = self.find_input_xpath_by_name(inputs, name)
if not xpath:
continue
element = self.driver.find_element(By.XPATH, xpath)
input_type = (element.get_attribute("type") or "text").lower()
if input_type in ["checkbox", "radio"]:
is_checked = element.is_selected()
should_be_checked = value.lower() == "checked"
if is_checked != should_be_checked:
element.click()
self.logger.info(f"Set {name} to {value}")
else:
element.send_keys(value)
self.logger.info(f"Filled {name} with {value}")
return True
except Exception as e:
self.logger.error(f"Error filling form inputs: {str(e)}")
return False return False
def get_current_url(self): def get_current_url(self) -> str:
"""Get the current URL of the page.""" """Get the current URL of the page."""
return self.driver.current_url return self.driver.current_url
def get_page_title(self): def get_page_title(self) -> str:
"""Get the title of the current page.""" """Get the title of the current page."""
return self.driver.title return self.driver.title
def scroll_bottom(self): def scroll_bottom(self) -> bool:
"""Scroll to the bottom of the page.""" """Scroll to the bottom of the page."""
try: try:
self.driver.execute_script( self.driver.execute_script(
@ -222,7 +345,7 @@ class Browser:
self.logger.error(f"Error scrolling: {str(e)}") self.logger.error(f"Error scrolling: {str(e)}")
return False return False
def screenshot(self, filename): def screenshot(self, filename:str) -> bool:
"""Take a screenshot of the current page.""" """Take a screenshot of the current page."""
try: try:
self.driver.save_screenshot(filename) self.driver.save_screenshot(filename)
@ -232,129 +355,12 @@ class Browser:
self.logger.error(f"Error taking screenshot: {str(e)}") self.logger.error(f"Error taking screenshot: {str(e)}")
return False return False
####################### def apply_web_safety(self):
# WEB SECURITY #
#######################
def apply_web_countermeasures(self):
""" """
Apply security measures to block any website malicious execution, privacy violation etc.. Apply security measures to block any website malicious/annoying execution, privacy violation etc..
""" """
self.inject_safety_script() script = self.load_js("inject_safety_script.js")
self.neutralize_event_listeners() input_elements = self.driver.execute_script(script)
self.monitor_and_reset_css()
self.block_clipboard_access()
self.limit_intervals_and_timeouts()
self.block_external_requests()
self.monitor_and_close_popups()
def inject_safety_script(self):
script = """
// Block hardware access by removing or disabling APIs
Object.defineProperty(navigator, 'serial', { get: () => undefined });
Object.defineProperty(navigator, 'hid', { get: () => undefined });
Object.defineProperty(navigator, 'bluetooth', { get: () => undefined });
// Block media playback
HTMLMediaElement.prototype.play = function() {
this.pause(); // Immediately pause if play is called
return Promise.reject('Blocked by script');
};
// Block fullscreen requests
Element.prototype.requestFullscreen = function() {
console.log('Blocked fullscreen request');
return Promise.reject('Blocked by script');
};
// Block pointer lock
Element.prototype.requestPointerLock = function() {
console.log('Blocked pointer lock');
};
// Block iframe creation (optional, since browser already blocks these)
const originalCreateElement = document.createElement;
document.createElement = function(tagName) {
if (tagName.toLowerCase() === 'iframe') {
console.log('Blocked iframe creation');
return null;
}
return originalCreateElement.apply(this, arguments);
};
// Block annoying dialogs
window.alert = function() {};
window.confirm = function() { return false; };
window.prompt = function() { return null; };
"""
self.driver.execute_script(script)
def neutralize_event_listeners(self):
script = """
const originalAddEventListener = EventTarget.prototype.addEventListener;
EventTarget.prototype.addEventListener = function(type, listener, options) {
if (['mousedown', 'mouseup', 'click', 'touchstart', 'keydown', 'keyup', 'keypress'].includes(type)) {
console.log(`Blocked adding listener for ${type}`);
return;
}
originalAddEventListener.apply(this, arguments);
};
"""
self.driver.execute_script(script)
def monitor_and_reset_css(self):
script = """
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
if (mutation.type === 'attributes' && mutation.attributeName === 'style') {
const html = document.querySelector('html');
if (html.style.cursor === 'none') {
html.style.cursor = 'auto';
}
}
});
});
observer.observe(document.querySelector('html'), { attributes: true });
"""
self.driver.execute_script(script)
def block_clipboard_access(self):
script = """
navigator.clipboard.readText = function() {
console.log('Blocked clipboard read');
return Promise.reject('Blocked');
};
navigator.clipboard.writeText = function() {
console.log('Blocked clipboard write');
return Promise.resolve();
};
"""
self.driver.execute_script(script)
def limit_intervals_and_timeouts(self):
script = """
const originalSetInterval = window.setInterval;
window.setInterval = function(callback, delay) {
if (typeof callback === 'function' && callback.toString().includes('alert')) {
console.log('Blocked suspicious interval');
return;
}
return originalSetInterval.apply(this, arguments);
};
"""
self.driver.execute_script(script)
def monitor_and_close_popups(self):
initial_handles = self.driver.window_handles
for handle in self.driver.window_handles:
if handle not in initial_handles:
self.driver.switch_to.window(handle)
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
def block_external_requests(self):
script = """
window.fetch = function() {
console.log('Blocked fetch request');
return Promise.reject('Blocked');
};
"""
self.driver.execute_script(script)
def close(self): def close(self):
"""Close the browser.""" """Close the browser."""
@ -372,18 +378,19 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
browser = Browser(headless=False) browser = Browser(headless=False)
time.sleep(8)
try: try:
# stress test print("AntiCaptcha Test")
browser.go_to("https://www.bbc.com/news") browser.go_to("https://www.google.com/recaptcha/api2/demo")
text = browser.get_text() time.sleep(5)
print("Page Text in Markdown:") print("Form Test:")
print(text) browser.go_to("https://practicetestautomation.com/practice-test-login/")
links = browser.get_navigable() inputs = browser.get_form_inputs()
print("\nNavigable Links:", links) inputs = ['[username](student)', f'[password](Password123)', '[appOtp]()', '[backupOtp]()']
print("WARNING SECURITY STRESS TEST WILL BE RUN IN 20s") browser.fill_form_inputs(inputs)
time.sleep(20) browser.find_and_click_submit()
print("Stress test")
browser.go_to("https://theannoyingsite.com/") browser.go_to("https://theannoyingsite.com/")
time.sleep(15)
finally: finally:
browser.close() browser.close()

View File

@ -35,7 +35,7 @@ class Interaction:
"""Find the name of the default AI. It is required for STT as a trigger word.""" """Find the name of the default AI. It is required for STT as a trigger word."""
ai_name = "jarvis" ai_name = "jarvis"
for agent in self.agents: for agent in self.agents:
if agent.role == "talking": if agent.type == "casual_agent":
ai_name = agent.agent_name ai_name = agent.agent_name
break break
return ai_name return ai_name
@ -43,12 +43,12 @@ class Interaction:
def recover_last_session(self): def recover_last_session(self):
"""Recover the last session.""" """Recover the last session."""
for agent in self.agents: for agent in self.agents:
agent.memory.load_memory() agent.memory.load_memory(agent.type)
def save_session(self): def save_session(self):
"""Save the current session.""" """Save the current session."""
for agent in self.agents: for agent in self.agents:
agent.memory.save_memory() agent.memory.save_memory(agent.type)
def is_active(self) -> bool: def is_active(self) -> bool:
return self.is_active return self.is_active

View File

@ -20,7 +20,7 @@ class Memory():
recover_last_session: bool = False, recover_last_session: bool = False,
memory_compression: bool = True): memory_compression: bool = True):
self.memory = [] self.memory = []
self.memory = [{'role': 'user', 'content': system_prompt}] self.memory = [{'role': 'system', 'content': system_prompt}]
self.session_time = datetime.datetime.now() self.session_time = datetime.datetime.now()
self.session_id = str(uuid.uuid4()) self.session_id = str(uuid.uuid4())
@ -38,20 +38,23 @@ class Memory():
def get_filename(self) -> str: def get_filename(self) -> str:
return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt" return f"memory_{self.session_time.strftime('%Y-%m-%d_%H-%M-%S')}.txt"
def save_memory(self) -> None: def save_memory(self, agent_type: str = "casual_agent") -> None:
"""Save the session memory to a file.""" """Save the session memory to a file."""
if not os.path.exists(self.conversation_folder): if not os.path.exists(self.conversation_folder):
os.makedirs(self.conversation_folder) os.makedirs(self.conversation_folder)
save_path = os.path.join(self.conversation_folder, agent_type)
if not os.path.exists(save_path):
os.makedirs(save_path)
filename = self.get_filename() filename = self.get_filename()
path = os.path.join(self.conversation_folder, filename) path = os.path.join(save_path, filename)
json_memory = json.dumps(self.memory) json_memory = json.dumps(self.memory)
with open(path, 'w') as f: with open(path, 'w') as f:
f.write(json_memory) f.write(json_memory)
def find_last_session_path(self) -> str: def find_last_session_path(self, path) -> str:
"""Find the last session path.""" """Find the last session path."""
saved_sessions = [] saved_sessions = []
for filename in os.listdir(self.conversation_folder): for filename in os.listdir(path):
if filename.startswith('memory_'): if filename.startswith('memory_'):
date = filename.split('_')[1] date = filename.split('_')[1]
saved_sessions.append((filename, date)) saved_sessions.append((filename, date))
@ -60,14 +63,15 @@ class Memory():
return saved_sessions[0][0] return saved_sessions[0][0]
return None return None
def load_memory(self) -> None: def load_memory(self, agent_type: str = "casual_agent") -> None:
"""Load the memory from the last session.""" """Load the memory from the last session."""
if not os.path.exists(self.conversation_folder): save_path = os.path.join(self.conversation_folder, agent_type)
if not os.path.exists(save_path):
return return
filename = self.find_last_session_path() filename = self.find_last_session_path(save_path)
if filename is None: if filename is None:
return return
path = os.path.join(self.conversation_folder, filename) path = os.path.join(save_path, filename)
with open(path, 'r') as f: with open(path, 'r') as f:
self.memory = json.load(f) self.memory = json.load(f)
@ -76,10 +80,10 @@ class Memory():
def push(self, role: str, content: str) -> None: def push(self, role: str, content: str) -> None:
"""Push a message to the memory.""" """Push a message to the memory."""
self.memory.append({'role': role, 'content': content})
# EXPERIMENTAL
if self.memory_compression and role == 'assistant': if self.memory_compression and role == 'assistant':
self.compress() self.compress()
# we don't compress the last message
self.memory.append({'role': role, 'content': content})
def clear(self) -> None: def clear(self) -> None:
self.memory = [] self.memory = []
@ -129,9 +133,9 @@ class Memory():
if not self.memory_compression: if not self.memory_compression:
return return
for i in range(len(self.memory)): for i in range(len(self.memory)):
if i <= 2: if i < 3:
continue continue
if self.memory[i]['role'] == 'assistant': if len(self.memory[i]['content']) > 1024:
self.memory[i]['content'] = self.summarize(self.memory[i]['content']) self.memory[i]['content'] = self.summarize(self.memory[i]['content'])
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -35,6 +35,7 @@ class PyInterpreter(Tools):
try: try:
try: try:
buffer = exec(code, global_vars) buffer = exec(code, global_vars)
print(buffer)
if buffer is not None: if buffer is not None:
output = buffer + '\n' output = buffer + '\n'
except Exception as e: except Exception as e:

View File

@ -75,7 +75,7 @@ class searxSearch(Tools):
'Upgrade-Insecure-Requests': '1', 'Upgrade-Insecure-Requests': '1',
'User-Agent': self.user_agent 'User-Agent': self.user_agent
} }
data = f"q={query}&categories=general&language=auto&time_range=&safesearch=0&theme=simple" data = f"q={query}&categories=general&language=auto&time_range=&safesearch=0&theme=simple".encode('utf-8')
try: try:
response = requests.post(search_url, headers=headers, data=data, verify=False) response = requests.post(search_url, headers=headers, data=data, verify=False)
response.raise_for_status() response.raise_for_status()

View File

@ -0,0 +1,49 @@
function findInputs(element, result = []) {
// Find all <input> elements in the current DOM tree
const inputs = element.querySelectorAll('input');
inputs.forEach(input => {
result.push({
tagName: input.tagName,
text: input.name || '',
type: input.type || '',
class: input.className || '',
xpath: getXPath(input),
displayed: isElementDisplayed(input)
});
});
const allElements = element.querySelectorAll('*');
allElements.forEach(el => {
if (el.shadowRoot) {
findInputs(el.shadowRoot, result);
}
});
return result;
}
// function to get the XPath of an element
function getXPath(element) {
if (!element) return '';
if (element.id !== '') return '//*[@id="' + element.id + '"]';
if (element === document.body) return '/html/body';
let ix = 0;
const siblings = element.parentNode ? element.parentNode.childNodes : [];
for (let i = 0; i < siblings.length; i++) {
const sibling = siblings[i];
if (sibling === element) {
return getXPath(element.parentNode) + '/' + element.tagName.toLowerCase() + '[' + (ix + 1) + ']';
}
if (sibling.nodeType === 1 && sibling.tagName === element.tagName) {
ix++;
}
}
return '';
}
return findInputs(document.body);
function isElementDisplayed(element) {
const style = window.getComputedStyle(element);
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
return false;
}
return true;
}

View File

@ -0,0 +1,36 @@
// Block hardware access by removing or disabling APIs
Object.defineProperty(navigator, 'serial', { get: () => undefined });
Object.defineProperty(navigator, 'hid', { get: () => undefined });
Object.defineProperty(navigator, 'bluetooth', { get: () => undefined });
// Block media playback
HTMLMediaElement.prototype.play = function() {
this.pause(); // Immediately pause if play is called
return Promise.reject('Blocked by script');
};
// Block fullscreen requests
Element.prototype.requestFullscreen = function() {
console.log('Blocked fullscreen request');
return Promise.reject('Blocked by script');
};
// Block pointer lock
Element.prototype.requestPointerLock = function() {
console.log('Blocked pointer lock');
};
// Block iframe creation (optional, since browser already blocks these)
const originalCreateElement = document.createElement;
document.createElement = function(tagName) {
if (tagName.toLowerCase() === 'iframe') {
console.log('Blocked iframe creation');
return null;
}
return originalCreateElement.apply(this, arguments);
};
//block fetch
window.fetch = function() {
console.log('Blocked fetch request');
return Promise.reject('Blocked');
};
// Block annoying dialogs
window.alert = function() {};
window.confirm = function() { return false; };
window.prompt = function() { return null; };