Spaces:
Sleeping
Sleeping
""" | |
Fallback model implementation for testing when llama-cpp-python is not available. | |
This provides a compatible model class that doesn't require any external dependencies, | |
allowing the rest of the application to function while we solve the llama-cpp-python | |
installation issues. | |
""" | |
import os | |
import logging | |
from typing import Dict, List, Optional, Any, Union | |
import requests | |
from smolagents import Model | |
from pathlib import Path | |
# Try to import llama_cpp, but don't fail if not available | |
try: | |
from llama_cpp import Llama | |
from pathlib import Path | |
LLAMA_CPP_AVAILABLE = True | |
except ImportError: | |
LLAMA_CPP_AVAILABLE = False | |
print("llama_cpp module not available, using fallback implementation") | |
logger = logging.getLogger(__name__) | |
class LlamaCppModel(Model): | |
"""Model using llama.cpp Python bindings for efficient local inference without PyTorch. | |
Falls back to a simple text generation if llama_cpp is not available.""" | |
def __init__( | |
self, | |
model_path: str = None, | |
model_url: str = None, | |
n_ctx: int = 2048, | |
n_gpu_layers: int = 0, | |
max_tokens: int = 512, | |
temperature: float = 0.7, | |
verbose: bool = True | |
): | |
""" | |
Initialize a local llama.cpp model or fallback to a simple implementation. | |
Args: | |
model_path: Path to local GGUF model file | |
model_url: URL to download model if model_path doesn't exist | |
n_ctx: Context window size | |
n_gpu_layers: Number of layers to offload to GPU (0 means CPU only) | |
max_tokens: Maximum new tokens to generate | |
temperature: Sampling temperature | |
verbose: Whether to print verbose messages | |
""" | |
super().__init__() | |
self.model_path = model_path | |
self.model_url = model_url | |
self.n_ctx = n_ctx | |
self.max_tokens = max_tokens | |
self.temperature = temperature | |
self.verbose = verbose | |
self.llm = None | |
# Check if we can use llama_cpp | |
if LLAMA_CPP_AVAILABLE: | |
try: | |
if self.verbose: | |
print("Attempting to initialize LlamaCpp model...") | |
# Try to initialize the real model | |
if model_path and os.path.exists(model_path): | |
if self.verbose: | |
print(f"Loading model from {model_path}...") | |
# Initialize the llama-cpp model | |
self.llm = Llama( | |
model_path=model_path, | |
n_ctx=n_ctx, | |
n_gpu_layers=n_gpu_layers, | |
verbose=verbose | |
) | |
if self.verbose: | |
print("LlamaCpp model loaded successfully") | |
else: | |
if self.verbose: | |
print(f"Model path not found or not specified. Using fallback mode.") | |
except Exception as e: | |
logger.error(f"Error initializing LlamaCpp model: {e}") | |
if self.verbose: | |
print(f"Error initializing LlamaCpp model: {e}") | |
self.llm = None | |
else: | |
if self.verbose: | |
print("LlamaCpp not available, using fallback implementation") | |
if not self.llm and self.verbose: | |
print("Using fallback text generation mode") | |
def _resolve_model_path(self, model_path: str = None, model_url: str = None) -> str: | |
""" | |
Resolve model path, downloading if necessary. | |
Returns: | |
Absolute path to model file | |
""" | |
# Default to a small model if none specified | |
if not model_path: | |
models_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models") | |
os.makedirs(models_dir, exist_ok=True) | |
model_path = os.path.join(models_dir, "ggml-model-q4_0.bin") | |
# Convert to Path for easier handling | |
path = Path(model_path) | |
# If model exists, return it | |
if path.exists(): | |
return str(path.absolute()) | |
# Download if URL provided | |
if model_url and not path.exists(): | |
try: | |
print(f"Downloading model from {model_url}...") | |
os.makedirs(path.parent, exist_ok=True) | |
try: | |
# Try with streaming download first | |
with requests.get(model_url, stream=True, timeout=30) as r: | |
r.raise_for_status() | |
total_size = int(r.headers.get('content-length', 0)) | |
block_size = 8192 | |
with open(path, 'wb') as f: | |
downloaded = 0 | |
for chunk in r.iter_content(chunk_size=block_size): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0: | |
percent = (downloaded / total_size) * 100 | |
if percent % 10 < (block_size / total_size) * 100: | |
print(f"Download progress: {int(percent)}%") | |
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: | |
print(f"Streaming download timed out: {e}. Using a simpler approach...") | |
# Fall back to simpler download method | |
r = requests.get(model_url, timeout=60) | |
r.raise_for_status() | |
with open(path, 'wb') as f: | |
f.write(r.content) | |
print("Download complete with simple method") | |
print(f"Model download complete: {path}") | |
return str(path.absolute()) | |
except Exception as e: | |
logger.error(f"Error downloading model: {e}") | |
print(f"Error downloading model: {e}") | |
print("Continuing with dummy model instead...") | |
# Create a small dummy model file so we can continue | |
with open(path, 'wb') as f: | |
f.write(b"DUMMY MODEL") | |
return str(path.absolute()) | |
# If we get here without a model, create a dummy one | |
print(f"Model file not found at {model_path} and no URL provided. Creating dummy model...") | |
os.makedirs(path.parent, exist_ok=True) | |
with open(path, 'wb') as f: | |
f.write(b"DUMMY MODEL") | |
return str(path.absolute()) | |
def generate(self, prompt: str, **kwargs) -> str: | |
""" | |
Generate text completion for the given prompt. | |
Args: | |
prompt: Input text | |
Returns: | |
Generated text completion | |
""" | |
try: | |
if self.verbose: | |
print(f"Generating with prompt: {prompt[:50]}...") | |
# If we have a real model, use it | |
if self.llm: | |
# Actual generation with llama-cpp | |
response = self.llm( | |
prompt=prompt, | |
max_tokens=self.max_tokens, | |
temperature=self.temperature, | |
echo=False # Don't include the prompt in the response | |
) | |
# Extract generated text | |
if not response: | |
return "" | |
if isinstance(response, dict): | |
generated_text = response.get('choices', [{}])[0].get('text', '') | |
else: | |
# List of responses | |
generated_text = response[0].get('text', '') | |
return generated_text.strip() | |
else: | |
# Fallback simple generation | |
if self.verbose: | |
print("Using fallback text generation") | |
# Extract key information from prompt | |
words = prompt.strip().split() | |
last_words = ' '.join(words[-10:]) if len(words) > 10 else prompt | |
# Simple response generation based on prompt content | |
if "?" in prompt: | |
return f"Based on the information provided, I believe the answer is related to {last_words}. This is a fallback response as the LLM model could not be loaded." | |
else: | |
return f"I understand you're asking about {last_words}. Since I'm running in fallback mode without a proper language model, I can only acknowledge your query but not provide a detailed response." | |
except Exception as e: | |
logger.error(f"Error generating text: {e}") | |
if self.verbose: | |
print(f"Error generating text: {e}") | |
return f"Error generating response: {str(e)}" | |
def generate_with_tools( | |
self, | |
messages: List[Dict[str, Any]], | |
tools: Optional[List[Dict[str, Any]]] = None, | |
**kwargs | |
) -> Dict[str, Any]: | |
""" | |
Generate a response with tool-calling capabilities. | |
This method implements the smolagents Model interface for tool-calling. | |
Args: | |
messages: List of message objects with role and content | |
tools: List of tool definitions | |
Returns: | |
Response with message and optional tool calls | |
""" | |
try: | |
# Format messages into a prompt | |
prompt = self._format_messages_to_prompt(messages, tools) | |
# Generate response | |
completion = self.generate(prompt) | |
# For now, just return the text without tool parsing | |
return { | |
"message": { | |
"role": "assistant", | |
"content": completion | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error generating with tools: {e}") | |
print(f"Error generating with tools: {e}") | |
return { | |
"message": { | |
"role": "assistant", | |
"content": f"Error: {str(e)}" | |
} | |
} | |
def _format_messages_to_prompt( | |
self, | |
messages: List[Dict[str, Any]], | |
tools: Optional[List[Dict[str, Any]]] = None | |
) -> str: | |
"""Format chat messages into a text prompt for the model.""" | |
formatted_prompt = "" | |
# Include tool descriptions if available | |
if tools and len(tools) > 0: | |
tool_descriptions = "\n".join([ | |
f"Tool {i+1}: {tool['name']} - {tool['description']}" | |
for i, tool in enumerate(tools) | |
]) | |
formatted_prompt += f"Available tools:\n{tool_descriptions}\n\n" | |
# Add conversation history | |
for msg in messages: | |
role = msg.get("role", "") | |
content = msg.get("content", "") | |
if role == "system": | |
formatted_prompt += f"System: {content}\n\n" | |
elif role == "user": | |
formatted_prompt += f"User: {content}\n\n" | |
elif role == "assistant": | |
formatted_prompt += f"Assistant: {content}\n\n" | |
# Add final prompt for assistant | |
formatted_prompt += "Assistant: " | |
return formatted_prompt |