Severian commited on
Commit
7e0a2e6
·
verified ·
1 Parent(s): 565b90d

Update json_parser.py

Browse files
Files changed (1) hide show
  1. json_parser.py +152 -102
json_parser.py CHANGED
@@ -1,140 +1,190 @@
1
  from logger_config import setup_logger
2
- from typing import Dict, Any, Optional, List, Union
3
- from dataclasses import dataclass, asdict
4
  from enum import Enum
5
  import json
6
- from dify_client_python.dify_client.models.stream import (
7
- StreamEvent,
8
- StreamResponse,
9
- build_chat_stream_response
10
- )
11
  import re
12
 
13
  logger = setup_logger()
14
 
15
- class EventType(Enum):
16
- AGENT_THOUGHT = "agent_thought"
17
- AGENT_MESSAGE = "agent_message"
18
- MESSAGE_END = "message_end"
19
- PING = "ping"
20
-
21
- @dataclass
22
- class ToolCall:
23
- tool_name: str
24
- tool_input: Dict[str, Any]
25
- tool_output: Optional[str]
26
- tool_labels: Dict[str, Dict[str, str]]
27
-
28
- @dataclass
29
- class Citation:
30
- dataset_id: str
31
- dataset_name: str
32
- document_id: str
33
- document_name: str
34
- segment_id: str
35
- score: float
36
- content: str
37
-
38
- @dataclass
39
- class ProcessedResponse:
40
- event_type: EventType
41
- task_id: str
42
- message_id: str
43
- conversation_id: str
44
- content: str
45
- tool_calls: List[ToolCall]
46
- citations: List[Citation]
47
- metadata: Dict[str, Any]
48
- created_at: int
49
-
50
- class EnumEncoder(json.JSONEncoder):
51
- def default(self, obj):
52
- if isinstance(obj, Enum):
53
- return obj.value
54
- if hasattr(obj, 'dict'):
55
- return obj.dict()
56
- return super().default(obj)
57
 
58
  class SSEParser:
59
  def __init__(self):
60
  self.logger = setup_logger("sse_parser")
 
 
 
 
 
 
 
61
 
62
- def parse_sse_event(self, data: str) -> Optional[Dict]:
63
- """Parse SSE event data with improved mermaid handling"""
64
- self.logger.debug("Parsing SSE event")
65
-
66
  try:
67
- # Extract the data portion
68
- if "data:" in data:
69
- data = data.split("data:", 1)[1].strip()
 
 
 
 
 
 
 
 
70
 
71
- # Parse JSON data
72
- parsed_data = json.loads(data)
 
 
 
 
 
 
 
 
 
 
 
73
 
74
- # Enhanced mermaid diagram handling
75
- if "observation" in parsed_data:
76
- try:
77
- observation = parsed_data["observation"]
78
- if observation and isinstance(observation, str):
79
- if "mermaid_diagram" in observation:
80
- try:
81
- tool_data = json.loads(observation)
82
- if isinstance(tool_data, dict):
83
- mermaid_content = tool_data.get(
84
- "mermaid_diagram", ""
85
- )
86
- if mermaid_content:
87
- # Clean and format mermaid content
88
- cleaned_content = self.clean_mermaid_content(
89
- mermaid_content
90
- )
91
- parsed_data["observation"] = json.dumps({
92
- "mermaid_diagram": cleaned_content
93
- })
94
- except json.JSONDecodeError:
95
- self.logger.warning(
96
- "Failed to parse mermaid diagram content"
97
- )
98
- except Exception as e:
99
- self.logger.error(f"Error processing observation: {str(e)}")
100
 
101
- return parsed_data
 
 
102
 
