Spaces:
Sleeping
Sleeping
File size: 2,834 Bytes
7c12196 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# src/chimera/api_clients/serp_client.py
import httpx # Using httpx for async requests
from ..config import SERPAPI_API_KEY # Assuming SerpApi service
from ..utils.logging_config import logger
import json
# If using the official serpapi library:
# from serpapi import GoogleSearch
# Note: The official library might be synchronous. You might need
# asyncio.to_thread like in the Gemini example if using it in async code.
# Example using httpx for a generic SERP API endpoint (adjust URL/params)
# This is a simplified example assuming a REST API like SerpApi.
# You'd replace this with the actual client/method for your chosen service.
async def search_google(query: str, num_results=5, location=None) -> dict:
"""
Performs a search using a SERP API (example uses SerpApi parameters).
"""
if not SERPAPI_API_KEY:
logger.error("SERP API Key not configured.")
return {"error": "SERP API Key not configured."}
params = {
"q": query,
"api_key": SERPAPI_API_KEY,
"num": str(num_results), # API might expect string
# Add other parameters like location, gl (country), hl (language) as needed
# "location": location,
"engine": "google", # Specify search engine for APIs like SerpApi
}
if location:
params["location"] = location
# Adjust URL for your specific SERP provider
SEARCH_URL = "https://serpapi.com/search"
try:
async with httpx.AsyncClient() as client:
logger.info(f"Querying SERP API for: {query}")
response = await client.get(SEARCH_URL, params=params, timeout=20.0) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
results = response.json()
logger.info(f"Received {len(results.get('organic_results', []))} results from SERP API.")
# You might want to process/filter results here before returning
return results # Return the raw JSON or processed data
except httpx.HTTPStatusError as e:
logger.error(f"SERP API request failed: {e.response.status_code} - {e.response.text}")
return {"error": f"SERP API request failed: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"SERP API request error: {e}")
return {"error": f"SERP API connection error: {e}"}
except json.JSONDecodeError as e:
logger.error(f"Failed to decode SERP API JSON response: {e}")
return {"error": "Failed to parse SERP API response."}
except Exception as e:
logger.exception("Unexpected error during SERP API call.")
return {"error": f"An unexpected error occurred: {e}"}
# Add similar functions or a class for other external APIs (Weather, Finance etc.)
# in external_apis.py, using httpx for async calls. |