|
import asyncio |
|
from python.helpers.extension import Extension |
|
from python.helpers.memory import Memory |
|
from python.helpers.dirty_json import DirtyJson |
|
from agent import LoopData |
|
from python.helpers.log import LogItem |
|
|
|
|
|
class MemorizeSolutions(Extension): |
|
|
|
REPLACE_THRESHOLD = 0.9 |
|
|
|
async def execute(self, loop_data: LoopData = LoopData(), **kwargs): |
|
|
|
|
|
|
|
self.agent.context.log.log( |
|
type="info", content="Memorizing succesful solutions...", temp=True |
|
) |
|
|
|
|
|
log_item = self.agent.context.log.log( |
|
type="util", |
|
heading="Memorizing succesful solutions...", |
|
) |
|
|
|
|
|
asyncio.create_task(self.memorize(loop_data, log_item)) |
|
|
|
async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs): |
|
|
|
system = self.agent.read_prompt("memory.solutions_sum.sys.md") |
|
msgs_text = self.agent.concat_messages(self.agent.history) |
|
|
|
|
|
async def log_callback(content): |
|
log_item.stream(content=content) |
|
|
|
|
|
solutions_json = await self.agent.call_utility_model( |
|
system=system, |
|
message=msgs_text, |
|
callback=log_callback, |
|
background=True, |
|
) |
|
|
|
solutions = DirtyJson.parse_string(solutions_json) |
|
|
|
if not isinstance(solutions, list) or len(solutions) == 0: |
|
log_item.update(heading="No successful solutions to memorize.") |
|
return |
|
else: |
|
log_item.update( |
|
heading=f"{len(solutions)} successful solutions to memorize." |
|
) |
|
|
|
|
|
db = await Memory.get(self.agent) |
|
|
|
solutions_txt = "" |
|
rem = [] |
|
for solution in solutions: |
|
|
|
txt = f"# Problem\n {solution['problem']}\n# Solution\n {solution['solution']}" |
|
solutions_txt += txt + "\n\n" |
|
|
|
|
|
if self.REPLACE_THRESHOLD > 0: |
|
rem += await db.delete_documents_by_query( |
|
query=txt, |
|
threshold=self.REPLACE_THRESHOLD, |
|
filter=f"area=='{Memory.Area.SOLUTIONS.value}'", |
|
) |
|
if rem: |
|
rem_txt = "\n\n".join(Memory.format_docs_plain(rem)) |
|
log_item.update(replaced=rem_txt) |
|
|
|
|
|
await db.insert_text(text=txt, metadata={"area": Memory.Area.SOLUTIONS.value}) |
|
|
|
solutions_txt = solutions_txt.strip() |
|
log_item.update(solutions=solutions_txt) |
|
log_item.update( |
|
result=f"{len(solutions)} solutions memorized.", |
|
heading=f"{len(solutions)} solutions memorized.", |
|
) |
|
if rem: |
|
log_item.stream(result=f"\nReplaced {len(rem)} previous solutions.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|