Andy Lee commited on
Commit
04ae29a
Β·
1 Parent(s): adf0c49

feat: force model to react with lat and lon for guessing

Browse files
Files changed (1) hide show
  1. geo_bot.py +118 -58
geo_bot.py CHANGED
@@ -3,6 +3,7 @@ import json
3
  import re
4
  from io import BytesIO
5
  from typing import Tuple, List, Optional, Dict, Any, Type
 
6
 
7
  from PIL import Image
8
  from langchain_core.messages import HumanMessage, BaseMessage
@@ -37,7 +38,10 @@ AGENT_PROMPT_TEMPLATE = """
37
 
38
  4. **Be Decisive:** A unique, definitive clue (full address, rare town name, etc.) β‡’ `GUESS` immediately.
39
 
40
- 5. **Final-Step Rule:** If **Remaining Steps = 1**, you **MUST** `GUESS` and you should carefully check the image and the surroundings.
 
 
 
41
 
42
  ────────────────────────────────
43
  **Context & Task:**
@@ -136,21 +140,33 @@ class GeoBot:
136
  )
137
  ]
138
 
139
- def _parse_agent_response(self, response: BaseMessage) -> Optional[Dict[str, Any]]:
 
 
140
  """
141
- Robustly parses JSON from the LLM response, handling markdown code blocks.
142
  """
143
  try:
144
  assert isinstance(response.content, str), "Response content is not a string"
145
  content = response.content.strip()
 
 
 
146
  match = re.search(r"```json\s*(\{.*?\})\s*```", content, re.DOTALL)
147
  if match:
148
  json_str = match.group(1)
 
149
  else:
150
  json_str = content
151
- return json.loads(json_str)
 
 
 
 
 
152
  except (json.JSONDecodeError, AttributeError) as e:
153
- print(f"Invalid JSON from LLM: {e}\nFull response was:\n{response.content}")
 
154
  return None
155
 
156
  def init_history(self) -> List[Dict[str, Any]]:
@@ -222,7 +238,8 @@ class GeoBot:
222
  prompt, image_b64_for_prompt[-1:]
223
  )
224
  response = self.model.invoke(message)
225
- decision = self._parse_agent_response(response)
 
226
  except Exception as e:
227
  print(f"Error during model invocation: {e}")
228
  decision = None
@@ -259,15 +276,7 @@ class GeoBot:
259
  self, max_steps: int = 10, step_callback=None
260
  ) -> Optional[Tuple[float, float]]:
261
  """
