File size: 5,316 Bytes
47755ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import os
import json
import asyncio
import aiohttp
class GoogleSerperAPIWrapper:
"""Wrapper around the Serper.dev Google Search API.
You can create a free API key at https://serper.dev.
To use, you should have the environment variable ``SERPER_API_KEY``
set with your API key, or pass `serper_api_key` as a named parameter
to the constructor.
Example:
.. code-block:: python
from langchain import GoogleSerperAPIWrapper
google_serper = GoogleSerperAPIWrapper()
"""
def __init__(self, snippet_cnt=10, language="en") -> None:
self.k = snippet_cnt
self.gl = "us"
self.hl = language
self.serper_api_key = os.environ.get("SERPER_API_KEY", None)
assert (
self.serper_api_key is not None
), "Please set the SERPER_API_KEY environment variable."
assert (
self.serper_api_key != ""
), "Please set the SERPER_API_KEY environment variable."
async def _google_serper_search_results(
self, session, search_term: str, gl: str, hl: str
) -> dict:
headers = {
"X-API-KEY": self.serper_api_key or "",
"Content-Type": "application/json",
}
params = {"q": search_term, "gl": gl, "hl": hl}
async with session.post(
"https://google.serper.dev/search",
headers=headers,
params=params,
raise_for_status=True,
) as response:
return await response.json()
def _parse_results(self, results):
snippets = []
if os.environ.get("SAVE_SERPER_COST", "False") == "True":
SERPER_COST_PATH = os.environ.get("SERPER_COST_PATH", "serper_cost.jsonl")
if results.get("credits"):
credits = results.get("credits")
with open(SERPER_COST_PATH, "a") as f:
f.write(json.dumps({"google_serper_credits": credits}) + "\n")
if results.get("answerBox"):
answer_box = results.get("answerBox", {})
if answer_box.get("answer"):
element = {"content": answer_box.get("answer"), "source": "None"}
return [element]
elif answer_box.get("snippet"):
element = {
"content": answer_box.get("snippet").replace("\n", " "),
"source": "None",
}
return [element]
elif answer_box.get("snippetHighlighted"):
element = {
"content": answer_box.get("snippetHighlighted"),
"source": "None",
}
return [element]
if results.get("knowledgeGraph"):
kg = results.get("knowledgeGraph", {})
title = kg.get("title")
entity_type = kg.get("type")
if entity_type:
element = {"content": f"{title}: {entity_type}", "source": "None"}
snippets.append(element)
description = kg.get("description")
if description:
element = {"content": description, "source": "None"}
snippets.append(element)
for attribute, value in kg.get("attributes", {}).items():
element = {"content": f"{attribute}: {value}", "source": "None"}
snippets.append(element)
for result in results["organic"][: self.k]:
if "snippet" in result:
element = {"content": result["snippet"], "source": result["link"]}
snippets.append(element)
for attribute, value in result.get("attributes", {}).items():
element = {"content": f"{attribute}: {value}", "source": result["link"]}
snippets.append(element)
if len(snippets) == 0:
element = {
"content": "No good Google Search Result was found",
"source": "None",
}
return [element]
# keep only the first k snippets
snippets = snippets[: int(self.k / 2)]
return snippets
async def parallel_searches(self, search_queries, gl, hl):
async with aiohttp.ClientSession() as session:
tasks = [
self._google_serper_search_results(session, query, gl, hl)
for query in search_queries
]
search_results = await asyncio.gather(*tasks, return_exceptions=True)
return search_results
def run(self, queries):
"""Run query through GoogleSearch and parse result."""
flattened_queries = []
for sublist in queries:
if sublist is None:
sublist = ["None", "None"]
for item in sublist:
flattened_queries.append(item)
# Get results
results = asyncio.run(
self.parallel_searches(flattened_queries, gl=self.gl, hl=self.hl)
)
snippets_list = []
for i in range(len(results)):
snippets_list.append(self._parse_results(results[i]))
# Flatten the list of snippets
snippets_split = [
snippets_list[i] + snippets_list[i + 1]
for i in range(0, len(snippets_list), 2)
]
return snippets_split
|