103
- except json.JSONDecodeError as e:
104
- self.logger.error(f"JSON decode error: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  return None
 
106
  except Exception as e:
107
  self.logger.error(f"Parse error: {str(e)}")
108
  return None
109
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  def clean_mermaid_content(self, content: str) -> str:
111
  """Clean and format mermaid diagram content"""
112
  try:
113
- # If content is JSON string, parse it
114
- if isinstance(content, str) and content.strip().startswith('{'):
115
- content_dict = json.loads(content)
116
- if "mermaid_diagram" in content_dict:
117
- content = content_dict["mermaid_diagram"]
118
-
119
- # Remove markdown code blocks
120
  content = re.sub(r'```mermaid\s*|\s*```', '', content)
121
-
122
- # Remove "tool response:" and any JSON wrapper
123
  content = re.sub(r'tool response:.*?{', '{', content)
124
  content = re.sub(r'}\s*\.$', '}', content)
125
 
126
- # If still JSON, extract mermaid content
127
  if content.strip().startswith('{'):
128
  try:
129
  content_dict = json.loads(content)
130
- if "mermaid_diagram" in content_dict:
131
- content = content_dict["mermaid_diagram"]
132
  except:
133
  pass
134
 
135
- # Final cleanup
136
- content = re.sub(r'\s+', ' ', content.strip())
137
- return content
138
 
139
  except Exception as e:
140
  self.logger.error(f"Error cleaning mermaid content: {e}")
 
1
  from logger_config import setup_logger
2
+ from typing import Dict, Any, Optional, List, Union, Tuple
3
+ from dataclasses import dataclass
4
  from enum import Enum
5
  import json
 
 
 
 
 
6
  import re
7
 
8
  logger = setup_logger()
9
 
10
+ class MessageState:
11
+ def __init__(self):
12
+ self.buffer = ""
13
+ self.is_complete = False
14
+ self.tool_outputs = []
15
+ self.citations = []
16
+ self.metadata = {}
17
+ self.processed_events = set()
18
+ self.current_message_id = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  class SSEParser:
21
  def __init__(self):
22
  self.logger = setup_logger("sse_parser")
23
+ self.current_message = MessageState()
24
+
25
+ def _extract_json_content(self, data: str) -> Optional[str]:
26
+ """Extract JSON content from SSE data line"""
27
+ if "data:" in data:
28
+ return data.split("data:", 1)[1].strip()
29
+ return None
30
 
31
+ def _is_valid_json(self, content: str) -> bool:
32
+ """Check if content is valid JSON"""
 
 
33
  try:
34
+ json.loads(content)
35
+ return True
36
+ except json.JSONDecodeError:
37
+ return False
38
+
39
+ def _clean_mermaid_content(self, content: str) -> Optional[str]:
40
+ """Clean and extract mermaid diagram content"""
41
+ try:
42
+ # Remove tool response prefix/suffix if present
43
+ if "tool response:" in content:
44
+ content = content.split("tool response:", 1)[1].strip()
45
 
46
+ # Parse JSON if present
47
+ try:
48
+ data = json.loads(content)
49
+ # Handle different mermaid output formats
50
+ if "mermaid_output" in data:
51
+ content = data["mermaid_output"]
52
+ elif "mermaid_diagram" in data:
53
+ content = data["mermaid_diagram"]
54
+ except json.JSONDecodeError:
55
+ pass
56
+
57
+ # Clean up markdown formatting
58
+ content = content.replace("```mermaid\n", "").replace("\n```", "")
59
 
60
+ return content.strip()
61
+ except Exception as e:
62
+ self.logger.error(f"Error cleaning mermaid content: {str(e)}")
63
+ return None
64
+
65
+ def parse_sse_event(self, data: str) -> Optional[Dict]:
66
+ """Parse SSE event data and format for frontend consumption"""
67
+ try:
68
+ # Extract JSON content from SSE data
69
+ json_content = self._extract_json_content(data)
70
+ if not json_content:
71
+ return None
72
+
73
+ # Parse JSON content
74
+ parsed_data = json.loads(json_content)
 
 
 
 
 
 
 
 
 
 
 
75
 
76
+ # Get event details
77
+ event_type = parsed_data.get("event")
78
+ message_id = parsed_data.get("message_id")
79
 
80
+ # Format based on event type
81
+ if event_type == "agent_message":
82
+ return {
83
+ "type": "message",
84
+ "content": parsed_data.get("answer", ""),
85
+ "message_id": message_id
86
+ }
87
+
88
+ elif event_type == "agent_thought":
89
+ thought = parsed_data.get("thought", "")
90
+ observation = parsed_data.get("observation", "")
91
+ tool = parsed_data.get("tool", "")
92
+
93
+ # Handle tool-specific formatting
94
+ if tool == "mermaid_diagrams":
95
+ try:
96
+ cleaned_content = self._clean_mermaid_content(observation)
97
+ if cleaned_content:
98
+ return {
99
+ "type": "tool_output",
100
+ "tool": "mermaid",
101
+ "content": cleaned_content,
102
+ "message_id": message_id
103
+ }
104
+ except Exception as e:
105
+ self.logger.error(f"Failed to parse mermaid data: {str(e)}")
106
+
107
+ return {
108
+ "type": "thought",
109
+ "content": {
110
+ "thought": thought,
111
+ "observation": observation,
112
+ "tool": tool
113
+ },
114
+ "message_id": message_id
115
+ }
116
+
117
+ elif event_type == "message_end":
118
+ return {
119
+ "type": "end",
120
+ "message_id": message_id,
121
+ "metadata": parsed_data.get("metadata", {})
122
+ }
123
+
124
  return None
125
+
126
  except Exception as e:
127
  self.logger.error(f"Parse error: {str(e)}")
128
  return None
129
 
130
+ def _process_observation(self, data: Dict) -> Dict:
131
+ """Process observation content with special handling for tool outputs"""
132
+ try:
133
+ observation = data.get("observation")
134
+ if observation and isinstance(observation, str):
135
+ # Handle tool-specific content
136
+ if "mermaid_diagram" in observation:
137
+ cleaned_content = self.clean_mermaid_content(observation)
138
+ if cleaned_content not in [t.get("content") for t in self.current_message.tool_outputs]:
139
+ self.current_message.tool_outputs.append({
140
+ "type": "mermaid_diagram",
141
+ "content": cleaned_content
142
+ })
143
+ data["observation"] = json.dumps({
144
+ "mermaid_diagram": cleaned_content
145
+ })
146
+ elif self._is_valid_json(observation):
147
+ # Handle other tool outputs
148
+ try:
149
+ tool_data = json.loads(observation)
150
+ if isinstance(tool_data, dict):
151
+ for tool_name, tool_output in tool_data.items():
152
+ if tool_output not in [t.get("content") for t in self.current_message.tool_outputs]:
153
+ self.current_message.tool_outputs.append({
154
+ "type": tool_name,
155
+ "content": tool_output
156
+ })
157
+ except json.JSONDecodeError:
158
+ pass
159
+ except Exception as e:
160
+ self.logger.error(f"Error processing observation: {str(e)}")
161
+ return data
162
+
163
+ def _handle_message_end(self, data: Dict) -> None:
164
+ """Handle message end event and cleanup state"""
165
+ self.current_message.citations = data.get("retriever_resources", [])
166
+ self.current_message.metadata = data.get("metadata", {})
167
+ self.current_message.metadata["tool_outputs"] = self.current_message.tool_outputs
168
+ self.current_message.is_complete = True
169
+
170
  def clean_mermaid_content(self, content: str) -> str:
171
  """Clean and format mermaid diagram content"""
172
  try:
173
+ # Remove markdown and JSON formatting
 
 
 
 
 
 
174
  content = re.sub(r'```mermaid\s*|\s*```', '', content)
 
 
175
  content = re.sub(r'tool response:.*?{', '{', content)
176
  content = re.sub(r'}\s*\.$', '}', content)
177
 
178
+ # Parse JSON if present
179
  if content.strip().startswith('{'):
180
  try:
181
  content_dict = json.loads(content)
182
+ if isinstance(content_dict, dict):
183
+ content = content_dict.get("mermaid_diagram", content)
184
  except:
185
  pass
186
 
187
+ return content.strip()
 
 
188
 
189
  except Exception as e:
190
  self.logger.error(f"Error cleaning mermaid content: {e}")