File size: 17,180 Bytes
b43abc8
 
 
 
 
 
 
 
89f1ad9
b43abc8
 
b2c3fd1
 
b43abc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58b7080
b43abc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c91f164
b43abc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46e8c84
 
35c2f90
b0865c9
 
 
 
 
e9b8926
b0865c9
 
 
e9b8926
da55c03
b43abc8
 
 
 
 
 
 
 
 
46e8c84
 
b43abc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c343f36
 
 
 
 
 
4b70d62
b43abc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import numpy as np
import tiktoken
from typing import List, Tuple
from sklearn.metrics.pairwise import cosine_similarity
from .utils.execute_code import extract_and_run_python_code
from .utils.extractor import extract_answer, extract_cheatsheet
from litellm import completion
from functools import partial
import litellm
import os # Added for SAMBANOVA env vars

litellm._turn_on_debug()

class LanguageModel:
    def __init__(self,
        model_name: str,
    ) -> None:
        """
        LanguageModel class to interact with different language models.

        Arguments:
            model_name : str : The name of the language model to use.

        Raises:
            ValueError : If the model name is not found or supported.
        """

        self.model_name = model_name

        # Known model list (remains the same)
        known_model_list = [
            "openai/gpt-4o-mini", "openai/gpt-4o-mini-2024-07-18",
            "openai/gpt-4o", "openai/gpt-4o-2024-08-06", "openai/gpt-4o-2024-11-20",
            "openai/gpt-3.5-turbo",
            "together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo",
            "meta-llama/Llama-3.3-70B-Instruct-Turbo",
            "openai/o3-mini", "openai/o3-mini-2025-01-31",
            "openai/o1", "openai/o1-2024-12-17",
            "anthropic/claude-3-5-sonnet-latest", "anthropic/claude-3-5-sonnet-20241022",
            "anthropic/claude-3-5-haiku-latest", "anthropic/claude-3-5-haiku-20241022",
            "anthropic/claude-3-7-sonnet-latest", "anthropic/claude-3-7-sonnet-20250219",
            "together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
            "together_ai/deepseek-ai/DeepSeek-R1",
            "together_ai/deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
            "together_ai/deepseek-ai/DeepSeek-R1-Distill-Qwen-14B",
            "together_ai/Qwen/Qwen2.5-Coder-32B-Instruct",
            "together_ai/Qwen/QwQ-32B",
            "together_ai/Qwen/Qwen2-72B-Instruct",
            "together_ai/Qwen/Qwen2.5-7B-Instruct-Turbo",
            "together_ai/Qwen/Qwen2.5-72B-Instruct-Turbo",
            "gemini/gemini-2.0-flash",
            "ollama/llama3:70b",
        ]

        # Load the client for the model based on the model name
        if self.model_name.startswith("sambanova/"):
            samba_api_key = os.environ.get("SAMBANOVA_API_KEY")
            samba_base_url = os.environ.get("SAMBANOVA_BASE_URL", "https://api.sambanova.ai/v1") # Default if not set
            if not samba_api_key:
                raise ValueError("SAMBANOVA_API_KEY environment variable not set for SambaNova model.")
            # For SambaNova (OpenAI compatible), explicitly pass api_key and api_base
            # The model name for litellm should be just the model identifier, not the full "samba/" prefix if api_base is provided.
            # However, litellm docs suggest that for OpenAI compatible endpoints, the model name passed to `completion`
            # should be what the endpoint expects. The `model` param in `partial` here is the one sent in the request body.
            # The `custom_llm_provider` in litellm is another way, but direct params are simpler for OpenAI compatibility.
            # Let's try keeping the model name as is (e.g. "samba/DeepSeek-R1-Distill-Llama-70B")
            # and provide api_key and api_base. LiteLLM should use these for any model if provided.
            # If this doesn't work, the model name might need to be stripped of "samba/" if api_base is set.
            # According to LiteLLM docs, for custom OpenAI-compatible endpoints, you can pass `base_url` and `api_key`.
            # The `model` parameter to `litellm.completion` will be the actual model ID the endpoint expects.
            # The `self.model_name` here is e.g. "samba/DeepSeek-R1-Distill-Llama-70B".
            # We need to ensure the `model` argument to `completion` is just "DeepSeek-R1-Distill-Llama-70B"
            # and set `custom_llm_provider="openai"` along with `api_base` and `api_key`.
            # Or, if SambaNova is a recognized provider by a different name in litellm, use that.
            # Given the error, litellm is not recognizing "samba/" as a provider directly.
            # The simp            actual_model_name = self.model_name.split("samba/", 1)[1] if "samba/" in self.model_name else self.model_name
            actual_model_name = self.model_name.split("sambanova/", 1)[1] if "sambanova/" in self.model_name else self.model_name
            self.client = partial(completion,
                                  model=actual_model_name,
                                  api_key=samba_api_key,
                                  api_base=samba_base_url,
                                  custom_llm_provider="openai"
                                 )
            print(f"Initialized SambaNova model '{actual_model_name}' via custom OpenAI provider settings with api_base: {samba_base_url}")
        elif self.model_name in known_model_list:
            self.client = partial(completion, model=self.model_name)
        else:
            print(f"Warning: Model '{self.model_name}' not in explicit list and does not start with recognized prefixes. Attempting to initialize with litellm directly.")
            try:
                self.client = partial(completion, model=self.model_name)
                print(f"Successfully initialized model '{self.model_name}' via litellm fallback.")
            except Exception as e:                raise ValueError(f"Model '{self.model_name}' is not in the known list, does not start with recognized prefixes, and could not be initialized by litellm directly: {{e}}")
        self.gpt4Tokenizer = tiktoken.encoding_for_model("gpt-4o")    

    def count_tokens(self, text: str) -> int:
        """
        Count the number of tokens in the text.
        """
        tokens = self.gpt4Tokenizer.encode(text)
        return len(tokens)

    def generate(self,
        history: List[str],
        temperature: float = 0.1,
        max_tokens: int = 2048,
        current_depth: int = 1,
        max_depth_num_rounds: int = 3,
        allow_code_execution: bool = True,
        code_execution_flag: str = "EXECUTE CODE!",
        final_output: str = ""
    ) -> str:
        """
        Generate a response from the language model.
        """
        if len(history) == 0:
            raise ValueError("History must contain at least one message.")

        print('history\n', history)
        # from litellm import num_tokens_from_messages
        # tokens_num = num_tokens_from_messages(
        #     messages=history,
        #     model=self.model_name
        # )
        try:
            token_count = litellm.token_counter(model_name=self.model_name, messages=history)
            print(f"DEBUG: litellm token_counter for '{model_name}' estimates: {token_count} tokens")
        except Exception as e:
            print(f"DEBUG: Error using litellm.token_counter: {e}")

        
        # The self.client is already a partial function with model, api_key, base_url, etc., pre-filled for SambaNova
        response = self.client(
            messages=history,
            # model=self.model_name, # This is now part of the partial self.client for SambaNova
            temperature=temperature,
            max_tokens=max_tokens, # litellm uses max_tokens or max_completion_tokens
        )
        output = response.choices[0].message.content # Corrected access to content

        print('output\n', output)

        pre_code_execution_flag = output.split(code_execution_flag)[0].strip()
        if allow_code_execution and code_execution_flag in output and pre_code_execution_flag.endswith("```"):
            output_prefix = output.split(code_execution_flag)[0].strip()
            executed_code = extract_and_run_python_code(output_prefix)
            executed_code = executed_code.strip()
            current_output = f"{output_prefix}\n{code_execution_flag}\n\n{executed_code}"
            final_output = f"{final_output}\n\n{current_output}".strip()

            if current_depth <= max_depth_num_rounds:
                warning_txt = ""
                if current_depth == max_depth_num_rounds:
                    warning_txt = f" (This is the last round. No more code execution will be allowed. Please present your final solution now.)"
                new_messages = [
                    {"role": "assistant", "content": current_output},
                    {"role": "user", "content": f"Proceed with any additional steps required and provide the completed solution. If everything is already complete, type FINAL ANSWER and submit it in the expected format. If you are stuck, please try alternative methods to solve the problem and provide the final solution.{warning_txt}"}
                ]
                history += new_messages
                return self.generate(
                    history=history,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    current_depth=current_depth+1,
                    max_depth_num_rounds=max_depth_num_rounds,
                    allow_code_execution=allow_code_execution,
                    code_execution_flag=code_execution_flag,
                    final_output=final_output,
                )
            else:
                return final_output
        else:
            final_output = f"{final_output}\n\n{output}".strip()
            return final_output

    def advanced_generate(self,
        approach_name: str,
        input_txt: str,
        cheatsheet: str = None,
        generator_template: str = None,
        cheatsheet_template: str = None,
        temperature: float = 0.0,
        max_tokens: int = 2048,
        max_num_rounds: int = 1,
        allow_code_execution: bool = True,
        code_execution_flag: str = "EXECUTE CODE!",
        add_previous_answers_to_cheatsheet: bool = True,
        original_input_corpus: List[str] = None,
        original_input_embeddings: np.ndarray = None,
        generator_outputs_so_far: List[str] = None,
        retrieve_top_k: int = 3,
    ) -> dict:
        """
        Generate a response from the language model.
        Returns dict instead of Tuple for clarity.
        """

        if approach_name == "default":
            generator_prompt = generator_template.replace("[[QUESTION]]", input_txt).replace("[[CHEATSHEET]]", "(empty)")
            generator_history = [
                {"role": "user", "content": generator_prompt},
            ]
            generator_output = self.generate(
                history=generator_history,
                temperature=temperature,
                max_tokens=max_tokens,
                allow_code_execution=allow_code_execution,
                code_execution_flag=code_execution_flag,
            )
            generator_answer = extract_answer(generator_output)
            return {
                "input_txt": input_txt,
                "steps": [
                    {
                        "round": 0,
                        "generator_prompt": generator_prompt,
                        "generator_output": generator_output,
                        "generator_answer": generator_answer,
                        "current_cheatsheet": None,
                        "new_cheatsheet": None,
                    }
                ],
                "previous_answers": None,
                "final_answer": generator_answer,
                "final_output": generator_output,
                "final_cheatsheet": None,
            }
        
        elif approach_name == "DynamicCheatsheet_Cumulative":
            if cheatsheet is None:
                raise ValueError("Cheatsheet must be provided for DynamicCheatsheet_Cumulative approach.")
            if generator_template is None or cheatsheet_template is None:
                raise ValueError("Generator and Cheatsheet templates must be provided for DynamicCheatsheet_Cumulative approach.")
            
            steps = []
            previous_answers = []
            current_cheatsheet_in_round = cheatsheet # Use a local var for the loop

            for round_num in range(max(1, max_num_rounds)):
                generator_cheatsheet_content = current_cheatsheet_in_round
                if round_num > 0 and add_previous_answers_to_cheatsheet and previous_answers:
                    previous_answers_txt = f"PREVIOUS ANSWERS:\n{'; '.join(previous_answers)}"
                    generator_cheatsheet_content = f"{generator_cheatsheet_content}\n\n{previous_answers_txt}"

                generator_prompt = generator_template.replace("[[QUESTION]]", input_txt).replace("[[CHEATSHEET]]", generator_cheatsheet_content)
                
                generator_history = [{"role": "user", "content": generator_prompt}]
                generator_output = self.generate(
                    history=generator_history,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    allow_code_execution=allow_code_execution,
                    code_execution_flag=code_execution_flag,
                )
                generator_answer = extract_answer(generator_output)

                cheatsheet_prompt = cheatsheet_template.replace("[[QUESTION]]", input_txt).replace("[[MODEL_ANSWER]]", generator_output).replace("[[PREVIOUS_CHEATSHEET]]", current_cheatsheet_in_round)
                cheatsheet_history = [{"role": "user", "content": cheatsheet_prompt}]
                # Pass explicit provider details for curator model if it's also SambaNova
                # Assuming curator uses the same model instance for now.
                cheatsheet_model_output = self.generate(
                    history=cheatsheet_history,
                    temperature=temperature,
                    max_tokens=2*max_tokens, # As per original
                    allow_code_execution=False,
                )
                new_cheatsheet = extract_cheatsheet(response=cheatsheet_model_output, old_cheatsheet=current_cheatsheet_in_round)
                
                steps.append({
                    "round": round_num,
                    "generator_prompt": generator_prompt,
                    "generator_output": generator_output,
                    "generator_answer": generator_answer,
                    "current_cheatsheet": current_cheatsheet_in_round,
                    "new_cheatsheet": new_cheatsheet,
                })
                current_cheatsheet_in_round = new_cheatsheet # Update for next potential round
                if generator_answer:
                    previous_answers.append(f"Round {round_num+1}: {generator_answer}")

            print("input_txt", input_txt)
            print("steps", steps)
            print("previous_answers", previous_answers)
            print("final_answer", generator_answer)
            print("final_cheatsheet", current_cheatsheet_in_round)
            print("final_output", generator_output)
                
            return {
                "input_txt": input_txt,
                "steps": steps,
                "previous_answers": previous_answers,
                "final_answer": generator_answer, # Answer from the last round
                "final_cheatsheet": current_cheatsheet_in_round, # Cheatsheet from the last round
                "final_output": generator_output, # Full output from the last generator call
            }
        elif approach_name == "FullHistoryAppending":
            length_of_history = len(generator_outputs_so_far) if generator_outputs_so_far else 0
            curated_cheatsheet = "(empty)"
            if length_of_history > 0 and original_input_corpus and generator_outputs_so_far:
                curated_cheatsheet = "### PREVIOUS SOLUTIONS (START)\n\n"
                for i, (prev_input, prev_output) in enumerate(zip(original_input_corpus[:length_of_history], generator_outputs_so_far[:length_of_history])):
                    curated_cheatsheet += f"#### Previous Input #{i+1}:\n\n{prev_input}\n\n#### Model Solution to Previous Input #{i+1}:\n\n{prev_output}\n---\n---\n\n"
                curated_cheatsheet += "#### PREVIOUS SOLUTIONS (END)"
            
            generator_prompt = generator_template.replace("[[QUESTION]]", input_txt).replace("[[CHEATSHEET]]", curated_cheatsheet)
            generator_history = [{"role": "user", "content": generator_prompt}]
            generator_output = self.generate(
                    history=generator_history,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    allow_code_execution=allow_code_execution,
                    code_execution_flag=code_execution_flag,
                )
            generator_answer = extract_answer(generator_output)
            return {
                "input_txt": input_txt,
                "steps": [], 
                "previous_answers": [],
                "final_answer": generator_answer,
                "final_cheatsheet": curated_cheatsheet, 
                "final_output": generator_output,
            }
        else:
            raise ValueError(f"Unknown approach_name: {approach_name}")