File size: 5,316 Bytes
47755ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import json
import asyncio
import aiohttp


class GoogleSerperAPIWrapper:
    """Wrapper around the Serper.dev Google Search API.
    You can create a free API key at https://serper.dev.
    To use, you should have the environment variable ``SERPER_API_KEY``
    set with your API key, or pass `serper_api_key` as a named parameter
    to the constructor.
    Example:
        .. code-block:: python
            from langchain import GoogleSerperAPIWrapper
            google_serper = GoogleSerperAPIWrapper()
    """

    def __init__(self, snippet_cnt=10, language="en") -> None:
        self.k = snippet_cnt
        self.gl = "us"
        self.hl = language
        self.serper_api_key = os.environ.get("SERPER_API_KEY", None)
        assert (
            self.serper_api_key is not None
        ), "Please set the SERPER_API_KEY environment variable."
        assert (
            self.serper_api_key != ""
        ), "Please set the SERPER_API_KEY environment variable."

    async def _google_serper_search_results(
        self, session, search_term: str, gl: str, hl: str
    ) -> dict:
        headers = {
            "X-API-KEY": self.serper_api_key or "",
            "Content-Type": "application/json",
        }
        params = {"q": search_term, "gl": gl, "hl": hl}
        async with session.post(
            "https://google.serper.dev/search",
            headers=headers,
            params=params,
            raise_for_status=True,
        ) as response:
            return await response.json()

    def _parse_results(self, results):
        snippets = []

        if os.environ.get("SAVE_SERPER_COST", "False") == "True":
            SERPER_COST_PATH = os.environ.get("SERPER_COST_PATH", "serper_cost.jsonl")
            if results.get("credits"):
                credits = results.get("credits")
                with open(SERPER_COST_PATH, "a") as f:
                    f.write(json.dumps({"google_serper_credits": credits}) + "\n")

        if results.get("answerBox"):
            answer_box = results.get("answerBox", {})
            if answer_box.get("answer"):
                element = {"content": answer_box.get("answer"), "source": "None"}
                return [element]
            elif answer_box.get("snippet"):
                element = {
                    "content": answer_box.get("snippet").replace("\n", " "),
                    "source": "None",
                }
                return [element]
            elif answer_box.get("snippetHighlighted"):
                element = {
                    "content": answer_box.get("snippetHighlighted"),
                    "source": "None",
                }
                return [element]

        if results.get("knowledgeGraph"):
            kg = results.get("knowledgeGraph", {})
            title = kg.get("title")
            entity_type = kg.get("type")
            if entity_type:
                element = {"content": f"{title}: {entity_type}", "source": "None"}
                snippets.append(element)
            description = kg.get("description")
            if description:
                element = {"content": description, "source": "None"}
                snippets.append(element)
            for attribute, value in kg.get("attributes", {}).items():
                element = {"content": f"{attribute}: {value}", "source": "None"}
                snippets.append(element)

        for result in results["organic"][: self.k]:
            if "snippet" in result:
                element = {"content": result["snippet"], "source": result["link"]}
                snippets.append(element)
            for attribute, value in result.get("attributes", {}).items():
                element = {"content": f"{attribute}: {value}", "source": result["link"]}
                snippets.append(element)

        if len(snippets) == 0:
            element = {
                "content": "No good Google Search Result was found",
                "source": "None",
            }
            return [element]

        # keep only the first k snippets
        snippets = snippets[: int(self.k / 2)]

        return snippets

    async def parallel_searches(self, search_queries, gl, hl):
        async with aiohttp.ClientSession() as session:
            tasks = [
                self._google_serper_search_results(session, query, gl, hl)
                for query in search_queries
            ]
            search_results = await asyncio.gather(*tasks, return_exceptions=True)
            return search_results

    def run(self, queries):
        """Run query through GoogleSearch and parse result."""
        flattened_queries = []

        for sublist in queries:
            if sublist is None:
                sublist = ["None", "None"]
            for item in sublist:
                flattened_queries.append(item)

        # Get results
        results = asyncio.run(
            self.parallel_searches(flattened_queries, gl=self.gl, hl=self.hl)
        )
        snippets_list = []
        for i in range(len(results)):
            snippets_list.append(self._parse_results(results[i]))

        # Flatten the list of snippets
        snippets_split = [
            snippets_list[i] + snippets_list[i + 1]
            for i in range(0, len(snippets_list), 2)
        ]
        return snippets_split