Spaces:
Sleeping
Sleeping
| # @title Think Paraguayo | |
| import os | |
| import random | |
| import time | |
| os.system("pip install gradio, llama_index, ragatouille, llama-cpp-python") | |
| os.system("git clone https://github.com/EnPaiva93/think-paraguayo-space-aux.git") | |
| os.system("wget https://huggingface.co/thinkPy/gua-a_v0.2-dpo_mistral-7b_GGUF/resolve/main/gua-a_v0.2-dpo_mistral-7b_q4_K_M.gguf -O model.gguf") | |
| from llama_cpp import Llama | |
| import gradio as gr | |
| from ragatouille import RAGPretrainedModel | |
| from llama_index.core import Document, SimpleDirectoryReader | |
| from llama_index.core.node_parser import SentenceSplitter | |
| max_seq_length = 512 # Choose any! We auto support RoPE Scaling internally! | |
| prompt = """Responde a preguntas de forma clara, amable, concisa y solamente en el lenguaje español, sobre el libro Ñande Ypykuéra. | |
| Contexto | |
| ------------------------- | |
| {} | |
| ------------------------- | |
| ### Pregunta: | |
| {} | |
| ### Respuesta: | |
| {}""" | |
| # Initialize the LLM | |
| llm = Llama(model_path="model.gguf", | |
| n_ctx=512, | |
| n_threads=2) | |
| BASE_PATH = "/home/user/app/think-paraguayo-space-aux/" | |
| DOC_PATH = BASE_PATH+"index" | |
| print(os.listdir()) | |
| documents = SimpleDirectoryReader(input_files=[BASE_PATH+"libro.txt"]).load_data() | |
| parser = SentenceSplitter(chunk_size=128, chunk_overlap=64) | |
| nodes = parser.get_nodes_from_documents( | |
| documents, show_progress=False | |
| ) | |
| list_nodes = [node.text for node in nodes] | |
| print(os.getcwd()) | |
| if os.path.exists(DOC_PATH): | |
| RAG = RAGPretrainedModel.from_index(DOC_PATH) | |
| else: | |
| RAG = RAGPretrainedModel.from_pretrained("AdrienB134/ColBERTv2.0-spanish-mmarcoES") | |
| my_documents = list_nodes | |
| index_path = RAG.index(index_name=DOC_PATH, max_document_length= 100, collection=my_documents) | |
| # def convert_list_to_dict(lst): | |
| # res_dct = {i: lst[i] for i in range(len(lst))} | |
| # return res_dct | |
| def reformat_rag(results_rag): | |
| if results_rag is not None: | |
| return [result["content"] for result in results_rag] | |
| else: | |
| return [""] | |
| # def response(query: str = "Quien es gua'a?", context: str = ""): | |
| # # print(base_prompt.format(query,"")) | |
| # inputs = tokenizer([base_prompt.format(query,"")], return_tensors = "pt").to("cuda") | |
| # outputs = model.generate(**inputs, max_new_tokens = 128, temperature = 0.1, repetition_penalty=1.15, pad_token_id=tokenizer.eos_token_id) | |
| # return tokenizer.batch_decode(outputs[0][inputs["input_ids"].shape[1]:].unsqueeze(0), skip_special_tokens=True)[0] | |
| def chat_stream_completion(message, history): | |
| context = reformat_rag(RAG.search(message, k=1)) | |
| context = " \n ".join(context) | |
| full_prompt = prompt.format(context,message,"") | |
| print(full_prompt) | |
| response = llm.create_completion( | |
| prompt=full_prompt, | |
| temperature=0.01, | |
| max_tokens=256, | |
| stream=True | |
| ) | |
| # print(response) | |
| message_repl = "" | |
| for chunk in response: | |
| if len(chunk['choices'][0]["text"]) != 0: | |
| # print(chunk) | |
| message_repl = message_repl + chunk['choices'][0]["text"] | |
| yield message_repl | |
| # def answer_question(pipeline, character, question): | |
| # def answer_question(question): | |
| # # context = reformat_rag(RAG.search(question, k=2)) | |
| # # context = " \n ".join(context) | |
| # yield chat_stream_completion(question, None) | |
| # def answer_question(question): | |
| # context = reformat_rag(RAG.search(question, k=2)) | |
| # context = " \n ".join(context) | |
| # return response(question, "") | |
| # def random_element(): | |
| # return random.choice(list_nodes) | |
| # clear_output() | |
| print("Importación Completada.. OK") | |
| css = """ | |
| h1 { | |
| font-size: 32px; | |
| text-align: center; | |
| } | |
| h2 { | |
| text-align: center; | |
| } | |
| img { | |
| height: 750px; /* Reducing the image height */ | |
| } | |
| """ | |
| def launcher(): | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("# Think Paraguayo") | |
| gr.Markdown("## Conoce la cultura guaraní!!") | |
| with gr.Row(variant='panel'): | |
| with gr.Column(scale=1): | |
| gr.Image(value=BASE_PATH+"think_paraguayo.jpeg", type="filepath", label="Imagen Estática") | |
| with gr.Column(scale=1): | |
| # with gr.Row(): | |
| # button = gr.Button("Cuentame ...") | |
| # with gr.Row(): | |
| # textbox = gr.Textbox(label="", interactive=False, value=random_element()) | |
| # button.click(fn=random_element, inputs=[], outputs=textbox) | |
| # with gr.Row(): | |
| chatbot = gr.ChatInterface( | |
| fn=chat_stream_completion, | |
| retry_btn = None, | |
| stop_btn = None, | |
| undo_btn = None | |
| ).queue() | |
| # with gr.Row(): | |
| # msg = gr.Textbox() | |
| # with gr.Row(): | |
| # clear = gr.ClearButton([msg, chatbot]) | |
| # def respond(message, chat_history): | |
| # bot_message = answer_question(message) | |
| # print(bot_message) | |
| # chat_history.append((message, bot_message)) | |
| # time.sleep(2) | |
| # return "", chat_history | |
| # msg.submit(chat_stream_completion, [msg, chatbot], [msg, chatbot]) | |
| demo.launch(share=True, inline= False, debug=True) | |
| if __name__ == "__main__": | |
| launcher() |