Spaces:
Runtime error
Runtime error
import requests | |
import json | |
import gradio as gr | |
import pdfplumber | |
import pandas as pd | |
import time | |
from cnocr import CnOcr | |
from sentence_transformers import SentenceTransformer, models, util | |
word_embedding_model = models.Transformer('uer/sbert-base-chinese-nli', do_lower_case=True) # BERT模型 | |
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(), pooling_mode='cls') # 取cls向量作为句向量 | |
embedder = SentenceTransformer(modules=[word_embedding_model, pooling_model]) # 定义模型 | |
ocr = CnOcr() # 初始化ocr模型 | |
chat_url = 'https://souljoy-my-api.hf.space/chatgpt' # 你的url | |
headers = { | |
'Content-Type': 'application/json', | |
} # 你的headers | |
history_max_len = 500 # 机器人记忆的最大长度 | |
all_max_len = 3000 # 输入的最大长度 | |
def doc_emb(doc): # 文档向量化 | |
texts = doc.split('\n') # 按行切分 | |
emb_list = embedder.encode(texts) # 句向量化 | |
print('\n'.join(texts)) | |
return texts, emb_list, gr.Textbox.update(visible=True), gr.Button.update(visible=True), gr.Markdown.update( | |
value="""操作说明 step 3:PDF解析提交成功! 🙋 可以开始对话啦~"""), gr.Chatbot.update(visible=True) | |
def get_response(open_ai_key, msg, bot, doc_text_list, doc_embeddings): # 获取机器人回复 | |
now_len = len(msg) # 当前输入的长度 | |
his_bg = -1 # 历史记录的起始位置 | |
for i in range(len(bot) - 1, -1, -1): # 从后往前遍历历史记录 | |
if now_len + len(bot[i][0]) + len(bot[i][1]) > history_max_len: # 如果超过了历史记录的最大长度,就不再加入 | |
break | |
now_len += len(bot[i][0]) + len(bot[i][1]) # 更新当前长度 | |
his_bg = i # 更新历史记录的起始位置 | |
history = [] if his_bg == -1 else bot[his_bg:] # 获取历史记录 | |
query_embedding = embedder.encode([msg]) # 输入向量化 | |
cos_scores = util.cos_sim(query_embedding, doc_embeddings)[0] # 计算相似度 | |
score_index = [[score, index] for score, index in zip(cos_scores, [i for i in range(len(cos_scores))])] # 相似度和索引对应 | |
score_index.sort(key=lambda x: x[0], reverse=True) # 按相似度排序 | |
print('score_index:\n', score_index) | |
index_set, sub_doc_list = set(), [] # 用于存储最终的索引和文档 | |
for s_i in score_index: # 遍历相似度和索引对应 | |
doc = doc_text_list[s_i[1]] # 获取文档 | |
if now_len + len(doc) > all_max_len: # 如果超过了最大长度,就不再加入 | |
break | |
index_set.add(s_i[1]) # 加入索引 | |
now_len += len(doc) # 更新当前长度 | |
# 可能段落截断错误,所以把上下段也加入进来 | |
if s_i[1] > 0 and s_i[1] - 1 not in index_set: # 如果上一段没有加入 | |
doc = doc_text_list[s_i[1] - 1] # 获取上一段 | |
if now_len + len(doc) > all_max_len: # 如果超过了最大长度,就不再加入 | |
break | |
index_set.add(s_i[1] - 1) # 加入索引 | |
now_len += len(doc) # 更新当前长度 | |
if s_i[1] + 1 < len(doc_text_list) and s_i[1] + 1 not in index_set: # 如果下一段没有加入 | |
doc = doc_text_list[s_i[1] + 1] # 获取下一段 | |
if now_len + len(doc) > all_max_len: # 如果超过了最大长度,就不再加入 | |
break | |
index_set.add(s_i[1] + 1) # 加入索引 | |
now_len += len(doc) # 更新当前长度 | |
index_list = list(index_set) # 转换成list | |
index_list.sort() # 排序 | |
for i in index_list: # 遍历索引 | |
sub_doc_list.append(doc_text_list[i]) # 加入文档 | |
document = '' if len(sub_doc_list) == 0 else '\n'.join(sub_doc_list) # 拼接文档 | |
messages = [{ | |
"role": "system", | |
"content": "你是一个有用的助手,可以使用文章内容准确地回答问题。使用提供的文章来生成你的答案,但避免逐字复制文章。尽可能使用自己的话。准确、有用、简洁、清晰。" | |
}, {"role": "system", "content": "文章内容:\n" + document}] # 角色人物定义 | |
for his in history: # 遍历历史记录 | |
messages.append({"role": "user", "content": his[0]}) # 加入用户的历史记录 | |
messages.append({"role": "assistant", "content": his[1]}) # 加入机器人的历史记录 | |
messages.append({"role": "user", "content": msg}) # 加入用户的当前输入 | |
req_json = {'messages': messages, 'key': open_ai_key, 'model': "gpt-3.5-turbo"} # 请求json | |
data = {"content": json.dumps(req_json)} # 请求data | |
print('data:\n', req_json) | |
result = requests.post(url=chat_url, | |
data=json.dumps(data), | |
headers=headers | |
) # 请求 | |
res = result.json()['content'] # 获取回复 | |
bot.append([msg, res]) # 加入历史记录 | |
return bot[max(0, len(bot) - 3):] # 返回最近3轮的历史记录 | |
def up_file(files): # 上传文件 | |
doc_text_list = [] # 用于存储文档 | |
for idx, file in enumerate(files): # 遍历文件 | |
print(file.name) | |
with pdfplumber.open(file.name) as pdf: # 打开pdf | |
for i in range(len(pdf.pages)): # 遍历pdf的每一页 | |
# 读取PDF文档第i+1页 | |
page = pdf.pages[i] | |
res_list = page.extract_text().split('\n')[:-1] # 提取文本 | |
for j in range(len(page.images)): # 遍历图片 | |
# 获取图片的二进制流 | |
img = page.images[j] | |
file_name = '{}-{}-{}.png'.format(str(time.time()), str(i), str(j)) # 生成文件名 | |
with open(file_name, mode='wb') as f: # 保存图片 | |
f.write(img['stream'].get_data()) | |
try: | |
res = ocr.ocr(file_name) # 识别图片 | |
except Exception as e: | |
res = [] # 识别失败 | |
if len(res) > 0: # 如果识别成功 | |
res_list.append(' '.join([re['text'] for re in res])) # 加入识别结果 | |
tables = page.extract_tables() # 提取表格 | |
for table in tables: # 遍历表格 | |
# 第一列当成表头: | |
df = pd.DataFrame(table[1:], columns=table[0]) | |
try: | |
records = json.loads(df.to_json(orient="records", force_ascii=False)) # 转换成json | |
for rec in records: # 遍历json | |
res_list.append(json.dumps(rec, ensure_ascii=False)) # 加入json | |
except Exception as e: | |
res_list.append(str(df)) # 如果转换识别,直接把表格转为str | |
doc_text_list += res_list # 加入文档 | |
doc_text_list = [str(text).strip() for text in doc_text_list if len(str(text).strip()) > 0] # 去除空格 | |
print(doc_text_list) | |
return gr.Textbox.update(value='\n'.join(doc_text_list), visible=True), gr.Button.update( | |
visible=True), gr.Markdown.update( | |
value="操作说明 step 2:确认PDF解析结果(可修正),点击“提交解析结果”,随后进行对话") | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(): | |
open_ai_key = gr.Textbox(label='OpenAI API Key', placeholder='输入你的OpenAI API Key') # 你的OpenAI API Key | |
file = gr.File(file_types=['.pdf'], label='点击上传PDF,进行解析(支持多文档、表格、OCR)', | |
file_count='multiple') # 支持多文档、表格、OCR | |
txt = gr.Textbox(label='PDF解析结果', visible=False) # PDF解析结果 | |
doc_bu = gr.Button(value='提交解析结果', visible=False) # 提交解析结果 | |
doc_text_state = gr.State([]) # 存储PDF解析结果 | |
doc_emb_state = gr.State([]) # 存储PDF解析结果的embedding | |
with gr.Column(): | |
md = gr.Markdown("""操作说明 step 1:点击左侧区域,上传PDF,进行解析""") # 操作说明 | |
chat_bot = gr.Chatbot(visible=False) # 聊天机器人 | |
msg_txt = gr.Textbox(label='消息框', placeholder='输入消息,点击发送', visible=False) # 消息框 | |
with gr.Row(): | |
chat_bu = gr.Button(value='发送', visible=False) # 发送按钮 | |
file.change(up_file, [file], [txt, doc_bu, md]) # 上传文件 | |
doc_bu.click(doc_emb, [txt], [doc_text_state, doc_emb_state, msg_txt, chat_bu, md, chat_bot]) # 提交解析结果 | |
chat_bu.click(get_response, [open_ai_key, msg_txt, chat_bot, doc_text_state, doc_emb_state], [chat_bot]) # 发送消息 | |
if __name__ == "__main__": | |
demo.queue().launch() | |