Spaces:
Paused
Paused
| """ | |
| Flare – API Executor (v2.0 · session-aware token management) | |
| """ | |
| from __future__ import annotations | |
| import json, re, time, requests | |
| from typing import Any, Dict, Optional, Union | |
| from logger import log_info, log_error, log_warning, log_debug, LogTimer | |
| from config_provider import ConfigProvider, APIConfig | |
| from session import Session | |
| import os | |
| MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10MB | |
| DEFAULT_TIMEOUT = int(os.getenv("API_TIMEOUT_SECONDS", "30")) | |
| _placeholder = re.compile(r"\{\{\s*([^\}]+?)\s*\}\}") | |
| def _get_variable_value(session: Session, var_path: str) -> Any: | |
| cfg = ConfigProvider.get() | |
| """Get variable value with proper type from session""" | |
| if var_path.startswith("variables."): | |
| var_name = var_path.split(".", 1)[1] | |
| return session.variables.get(var_name) | |
| elif var_path.startswith("auth_tokens."): | |
| parts = var_path.split(".") | |
| if len(parts) >= 3: | |
| token_api = parts[1] | |
| token_field = parts[2] | |
| token_data = session._auth_tokens.get(token_api, {}) | |
| return token_data.get(token_field) | |
| elif var_path.startswith("config."): | |
| attr_name = var_path.split(".", 1)[1] | |
| return getattr(cfg.global_config, attr_name, None) | |
| return None | |
| def _render_value(value: Any) -> Union[str, int, float, bool, None]: | |
| """Convert value to appropriate JSON type""" | |
| if value is None: | |
| return None | |
| elif isinstance(value, bool): | |
| return value | |
| elif isinstance(value, (int, float)): | |
| return value | |
| elif isinstance(value, str): | |
| # Check if it's a number string | |
| if value.isdigit(): | |
| return int(value) | |
| try: | |
| return float(value) | |
| except ValueError: | |
| pass | |
| # Check if it's a boolean string | |
| if value.lower() in ('true', 'false'): | |
| return value.lower() == 'true' | |
| # Return as string | |
| return value | |
| else: | |
| return str(value) | |
| def _render_json(obj: Any, session: Session, api_name: str) -> Any: | |
| """Render JSON preserving types""" | |
| if isinstance(obj, str): | |
| # Check if entire string is a template | |
| template_match = _placeholder.fullmatch(obj.strip()) | |
| if template_match: | |
| # This is a pure template like {{variables.pnr}} | |
| var_path = template_match.group(1).strip() | |
| value = _get_variable_value(session, var_path) | |
| return _render_value(value) | |
| else: | |
| # String with embedded templates or regular string | |
| def replacer(match): | |
| var_path = match.group(1).strip() | |
| value = _get_variable_value(session, var_path) | |
| return str(value) if value is not None else "" | |
| return _placeholder.sub(replacer, obj) | |
| elif isinstance(obj, dict): | |
| return {k: _render_json(v, session, api_name) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [_render_json(v, session, api_name) for v in obj] | |
| else: | |
| # Return as-is for numbers, booleans, None | |
| return obj | |
| def _render(obj: Any, session: Session, api_name: str) -> Any: | |
| """Render template with session variables and tokens""" | |
| # For headers and other string-only contexts | |
| if isinstance(obj, str): | |
| def replacer(match): | |
| var_path = match.group(1).strip() | |
| value = _get_variable_value(session, var_path) | |
| return str(value) if value is not None else "" | |
| return _placeholder.sub(replacer, obj) | |
| elif isinstance(obj, dict): | |
| return {k: _render(v, session, api_name) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [_render(v, session, api_name) for v in obj] | |
| return obj | |
| def _fetch_token(api: APIConfig, session: Session) -> None: | |
| """Fetch new auth token""" | |
| if not api.auth or not api.auth.enabled: | |
| return | |
| log_info(f"🔑 Fetching token for {api.name}") | |
| try: | |
| # Use _render_json for body to preserve types | |
| body = _render_json(api.auth.token_request_body, session, api.name) | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post( | |
| str(api.auth.token_endpoint), | |
| json=body, | |
| headers=headers, | |
| timeout=api.timeout_seconds | |
| ) | |
| response.raise_for_status() | |
| json_data = response.json() | |
| # Extract token using path | |
| token = json_data | |
| for path_part in api.auth.response_token_path.split("."): | |
| token = token.get(path_part) | |
| if token is None: | |
| raise ValueError(f"Token path {api.auth.response_token_path} not found in response") | |
| # Store in session | |
| session._auth_tokens[api.name] = { | |
| "token": token, | |
| "expiry": time.time() + 3500, # ~1 hour | |
| "refresh_token": json_data.get("refresh_token") | |
| } | |
| log_info(f"✅ Token obtained for {api.name}") | |
| except Exception as e: | |
| log_error(f"❌ Token fetch failed for {api.name}", e) | |
| raise | |
| def _refresh_token(api: APIConfig, session: Session) -> bool: | |
| """Refresh existing token""" | |
| if not api.auth or not api.auth.token_refresh_endpoint: | |
| return False | |
| token_info = session._auth_tokens.get(api.name, {}) | |
| if not token_info.get("refresh_token"): | |
| return False | |
| log_info(f"🔄 Refreshing token for {api.name}") | |
| try: | |
| body = _render_json(api.auth.token_refresh_body or {}, session, api.name) | |
| body["refresh_token"] = token_info["refresh_token"] | |
| response = requests.post( | |
| str(api.auth.token_refresh_endpoint), | |
| json=body, | |
| timeout=api.timeout_seconds | |
| ) | |
| response.raise_for_status() | |
| json_data = response.json() | |
| # Extract new token | |
| token = json_data | |
| for path_part in api.auth.response_token_path.split("."): | |
| token = token.get(path_part) | |
| if token is None: | |
| raise ValueError(f"Token path {api.auth.response_token_path} not found in refresh response") | |
| # Update session | |
| session._auth_tokens[api.name] = { | |
| "token": token, | |
| "expiry": time.time() + 3500, | |
| "refresh_token": json_data.get("refresh_token", token_info["refresh_token"]) | |
| } | |
| log_info(f"✅ Token refreshed for {api.name}") | |
| return True | |
| except Exception as e: | |
| log_error(f"❌ Token refresh failed for {api.name}", e) | |
| return False | |
| def _ensure_token(api: APIConfig, session: Session) -> None: | |
| """Ensure valid token exists for API""" | |
| if not api.auth or not api.auth.enabled: | |
| return | |
| token_info = session._auth_tokens.get(api.name) | |
| # No token yet | |
| if not token_info: | |
| _fetch_token(api, session) | |
| return | |
| # Token still valid | |
| if token_info.get("expiry", 0) > time.time(): | |
| return | |
| # Try refresh first | |
| if _refresh_token(api, session): | |
| return | |
| # Refresh failed, get new token | |
| _fetch_token(api, session) | |
| def call_api(api: APIConfig, session: Session) -> requests.Response: | |
| """Execute API call with automatic token management and better error handling""" | |
| # Ensure valid token | |
| _ensure_token(api, session) | |
| # Prepare request | |
| headers = _render(api.headers, session, api.name) | |
| body = _render_json(api.body_template, session, api.name) | |
| # Get timeout with fallback | |
| timeout = api.timeout_seconds if api.timeout_seconds else DEFAULT_TIMEOUT | |
| # Handle proxy | |
| proxies = None | |
| if api.proxy: | |
| if isinstance(api.proxy, str): | |
| proxies = {"http": api.proxy, "https": api.proxy} | |
| elif hasattr(api.proxy, "enabled") and api.proxy.enabled: | |
| proxy_url = str(api.proxy.url) | |
| proxies = {"http": proxy_url, "https": proxy_url} | |
| # Prepare request parameters | |
| request_params = { | |
| "method": api.method, | |
| "url": str(api.url), | |
| "headers": headers, | |
| "timeout": timeout, # Use configured timeout | |
| "stream": True # Enable streaming for large responses | |
| } | |
| # Add body based on method | |
| if api.method in ("POST", "PUT", "PATCH"): | |
| request_params["json"] = body | |
| elif api.method == "GET" and body: | |
| request_params["params"] = body | |
| if proxies: | |
| request_params["proxies"] = proxies | |
| # Execute with retry | |
| retry_count = api.retry.retry_count if api.retry else 0 | |
| last_error = None | |
| response = None | |
| for attempt in range(retry_count + 1): | |
| try: | |
| # Use LogTimer for performance tracking | |
| with LogTimer(f"API call {api.name}", attempt=attempt + 1): | |
| log_info( | |
| f"🌐 API call starting", | |
| api=api.name, | |
| method=api.method, | |
| url=api.url, | |
| attempt=f"{attempt + 1}/{retry_count + 1}", | |
| timeout=timeout | |
| ) | |
| if body: | |
| log_debug(f"📋 Request body", body=json.dumps(body, ensure_ascii=False)[:500]) | |
| # Make request with streaming | |
| response = requests.request(**request_params) | |
| # Check response size from headers | |
| content_length = response.headers.get('content-length') | |
| if content_length and int(content_length) > MAX_RESPONSE_SIZE: | |
| response.close() | |
| raise ValueError(f"Response too large: {int(content_length)} bytes (max: {MAX_RESPONSE_SIZE})") | |
| # Handle 401 Unauthorized | |
| if response.status_code == 401 and api.auth and api.auth.enabled and attempt < retry_count: | |
| log_warning(f"🔒 Got 401, refreshing token", api=api.name) | |
| _fetch_token(api, session) | |
| headers = _render(api.headers, session, api.name) | |
| request_params["headers"] = headers | |
| response.close() | |
| continue | |
| # Read response with size limit | |
| content_size = 0 | |
| chunks = [] | |
| for chunk in response.iter_content(chunk_size=8192): | |
| chunks.append(chunk) | |
| content_size += len(chunk) | |
| if content_size > MAX_RESPONSE_SIZE: | |
| response.close() | |
| raise ValueError(f"Response exceeded size limit: {content_size} bytes") | |
| # Reconstruct response content | |
| response._content = b''.join(chunks) | |
| response._content_consumed = True | |
| # Check status | |
| response.raise_for_status() | |
| log_info( | |
| f"✅ API call successful", | |
| api=api.name, | |
| status_code=response.status_code, | |
| response_size=content_size, | |
| duration_ms=f"{response.elapsed.total_seconds() * 1000:.2f}" | |
| ) | |
| # Mevcut response mapping işlemi korunacak | |
| if response.status_code in (200, 201, 202, 204) and hasattr(api, 'response_mappings') and api.response_mappings: | |
| try: | |
| if response.status_code != 204 and response.content: | |
| response_json = response.json() | |
| for mapping in api.response_mappings: | |
| var_name = mapping.get('variable_name') | |
| var_type = mapping.get('type', 'str') | |
| json_path = mapping.get('json_path') | |
| if not all([var_name, json_path]): | |
| continue | |
| # JSON path'ten değeri al | |
| value = response_json | |
| for path_part in json_path.split('.'): | |
| if isinstance(value, dict): | |
| value = value.get(path_part) | |
| if value is None: | |
| break | |
| if value is not None: | |
| # Type conversion | |
| if var_type == 'int': | |
| value = int(value) | |
| elif var_type == 'float': | |
| value = float(value) | |
| elif var_type == 'bool': | |
| value = bool(value) | |
| elif var_type == 'date': | |
| value = str(value) | |
| else: # str | |
| value = str(value) | |
| # Session'a kaydet | |
| session.variables[var_name] = value | |
| log_info(f"📝 Mapped response", variable=var_name, value=value) | |
| except Exception as e: | |
| log_error("⚠️ Response mapping error", error=str(e)) | |
| return response | |
| except requests.exceptions.Timeout as e: | |
| last_error = e | |
| log_warning( | |
| f"⏱️ API timeout", | |
| api=api.name, | |
| attempt=attempt + 1, | |
| timeout=timeout | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| last_error = e | |
| log_error( | |
| f"❌ API request error", | |
| api=api.name, | |
| error=str(e), | |
| attempt=attempt + 1 | |
| ) | |
| except ValueError as e: # Size limit exceeded | |
| log_error( | |
| f"❌ Response size error", | |
| api=api.name, | |
| error=str(e) | |
| ) | |
| raise # Don't retry for size errors | |
| except Exception as e: | |
| last_error = e | |
| log_error( | |
| f"❌ Unexpected API error", | |
| api=api.name, | |
| error=str(e), | |
| attempt=attempt + 1 | |
| ) | |
| # Retry backoff | |
| if attempt < retry_count: | |
| backoff = api.retry.backoff_seconds if api.retry else 2 | |
| if api.retry and api.retry.strategy == "exponential": | |
| backoff = backoff * (2 ** attempt) | |
| log_info(f"⏳ Retry backoff", wait_seconds=backoff, next_attempt=attempt + 2) | |
| time.sleep(backoff) | |
| # All retries failed | |
| error_msg = f"API call failed after {retry_count + 1} attempts" | |
| log_error(error_msg, api=api.name, last_error=str(last_error)) | |
| if last_error: | |
| raise last_error | |
| raise requests.exceptions.RequestException(error_msg) | |
| def format_size(size_bytes: int) -> str: | |
| """Format bytes to human readable format""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.2f} {unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.2f} TB" |