File size: 3,880 Bytes
f5776d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Retriever model for deeplake
"""

from typing import Optional, List, Union, Type
import openai
import dspy
from collections import defaultdict
from dsp.utils import dotdict

try:
    import openai.error

    ERRORS = (
        openai.error.RateLimitError,
        openai.error.ServiceUnavailableError,
        openai.error.APIError,
    )
except Exception:
    ERRORS = (openai.error.RateLimitError, openai.error.APIError)


class DeeplakeRM(dspy.Retrieve):
    
    """
    A retriever module that uses deeplake to return the top passages for a given query.

    Assumes that a Deep Lake Vector Store has been created and populated with the following payload:
        - text: The text of the passage

    Args:
        deeplake_vectorstore_name (str): The name or path of the Deep Lake Vector Store.
        deeplake_client (VectorStore): An instance of the Deep Lake client.
        k (int, optional): The default number of top passages to retrieve. Defaults to 3.

    Examples:
        Below is a code snippet that shows how to use Deep Lake as the default retriver:
        ```python
        from deeplake import VectorStore
        llm = dspy.OpenAI(model="gpt-3.5-turbo")
        deeplake_client = VectorStore
        retriever_model = DeeplakeRM("my_vectorstore_path", deeplake_client=deeplake_client)
        dspy.settings.configure(lm=llm, rm=retriever_model)
        ```

        Below is a code snippet that shows how to use Deep Lake in the forward() function of a module
        ```python
        self.retrieve = DeeplakeRM("my_vectorstore_path", deeplake_client=deeplake_client, k=num_passages)
        ```
    """

    def __init__(
        self,
        deeplake_vectorstore_name: str,
        deeplake_client,
        k: int = 3,
    ):
        try:
          from deeplake import VectorStore
        except ImportError:
          raise ImportError(
              "The 'deeplake' extra is required to use DeepLakeRM. Install it with `pip install dspy-ai[deeplake]`"
          )
        self._deeplake_vectorstore_name = deeplake_vectorstore_name
        self._deeplake_client = deeplake_client

        super().__init__(k=k)

    def embedding_function(self, texts, model="text-embedding-ada-002"):
        if isinstance(texts, str):
            texts = [texts]

        texts = [t.replace("\n", " ") for t in texts]
        return [
            data["embedding"]
            for data in openai.Embedding.create(input=texts, model=model)["data"]
        ]

    def forward(
        self, query_or_queries: Union[str, List[str]], k: Optional[int]
    ) -> dspy.Prediction:
        
        """Search with DeepLake for self.k top passages for query

        Args:
            query_or_queries (Union[str, List[str]]): The query or queries to search for.
            k (Optional[int]): The number of top passages to retrieve. Defaults to self.k.

        Returns:
            dspy.Prediction: An object containing the retrieved passages.
        """
        queries = (
            [query_or_queries]
            if isinstance(query_or_queries, str)
            else query_or_queries
        )
        queries = [q for q in queries if q]  # Filter empty queries

        k = k if k is not None else self.k

        passages = defaultdict(float)
        #deeplake doesn't support batch querying, manually querying each query and storing them
        for query in queries:
            results = self._deeplake_client(
            path=self._deeplake_vectorstore_name,
            embedding_function=self.embedding_function
            ).search(query, k=k)

            for score,text in zip(results.get('score',0.0),results.get('text',"")):
                passages[text] += score

        sorted_passages = sorted(
            passages.items(), key=lambda x: x[1], reverse=True)[:k]
        
        return [dotdict({"long_text": p}) for p, _ in sorted_passages]