mirror of
https://github.com/tcsenpai/agenticSeek.git
synced 2025-06-06 11:05:26 +00:00
108 lines
4.3 KiB
Python
108 lines
4.3 KiB
Python
|
|
import os
|
|
import requests
|
|
import dotenv
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from utility import animate_thinking, pretty_print
|
|
from tools import Tools
|
|
else:
|
|
from sources.tools.tools import Tools
|
|
from sources.utility import animate_thinking, pretty_print
|
|
|
|
class webSearch(Tools):
|
|
def __init__(self, api_key: str = None):
|
|
"""
|
|
A tool to perform a Google search and return information from the first result.
|
|
"""
|
|
super().__init__()
|
|
self.tag = "web_search"
|
|
self.api_key = api_key or os.getenv("SERPAPI_KEY") # Requires a SerpApi key
|
|
self.paywall_keywords = [
|
|
"subscribe", "paywall", "login to continue", "access denied", "restricted content"
|
|
]
|
|
|
|
async def link_valid(self, session, link):
|
|
"""asyncronously check if a link is shit."""
|
|
if not link.startswith("http"):
|
|
return "Status: Invalid URL"
|
|
try:
|
|
async with session.get(link, timeout=aiohttp.ClientTimeout(total=5)) as response:
|
|
status = response.status
|
|
if status == 200:
|
|
content = await response.text(encoding='utf-8', errors='ignore')[:1000]
|
|
if any(keyword in content.lower() for keyword in self.paywall_keywords):
|
|
return "Status: Possible Paywall"
|
|
return "Status: Accessible"
|
|
elif status == 404:
|
|
return "Status: 404 Not Found"
|
|
elif status == 403:
|
|
return "Status: 403 Forbidden"
|
|
else:
|
|
return f"Status: {status} {response.reason}"
|
|
except Exception as e:
|
|
return f"Error: {str(e)}"
|
|
|
|
async def check_all_links(self, links):
|
|
"""Check all links asynchronously using a single session."""
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
|
|
async with aiohttp.ClientSession(headers=headers) as session:
|
|
tasks = [self.link_valid(session, link) for link in links]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
def execute(self, blocks: str, safety: bool = True) -> str:
|
|
if self.api_key is None:
|
|
return "Error: No SerpApi key provided."
|
|
for block in blocks:
|
|
query = block.strip()
|
|
pretty_print(f"Searching for: {query}", color="status")
|
|
if not query:
|
|
return "Error: No search query provided."
|
|
|
|
try:
|
|
url = "https://serpapi.com/search"
|
|
params = {
|
|
"q": query,
|
|
"api_key": self.api_key,
|
|
"num": 100,
|
|
"output": "json"
|
|
}
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
results = []
|
|
if "organic_results" in data and len(data["organic_results"]) > 0:
|
|
for result in data["organic_results"][:50]:
|
|
title = result.get("title", "No title")
|
|
snippet = result.get("snippet", "No snippet available")
|
|
link = result.get("link", "No link available")
|
|
results.append(f"Title: {title}\nSnippet: {snippet}\nLink: {link}")
|
|
return "\n\n".join(results)
|
|
else:
|
|
return "No results found for the query."
|
|
except requests.RequestException as e:
|
|
return f"Error during web search: {str(e)}"
|
|
except Exception as e:
|
|
return f"Unexpected error: {str(e)}"
|
|
return "No search performed"
|
|
|
|
def execution_failure_check(self, output: str) -> bool:
|
|
return output.startswith("Error") or "No results found" in output
|
|
|
|
def interpreter_feedback(self, output: str) -> str:
|
|
if self.execution_failure_check(output):
|
|
return f"Web search failed: {output}"
|
|
return f"Web search result:\n{output}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
search_tool = webSearch(api_key=os.getenv("SERPAPI_KEY"))
|
|
query = "when did covid start"
|
|
result = search_tool.execute([query], safety=True)
|
|
feedback = search_tool.interpreter_feedback(result)
|
|
print(feedback) |