Spaces:
Sleeping
Sleeping
| # Importing libraries | |
| import pandas as pd | |
| import json | |
| import gradio as gr | |
| from pathlib import Path | |
| from ragatouille import RAGPretrainedModel | |
| from gradio_client import Client | |
| from tempfile import NamedTemporaryFile | |
| from sentence_transformers import CrossEncoder | |
| import numpy as np | |
| from time import perf_counter | |
| from sentence_transformers import CrossEncoder | |
| #calling functions from other files - to call the knowledge database tables (lancedb for accurate mode) for creating quiz | |
| from backend.semantic_search import table, retriever | |
| VECTOR_COLUMN_NAME = "vector" | |
| TEXT_COLUMN_NAME = "text" | |
| proj_dir = Path.cwd() | |
| # Set up logging | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Replace Mixtral client with Qwen Client | |
| client = Client("Qwen/Qwen1.5-110B-Chat-demo") | |
| def system_instructions(question_difficulty, topic, documents_str): | |
| return f"""<s> [INST] You are a great teacher and your task is to create 10 questions with 4 choices with {question_difficulty} difficulty about the topic request "{topic}" only from the below given documents, {documents_str}. Then create answers. Index in JSON format, the questions as "Q#":"" to "Q#":"", the four choices as "Q#:C1":"" to "Q#:C4":"", and the answers as "A#":"Q#:C#" to "A#":"Q#:C#". Example: 'A10':'Q10:C3' [/INST]""" | |
| # Ragatouille database for Colbert ie highly accurate mode | |
| RAG_db = gr.State() | |
| quiz_data = None | |
| #defining a function to convert json file to excel file | |
| def json_to_excel(output_json): | |
| # Initialize list for DataFrame | |
| data = [] | |
| gr.Warning('Generating Shareable file link..', duration=30) | |
| for i in range(1, 11): # Assuming there are 10 questions | |
| question_key = f"Q{i}" | |
| answer_key = f"A{i}" | |
| question = output_json.get(question_key, '') | |
| correct_answer_key = output_json.get(answer_key, '') | |
| #correct_answer = correct_answer_key.split(':')[-1] if correct_answer_key else '' | |
| correct_answer = correct_answer_key.split(':')[-1].replace('C', '').strip() if correct_answer_key else '' | |
| # Extract options | |
| option_keys = [f"{question_key}:C{i}" for i in range(1, 6)] | |
| options = [output_json.get(key, '') for key in option_keys] | |
| # Add data row | |
| data.append([ | |
| question, # Question Text | |
| "Multiple Choice", # Question Type | |
| options[0], # Option 1 | |
| options[1], # Option 2 | |
| options[2] if len(options) > 2 else '', # Option 3 | |
| options[3] if len(options) > 3 else '', # Option 4 | |
| options[4] if len(options) > 4 else '', # Option 5 | |
| correct_answer, # Correct Answer | |
| 30, # Time in seconds | |
| '' # Image Link | |
| ]) | |
| # Create DataFrame | |
| df = pd.DataFrame(data, columns=[ | |
| "Question Text", | |
| "Question Type", | |
| "Option 1", | |
| "Option 2", | |
| "Option 3", | |
| "Option 4", | |
| "Option 5", | |
| "Correct Answer", | |
| "Time in seconds", | |
| "Image Link" | |
| ]) | |
| temp_file = NamedTemporaryFile(delete=False, suffix=".xlsx") | |
| df.to_excel(temp_file.name, index=False) | |
| return temp_file.name | |
| # Define a colorful theme | |
| colorful_theme = gr.themes.Default( | |
| primary_hue="cyan", # Set a bright cyan as primary color | |
| secondary_hue="yellow", # Set a bright magenta as secondary color | |
| neutral_hue="purple" # Optionally set a neutral color | |
| ) | |
| #gradio app creation for a user interface | |
| with gr.Blocks(title="Quiz Maker", theme=colorful_theme) as QUIZBOT: | |
| # Create a single row for the HTML and Image | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Image(value='logo.png', height=200, width=200) | |
| with gr.Column(scale=6): | |
| gr.HTML(""" | |
| <center> | |
| <h1><span style="color: purple;">GOVERNMENT HIGH SCHOOL,SUTHUKENY</span> STUDENTS QUIZBOT</h1> | |
| <h2>Generative AI-powered Capacity building for STUDENTS</h2> | |
| <i>β οΈSTUDENTS CAN CREATE QUIZ AND EVALUATE BY THEMSELVES ! β οΈ</i> | |
| </center> | |
| """) | |
| topic = gr.Textbox(label="Enter the Topic for Quiz", placeholder="Write any topic/details from Customs Manual") | |
| with gr.Row(): | |
| difficulty_radio = gr.Radio(["easy", "average", "hard"], label="How difficult should the quiz be?") | |
| model_radio = gr.Radio(choices=[ '(ACCURATE) BGE reranker', '(HIGH ACCURATE) ColBERT'], | |
| value='(ACCURATE) BGE reranker', label="Embeddings", | |
| info="First query to ColBERT may take a little time") | |
| generate_quiz_btn = gr.Button("Generate Quiz!π") | |
| quiz_msg = gr.Textbox() | |
| question_radios = [gr.Radio(visible=False) for _ in range(10)] | |
| def generate_quiz(question_difficulty, topic, cross_encoder): | |
| top_k_rank = 10 | |
| documents = [] | |
| gr.Warning('Generating Quiz may take 1-2 minutes. Please wait.', duration=60) | |
| if cross_encoder == '(HIGH ACCURATE) ColBERT': | |
| gr.Warning('Retrieving using ColBERT.. First-time query will take 2 minute for model to load.. please wait',duration=100) | |
| RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0") | |
| RAG_db.value = RAG.from_index('.ragatouille/colbert/indexes/cbseclass10index') | |
| documents_full = RAG_db.value.search(topic, k=top_k_rank) | |
| documents = [item['content'] for item in documents_full] | |
| else: | |
| document_start = perf_counter() | |
| query_vec = retriever.encode(topic) | |
| doc1 = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank) | |
| documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank).to_list() | |
| documents = [doc[TEXT_COLUMN_NAME] for doc in documents] | |
| query_doc_pair = [[topic, doc] for doc in documents] | |
| # if cross_encoder == '(FAST) MiniLM-L6v2': | |
| # cross_encoder1 = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
| if cross_encoder == '(ACCURATE) BGE reranker': | |
| cross_encoder1 = CrossEncoder('BAAI/bge-reranker-base') | |
| cross_scores = cross_encoder1.predict(query_doc_pair) | |
| sim_scores_argsort = list(reversed(np.argsort(cross_scores))) | |
| documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] | |
| #creating a text prompt to Qwen model combining the documents and system instruction | |
| formatted_prompt = system_instructions(question_difficulty, topic, '\n'.join(documents)) | |
| print(' Formatted Prompt : ' ,formatted_prompt) | |
| try: | |
| response = client.predict(query=formatted_prompt, history=[], system="You are a helpful assistant.", api_name="/model_chat") | |
| response1 = response[1][0][1] | |
| # Extract JSON | |
| start_index = response1.find('{') | |
| end_index = response1.rfind('}') | |
| cleaned_response = response1[start_index:end_index + 1] if start_index != -1 and end_index != -1 else '' | |
| print('Cleaned Response :',cleaned_response) | |
| output_json = json.loads(cleaned_response) | |
| # Assign the extracted JSON to quiz_data for use in the comparison function | |
| global quiz_data | |
| quiz_data = output_json | |
| # Generate the Excel file | |
| excel_file = json_to_excel(output_json) | |
| #Create a Quiz display in app | |
| question_radio_list = [] | |
| for question_num in range(1, 11): | |
| question_key = f"Q{question_num}" | |
| answer_key = f"A{question_num}" | |
| question = output_json.get(question_key) | |
| answer = output_json.get(output_json.get(answer_key)) | |
| if not question or not answer: | |
| continue | |
| choice_keys = [f"{question_key}:C{i}" for i in range(1, 5)] | |
| choice_list = [output_json.get(choice_key, "Choice not found") for choice_key in choice_keys] | |
| radio = gr.Radio(choices=choice_list, label=question, visible=True, interactive=True) | |
| question_radio_list.append(radio) | |
| return ['Quiz Generated!'] + question_radio_list + [excel_file] | |
| except json.JSONDecodeError as e: | |
| print(f"Failed to decode JSON: {e}") | |
| check_button = gr.Button("Check Score") | |
| score_textbox = gr.Markdown() | |
| def compare_answers(*user_answers): | |
| user_answer_list = list(user_answers) | |
| answers_list = [] | |
| for question_num in range(1, 20): | |
| answer_key = f"A{question_num}" | |
| answer = quiz_data.get(quiz_data.get(answer_key)) | |
| if not answer: | |
| break | |
| answers_list.append(answer) | |
| score = sum(1 for item in user_answer_list if item in answers_list) | |
| if score > 7: | |
| message = f"### Excellent! You got {score} out of 10!" | |
| elif score > 5: | |
| message = f"### Good! You got {score} out of 10!" | |
| else: | |
| message = f"### You got {score} out of 10! Don't worry. You can prepare well and try better next time!" | |
| return message | |
| QUIZBOT.queue() | |
| QUIZBOT.launch(debug=True) | |