262
- Enhanced agent loop that calls a callback function after each step for UI updates.
263
-
264
- Args:
265
- max_steps: Maximum number of steps to take
266
- step_callback: Function called after each step with step info
267
- Signature: callback(step_info: dict) -> None
268
-
269
- Returns:
270
- Final guess coordinates (lat, lon) or None if no guess made
271
  """
272
  history = self.init_history()
273
 
@@ -275,14 +284,24 @@ class GeoBot:
275
  step_num = max_steps - step + 1
276
  print(f"\n--- Step {step_num}/{max_steps} ---")
277
 
278
- # Setup and screenshot
279
- self.controller.setup_clean_environment()
280
- self.controller.label_arrows_on_screen()
 
 
 
 
 
 
 
 
 
 
 
281
 
282
- screenshot_bytes = self.controller.take_street_view_screenshot()
283
  if not screenshot_bytes:
284
- print("Failed to take screenshot. Ending agent loop.")
285
- return None
286
 
287
  current_screenshot_b64 = self.pil_to_base64(
288
  image=Image.open(BytesIO(screenshot_bytes))
@@ -290,36 +309,28 @@ class GeoBot:
290
  available_actions = self.controller.get_available_actions()
291
  print(f"Available actions: {available_actions}")
292
 
293
- # Force guess on final step or get AI decision
294
- if step == 1: # Final step
295
- # Force a guess with fallback logic
296
- decision = {
297
- "reasoning": "Maximum steps reached, forcing final guess.",
298
- "action_details": {"action": "GUESS", "lat": 0.0, "lon": 0.0},
299
- }
300
- # Try to get a real guess from AI
301
- try:
302
- ai_decision = self.execute_agent_step(
303
- history, step, current_screenshot_b64, available_actions
304
- )
305
- if (
306
- ai_decision
307
- and ai_decision.get("action_details", {}).get("action")
308
- == "GUESS"
309
- ):
310
- decision = ai_decision
311
- except Exception as e:
312
- print(
313
- f"\nERROR: An exception occurred during the final GUESS attempt: {e}. Using fallback (0,0).\n"
314
- )
315
  else:
316
- # Normal step execution
317
  decision = self.execute_agent_step(
318
  history, step, current_screenshot_b64, available_actions
319
  )
320
 
321
- # Create step_info with current history BEFORE adding current step
322
- # This shows the history up to (but not including) the current step
 
 
 
 
 
 
 
 
 
 
323
  step_info = {
324
  "step_num": step_num,
325
  "max_steps": max_steps,
@@ -330,7 +341,7 @@ class GeoBot:
330
  "is_final_step": step == 1,
331
  "reasoning": decision.get("reasoning", "N/A"),
332
  "action_details": decision.get("action_details", {"action": "N/A"}),
333
- "history": history.copy(), # History up to current step (excluding current)
334
  }
335
 
336
  action_details = decision.get("action_details", {})
@@ -338,29 +349,78 @@ class GeoBot:
338
  print(f"AI Reasoning: {decision.get('reasoning', 'N/A')}")
339
  print(f"AI Action: {action}")
340
 
341
- # Call UI callback before executing action
342
  if step_callback:
343
  try:
344
  step_callback(step_info)
345
  except Exception as e:
346
- print(f"Warning: UI callback failed: {e}")
347
 
348
- # Add step to history AFTER callback (so next iteration has this step in history)
349
  self.add_step_to_history(history, current_screenshot_b64, decision)
350
 
351
  # Execute action
352
  if action == "GUESS":
353
- lat, lon = action_details.get("lat"), action_details.get("lon")
354
- if lat is not None and lon is not None:
355
- return lat, lon
356
- else:
357
- print("Invalid guess coordinates, using fallback")
358
- return 0.0, 0.0 # Fallback coordinates
 
 
 
 
 
 
 
 
359
  else:
360
  self.execute_action(action)
361
 
362
- print("Max steps reached. Agent did not make a final guess.")
363
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
364
 
365
  def analyze_image(self, image: Image.Image) -> Optional[Tuple[float, float]]:
366
  image_b64 = self.pil_to_base64(image)
 
3
  import re
4
  from io import BytesIO
5
  from typing import Tuple, List, Optional, Dict, Any, Type
6
+ import time
7
 
8
  from PIL import Image
9
  from langchain_core.messages import HumanMessage, BaseMessage
 
38
 
39
  4. **Be Decisive:** A unique, definitive clue (full address, rare town name, etc.) β‡’ `GUESS` immediately.
40
 
41
+ 5. **Final-Step Rule**
42
+ - If **Remaining Steps = 1**, you **MUST** `GUESS` with coordinates.
43
+ - **NO EXCEPTIONS**: Even with limited clues, provide your best estimate.
44
+ - **ALWAYS provide lat/lon numbers** - educated guesses are mandatory.
45
 
46
  ────────────────────────────────
47
  **Context & Task:**
 
140
  )
141
  ]
142
 
143
+ def _parse_agent_response(
144
+ self, response: BaseMessage, verbose: bool = False
145
+ ) -> Optional[Dict[str, Any]]:
146
  """
147
+ Robustly parses JSON from the LLM response with detailed logging.
148
  """
149
  try:
150
  assert isinstance(response.content, str), "Response content is not a string"
151
  content = response.content.strip()
152
+ if verbose:
153
+ print(f"Raw AI response: {content[:200]}...") # Show first 200 chars
154
+
155
  match = re.search(r"```json\s*(\{.*?\})\s*```", content, re.DOTALL)
156
  if match:
157
  json_str = match.group(1)
158
+ print(f"Extracted JSON: {json_str}")
159
  else:
160
  json_str = content
161
+ print("No JSON code block found, trying to parse entire content")
162
+
163
+ parsed = json.loads(json_str)
164
+ print(f"Successfully parsed JSON: {parsed}")
165
+ return parsed
166
+
167
  except (json.JSONDecodeError, AttributeError) as e:
168
+ print(f"βœ— JSON parsing failed: {e}")
169
+ print(f"Full response was:\n{response.content}")
170
  return None
171
 
172
  def init_history(self) -> List[Dict[str, Any]]:
 
238
  prompt, image_b64_for_prompt[-1:]
239
  )
240
  response = self.model.invoke(message)
241
+ verbose = remaining_steps == 1
242
+ decision = self._parse_agent_response(response, verbose)
243
  except Exception as e:
244
  print(f"Error during model invocation: {e}")
245
  decision = None
 
276
  self, max_steps: int = 10, step_callback=None
277
  ) -> Optional[Tuple[float, float]]:
278
  """
