Spaces:
Sleeping
Sleeping
from prompt import Prompt | |
from openai import OpenAI | |
from fuzzywuzzy import fuzz | |
from fuzzywuzzy import process | |
import gradio as gr | |
import pandas as pd | |
import os | |
class Backend: | |
def __init__(self): | |
self.agent = OpenAI() | |
self.prompt = Prompt() | |
def read_file(self, file): | |
# read the file | |
if file is not None: | |
with open(file.name, 'r') as f: | |
text = f.read() | |
else: | |
raise gr.Error("You need to upload a file first") | |
return text | |
def highlight_text(self, text, highlight_list): | |
# Find the original sentences | |
# Split the passage into sentences | |
sentences_in_passage = text.split('.') | |
sentences_in_passage = [i.split('\n') for i in sentences_in_passage] | |
new_sentences_in_passage = [] | |
for i in sentences_in_passage: | |
new_sentences_in_passage =new_sentences_in_passage + i | |
# hightlight the reference | |
for hl in highlight_list: | |
# Find the best match using fuzzy matching | |
best_match = process.extractOne(hl, new_sentences_in_passage, scorer=fuzz.partial_ratio) | |
text = text.replace(best_match[0], f'<mark style="background: #A5D2F1">{best_match[0]}</mark><mark style="background: #FFC0CB"><font color="red"> (match score:{best_match[1]})</font></mark>') | |
# add line break | |
text = text.replace('\n', f" <br /> ") | |
# add scroll bar | |
text = f'<div style="height: 500px; overflow: auto;">{text}</div>' | |
return text | |
def process_file(self, file, questions, openai_key): | |
# record the questions | |
self.questions = questions | |
# get the text | |
self.text = self.read_file(file) | |
# make the prompt | |
prompt = self.prompt.get(self.text, questions, 'v3') | |
# interact with openai | |
res = self.agent(prompt, with_history = False, temperature = 0.1, model = 'gpt-3.5-turbo-16k', api_key = openai_key) | |
res = self.prompt.process_result(res, 'v3') | |
# for multiple questions | |
self.gpt_result = res | |
self.curret_question = 0 | |
self.totel_question = len(res.keys()) | |
# make a dataframe to record everything | |
self.ori_answer_df = pd.DataFrame(res).T | |
self.answer_df = pd.DataFrame(res).T | |
# default fist question | |
res = res['Question 1'] | |
question = self.questions[self.curret_question] | |
self.answer = res['answer'] | |
self.highlighted_out = res['original sentences'] | |
highlighted_out_html = self.highlight_text(self.text, self.highlighted_out) | |
self.highlighted_out = '\n'.join(self.highlighted_out) | |
return question, self.answer, highlighted_out_html, self.answer, self.highlighted_out | |
def process_results(self, answer_correct, correct_answer, reference_correct, correct_reference): | |
if not hasattr(self, 'clicked_correct_answer'): | |
raise gr.Error("You need to judge whether the generated answer is correct first") | |
if not hasattr(self, 'clicked_correct_reference'): | |
raise gr.Error("You need to judge whether the highlighted reference is correct first") | |
if not hasattr(self, 'answer_df'): | |
raise gr.Error("You need to submit the document first") | |
if self.curret_question >= self.totel_question or self.curret_question < 0: | |
raise gr.Error("No more questions, please return back") | |
# record the answer | |
self.answer_df.loc[f'Question {self.curret_question + 1}', 'answer_correct'] = answer_correct | |
self.answer_df.loc[f'Question {self.curret_question + 1}', 'reference_correct'] = reference_correct | |
if self.clicked_correct_answer == True: | |
if hasattr(self, 'answer'): | |
self.answer_df.loc[f'Question {self.curret_question + 1}', 'correct_answer'] = self.answer | |
else: | |
raise gr.Error("You need to submit the document first") | |
else: | |
self.answer_df.loc[f'Question {self.curret_question + 1}', 'correct_answer'] = correct_answer | |
if self.clicked_correct_reference == True: | |
if hasattr(self, 'highlighted_out'): | |
self.answer_df.loc[f'Question {self.curret_question + 1}', 'correct_reference'] = self.highlighted_out | |
else: | |
raise gr.Error("You need to submit the document first") | |
else: | |
self.answer_df.loc[f'Question {self.curret_question + 1}', 'correct_reference'] = correct_reference | |
gr.Info('Results saved!') | |
return "Results saved!" | |
def process_next(self): | |
self.curret_question += 1 | |
if hasattr(self, 'clicked_correct_answer'): | |
del self.clicked_correct_answer | |
if hasattr(self, 'clicked_correct_reference'): | |
del self.clicked_correct_reference | |
if self.curret_question >= self.totel_question: | |
# self.curret_question -= 1 | |
return "No more questions!", "No more questions!", "No more questions!", 'No more questions!', 'No more questions!', 'Still need to click the button above to save the results', None, None | |
else: | |
res = self.gpt_result[f'Question {self.curret_question + 1}'] | |
question = self.questions[self.curret_question] | |
self.answer = res['answer'] | |
self.highlighted_out = res['original sentences'] | |
highlighted_out_html = self.highlight_text(self.text, self.highlighted_out) | |
self.highlighted_out = '\n'.join(self.highlighted_out) | |
return question, self.answer, highlighted_out_html, 'Please judge on the generated answer', 'Please judge on the generated answer', 'Still need to click the button above to save the results', None, None | |
def process_last(self): | |
self.curret_question -= 1 | |
if hasattr(self, 'clicked_correct_answer'): | |
del self.clicked_correct_answer | |
if hasattr(self, 'clicked_correct_reference'): | |
del self.clicked_correct_reference | |
if self.curret_question < 0: | |
# self.curret_question += 1 | |
return "No more questions!", "No more questions!", "No more questions!", 'No more questions!', 'No more questions!', 'Still need to click the button above to save the results', None, None | |
else: | |
res = self.gpt_result[f'Question {self.curret_question + 1}'] | |
question = self.questions[self.curret_question] | |
self.answer = res['answer'] | |
self.highlighted_out = res['original sentences'] | |
highlighted_out_html = self.highlight_text(self.text, self.highlighted_out) | |
self.highlighted_out = '\n'.join(self.highlighted_out) | |
return question, self.answer, highlighted_out_html, 'Please judge on the generated answer', 'Please judge on the generated answer', 'Still need to click the button above to save the results', None, None | |
def download_answer(self, path = './tmp', name = 'answer.xlsx'): | |
os.makedirs(path, exist_ok = True) | |
path = os.path.join(path, name) | |
self.ori_answer_df.to_excel(path, index = False) | |
return path | |
def download_corrected(self, path = './tmp', name = 'corrected_answer.xlsx'): | |
os.makedirs(path, exist_ok = True) | |
path = os.path.join(path, name) | |
self.answer_df.to_excel(path, index = False) | |
return path | |
def change_correct_answer(self, correctness): | |
if correctness == "Correct": | |
self.clicked_correct_answer = True | |
return "No need to change" | |
else: | |
if hasattr(self, 'answer'): | |
self.clicked_correct_answer = False | |
return self.answer | |
else: | |
return "No answer yet, you need to submit the document first" | |
def change_correct_reference(self, correctness): | |
if correctness == "Correct": | |
self.clicked_correct_reference = True | |
return "No need to change" | |
else: | |
if hasattr(self, 'highlighted_out'): | |
self.clicked_correct_reference = False | |
return self.highlighted_out | |
else: | |
return "No answer yet, you need to submit the document first" |