Spaces:
Sleeping
Sleeping
import re | |
import os | |
import json | |
import html | |
from typing import Any | |
def pack_history_conversations(*args: str): | |
roles = ["user", "assistant"] | |
return [ | |
{"role": roles[i % 2], "content": content} for i, content in enumerate(args) | |
] | |
def split_string_by_multi_markers(content: str, markers: list[str]) -> list[str]: | |
"""Split a string by multiple markers""" | |
if not markers: | |
return [content] | |
results = re.split("|".join(re.escape(marker) for marker in markers), content) | |
return [r.strip() for r in results if r.strip()] | |
# Refer the utils functions of the official GraphRAG implementation: | |
# https://github.com/microsoft/graphrag | |
def clean_str(input: Any) -> str: | |
"""Clean an input string by removing HTML escapes, control characters, and other unwanted characters.""" | |
# If we get non-string input, just give it back | |
if not isinstance(input, str): | |
return input | |
result = html.unescape(input.strip()) | |
# https://stackoverflow.com/questions/4324790/removing-control-characters-from-a-string-in-python | |
return re.sub(r"[\x00-\x1f\x7f-\x9f]", "", result) | |
async def handle_single_entity_extraction( | |
record_attributes: list[str], | |
chunk_key: str, | |
): | |
if len(record_attributes) < 4 or record_attributes[0] != '"entity"': | |
return None | |
# add this record as a node in the G | |
entity_name = clean_str(record_attributes[1].upper()) | |
if not entity_name.strip(): | |
return None | |
entity_type = clean_str(record_attributes[2].upper()) | |
entity_description = clean_str(record_attributes[3]) | |
entity_source_id = chunk_key | |
return { | |
"entity_name": entity_name, | |
"entity_type": entity_type, | |
"description": entity_description, | |
"source_id": entity_source_id, | |
} | |
def is_float_regex(value): | |
return bool(re.match(r"^[-+]?[0-9]*\.?[0-9]+$", value)) | |
async def handle_single_relationship_extraction( | |
record_attributes: list[str], | |
chunk_key: str, | |
): | |
if len(record_attributes) < 4 or record_attributes[0] != '"relationship"': | |
return None | |
# add this record as edge | |
source = clean_str(record_attributes[1].upper()) | |
target = clean_str(record_attributes[2].upper()) | |
edge_description = clean_str(record_attributes[3]) | |
edge_source_id = chunk_key | |
return { | |
"src_id": source, | |
"tgt_id": target, | |
"description": edge_description, | |
"source_id": edge_source_id, | |
} | |
def load_json(file_name): | |
if not os.path.exists(file_name): | |
return None | |
with open(file_name, encoding="utf-8") as f: | |
return json.load(f) | |
def write_json(json_obj, file_name): | |
if not os.path.exists(os.path.dirname(file_name)): | |
os.makedirs(os.path.dirname(file_name), exist_ok=True) | |
with open(file_name, "w", encoding="utf-8") as f: | |
json.dump(json_obj, f, indent=4, ensure_ascii=False) | |