|
"""
|
|
Data Processing Functions.
|
|
Supports:
|
|
- Segmentation of long text
|
|
- Segmentation of file content
|
|
"""
|
|
from langchain_community.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader, BSHTMLLoader, JSONLoader
|
|
from nltk.tokenize import sent_tokenize
|
|
from collections import Counter
|
|
import re
|
|
import json
|
|
import yaml
|
|
import os
|
|
import yaml
|
|
import os
|
|
import inspect
|
|
import ast
|
|
with open(os.path.join(os.path.dirname(__file__), "..", "config.yaml")) as file:
|
|
config = yaml.safe_load(file)
|
|
|
|
|
|
def load_extraction_config(yaml_path):
|
|
|
|
if not os.path.exists(yaml_path):
|
|
print(f"Error: The config file '{yaml_path}' does not exist.")
|
|
return {}
|
|
|
|
with open(yaml_path, 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
|
|
|
|
model_config = config.get('model', {})
|
|
extraction_config = config.get('extraction', {})
|
|
|
|
|
|
model_name_or_path = model_config.get('model_name_or_path', "")
|
|
model_category = model_config.get('category', "")
|
|
api_key = model_config.get('api_key', "")
|
|
base_url = model_config.get('base_url', "")
|
|
vllm_serve = model_config.get('vllm_serve', False)
|
|
|
|
|
|
task = extraction_config.get('task', "")
|
|
instruction = extraction_config.get('instruction', "")
|
|
text = extraction_config.get('text', "")
|
|
output_schema = extraction_config.get('output_schema', "")
|
|
constraint = extraction_config.get('constraint', "")
|
|
truth = extraction_config.get('truth', "")
|
|
use_file = extraction_config.get('use_file', False)
|
|
file_path = extraction_config.get('file_path', "")
|
|
mode = extraction_config.get('mode', "quick")
|
|
update_case = extraction_config.get('update_case', False)
|
|
show_trajectory = extraction_config.get('show_trajectory', False)
|
|
|
|
|
|
if 'construct' in config:
|
|
construct_config = config.get('construct', {})
|
|
database = construct_config.get('database', "")
|
|
url = construct_config.get('url', "")
|
|
username = construct_config.get('username', "")
|
|
password = construct_config.get('password', "")
|
|
|
|
return {
|
|
"model": {
|
|
"model_name_or_path": model_name_or_path,
|
|
"category": model_category,
|
|
"api_key": api_key,
|
|
"base_url": base_url,
|
|
"vllm_serve": vllm_serve
|
|
},
|
|
"extraction": {
|
|
"task": task,
|
|
"instruction": instruction,
|
|
"text": text,
|
|
"output_schema": output_schema,
|
|
"constraint": constraint,
|
|
"truth": truth,
|
|
"use_file": use_file,
|
|
"file_path": file_path,
|
|
"mode": mode,
|
|
"update_case": update_case,
|
|
"show_trajectory": show_trajectory
|
|
},
|
|
"construct": {
|
|
"database": database,
|
|
"url": url,
|
|
"username": username,
|
|
"password": password
|
|
}
|
|
}
|
|
|
|
|
|
return {
|
|
"model": {
|
|
"model_name_or_path": model_name_or_path,
|
|
"category": model_category,
|
|
"api_key": api_key,
|
|
"base_url": base_url,
|
|
"vllm_serve": vllm_serve
|
|
},
|
|
"extraction": {
|
|
"task": task,
|
|
"instruction": instruction,
|
|
"text": text,
|
|
"output_schema": output_schema,
|
|
"constraint": constraint,
|
|
"truth": truth,
|
|
"use_file": use_file,
|
|
"file_path": file_path,
|
|
"mode": mode,
|
|
"update_case": update_case,
|
|
"show_trajectory": show_trajectory
|
|
}
|
|
}
|
|
|
|
|
|
def chunk_str(text):
|
|
sentences = sent_tokenize(text)
|
|
chunks = []
|
|
current_chunk = []
|
|
current_length = 0
|
|
|
|
for sentence in sentences:
|
|
token_count = len(sentence.split())
|
|
if current_length + token_count <= config['agent']['chunk_token_limit']:
|
|
current_chunk.append(sentence)
|
|
current_length += token_count
|
|
else:
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
current_chunk = [sentence]
|
|
current_length = token_count
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
return chunks
|
|
|
|
|
|
def chunk_file(file_path):
|
|
pages = []
|
|
|
|
if file_path.endswith(".pdf"):
|
|
loader = PyPDFLoader(file_path)
|
|
elif file_path.endswith(".txt"):
|
|
loader = TextLoader(file_path)
|
|
elif file_path.endswith(".docx"):
|
|
loader = Docx2txtLoader(file_path)
|
|
elif file_path.endswith(".html"):
|
|
loader = BSHTMLLoader(file_path)
|
|
elif file_path.endswith(".json"):
|
|
loader = JSONLoader(file_path)
|
|
else:
|
|
raise ValueError("Unsupported file format")
|
|
|
|
pages = loader.load_and_split()
|
|
docs = ""
|
|
for item in pages:
|
|
docs += item.page_content
|
|
pages = chunk_str(docs)
|
|
|
|
return pages
|
|
|
|
def process_single_quotes(text):
|
|
result = re.sub(r"(?<!\w)'|'(?!\w)", '"', text)
|
|
return result
|
|
|
|
def remove_empty_values(data):
|
|
def is_empty(value):
|
|
return value is None or value == [] or value == "" or value == {}
|
|
if isinstance(data, dict):
|
|
return {
|
|
k: remove_empty_values(v)
|
|
for k, v in data.items()
|
|
if not is_empty(v)
|
|
}
|
|
elif isinstance(data, list):
|
|
return [
|
|
remove_empty_values(item)
|
|
for item in data
|
|
if not is_empty(item)
|
|
]
|
|
else:
|
|
return data
|
|
|
|
def extract_json_dict(text):
|
|
if isinstance(text, dict):
|
|
return text
|
|
pattern = r'\{(?:[^{}]|(?:\{(?:[^{}]|(?:\{[^{}]*\})*)*\})*)*\}'
|
|
matches = re.findall(pattern, text)
|
|
if matches:
|
|
json_string = matches[-1]
|
|
json_string = process_single_quotes(json_string)
|
|
try:
|
|
json_dict = json.loads(json_string)
|
|
json_dict = remove_empty_values(json_dict)
|
|
if json_dict is None:
|
|
return "No valid information found."
|
|
return json_dict
|
|
except json.JSONDecodeError:
|
|
return json_string
|
|
else:
|
|
return text
|
|
|
|
def good_case_wrapper(example: str):
|
|
if example is None or example == "":
|
|
return ""
|
|
example = f"\nHere are some examples:\n{example}\n(END OF EXAMPLES)\nRefer to the reasoning steps and analysis in the examples to help complete the extraction task below.\n\n"
|
|
return example
|
|
|
|
def bad_case_wrapper(example: str):
|
|
if example is None or example == "":
|
|
return ""
|
|
example = f"\nHere are some examples of bad cases:\n{example}\n(END OF EXAMPLES)\nRefer to the reflection rules and reflection steps in the examples to help optimize the original result below.\n\n"
|
|
return example
|
|
|
|
def example_wrapper(example: str):
|
|
if example is None or example == "":
|
|
return ""
|
|
example = f"\nHere are some examples:\n{example}\n(END OF EXAMPLES)\n\n"
|
|
return example
|
|
|
|
def remove_redundant_space(s):
|
|
s = ' '.join(s.split())
|
|
s = re.sub(r"\s*(,|:|\(|\)|\.|_|;|'|-)\s*", r'\1', s)
|
|
return s
|
|
|
|
def format_string(s):
|
|
s = remove_redundant_space(s)
|
|
s = s.lower()
|
|
s = s.replace('{','').replace('}','')
|
|
s = re.sub(',+', ',', s)
|
|
s = re.sub('\.+', '.', s)
|
|
s = re.sub(';+', ';', s)
|
|
s = s.replace('’', "'")
|
|
return s
|
|
|
|
def calculate_metrics(y_truth: set, y_pred: set):
|
|
TP = len(y_truth & y_pred)
|
|
FN = len(y_truth - y_pred)
|
|
FP = len(y_pred - y_truth)
|
|
precision = TP / (TP + FP) if (TP + FP) > 0 else 0
|
|
recall = TP / (TP + FN) if (TP + FN) > 0 else 0
|
|
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
|
|
return precision, recall, f1_score
|
|
|
|
def current_function_name():
|
|
try:
|
|
stack = inspect.stack()
|
|
if len(stack) > 1:
|
|
outer_func_name = stack[1].function
|
|
return outer_func_name
|
|
else:
|
|
print("No caller function found")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
pass
|
|
|
|
def normalize_obj(value):
|
|
if isinstance(value, dict):
|
|
return frozenset((k, normalize_obj(v)) for k, v in value.items())
|
|
elif isinstance(value, (list, set, tuple)):
|
|
return tuple(Counter(map(normalize_obj, value)).items())
|
|
elif isinstance(value, str):
|
|
return format_string(value)
|
|
return value
|
|
|
|
def dict_list_to_set(data_list):
|
|
result_set = set()
|
|
try:
|
|
for dictionary in data_list:
|
|
value_tuple = tuple(format_string(value) for value in dictionary.values())
|
|
result_set.add(value_tuple)
|
|
return result_set
|
|
except Exception as e:
|
|
print (f"Failed to convert dictionary list to set: {data_list}")
|
|
return result_set
|
|
|