File size: 5,705 Bytes
d470c63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
576620d
d470c63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78e025e
d470c63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6217c50
 
d470c63
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import gradio as gr
import json


import gradio as gr

# !python -c "import torch; assert torch.cuda.get_device_capability()[0] >= 8, 'Hardware not supported for Flash Attention'"
import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, GemmaTokenizer,  StoppingCriteria, StoppingCriteriaList, GenerationConfig
# from google.colab import userdata
import os

model_id = "somosnlp/Sam_Diagnostic"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)
max_seq_length=2048

# if torch.cuda.get_device_capability()[0] >= 8:
#     # print("Flash Attention")
#     attn_implementation="flash_attention_2"
# else:
#     attn_implementation=None
attn_implementation=None

tokenizer = AutoTokenizer.from_pretrained(model_id,
                                          max_length = max_seq_length)
model = AutoModelForCausalLM.from_pretrained(model_id,
                                            #  quantization_config=bnb_config,
                                             device_map = {"":0},
                                              attn_implementation = attn_implementation, # A100 o H100
                                             ).eval()



class ListOfTokensStoppingCriteria(StoppingCriteria):
    """
    Clase para definir un criterio de parada basado en una lista de tokens específicos.
    """
    def __init__(self, tokenizer, stop_tokens):
        self.tokenizer = tokenizer
        # Codifica cada token de parada y guarda sus IDs en una lista
        self.stop_token_ids_list = [tokenizer.encode(stop_token, add_special_tokens=False) for stop_token in stop_tokens]

    def __call__(self, input_ids, scores, **kwargs):
        # Verifica si los últimos tokens generados coinciden con alguno de los conjuntos de tokens de parada
        for stop_token_ids in self.stop_token_ids_list:
            len_stop_tokens = len(stop_token_ids)
            if len(input_ids[0]) >= len_stop_tokens:
                if input_ids[0, -len_stop_tokens:].tolist() == stop_token_ids:
                    return True
        return False

# Uso del criterio de parada personalizado
stop_tokens = ["<end_of_turn>"]  # Lista de tokens de parada

# Inicializa tu criterio de parada con el tokenizer y la lista de tokens de parada
stopping_criteria = ListOfTokensStoppingCriteria(tokenizer, stop_tokens)

# Añade tu criterio de parada a una StoppingCriteriaList
stopping_criteria_list = StoppingCriteriaList([stopping_criteria])

def generate_text(prompt, idioma_entrada, idioma_salida, max_length=2100):
  prompt=prompt.replace(". ", ".\n").strip()

  input_text = f'''<bos><start_of_turn>system
You are a helpful AI assistant.
Responde en formato json.
Eres un agente experto en medicina.
Lista de codigos linguisticos disponibles: ["{idioma_entrada}", "{idioma_salida}"]<end_of_turn>
<start_of_turn>user
{prompt}<end_of_turn>
<start_of_turn>model
'''

  inputs = tokenizer.encode(input_text,
                            return_tensors="pt",
                            add_special_tokens=False).to("cuda:0")
  max_new_tokens=max_length
  generation_config = GenerationConfig(
                max_new_tokens=max_new_tokens,
                temperature=0.35, #55
                #top_p=0.9,
                top_k=50, # 45
                repetition_penalty=1.,  #1.1
                do_sample=True,
            )
  outputs = model.generate(generation_config=generation_config,
                          input_ids=inputs,
                          stopping_criteria=stopping_criteria_list,)
  return tokenizer.decode(outputs[0], skip_special_tokens=False) #True

def mostrar_respuesta(pregunta, idioma_entrada, idioma_salida):
    try:
      lista_codigo_lin = {
            "español": "es",
            "ingles": "en",
        }
      # Utiliza los parámetros de idioma para obtener los códigos de idioma correspondientes.
      codigo_lin_entrada = lista_codigo_lin[idioma_entrada.lower()]
      codigo_lin_salida = lista_codigo_lin[idioma_salida.lower()]
      
      res= generate_text(pregunta, codigo_lin_entrada, codigo_lin_salida, max_length=1500)
      inicio_json = res.find('{')
      fin_json = res.rfind('}') + 1
      json_str = res[inicio_json:fin_json]
      json_obj = json.loads(json_str)
      return json_obj["description"], json_obj["medical_specialty"], json_obj["principal_diagnostic"]
    except:
      json_obj={}
      json_obj['description']='Error diagnostico'
      json_obj['medical_specialty']='Error diagnostico'
      json_obj['principal_diagnostic']='Error diagnostico'
      return json_obj["description"], json_obj["medical_specialty"], json_obj["principal_diagnostic"]

# Ejemplos de preguntas
ejemplos = [
    ["CHIEF COMPLAINT:,  Left wrist pain.,HISTORY OF PRESENT PROBLEM"],
    ["INDICATIONS:  ,Chest pain.,STRESS TECHNIQUE:,"],
    ["MOTIVO DE CONSULTA: Una niña de 2 meses"],
]

idiomas = ["español", "ingles"]


iface = gr.Interface(
    fn=mostrar_respuesta,
    inputs=[
        gr.Textbox(label="Pregunta", placeholder="Introduce tu consulta médica aquí..."),
        gr.Dropdown(label="Idioma de Entrada", choices=idiomas, default="español"),
        gr.Dropdown(label="Idioma de Salida", choices=idiomas, default="español"),
    ],
    outputs=[
        gr.Textbox(label="Description", lines=2),
        gr.Textbox(label="Medical specialty", lines=1),
        gr.Textbox(label="Principal diagnostic", lines=1)
    ],
    title="Consultas medicas",
    description="Introduce tu diagnostico.",
    examples=ejemplos,
    concurrency_limit=20
)

iface.queue(max_size=14).launch(share=True,debug=True, ) # share=True,debug=True