File size: 26,376 Bytes
5301c48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
import json
import re
import signal
from typing import Any, Dict, List, Optional

from starfish.common.exceptions import JsonParserError, SchemaValidationError
from starfish.common.logger import get_logger

logger = get_logger(__name__)

# Maximum time (in seconds) to allow for JSON parsing operations
DEFAULT_PARSING_TIMEOUT = 1.0


class TimeoutError(Exception):
    """Exception raised when a parsing operation times out."""

    pass


class JSONParser:
    """Handles parsing and validation of JSON data against schemas.

    Provides utilities for JSON schema generation and formatting.
    """

    @staticmethod
    def _extract_json_from_text(text: str) -> str:
        """Clean a string that might contain JSON with markdown code block markers.

        Args:
            text: String potentially containing JSON within markdown formatting

        Returns:
            Cleaned JSON string with markdown and extra text removed

        Raises:
            JsonParserError: If no valid JSON content can be found in the text
        """
        # First try to extract from markdown code blocks
        if "```" in text:
            # Try extracting from ```json blocks first
            if "```json" in text and "```" in text.split("```json", 1)[1]:
                json_content = text.split("```json", 1)[1].split("```")[0]
                return json_content.strip()

            # Try extracting from any code block
            parts = text.split("```")
            if len(parts) >= 3:
                content = parts[1]
                if "\n" in content:
                    first_line, rest = content.split("\n", 1)
                    if not first_line.strip().startswith(("{", "[")):
                        content = rest
                return content.strip()

        # Try to find JSON content directly
        for i, char in enumerate(text):
            if char in ["{", "["]:
                # Find matching closing brace/bracket
                stack = []
                in_string = False
                escaped = False

                for j in range(i, len(text)):
                    char = text[j]

                    if in_string:
                        if char == "\\":
                            escaped = not escaped
                        elif char == '"' and not escaped:
                            in_string = False
                        else:
                            escaped = False
                    else:
                        if char == '"':
                            in_string = True
                            escaped = False
                        elif char in ["{", "["]:
                            stack.append(char)
                        elif char == "}" and stack and stack[-1] == "{":
                            stack.pop()
                        elif char == "]" and stack and stack[-1] == "[":
                            stack.pop()

                    if not stack:
                        return text[i : j + 1].strip()

        raise JsonParserError("No valid JSON content found in the text")

    @staticmethod
    def _aggressive_escape_all_backslashes(json_text: str) -> str:
        """Apply aggressive backslash escaping to all string literals in JSON.

        This is a more heavy-handed approach when selective escaping fails.

        Args:
            json_text: JSON text with potentially problematic escape sequences

        Returns:
            JSON text with all backslashes doubled in string literals
        """
        pattern = r'"([^"]*(?:\\.[^"]*)*)"'

        def replace_string_content(match):
            string_content = match.group(1)
            # Replace any single backslash with double backslash
            escaped_content = string_content.replace("\\", "\\\\")
            return f'"{escaped_content}"'

        return re.sub(pattern, replace_string_content, json_text)

    @staticmethod
    def _sanitize_control_characters(json_text: str) -> str:
        """Remove or escape invalid control characters in JSON string literals.

        JSON doesn't allow raw control characters (ASCII 0-31) within strings.
        This method identifies and removes or escapes these characters within string literals.

        Args:
            json_text: JSON text with potentially invalid control characters

        Returns:
            Sanitized JSON text with control characters properly handled
        """
        pattern = r'"([^"]*(?:\\.[^"]*)*)"'

        def sanitize_string_content(match):
            string_content = match.group(1)
            # Replace any control characters with proper escapes or remove them
            # First replace common ones with their escape sequences
            string_content = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F]", "", string_content)
            # Make sure \t, \n, \r are preserved as actual escape sequences
            string_content = string_content.replace("\t", "\\t")
            string_content = string_content.replace("\n", "\\n")
            string_content = string_content.replace("\r", "\\r")
            return f'"{string_content}"'

        return re.sub(pattern, sanitize_string_content, json_text)

    @staticmethod
    def _try_parse_json(json_text: str) -> Any:
        """Try to parse JSON text using various strategies.

        This method attempts multiple parsing strategies in sequence to handle LLM-generated JSON:
        1. Parse the raw text directly
        2. Try aggressive escaping of all backslashes
        3. Try sanitizing control characters
        4. Try combinations of the above approaches

        Args:
            json_text: JSON text to parse

        Returns:
            Parsed JSON object if successful

        Raises:
            JsonParserError: If parsing fails after trying all strategies.
        """
        # Keep a list of all errors for comprehensive error reporting
        errors = []

        # Strategy 1: Try parsing directly
        try:
            return json.loads(json_text)
        except json.JSONDecodeError as e:
            errors.append(f"Direct parsing: {e}")
            logger.debug(f"Direct JSON parsing failed: {e}. Trying aggressive escaping.")

            # Strategy 2: Try aggressive backslash escaping
            try:
                aggressive_text = JSONParser._aggressive_escape_all_backslashes(json_text)
                return json.loads(aggressive_text)
            except json.JSONDecodeError as e2:
                errors.append(f"Backslash escaping: {e2}")
                logger.debug(f"Aggressive escaping failed: {e2}. Trying control character sanitization.")

                # Strategy 3: Try sanitizing control characters
                try:
                    # First sanitize control characters in the original text
                    sanitized_text = JSONParser._sanitize_control_characters(json_text)
                    return json.loads(sanitized_text)
                except json.JSONDecodeError as e3:
                    errors.append(f"Control character sanitization: {e3}")

                    # Strategy 4: Try sanitizing after aggressive escaping
                    # (combines both approaches)
                    try:
                        sanitized_aggressive = JSONParser._sanitize_control_characters(aggressive_text)
                        return json.loads(sanitized_aggressive)
                    except json.JSONDecodeError as e4:
                        errors.append(f"Sanitized + escaped: {e4}")
                        logger.error(f"All JSON parsing strategies failed. Errors: {', '.join(errors)}")

                        # If we've exhausted all options, raise a comprehensive error
                        raise JsonParserError(f"Failed to parse JSON after trying all strategies. Errors: {' | '.join(errors)}") from e4

    @staticmethod
    def _unwrap_json_data(json_data: Any, json_wrapper_key: Optional[str] = None) -> List[Dict[str, Any]]:
        """Extract and normalize data from parsed JSON.

        Args:
            json_data: Parsed JSON data
            json_wrapper_key: Optional key that may wrap the actual data

        Returns:
            List of data items, ensuring the result is always a list

        Raises:
            TypeError: If data is not a dict or list
            KeyError: If json_wrapper_key is not found in the data
        """
        if json_wrapper_key and isinstance(json_data, dict):
            # Let KeyError propagate naturally if key doesn't exist
            result = json_data[json_wrapper_key]
        else:
            result = json_data

        if not isinstance(result, (dict, list)):
            raise TypeError(f"Expected dict or list, got {type(result).__name__}")

        return [result] if isinstance(result, dict) else result

    @staticmethod
    def convert_to_schema(fields: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Generate a JSON schema from field definitions.

        Args:
            fields: List of field definitions with name, type, description, and required flag

        Returns:
            A JSON schema dictionary

        Raises:
            TypeError: If fields is not a list or field is not a dict
            KeyError: If required field attributes are missing
            ValueError: If field type is invalid
        """
        if not isinstance(fields, list):
            raise TypeError(f"Expected list of fields, got {type(fields)}")

        schema = {"type": "object", "properties": {}, "required": []}

        type_mapping = {
            "str": {"type": "string"},
            "int": {"type": "integer"},
            "float": {"type": "number"},
            "bool": {"type": "boolean"},
            "list": {"type": "array"},
            "dict": {"type": "object"},
            "null": {"type": "null"},
        }

        for field in fields:
            if not isinstance(field, dict):
                raise TypeError(f"Expected dict for field definition, got {type(field)}")

            # Let KeyError propagate naturally for missing required attributes
            name = field["name"]
            field_type = field["type"]
            description = field.get("description", "")
            required = field.get("required", True)

            if field_type == "list" and "items" in field:
                schema["properties"][name] = {"type": "array", "items": field["items"], "description": description}
            elif field_type == "dict" and "properties" in field:
                schema["properties"][name] = {"type": "object", "properties": field["properties"], "description": description}
                if "required" in field:
                    schema["properties"][name]["required"] = field["required"]
            elif field_type in type_mapping:
                schema["properties"][name] = {**type_mapping[field_type], "description": description}
            else:
                raise ValueError(f"Invalid field type '{field_type}' for field '{name}'")

            if required:
                schema["required"].append(name)

        return schema

    @staticmethod
    def get_format_instructions(schema: Dict[str, Any], json_wrapper_key: Optional[str] = None, show_array_items: int = 1) -> str:
        """Format a JSON schema into human-readable instructions.

        Args:
            schema: A JSON schema dictionary
            json_wrapper_key: Optional key to wrap the schema in an array
            show_array_items: Number of example items to show in an array wrapper

        Returns:
            Formatted string with schema instructions
        """

        def format_property(name: str, prop: Dict[str, Any], required: List[str], indent_level: int = 1) -> List[str]:
            lines = []
            indent = "  " * indent_level
            field_type = prop.get("type", "string")
            description = prop.get("description", "")
            is_required = name in required

            comment = f"// {description}" + (" (required)" if is_required else " (optional)")

            if field_type == "object" and "properties" in prop:
                lines.append(f'{indent}"{name}": {{  {comment}')
                nested_props = prop.get("properties", {})
                nested_required = prop.get("required", [])

                # Recursively format properties of the nested object
                formatted_props = []
                for i, (nested_name, nested_prop) in enumerate(nested_props.items()):
                    # Increase indent level for properties inside the object
                    prop_lines = format_property(nested_name, nested_prop, nested_required, indent_level + 1)
                    # Add comma if not the last property
                    if i < len(nested_props) - 1 and prop_lines:
                        prop_lines[-1] = prop_lines[-1] + ","
                    formatted_props.extend(prop_lines)
                lines.extend(formatted_props)
                # End of recursive formatting

                lines.append(f"{indent}}}")

            elif field_type == "array" and "items" in prop:
                items = prop.get("items", {})
                item_type = items.get("type")
                lines.append(f'{indent}"{name}": [  {comment}')  # Start array

                # Check if items are objects and have properties
                if item_type == "object" and "properties" in items:
                    lines.append(f"{indent}  {{")  # Start example object in array
                    nested_props = items.get("properties", {})
                    nested_required = items.get("required", [])

                    # Recursively format the properties of the object within the array item
                    formatted_props = []
                    for i, (nested_name, nested_prop) in enumerate(nested_props.items()):
                        # Increase indent level for properties inside the object
                        prop_lines = format_property(nested_name, nested_prop, nested_required, indent_level + 2)
                        # Add comma if not the last property
                        if i < len(nested_props) - 1 and prop_lines:
                            prop_lines[-1] = prop_lines[-1] + ","
                        formatted_props.extend(prop_lines)
                    lines.extend(formatted_props)
                    # End of recursive formatting for array item properties

                    lines.append(f"{indent}  }}")  # End example object
                    lines.append(f"{indent}  // ... more items ...")  # Indicate potential for more items
                # Handle arrays of simple types (optional, could add examples here too)
                # elif item_type in type_mapping:
                #     lines.append(f"{indent}  // Example: {type_mapping[item_type]}")
                else:
                    lines.append(f"{indent}  // Example items of type {item_type}")

                lines.append(f"{indent}]")  # End array
            else:
                example_value = (
                    '""'
                    if field_type == "string"
                    else "number"
                    if field_type in ["integer", "number"]
                    else "true or false"
                    if field_type == "boolean"
                    else "[]"
                    if field_type == "array"
                    else "{}"
                )
                lines.append(f'{indent}"{name}": {example_value}  {comment}')

            return lines

        schema_lines = []

        if json_wrapper_key:
            schema_lines.extend(["{", f'  "{json_wrapper_key}": ['])

            properties = schema.get("properties", {})
            required = schema.get("required", [])

            for item_idx in range(show_array_items):
                schema_lines.append("    {")

                for i, (name, prop) in enumerate(properties.items()):
                    prop_lines = format_property(name, prop, required, indent_level=3)
                    if i < len(properties) - 1 and prop_lines:
                        prop_lines[-1] = prop_lines[-1] + ","
                    schema_lines.extend(prop_lines)

                schema_lines.append("    }" + ("," if item_idx < show_array_items - 1 else ""))

            schema_lines.append("    ...")

            schema_lines.extend(["  ]", "}"])
        else:
            # Always format as a list structure
            schema_lines.append("[")
            properties = schema.get("properties", {})
            required = schema.get("required", [])

            for item_idx in range(show_array_items):
                schema_lines.append("    {")

                for i, (name, prop) in enumerate(properties.items()):
                    prop_lines = format_property(name, prop, required, indent_level=2)
                    if i < len(properties) - 1 and prop_lines:
                        prop_lines[-1] = prop_lines[-1] + ","
                    schema_lines.extend(prop_lines)

                schema_lines.append("    }" + ("," if item_idx < show_array_items - 1 else ""))

            schema_lines.append("    ...")
            schema_lines.append("]")

        if schema.get("title") or schema.get("description"):
            schema_lines.append("")
            if schema.get("title"):
                schema_lines.append(schema["title"])
            if schema.get("description"):
                schema_lines.append(schema["description"])

        required = schema.get("required", [])
        if required:
            schema_lines.append(f"\nRequired fields: {', '.join(required)}")

        return "\n".join(schema_lines)

    @staticmethod
    def validate_against_schema(data: List[Dict[str, Any]], schema: Dict[str, Any], type_check: bool = False) -> None:
        """Validate data against a JSON schema.

        Args:
            data: List of data items to validate
            schema: JSON schema to validate against
            type_check: If True, check field types against schema. If False, skip type validation.

        Raises:
            TypeError: If data or schema have invalid types
            KeyError: If schema is missing required fields
            SchemaValidationError: If validation fails with specific validation errors
        """
        properties = schema["properties"]
        required_fields = schema.get("required", [])

        type_mapping = {"string": str, "integer": int, "number": (int, float), "boolean": bool, "array": list, "object": dict}

        validation_errors = []

        for index, item in enumerate(data):
            if not isinstance(item, dict):
                raise TypeError(f"Item {index}: expected dict, got {type(item)}")

            # Check required fields
            for field_name in required_fields:
                if field_name not in item:
                    validation_errors.append(f"Item {index}: Missing required field '{field_name}'")

            # Check unexpected fields
            for field_name in item:
                if field_name not in properties:
                    validation_errors.append(f"Item {index}: Unexpected field '{field_name}' not defined in schema")

            # Check field types only if type_check is True
            if type_check:
                for field_name, field_schema in properties.items():
                    if field_name not in item:
                        continue

                    field_value = item[field_name]
                    if field_value is None:
                        if field_schema.get("type") != "null" and "null" not in field_schema.get("type", []):
                            validation_errors.append(f"Item {index}: Field '{field_name}' is null but type should be {field_schema['type']}")
                        continue

                    # Let KeyError propagate naturally
                    expected_type = field_schema["type"]
                    expected_python_type = type_mapping.get(expected_type)

                    if expected_python_type and not isinstance(field_value, expected_python_type):
                        validation_errors.append(f"Item {index}: Field '{field_name}' has type {type(field_value).__name__} " f"but should be {expected_type}")

                    # Validate nested objects
                    if expected_type == "object" and isinstance(field_value, dict):
                        # Let KeyError propagate naturally
                        nested_schema = {"properties": field_schema["properties"], "required": field_schema.get("required", [])}
                        try:
                            JSONParser.validate_against_schema([field_value], nested_schema, type_check=type_check)
                        except SchemaValidationError as e:
                            for error in e.details["errors"]:
                                validation_errors.append(error.replace("Item 0:", f"Item {index}: Field '{field_name}'"))

                    # Validate arrays
                    if expected_type == "array" and isinstance(field_value, list):
                        # Let KeyError propagate naturally
                        items_schema = field_schema["items"]
                        if items_schema.get("type") == "object":
                            nested_schema = {"properties": items_schema["properties"], "required": items_schema.get("required", [])}
                            for array_idx, array_item in enumerate(field_value):
                                if not isinstance(array_item, dict):
                                    validation_errors.append(f"Item {index}: Field '{field_name}[{array_idx}]' should be an object")
                                    continue

                                try:
                                    JSONParser.validate_against_schema([array_item], nested_schema, type_check=type_check)
                                except SchemaValidationError as e:
                                    for error in e.details["errors"]:
                                        validation_errors.append(error.replace("Item 0:", f"Item {index}: Field '{field_name}[{array_idx}]'"))

        if validation_errors:
            raise SchemaValidationError("Schema validation failed", details={"errors": validation_errors})

    @staticmethod
    def parse_llm_output(
        text: str,
        schema: Optional[Dict[str, Any]] = None,
        json_wrapper_key: Optional[str] = None,
        strict: bool = False,
        type_check: bool = False,
        timeout: float = DEFAULT_PARSING_TIMEOUT,
    ) -> Optional[Any]:
        """Complete JSON parsing pipeline for LLM outputs with configurable error handling.

        Args:
            text: Raw text from LLM that may contain JSON
            schema: Optional JSON schema to validate against
            json_wrapper_key: Optional key that may wrap the actual data
            strict: If True, raise errors. If False, return None and log warning
            type_check: If True, check field types against schema. If False, skip type validation.
            timeout: Maximum time in seconds to allow for parsing (default: 1 second)

        Returns:
            Parsed data if successful, None if parsing fails in non-strict mode

        Raises:
            JsonParserError: If parsing fails in strict mode
            SchemaValidationError: If schema validation fails in strict mode
            json.JSONDecodeError: If JSON syntax is invalid in strict mode
            TimeoutError: If parsing takes longer than the specified timeout
        """

        def timeout_handler(signum, frame):
            raise TimeoutError(f"JSON parsing operation timed out after {timeout} seconds")

        try:
            # Set up the timeout
            if timeout > 0:
                # Set the timeout handler
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.setitimer(signal.ITIMER_REAL, timeout)

            try:
                # Step 1: Extract potential JSON content from the text
                extracted_json = JSONParser._extract_json_from_text(text)

                # Step 2: Try to parse the JSON with multiple strategies
                parsed_json = JSONParser._try_parse_json(extracted_json)
                if parsed_json is None:
                    raise JsonParserError("Failed to parse JSON content after trying all strategies")

                # Step 3: Unwrap the parsed JSON data
                data = JSONParser._unwrap_json_data(parsed_json, json_wrapper_key)

                # Step 4: Validate against schema if provided
                if schema:
                    JSONParser.validate_against_schema(data, schema, type_check=type_check)

                return data

            finally:
                # Cancel the timeout regardless of whether an exception occurred
                if timeout > 0:
                    signal.setitimer(signal.ITIMER_REAL, 0)

        except TimeoutError as e:
            # Handle timeout
            logger.warning(f"JSON parsing timeout: {str(e)}")
            if strict:
                raise JsonParserError(f"Parsing timed out: {str(e)}") from e
            return None

        except JsonParserError as e:
            # Handle JSON extraction errors
            if strict:
                raise
            logger.warning(f"Failed to extract JSON from LLM response: {str(e)}")
            return None

        except json.JSONDecodeError as e:
            # Handle JSON syntax errors
            if strict:
                raise JsonParserError(f"Invalid JSON syntax: {str(e)}") from e
            logger.warning(f"Invalid JSON syntax in LLM response: {str(e)}")
            return None

        except SchemaValidationError as e:
            # Handle schema validation errors
            if strict:
                raise
            logger.warning(f"LLM response failed schema validation: {str(e)}")
            if e.details and "errors" in e.details:
                for error in e.details["errors"]:
                    logger.debug(f"- {error}")
            return None

        except (TypeError, KeyError) as e:
            # Handle data structure errors
            if strict:
                raise JsonParserError(f"Data structure error: {str(e)}") from e
            logger.warning(f"Data structure error in LLM response: {str(e)}")
            return None