File size: 9,939 Bytes
5d3ebd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import logging
from dataclasses import dataclass
from typing import Dict, Optional
from datetime import datetime

logger = logging.getLogger(__name__)

@dataclass
class ModelPricing:
    """Pricing information for Azure OpenAI models."""
    model_name: str
    input_cost_per_1k_tokens: float  # Cost per 1000 input tokens
    output_cost_per_1k_tokens: float  # Cost per 1000 output tokens
    description: str

@dataclass
class TokenUsage:
    """Token usage statistics for a single API call."""
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    model: str
    timestamp: datetime

@dataclass
class CostAnalysis:
    """Cost analysis for document processing."""
    total_input_tokens: int
    total_output_tokens: int
    total_cost: float
    model_breakdown: Dict[str, Dict[str, float]]  # {model: {"input_cost": x, "output_cost": y, "total_cost": z}}
    processing_time: float
    timestamp: datetime

class CostTracker:
    """Tracks token usage and calculates costs for Azure OpenAI API calls."""
    
    # Hardcoded pricing for Azure OpenAI models (current as of 2024)
    # Source: https://azure.microsoft.com/en-us/pricing/details/cognitive-services/openai-service/
    MODEL_PRICING = {
        # Standard model names
        "gpt-4o-mini": ModelPricing(
            model_name="gpt-4o-mini",
            input_cost_per_1k_tokens=0.00015,  # $0.00015 per 1K input tokens
            output_cost_per_1k_tokens=0.0006,  # $0.0006 per 1K output tokens
            description="GPT-4o Mini (O3 Mini)"
        ),
        "gpt-4o": ModelPricing(
            model_name="gpt-4o",
            input_cost_per_1k_tokens=0.0025,  # $0.0025 per 1K input tokens
            output_cost_per_1k_tokens=0.01,   # $0.01 per 1K output tokens
            description="GPT-4o (O4)"
        ),
        "gpt-35-turbo": ModelPricing(
            model_name="gpt-35-turbo",
            input_cost_per_1k_tokens=0.0005,  # $0.0005 per 1K input tokens
            output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens
            description="GPT-3.5 Turbo (O3)"
        ),
        # Azure deployment names (custom names set in Azure)
        "o3-mini": ModelPricing(
            model_name="o3-mini",
            input_cost_per_1k_tokens=0.00015,  # $0.00015 per 1K input tokens
            output_cost_per_1k_tokens=0.0006,  # $0.0006 per 1K output tokens
            description="O3 Mini (GPT-4o Mini)"
        ),
        "o4-mini": ModelPricing(
            model_name="o4-mini",
            input_cost_per_1k_tokens=0.00015,  # $0.00015 per 1K input tokens
            output_cost_per_1k_tokens=0.0006,  # $0.0006 per 1K output tokens
            description="O4 Mini (GPT-4o Mini)"
        ),
        "o3": ModelPricing(
            model_name="o3",
            input_cost_per_1k_tokens=0.0005,  # $0.0005 per 1K input tokens
            output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens
            description="O3 (GPT-3.5 Turbo)"
        ),
        "o4": ModelPricing(
            model_name="o4",
            input_cost_per_1k_tokens=0.0025,  # $0.0025 per 1K input tokens
            output_cost_per_1k_tokens=0.01,   # $0.01 per 1K output tokens
            description="O4 (GPT-4o)"
        ),
        # Alternative model names that might be used in Azure deployments
        "gpt-4o-mini-2024-07-18": ModelPricing(
            model_name="gpt-4o-mini-2024-07-18",
            input_cost_per_1k_tokens=0.00015,  # $0.00015 per 1K input tokens
            output_cost_per_1k_tokens=0.0006,  # $0.0006 per 1K output tokens
            description="GPT-4o Mini (O3 Mini) - Latest"
        ),
        "gpt-4o-2024-05-13": ModelPricing(
            model_name="gpt-4o-2024-05-13",
            input_cost_per_1k_tokens=0.0025,  # $0.0025 per 1K input tokens
            output_cost_per_1k_tokens=0.01,   # $0.01 per 1K output tokens
            description="GPT-4o (O4) - Latest"
        ),
        "gpt-35-turbo-0125": ModelPricing(
            model_name="gpt-35-turbo-0125",
            input_cost_per_1k_tokens=0.0005,  # $0.0005 per 1K input tokens
            output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens
            description="GPT-3.5 Turbo (O3) - Latest"
        ),
    }
    
    def __init__(self):
        self.usage_history: list[TokenUsage] = []
        self.current_session_tokens = 0
        self.current_session_cost = 0.0
    
    def record_usage(self, prompt_tokens: int, completion_tokens: int, model: str) -> TokenUsage:
        """Record token usage from an API call."""
        total_tokens = prompt_tokens + completion_tokens
        usage = TokenUsage(
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens,
            model=model,
            timestamp=datetime.now()
        )
        
        self.usage_history.append(usage)
        self.current_session_tokens += total_tokens
        
        # Calculate cost for this usage
        cost = self._calculate_cost(prompt_tokens, completion_tokens, model)
        self.current_session_cost += cost
        
        logger.info(f"Recorded usage: {prompt_tokens} input + {completion_tokens} output = {total_tokens} total tokens "
                   f"for model {model}, cost: ${cost:.6f}")
        
        return usage
    
    def _calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        """Calculate cost for given token usage and model."""
        if model not in self.MODEL_PRICING:
            logger.warning(f"Unknown model pricing for {model}, using default pricing")
            # Try to guess the model type based on the name
            if "mini" in model.lower():
                # Assume it's a mini model (cheapest)
                model = "o3-mini"
            elif "o4" in model.lower():
                # Assume it's O4 (most expensive)
                model = "o4"
            elif "o3" in model.lower():
                # Assume it's O3 (medium)
                model = "o3"
            else:
                # Default to cheapest option
                model = "o3-mini"
        
        pricing = self.MODEL_PRICING[model]
        
        input_cost = (input_tokens / 1000) * pricing.input_cost_per_1k_tokens
        output_cost = (output_tokens / 1000) * pricing.output_cost_per_1k_tokens
        
        return input_cost + output_cost
    
    def get_session_summary(self) -> Dict[str, any]:
        """Get summary of current session usage."""
        if not self.usage_history:
            return {
                "total_tokens": 0,
                "total_cost": 0.0,
                "model_breakdown": {},
                "usage_count": 0
            }
        
        model_breakdown = {}
        for usage in self.usage_history:
            if usage.model not in model_breakdown:
                model_breakdown[usage.model] = {
                    "input_tokens": 0,
                    "output_tokens": 0,
                    "total_tokens": 0,
                    "cost": 0.0,
                    "usage_count": 0
                }
            
            model_breakdown[usage.model]["input_tokens"] += usage.prompt_tokens
            model_breakdown[usage.model]["output_tokens"] += usage.completion_tokens
            model_breakdown[usage.model]["total_tokens"] += usage.total_tokens
            model_breakdown[usage.model]["usage_count"] += 1
            model_breakdown[usage.model]["cost"] += self._calculate_cost(
                usage.prompt_tokens, usage.completion_tokens, usage.model
            )
        
        return {
            "total_tokens": self.current_session_tokens,
            "total_cost": self.current_session_cost,
            "model_breakdown": model_breakdown,
            "usage_count": len(self.usage_history)
        }
    
    def reset_session(self):
        """Reset current session statistics."""
        self.usage_history = []
        self.current_session_tokens = 0
        self.current_session_cost = 0.0
        logger.info("Cost tracker session reset")
    
    def get_available_models(self) -> list[str]:
        """Get list of available models with pricing."""
        return list(self.MODEL_PRICING.keys())
    
    def get_model_info(self, model: str) -> Optional[ModelPricing]:
        """Get pricing information for a specific model."""
        return self.MODEL_PRICING.get(model)
    
    def add_deployment_pricing(self, deployment_name: str, model_type: str = "o3-mini"):
        """Add pricing for a custom deployment name by mapping it to an existing model type."""
        if deployment_name in self.MODEL_PRICING:
            return  # Already exists
        
        # Map deployment name to existing model pricing
        if model_type in self.MODEL_PRICING:
            base_pricing = self.MODEL_PRICING[model_type]
            self.MODEL_PRICING[deployment_name] = ModelPricing(
                model_name=deployment_name,
                input_cost_per_1k_tokens=base_pricing.input_cost_per_1k_tokens,
                output_cost_per_1k_tokens=base_pricing.output_cost_per_1k_tokens,
                description=f"{deployment_name} ({base_pricing.description})"
            )
            logger.info(f"Added pricing for deployment {deployment_name} based on {model_type}")
        else:
            logger.warning(f"Unknown model type {model_type} for deployment {deployment_name}")
    
    def guess_model_type(self, deployment_name: str) -> str:
        """Guess the model type based on deployment name."""
        deployment_lower = deployment_name.lower()
        if "mini" in deployment_lower:
            return "o3-mini"
        elif "o4" in deployment_lower:
            return "o4"
        elif "o3" in deployment_lower:
            return "o3"
        else:
            return "o3-mini"  # Default to cheapest

# Global cost tracker instance
cost_tracker = CostTracker()