John-Jiang's picture
init commit
5301c48
import json
import re
import signal
from typing import Any, Dict, List, Optional
from starfish.common.exceptions import JsonParserError, SchemaValidationError
from starfish.common.logger import get_logger
logger = get_logger(__name__)
# Maximum time (in seconds) to allow for JSON parsing operations
DEFAULT_PARSING_TIMEOUT = 1.0
class TimeoutError(Exception):
"""Exception raised when a parsing operation times out."""
pass
class JSONParser:
"""Handles parsing and validation of JSON data against schemas.
Provides utilities for JSON schema generation and formatting.
"""
@staticmethod
def _extract_json_from_text(text: str) -> str:
"""Clean a string that might contain JSON with markdown code block markers.
Args:
text: String potentially containing JSON within markdown formatting
Returns:
Cleaned JSON string with markdown and extra text removed
Raises:
JsonParserError: If no valid JSON content can be found in the text
"""
# First try to extract from markdown code blocks
if "```" in text:
# Try extracting from ```json blocks first
if "```json" in text and "```" in text.split("```json", 1)[1]:
json_content = text.split("```json", 1)[1].split("```")[0]
return json_content.strip()
# Try extracting from any code block
parts = text.split("```")
if len(parts) >= 3:
content = parts[1]
if "\n" in content:
first_line, rest = content.split("\n", 1)
if not first_line.strip().startswith(("{", "[")):
content = rest
return content.strip()
# Try to find JSON content directly
for i, char in enumerate(text):
if char in ["{", "["]:
# Find matching closing brace/bracket
stack = []
in_string = False
escaped = False
for j in range(i, len(text)):
char = text[j]
if in_string:
if char == "\\":
escaped = not escaped
elif char == '"' and not escaped:
in_string = False
else:
escaped = False
else:
if char == '"':
in_string = True
escaped = False
elif char in ["{", "["]:
stack.append(char)
elif char == "}" and stack and stack[-1] == "{":
stack.pop()
elif char == "]" and stack and stack[-1] == "[":
stack.pop()
if not stack:
return text[i : j + 1].strip()
raise JsonParserError("No valid JSON content found in the text")
@staticmethod
def _aggressive_escape_all_backslashes(json_text: str) -> str:
"""Apply aggressive backslash escaping to all string literals in JSON.
This is a more heavy-handed approach when selective escaping fails.
Args:
json_text: JSON text with potentially problematic escape sequences
Returns:
JSON text with all backslashes doubled in string literals
"""
pattern = r'"([^"]*(?:\\.[^"]*)*)"'
def replace_string_content(match):
string_content = match.group(1)
# Replace any single backslash with double backslash
escaped_content = string_content.replace("\\", "\\\\")
return f'"{escaped_content}"'
return re.sub(pattern, replace_string_content, json_text)
@staticmethod
def _sanitize_control_characters(json_text: str) -> str:
"""Remove or escape invalid control characters in JSON string literals.
JSON doesn't allow raw control characters (ASCII 0-31) within strings.
This method identifies and removes or escapes these characters within string literals.
Args:
json_text: JSON text with potentially invalid control characters
Returns:
Sanitized JSON text with control characters properly handled
"""
pattern = r'"([^"]*(?:\\.[^"]*)*)"'
def sanitize_string_content(match):
string_content = match.group(1)
# Replace any control characters with proper escapes or remove them
# First replace common ones with their escape sequences
string_content = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F]", "", string_content)
# Make sure \t, \n, \r are preserved as actual escape sequences
string_content = string_content.replace("\t", "\\t")
string_content = string_content.replace("\n", "\\n")
string_content = string_content.replace("\r", "\\r")
return f'"{string_content}"'
return re.sub(pattern, sanitize_string_content, json_text)
@staticmethod
def _try_parse_json(json_text: str) -> Any:
"""Try to parse JSON text using various strategies.
This method attempts multiple parsing strategies in sequence to handle LLM-generated JSON:
1. Parse the raw text directly
2. Try aggressive escaping of all backslashes
3. Try sanitizing control characters
4. Try combinations of the above approaches
Args:
json_text: JSON text to parse
Returns:
Parsed JSON object if successful
Raises:
JsonParserError: If parsing fails after trying all strategies.
"""
# Keep a list of all errors for comprehensive error reporting
errors = []
# Strategy 1: Try parsing directly
try:
return json.loads(json_text)
except json.JSONDecodeError as e:
errors.append(f"Direct parsing: {e}")
logger.debug(f"Direct JSON parsing failed: {e}. Trying aggressive escaping.")
# Strategy 2: Try aggressive backslash escaping
try:
aggressive_text = JSONParser._aggressive_escape_all_backslashes(json_text)
return json.loads(aggressive_text)
except json.JSONDecodeError as e2:
errors.append(f"Backslash escaping: {e2}")
logger.debug(f"Aggressive escaping failed: {e2}. Trying control character sanitization.")
# Strategy 3: Try sanitizing control characters
try:
# First sanitize control characters in the original text
sanitized_text = JSONParser._sanitize_control_characters(json_text)
return json.loads(sanitized_text)
except json.JSONDecodeError as e3:
errors.append(f"Control character sanitization: {e3}")
# Strategy 4: Try sanitizing after aggressive escaping
# (combines both approaches)
try:
sanitized_aggressive = JSONParser._sanitize_control_characters(aggressive_text)
return json.loads(sanitized_aggressive)
except json.JSONDecodeError as e4:
errors.append(f"Sanitized + escaped: {e4}")
logger.error(f"All JSON parsing strategies failed. Errors: {', '.join(errors)}")
# If we've exhausted all options, raise a comprehensive error
raise JsonParserError(f"Failed to parse JSON after trying all strategies. Errors: {' | '.join(errors)}") from e4
@staticmethod
def _unwrap_json_data(json_data: Any, json_wrapper_key: Optional[str] = None) -> List[Dict[str, Any]]:
"""Extract and normalize data from parsed JSON.
Args:
json_data: Parsed JSON data
json_wrapper_key: Optional key that may wrap the actual data
Returns:
List of data items, ensuring the result is always a list
Raises:
TypeError: If data is not a dict or list
KeyError: If json_wrapper_key is not found in the data
"""
if json_wrapper_key and isinstance(json_data, dict):
# Let KeyError propagate naturally if key doesn't exist
result = json_data[json_wrapper_key]
else:
result = json_data
if not isinstance(result, (dict, list)):
raise TypeError(f"Expected dict or list, got {type(result).__name__}")
return [result] if isinstance(result, dict) else result
@staticmethod
def convert_to_schema(fields: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate a JSON schema from field definitions.
Args:
fields: List of field definitions with name, type, description, and required flag
Returns:
A JSON schema dictionary
Raises:
TypeError: If fields is not a list or field is not a dict
KeyError: If required field attributes are missing
ValueError: If field type is invalid
"""
if not isinstance(fields, list):
raise TypeError(f"Expected list of fields, got {type(fields)}")
schema = {"type": "object", "properties": {}, "required": []}
type_mapping = {
"str": {"type": "string"},
"int": {"type": "integer"},
"float": {"type": "number"},
"bool": {"type": "boolean"},
"list": {"type": "array"},
"dict": {"type": "object"},
"null": {"type": "null"},
}
for field in fields:
if not isinstance(field, dict):
raise TypeError(f"Expected dict for field definition, got {type(field)}")
# Let KeyError propagate naturally for missing required attributes
name = field["name"]
field_type = field["type"]
description = field.get("description", "")
required = field.get("required", True)
if field_type == "list" and "items" in field:
schema["properties"][name] = {"type": "array", "items": field["items"], "description": description}
elif field_type == "dict" and "properties" in field:
schema["properties"][name] = {"type": "object", "properties": field["properties"], "description": description}
if "required" in field:
schema["properties"][name]["required"] = field["required"]
elif field_type in type_mapping:
schema["properties"][name] = {**type_mapping[field_type], "description": description}
else:
raise ValueError(f"Invalid field type '{field_type}' for field '{name}'")
if required:
schema["required"].append(name)
return schema
@staticmethod
def get_format_instructions(schema: Dict[str, Any], json_wrapper_key: Optional[str] = None, show_array_items: int = 1) -> str:
"""Format a JSON schema into human-readable instructions.
Args:
schema: A JSON schema dictionary
json_wrapper_key: Optional key to wrap the schema in an array
show_array_items: Number of example items to show in an array wrapper
Returns:
Formatted string with schema instructions
"""
def format_property(name: str, prop: Dict[str, Any], required: List[str], indent_level: int = 1) -> List[str]:
lines = []
indent = " " * indent_level
field_type = prop.get("type", "string")
description = prop.get("description", "")
is_required = name in required
comment = f"// {description}" + (" (required)" if is_required else " (optional)")
if field_type == "object" and "properties" in prop:
lines.append(f'{indent}"{name}": {{ {comment}')
nested_props = prop.get("properties", {})
nested_required = prop.get("required", [])
# Recursively format properties of the nested object
formatted_props = []
for i, (nested_name, nested_prop) in enumerate(nested_props.items()):
# Increase indent level for properties inside the object
prop_lines = format_property(nested_name, nested_prop, nested_required, indent_level + 1)
# Add comma if not the last property
if i < len(nested_props) - 1 and prop_lines:
prop_lines[-1] = prop_lines[-1] + ","
formatted_props.extend(prop_lines)
lines.extend(formatted_props)
# End of recursive formatting
lines.append(f"{indent}}}")
elif field_type == "array" and "items" in prop:
items = prop.get("items", {})
item_type = items.get("type")
lines.append(f'{indent}"{name}": [ {comment}') # Start array
# Check if items are objects and have properties
if item_type == "object" and "properties" in items:
lines.append(f"{indent} {{") # Start example object in array
nested_props = items.get("properties", {})
nested_required = items.get("required", [])
# Recursively format the properties of the object within the array item
formatted_props = []
for i, (nested_name, nested_prop) in enumerate(nested_props.items()):
# Increase indent level for properties inside the object
prop_lines = format_property(nested_name, nested_prop, nested_required, indent_level + 2)
# Add comma if not the last property
if i < len(nested_props) - 1 and prop_lines:
prop_lines[-1] = prop_lines[-1] + ","
formatted_props.extend(prop_lines)
lines.extend(formatted_props)
# End of recursive formatting for array item properties
lines.append(f"{indent} }}") # End example object
lines.append(f"{indent} // ... more items ...") # Indicate potential for more items
# Handle arrays of simple types (optional, could add examples here too)
# elif item_type in type_mapping:
# lines.append(f"{indent} // Example: {type_mapping[item_type]}")
else:
lines.append(f"{indent} // Example items of type {item_type}")
lines.append(f"{indent}]") # End array
else:
example_value = (
'""'
if field_type == "string"
else "number"
if field_type in ["integer", "number"]
else "true or false"
if field_type == "boolean"
else "[]"
if field_type == "array"
else "{}"
)
lines.append(f'{indent}"{name}": {example_value} {comment}')
return lines
schema_lines = []
if json_wrapper_key:
schema_lines.extend(["{", f' "{json_wrapper_key}": ['])
properties = schema.get("properties", {})
required = schema.get("required", [])
for item_idx in range(show_array_items):
schema_lines.append(" {")
for i, (name, prop) in enumerate(properties.items()):
prop_lines = format_property(name, prop, required, indent_level=3)
if i < len(properties) - 1 and prop_lines:
prop_lines[-1] = prop_lines[-1] + ","
schema_lines.extend(prop_lines)
schema_lines.append(" }" + ("," if item_idx < show_array_items - 1 else ""))
schema_lines.append(" ...")
schema_lines.extend([" ]", "}"])
else:
# Always format as a list structure
schema_lines.append("[")
properties = schema.get("properties", {})
required = schema.get("required", [])
for item_idx in range(show_array_items):
schema_lines.append(" {")
for i, (name, prop) in enumerate(properties.items()):
prop_lines = format_property(name, prop, required, indent_level=2)
if i < len(properties) - 1 and prop_lines:
prop_lines[-1] = prop_lines[-1] + ","
schema_lines.extend(prop_lines)
schema_lines.append(" }" + ("," if item_idx < show_array_items - 1 else ""))
schema_lines.append(" ...")
schema_lines.append("]")
if schema.get("title") or schema.get("description"):
schema_lines.append("")
if schema.get("title"):
schema_lines.append(schema["title"])
if schema.get("description"):
schema_lines.append(schema["description"])
required = schema.get("required", [])
if required:
schema_lines.append(f"\nRequired fields: {', '.join(required)}")
return "\n".join(schema_lines)
@staticmethod
def validate_against_schema(data: List[Dict[str, Any]], schema: Dict[str, Any], type_check: bool = False) -> None:
"""Validate data against a JSON schema.
Args:
data: List of data items to validate
schema: JSON schema to validate against
type_check: If True, check field types against schema. If False, skip type validation.
Raises:
TypeError: If data or schema have invalid types
KeyError: If schema is missing required fields
SchemaValidationError: If validation fails with specific validation errors
"""
properties = schema["properties"]
required_fields = schema.get("required", [])
type_mapping = {"string": str, "integer": int, "number": (int, float), "boolean": bool, "array": list, "object": dict}
validation_errors = []
for index, item in enumerate(data):
if not isinstance(item, dict):
raise TypeError(f"Item {index}: expected dict, got {type(item)}")
# Check required fields
for field_name in required_fields:
if field_name not in item:
validation_errors.append(f"Item {index}: Missing required field '{field_name}'")
# Check unexpected fields
for field_name in item:
if field_name not in properties:
validation_errors.append(f"Item {index}: Unexpected field '{field_name}' not defined in schema")
# Check field types only if type_check is True
if type_check:
for field_name, field_schema in properties.items():
if field_name not in item:
continue
field_value = item[field_name]
if field_value is None:
if field_schema.get("type") != "null" and "null" not in field_schema.get("type", []):
validation_errors.append(f"Item {index}: Field '{field_name}' is null but type should be {field_schema['type']}")
continue
# Let KeyError propagate naturally
expected_type = field_schema["type"]
expected_python_type = type_mapping.get(expected_type)
if expected_python_type and not isinstance(field_value, expected_python_type):
validation_errors.append(f"Item {index}: Field '{field_name}' has type {type(field_value).__name__} " f"but should be {expected_type}")
# Validate nested objects
if expected_type == "object" and isinstance(field_value, dict):
# Let KeyError propagate naturally
nested_schema = {"properties": field_schema["properties"], "required": field_schema.get("required", [])}
try:
JSONParser.validate_against_schema([field_value], nested_schema, type_check=type_check)
except SchemaValidationError as e:
for error in e.details["errors"]:
validation_errors.append(error.replace("Item 0:", f"Item {index}: Field '{field_name}'"))
# Validate arrays
if expected_type == "array" and isinstance(field_value, list):
# Let KeyError propagate naturally
items_schema = field_schema["items"]
if items_schema.get("type") == "object":
nested_schema = {"properties": items_schema["properties"], "required": items_schema.get("required", [])}
for array_idx, array_item in enumerate(field_value):
if not isinstance(array_item, dict):
validation_errors.append(f"Item {index}: Field '{field_name}[{array_idx}]' should be an object")
continue
try:
JSONParser.validate_against_schema([array_item], nested_schema, type_check=type_check)
except SchemaValidationError as e:
for error in e.details["errors"]:
validation_errors.append(error.replace("Item 0:", f"Item {index}: Field '{field_name}[{array_idx}]'"))
if validation_errors:
raise SchemaValidationError("Schema validation failed", details={"errors": validation_errors})
@staticmethod
def parse_llm_output(
text: str,
schema: Optional[Dict[str, Any]] = None,
json_wrapper_key: Optional[str] = None,
strict: bool = False,
type_check: bool = False,
timeout: float = DEFAULT_PARSING_TIMEOUT,
) -> Optional[Any]:
"""Complete JSON parsing pipeline for LLM outputs with configurable error handling.
Args:
text: Raw text from LLM that may contain JSON
schema: Optional JSON schema to validate against
json_wrapper_key: Optional key that may wrap the actual data
strict: If True, raise errors. If False, return None and log warning
type_check: If True, check field types against schema. If False, skip type validation.
timeout: Maximum time in seconds to allow for parsing (default: 1 second)
Returns:
Parsed data if successful, None if parsing fails in non-strict mode
Raises:
JsonParserError: If parsing fails in strict mode
SchemaValidationError: If schema validation fails in strict mode
json.JSONDecodeError: If JSON syntax is invalid in strict mode
TimeoutError: If parsing takes longer than the specified timeout
"""
def timeout_handler(signum, frame):
raise TimeoutError(f"JSON parsing operation timed out after {timeout} seconds")
try:
# Set up the timeout
if timeout > 0:
# Set the timeout handler
signal.signal(signal.SIGALRM, timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout)
try:
# Step 1: Extract potential JSON content from the text
extracted_json = JSONParser._extract_json_from_text(text)
# Step 2: Try to parse the JSON with multiple strategies
parsed_json = JSONParser._try_parse_json(extracted_json)
if parsed_json is None:
raise JsonParserError("Failed to parse JSON content after trying all strategies")
# Step 3: Unwrap the parsed JSON data
data = JSONParser._unwrap_json_data(parsed_json, json_wrapper_key)
# Step 4: Validate against schema if provided
if schema:
JSONParser.validate_against_schema(data, schema, type_check=type_check)
return data
finally:
# Cancel the timeout regardless of whether an exception occurred
if timeout > 0:
signal.setitimer(signal.ITIMER_REAL, 0)
except TimeoutError as e:
# Handle timeout
logger.warning(f"JSON parsing timeout: {str(e)}")
if strict:
raise JsonParserError(f"Parsing timed out: {str(e)}") from e
return None
except JsonParserError as e:
# Handle JSON extraction errors
if strict:
raise
logger.warning(f"Failed to extract JSON from LLM response: {str(e)}")
return None
except json.JSONDecodeError as e:
# Handle JSON syntax errors
if strict:
raise JsonParserError(f"Invalid JSON syntax: {str(e)}") from e
logger.warning(f"Invalid JSON syntax in LLM response: {str(e)}")
return None
except SchemaValidationError as e:
# Handle schema validation errors
if strict:
raise
logger.warning(f"LLM response failed schema validation: {str(e)}")
if e.details and "errors" in e.details:
for error in e.details["errors"]:
logger.debug(f"- {error}")
return None
except (TypeError, KeyError) as e:
# Handle data structure errors
if strict:
raise JsonParserError(f"Data structure error: {str(e)}") from e
logger.warning(f"Data structure error in LLM response: {str(e)}")
return None