279
+ Agent loop with simple retry logic and clear error coordinates.
 
 
 
 
 
 
 
 
280
  """
281
  history = self.init_history()
282
 
 
284
  step_num = max_steps - step + 1
285
  print(f"\n--- Step {step_num}/{max_steps} ---")
286
 
287
+ # Simple retry for screenshot
288
+ screenshot_bytes = None
289
+ for retry in range(3):
290
+ try:
291
+ self.controller.setup_clean_environment()
292
+ self.controller.label_arrows_on_screen()
293
+ screenshot_bytes = self.controller.take_street_view_screenshot()
294
+ if screenshot_bytes:
295
+ break
296
+ print(f"Screenshot retry {retry + 1}/3")
297
+ except Exception as e:
298
+ print(f"Error in step {step_num}, retry {retry + 1}: {e}")
299
+ if retry < 2:
300
+ time.sleep(2)
301
 
 
302
  if not screenshot_bytes:
303
+ print("Failed to get screenshot after retries")
304
+ return -1.0, -1.0
305
 
306
  current_screenshot_b64 = self.pil_to_base64(
307
  image=Image.open(BytesIO(screenshot_bytes))
 
309
  available_actions = self.controller.get_available_actions()
310
  print(f"Available actions: {available_actions}")
311
 
312
+ # Get AI decision
313
+ if step == 1: # Final step - force guess
314
+ decision = self._get_final_guess(
315
+ history, current_screenshot_b64, available_actions
316
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
317
  else:
 
318
  decision = self.execute_agent_step(
319
  history, step, current_screenshot_b64, available_actions
320
  )
321
 
322
+ if not decision:
323
+ print("No decision from AI, using fallback")
324
+ decision = {
325
+ "reasoning": "AI decision failed",
326
+ "action_details": {
327
+ "action": "GUESS" if step == 1 else "PAN_RIGHT",
328
+ "lat": -1.0,
329
+ "lon": -1.0,
330
+ },
331
+ }
332
+
333
+ # UI callback
334
  step_info = {
335
  "step_num": step_num,
336
  "max_steps": max_steps,
 
341
  "is_final_step": step == 1,
342
  "reasoning": decision.get("reasoning", "N/A"),
343
  "action_details": decision.get("action_details", {"action": "N/A"}),
344
+ "history": history.copy(),
345
  }
346
 
347
  action_details = decision.get("action_details", {})
 
349
  print(f"AI Reasoning: {decision.get('reasoning', 'N/A')}")
350
  print(f"AI Action: {action}")
351
 
 
352
  if step_callback:
353
  try:
354
  step_callback(step_info)
355
  except Exception as e:
356
+ print(f"UI callback error: {e}")
357
 
358
+ # Add to history
359
  self.add_step_to_history(history, current_screenshot_b64, decision)
360
 
361
  # Execute action
362
  if action == "GUESS":
363
+ lat = action_details.get("lat", -1.0)
364
+ lon = action_details.get("lon", -1.0)
365
+ print(f"Final guess: lat={lat}, lon={lon}")
366
+
367
+ # Validate coordinates
368
+ try:
369
+ lat_f, lon_f = float(lat), float(lon)
370
+ if -90 <= lat_f <= 90 and -180 <= lon_f <= 180:
371
+ return lat_f, lon_f
372
+ except (ValueError, TypeError):
373
+ pass
374
+
375
+ print("Invalid coordinates, returning error values")
376
+ return -1.0, -1.0
377
  else:
378
  self.execute_action(action)
379
 
380
+ print("Max steps reached without guess")
381
+ return -1.0, -1.0
382
+
383
+ def _get_final_guess(self, history, screenshot_b64, available_actions):
384
+ """Get final guess from AI with simple retry."""
385
+ for retry in range(2):
386
+ try:
387
+ # If retry > 0, use a force prompt to ensure the AI returns a GUESS with coordinates.
388
+ if retry > 0:
389
+ history_text = self.generate_history_text(history)
390
+ force_prompt = f"""**FINAL STEP - MANDATORY GUESS**
391
+ You MUST return GUESS with coordinates. No other action allowed.
392
+ Remaining Steps: 1
393
+ Journey history: {history_text}
394
+ Provide your best lat/lon estimate based on all observed clues.
395
+ **MANDATORY JSON Format:**
396
+ {{"reasoning": "your analysis", "action_details": {{"action": "GUESS", "lat": 45.0, "lon": 2.0}} }}"""
397
+
398
+ message = self._create_message_with_history(
399
+ force_prompt, [screenshot_b64]
400
+ )
401
+ response = self.model.invoke(message)
402
+ decision = self._parse_agent_response(response)
403
+ else:
404
+ decision = self.execute_agent_step(
405
+ history, 1, screenshot_b64, available_actions
406
+ )
407
+ if (
408
+ decision
409
+ and decision.get("action_details", {}).get("action") == "GUESS"
410
+ ):
411
+ return decision
412
+ print(f"AI didn't return GUESS, retry {retry + 1}/2")
413
+ except Exception as e:
414
+ print(f"AI call failed, retry {retry + 1}/2: {e}")
415
+
416
+ if retry == 0:
417
+ time.sleep(1)
418
+
419
+ # Fallback
420
+ return {
421
+ "reasoning": "AI failed to provide final guess after retries",
422
+ "action_details": {"action": "GUESS", "lat": -1.0, "lon": -1.0},
423
+ }
424
 
425
  def analyze_image(self, image: Image.Image) -> Optional[Tuple[float, float]]:
426
  image_b64 = self.pil_to_base64(image)