Spaces:
Runtime error
Runtime error
File size: 5,705 Bytes
d470c63 576620d d470c63 78e025e d470c63 6217c50 d470c63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import gradio as gr
import json
import gradio as gr
# !python -c "import torch; assert torch.cuda.get_device_capability()[0] >= 8, 'Hardware not supported for Flash Attention'"
import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, GemmaTokenizer, StoppingCriteria, StoppingCriteriaList, GenerationConfig
# from google.colab import userdata
import os
model_id = "somosnlp/Sam_Diagnostic"
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
max_seq_length=2048
# if torch.cuda.get_device_capability()[0] >= 8:
# # print("Flash Attention")
# attn_implementation="flash_attention_2"
# else:
# attn_implementation=None
attn_implementation=None
tokenizer = AutoTokenizer.from_pretrained(model_id,
max_length = max_seq_length)
model = AutoModelForCausalLM.from_pretrained(model_id,
# quantization_config=bnb_config,
device_map = {"":0},
attn_implementation = attn_implementation, # A100 o H100
).eval()
class ListOfTokensStoppingCriteria(StoppingCriteria):
"""
Clase para definir un criterio de parada basado en una lista de tokens específicos.
"""
def __init__(self, tokenizer, stop_tokens):
self.tokenizer = tokenizer
# Codifica cada token de parada y guarda sus IDs en una lista
self.stop_token_ids_list = [tokenizer.encode(stop_token, add_special_tokens=False) for stop_token in stop_tokens]
def __call__(self, input_ids, scores, **kwargs):
# Verifica si los últimos tokens generados coinciden con alguno de los conjuntos de tokens de parada
for stop_token_ids in self.stop_token_ids_list:
len_stop_tokens = len(stop_token_ids)
if len(input_ids[0]) >= len_stop_tokens:
if input_ids[0, -len_stop_tokens:].tolist() == stop_token_ids:
return True
return False
# Uso del criterio de parada personalizado
stop_tokens = ["<end_of_turn>"] # Lista de tokens de parada
# Inicializa tu criterio de parada con el tokenizer y la lista de tokens de parada
stopping_criteria = ListOfTokensStoppingCriteria(tokenizer, stop_tokens)
# Añade tu criterio de parada a una StoppingCriteriaList
stopping_criteria_list = StoppingCriteriaList([stopping_criteria])
def generate_text(prompt, idioma_entrada, idioma_salida, max_length=2100):
prompt=prompt.replace(". ", ".\n").strip()
input_text = f'''<bos><start_of_turn>system
You are a helpful AI assistant.
Responde en formato json.
Eres un agente experto en medicina.
Lista de codigos linguisticos disponibles: ["{idioma_entrada}", "{idioma_salida}"]<end_of_turn>
<start_of_turn>user
{prompt}<end_of_turn>
<start_of_turn>model
'''
inputs = tokenizer.encode(input_text,
return_tensors="pt",
add_special_tokens=False).to("cuda:0")
max_new_tokens=max_length
generation_config = GenerationConfig(
max_new_tokens=max_new_tokens,
temperature=0.35, #55
#top_p=0.9,
top_k=50, # 45
repetition_penalty=1., #1.1
do_sample=True,
)
outputs = model.generate(generation_config=generation_config,
input_ids=inputs,
stopping_criteria=stopping_criteria_list,)
return tokenizer.decode(outputs[0], skip_special_tokens=False) #True
def mostrar_respuesta(pregunta, idioma_entrada, idioma_salida):
try:
lista_codigo_lin = {
"español": "es",
"ingles": "en",
}
# Utiliza los parámetros de idioma para obtener los códigos de idioma correspondientes.
codigo_lin_entrada = lista_codigo_lin[idioma_entrada.lower()]
codigo_lin_salida = lista_codigo_lin[idioma_salida.lower()]
res= generate_text(pregunta, codigo_lin_entrada, codigo_lin_salida, max_length=1500)
inicio_json = res.find('{')
fin_json = res.rfind('}') + 1
json_str = res[inicio_json:fin_json]
json_obj = json.loads(json_str)
return json_obj["description"], json_obj["medical_specialty"], json_obj["principal_diagnostic"]
except:
json_obj={}
json_obj['description']='Error diagnostico'
json_obj['medical_specialty']='Error diagnostico'
json_obj['principal_diagnostic']='Error diagnostico'
return json_obj["description"], json_obj["medical_specialty"], json_obj["principal_diagnostic"]
# Ejemplos de preguntas
ejemplos = [
["CHIEF COMPLAINT:, Left wrist pain.,HISTORY OF PRESENT PROBLEM"],
["INDICATIONS: ,Chest pain.,STRESS TECHNIQUE:,"],
["MOTIVO DE CONSULTA: Una niña de 2 meses"],
]
idiomas = ["español", "ingles"]
iface = gr.Interface(
fn=mostrar_respuesta,
inputs=[
gr.Textbox(label="Pregunta", placeholder="Introduce tu consulta médica aquí..."),
gr.Dropdown(label="Idioma de Entrada", choices=idiomas, default="español"),
gr.Dropdown(label="Idioma de Salida", choices=idiomas, default="español"),
],
outputs=[
gr.Textbox(label="Description", lines=2),
gr.Textbox(label="Medical specialty", lines=1),
gr.Textbox(label="Principal diagnostic", lines=1)
],
title="Consultas medicas",
description="Introduce tu diagnostico.",
examples=ejemplos,
concurrency_limit=20
)
iface.queue(max_size=14).launch(share=True,debug=True, ) # share=True,debug=True
|