File size: 11,774 Bytes
fc78ae4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
"""
Fallback model implementation for testing when llama-cpp-python is not available.

This provides a compatible model class that doesn't require any external dependencies,
allowing the rest of the application to function while we solve the llama-cpp-python
installation issues.
"""

import os
import logging
from typing import Dict, List, Optional, Any, Union
import requests
from smolagents import Model
from pathlib import Path

# Try to import llama_cpp, but don't fail if not available
try:
    from llama_cpp import Llama
    from pathlib import Path
    LLAMA_CPP_AVAILABLE = True
except ImportError:
    LLAMA_CPP_AVAILABLE = False
    print("llama_cpp module not available, using fallback implementation")

logger = logging.getLogger(__name__)

class LlamaCppModel(Model):
    """Model using llama.cpp Python bindings for efficient local inference without PyTorch.
    Falls back to a simple text generation if llama_cpp is not available."""
    def __init__(
        self, 
        model_path: str = None,
        model_url: str = None,
        n_ctx: int = 2048,
        n_gpu_layers: int = 0,
        max_tokens: int = 512,
        temperature: float = 0.7,
        verbose: bool = True
    ):
        """
        Initialize a local llama.cpp model or fallback to a simple implementation.
        
        Args:
            model_path: Path to local GGUF model file
            model_url: URL to download model if model_path doesn't exist
            n_ctx: Context window size
            n_gpu_layers: Number of layers to offload to GPU (0 means CPU only)
            max_tokens: Maximum new tokens to generate
            temperature: Sampling temperature
            verbose: Whether to print verbose messages
        """
        super().__init__()
        
        self.model_path = model_path
        self.model_url = model_url
        self.n_ctx = n_ctx
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.verbose = verbose
        self.llm = None
        
        # Check if we can use llama_cpp
        if LLAMA_CPP_AVAILABLE:
            try:
                if self.verbose:
                    print("Attempting to initialize LlamaCpp model...")
                
                # Try to initialize the real model
                if model_path and os.path.exists(model_path):
                    if self.verbose:
                        print(f"Loading model from {model_path}...")
                    
                    # Initialize the llama-cpp model
                    self.llm = Llama(
                        model_path=model_path,
                        n_ctx=n_ctx,
                        n_gpu_layers=n_gpu_layers,
                        verbose=verbose
                    )
                    
                    if self.verbose:
                        print("LlamaCpp model loaded successfully")
                else:
                    if self.verbose:
                        print(f"Model path not found or not specified. Using fallback mode.")
            except Exception as e:
                logger.error(f"Error initializing LlamaCpp model: {e}")
                if self.verbose:
                    print(f"Error initializing LlamaCpp model: {e}")
                self.llm = None
        else:
            if self.verbose:
                print("LlamaCpp not available, using fallback implementation")
                
        if not self.llm and self.verbose:
            print("Using fallback text generation mode")
    
    def _resolve_model_path(self, model_path: str = None, model_url: str = None) -> str:
        """
        Resolve model path, downloading if necessary.
        
        Returns:
            Absolute path to model file
        """
        # Default to a small model if none specified
        if not model_path:
            models_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models")
            os.makedirs(models_dir, exist_ok=True)
            model_path = os.path.join(models_dir, "ggml-model-q4_0.bin")
        
        # Convert to Path for easier handling
        path = Path(model_path)
        
        # If model exists, return it
        if path.exists():
            return str(path.absolute())
        
        # Download if URL provided
        if model_url and not path.exists():
            try:
                print(f"Downloading model from {model_url}...")
                os.makedirs(path.parent, exist_ok=True)
                
                try:
                    # Try with streaming download first
                    with requests.get(model_url, stream=True, timeout=30) as r:
                        r.raise_for_status()
                        total_size = int(r.headers.get('content-length', 0))
                        block_size = 8192
                        
                        with open(path, 'wb') as f:
                            downloaded = 0
                            for chunk in r.iter_content(chunk_size=block_size):
                                if chunk:
                                    f.write(chunk)
                                    downloaded += len(chunk)
                                    if total_size > 0:
                                        percent = (downloaded / total_size) * 100
                                        if percent % 10 < (block_size / total_size) * 100:
                                            print(f"Download progress: {int(percent)}%")
                except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
                    print(f"Streaming download timed out: {e}. Using a simpler approach...")
                    # Fall back to simpler download method
                    r = requests.get(model_url, timeout=60)
                    r.raise_for_status()
                    with open(path, 'wb') as f:
                        f.write(r.content)
                        print("Download complete with simple method")
                
                print(f"Model download complete: {path}")
                return str(path.absolute())
            except Exception as e:
                logger.error(f"Error downloading model: {e}")
                print(f"Error downloading model: {e}")
                print("Continuing with dummy model instead...")
                # Create a small dummy model file so we can continue
                with open(path, 'wb') as f:
                    f.write(b"DUMMY MODEL")
                return str(path.absolute())
        
        # If we get here without a model, create a dummy one
        print(f"Model file not found at {model_path} and no URL provided. Creating dummy model...")
        os.makedirs(path.parent, exist_ok=True)
        with open(path, 'wb') as f:
            f.write(b"DUMMY MODEL")
        return str(path.absolute())
    
    def generate(self, prompt: str, **kwargs) -> str:
        """
        Generate text completion for the given prompt.
        
        Args:
            prompt: Input text
            
        Returns:
            Generated text completion
        """
        try:
            if self.verbose:
                print(f"Generating with prompt: {prompt[:50]}...")
            
            # If we have a real model, use it
            if self.llm:
                # Actual generation with llama-cpp
                response = self.llm(
                    prompt=prompt,
                    max_tokens=self.max_tokens,
                    temperature=self.temperature,
                    echo=False  # Don't include the prompt in the response
                )
                
                # Extract generated text
                if not response:
                    return ""
                    
                if isinstance(response, dict):
                    generated_text = response.get('choices', [{}])[0].get('text', '')
                else:
                    # List of responses
                    generated_text = response[0].get('text', '')
                
                return generated_text.strip()
            else:
                # Fallback simple generation
                if self.verbose:
                    print("Using fallback text generation")
                
                # Extract key information from prompt
                words = prompt.strip().split()
                last_words = ' '.join(words[-10:]) if len(words) > 10 else prompt
                
                # Simple response generation based on prompt content
                if "?" in prompt:
                    return f"Based on the information provided, I believe the answer is related to {last_words}. This is a fallback response as the LLM model could not be loaded."
                else:
                    return f"I understand you're asking about {last_words}. Since I'm running in fallback mode without a proper language model, I can only acknowledge your query but not provide a detailed response."
            
        except Exception as e:
            logger.error(f"Error generating text: {e}")
            if self.verbose:
                print(f"Error generating text: {e}")
            return f"Error generating response: {str(e)}"
    
    def generate_with_tools(
        self, 
        messages: List[Dict[str, Any]], 
        tools: Optional[List[Dict[str, Any]]] = None,
        **kwargs
    ) -> Dict[str, Any]:
        """
        Generate a response with tool-calling capabilities.
        This method implements the smolagents Model interface for tool-calling.
        
        Args:
            messages: List of message objects with role and content
            tools: List of tool definitions
            
        Returns:
            Response with message and optional tool calls
        """
        try:
            # Format messages into a prompt
            prompt = self._format_messages_to_prompt(messages, tools)
            
            # Generate response
            completion = self.generate(prompt)
            
            # For now, just return the text without tool parsing
            return {
                "message": {
                    "role": "assistant",
                    "content": completion
                }
            }
        except Exception as e:
            logger.error(f"Error generating with tools: {e}")
            print(f"Error generating with tools: {e}")
            return {
                "message": {
                    "role": "assistant",
                    "content": f"Error: {str(e)}"
                }
            }
    
    def _format_messages_to_prompt(
        self, 
        messages: List[Dict[str, Any]], 
        tools: Optional[List[Dict[str, Any]]] = None
    ) -> str:
        """Format chat messages into a text prompt for the model."""
        formatted_prompt = ""
        
        # Include tool descriptions if available
        if tools and len(tools) > 0:
            tool_descriptions = "\n".join([
                f"Tool {i+1}: {tool['name']} - {tool['description']}"
                for i, tool in enumerate(tools)
            ])
            formatted_prompt += f"Available tools:\n{tool_descriptions}\n\n"
        
        # Add conversation history
        for msg in messages:
            role = msg.get("role", "")
            content = msg.get("content", "")
            
            if role == "system":
                formatted_prompt += f"System: {content}\n\n"
            elif role == "user":
                formatted_prompt += f"User: {content}\n\n"
            elif role == "assistant":
                formatted_prompt += f"Assistant: {content}\n\n"
        
        # Add final prompt for assistant
        formatted_prompt += "Assistant: "
        
        return formatted_prompt