Feat : browser v0

This commit is contained in:
martin legrand 2025-03-10 21:05:10 +01:00
parent a4008c14ef
commit 68bab4ecac
7 changed files with 190 additions and 3 deletions

View File

@ -7,7 +7,7 @@ import configparser
from sources.llm_provider import Provider from sources.llm_provider import Provider
from sources.interaction import Interaction from sources.interaction import Interaction
from sources.agents import Agent, CoderAgent, CasualAgent, FileAgent, PlannerAgent from sources.agents import Agent, CoderAgent, CasualAgent, FileAgent, PlannerAgent, BrowserAgent
import warnings import warnings
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
@ -44,6 +44,10 @@ def main():
PlannerAgent(model=config["MAIN"]["provider_model"], PlannerAgent(model=config["MAIN"]["provider_model"],
name="Planner", name="Planner",
prompt_path="prompts/planner_agent.txt", prompt_path="prompts/planner_agent.txt",
provider=provider),
BrowserAgent(model=config["MAIN"]["provider_model"],
name="Browser",
prompt_path="prompts/browser_agent.txt",
provider=provider) provider=provider)
] ]

View File

@ -14,6 +14,7 @@ soundfile==0.13.1
protobuf==3.20.3 protobuf==3.20.3
termcolor==2.5.0 termcolor==2.5.0
gliclass==0.1.8 gliclass==0.1.8
huggingface-hub==0.26.3
# if use chinese # if use chinese
ordered_set ordered_set
pypinyin pypinyin

View File

@ -30,6 +30,7 @@ setup(
"protobuf==3.20.3", "protobuf==3.20.3",
"termcolor==2.5.0", "termcolor==2.5.0",
"gliclass==0.1.8", "gliclass==0.1.8",
"huggingface-hub==0.26.3"
], ],
extras_require={ extras_require={
"chinese": [ "chinese": [

View File

@ -4,5 +4,6 @@ from .code_agent import CoderAgent
from .casual_agent import CasualAgent from .casual_agent import CasualAgent
from .file_agent import FileAgent from .file_agent import FileAgent
from .planner_agent import PlannerAgent from .planner_agent import PlannerAgent
from .browser_agent import BrowserAgent
__all__ = ["Agent", "CoderAgent", "CasualAgent", "FileAgent", "PlannerAgent"] __all__ = ["Agent", "CoderAgent", "CasualAgent", "FileAgent", "PlannerAgent", "BrowserAgent"]

View File

@ -0,0 +1,23 @@
from sources.utility import pretty_print, animate_thinking
from sources.agents.agent import Agent
from sources.tools.webSearch import webSearch
from sources.browser import Browser
class BrowserAgent(Agent):
def __init__(self, model, name, prompt_path, provider):
"""
The casual agent is a special for casual talk to the user without specific tasks.
"""
super().__init__(model, name, prompt_path, provider)
self.tools = {
"web_search": webSearch(),
}
self.role = "deep research and web search"
self.browser = Browser()
self.browser.goTo("https://github.com/")
def process(self, prompt, speech_module) -> str:
raise NotImplementedError("Browser agent is not implemented yet")
if __name__ == "__main__":
browser = Browser()

157
sources/browser.py Normal file
View File

@ -0,0 +1,157 @@
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import time
from bs4 import BeautifulSoup
import markdownify
import logging
class Browser:
def __init__(self, headless=True):
"""Initialize the browser with optional headless mode."""
try:
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
self.driver = webdriver.Chrome(options=chrome_options)
self.wait = WebDriverWait(self.driver, 10)
self.logger = logging.getLogger(__name__)
self.logger.info("Browser initialized successfully")
except Exception as e:
raise Exception(f"Failed to initialize browser: {str(e)}")
def goTo(self, url):
"""Navigate to a specified URL."""
try:
self.driver.get(url)
time.sleep(2) # Wait for page to load
self.logger.info(f"Navigated to: {url}")
return True
except WebDriverException as e:
self.logger.error(f"Error navigating to {url}: {str(e)}")
return False
def getText(self):
"""Get page text and convert it to README (Markdown) format."""
try:
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
for element in soup(['script', 'style']):
element.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
markdown_text = markdownify.markdownify(text, heading_style="ATX")
return markdown_text
except Exception as e:
self.logger.error(f"Error getting text: {str(e)}")
return None
def getNavigable(self):
"""Get all navigable links on the current page."""
try:
links = []
elements = self.driver.find_elements(By.TAG_NAME, "a")
for element in elements:
href = element.get_attribute("href")
if href and href.startswith(("http", "https")):
links.append({
"url": href,
"text": element.text.strip(),
"is_displayed": element.is_displayed()
})
self.logger.info(f"Found {len(links)} navigable links")
return links
except Exception as e:
self.logger.error(f"Error getting navigable links: {str(e)}")
return []
def clickElement(self, xpath):
"""Click an element specified by xpath."""
try:
element = self.wait.until(
EC.element_to_be_clickable((By.XPATH, xpath))
)
element.click()
time.sleep(2) # Wait for action to complete
return True
except TimeoutException:
self.logger.error(f"Element not found or not clickable: {xpath}")
return False
def getCurrentUrl(self):
"""Get the current URL of the page."""
return self.driver.current_url
def getPageTitle(self):
"""Get the title of the current page."""
return self.driver.title
def scrollToBottom(self):
"""Scroll to the bottom of the page."""
try:
self.driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
time.sleep(1) # Wait for scroll to complete
return True
except Exception as e:
self.logger.error(f"Error scrolling: {str(e)}")
return False
def takeScreenshot(self, filename):
"""Take a screenshot of the current page."""
try:
self.driver.save_screenshot(filename)
self.logger.info(f"Screenshot saved as {filename}")
return True
except Exception as e:
self.logger.error(f"Error taking screenshot: {str(e)}")
return False
def close(self):
"""Close the browser."""
try:
self.driver.quit()
self.logger.info("Browser closed")
except Exception as e:
self.logger.error(f"Error closing browser: {str(e)}")
def __del__(self):
"""Destructor to ensure browser is closed."""
self.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
browser = Browser(headless=False)
try:
browser.goTo("https://github.com/geohot")
text = browser.getText()
print("Page Text in Markdown:")
print(text)
links = browser.getNavigable()
print("\nNavigable Links:")
for link in links[:50]:
print(f"Text: {link['text']}, URL: {link['url']}")
browser.takeScreenshot("example.png")
finally:
browser.close()

View File

@ -115,7 +115,7 @@ class Transcript:
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000)
result = self.pipe(audio_data) result = self.pipe(audio_data)
return self.remove_hallucinations(result["text"]) return self.remove_hallucinations(result["text"])
class AudioTranscriber: class AudioTranscriber:
""" """
AudioTranscriber is a class that transcribes audio from the audio queue and adds it to the transcript. AudioTranscriber is a class that transcribes audio from the audio queue and adds it to the transcript.