Spaces:
Sleeping
Sleeping
""" | |
Web search service for retrieving and processing web content. | |
This module provides functionality to search the web using Serper API | |
and extract content from web pages using crawl4ai. | |
""" | |
import asyncio | |
import os | |
from typing import List, Optional | |
from bio_requests.rag_request import RagRequest | |
from dto.bio_document import BaseBioDocument, create_bio_document | |
from search_service.base_search import BaseSearchService | |
from service.web_search import SerperClient, scrape_urls, url_to_fit_contents | |
from utils.bio_logger import bio_logger as logger | |
class WebSearchService(BaseSearchService): | |
""" | |
Web search service that retrieves content from web pages. | |
This service uses Serper API for web search and crawl4ai for content extraction. | |
""" | |
def __init__(self): | |
"""Initialize the web search service.""" | |
self.data_source = "web" | |
self._serper_client: Optional[SerperClient] = None | |
self._max_results = 5 | |
self._content_length_limit = 40000 # ~10k tokens | |
def serper_client(self) -> SerperClient: | |
"""Lazy initialization of SerperClient.""" | |
if self._serper_client is None: | |
# 从环境变量获取API密钥 | |
api_key = os.getenv("SERPER_API_KEY") | |
if not api_key: | |
logger.warning("SERPER_API_KEY environment variable not set, using default key") | |
self._serper_client = SerperClient(api_key=api_key) | |
return self._serper_client | |
async def search(self, rag_request: RagRequest) -> List[BaseBioDocument]: | |
""" | |
Perform web search and extract content from search results. | |
Args: | |
rag_request: The RAG request containing the search query | |
Returns: | |
List of BaseBioDocument objects with extracted web content | |
""" | |
try: | |
query = rag_request.query | |
logger.info(f"Starting web search for query: {query}") | |
# Search for URLs using Serper | |
url_results = await self.search_serper(query, rag_request.top_k) | |
if not url_results: | |
logger.info(f"No search results found for query: {query}") | |
return [] | |
# Extract content from URLs | |
search_results = await self.enrich_url_results_with_contents(url_results) | |
logger.info(f"Web search completed. Found {len(search_results)} documents") | |
return search_results | |
except Exception as e: | |
logger.error(f"Error during web search: {str(e)}", exc_info=e) | |
return [] | |
async def enrich_url_results_with_contents( | |
self, results: List | |
) -> List[BaseBioDocument]: | |
""" | |
Extract content from URLs and create BaseBioDocument objects. | |
Args: | |
results: List of search results with URLs | |
Returns: | |
List of BaseBioDocument objects with extracted content | |
""" | |
try: | |
# Create tasks for concurrent content extraction | |
tasks = [self._extract_content_from_url(res) for res in results] | |
contents = await asyncio.gather(*tasks, return_exceptions=True) | |
enriched_results = [] | |
for res, content in zip(results, contents): | |
# Handle exceptions from content extraction | |
if isinstance(content, Exception): | |
logger.error(f"Failed to extract content from {res.url}: {content}") | |
continue | |
bio_doc = create_bio_document( | |
title=res.title, | |
url=res.url, | |
text=str(content)[: self._content_length_limit], | |
source=self.data_source, | |
) | |
enriched_results.append(bio_doc) | |
return enriched_results | |
except Exception as e: | |
logger.error(f"Error enriching URL results: {str(e)}", exc_info=e) | |
return [] | |
async def _extract_content_from_url(self, res) -> str: | |
""" | |
Extract content from a single URL with error handling. | |
Args: | |
res: Search result object containing URL information | |
Returns: | |
Extracted content as string | |
""" | |
try: | |
return await url_to_fit_contents(res) | |
except Exception as e: | |
logger.error(f"Error extracting content from {res.url}: {str(e)}") | |
return f"Error extracting content: {str(e)}" | |
async def search_serper( | |
self, query: str, max_results: Optional[int] = None | |
) -> List: | |
""" | |
Perform web search using Serper API. | |
Args: | |
query: Search query string | |
max_results: Maximum number of results to return | |
Returns: | |
List of search results with URLs | |
""" | |
try: | |
max_results = max_results or self._max_results | |
logger.info(f"Searching Serper for: {query} (max_results: {max_results})") | |
search_results = await self.serper_client.search( | |
query, filter_for_relevance=True, max_results=max_results | |
) | |
if not search_results: | |
logger.info(f"No search results from Serper for query: {query}") | |
return [] | |
# Scrape content from URLs | |
results = await scrape_urls(search_results) | |
logger.info(f"Serper search completed. Found {len(results)} results") | |
return results | |
except Exception as e: | |
logger.error(f"Error in Serper search: {str(e)}", exc_info=e) | |
return [] | |