import os import re import json import time import tempfile from typing import Dict, Any, List, Optional from transformers import AutoTokenizer from sentence_transformers import SentenceTransformer from huggingface_hub import login from src.prompts import SYSTEM_PROMPT # Define the prompt templates directly in this file since they're referenced but missing SUMMARY_PROMPT_TEMPLATE = """You are an expert content analyst specialized in creating professional, actionable summaries of educational content. Please analyze the following text to create a comprehensive yet concise summary that will be valuable to readers. Break down the content into 2-3 meaningful segments, each focused on a key topic or theme. For each segment of the content, provide: 1. A descriptive topic name 2. 3-5 key concepts or terms that are central to understanding this segment 3. A concise summary paragraph (3-5 sentences) that captures the essential information The text to analyze is: {text} FORMAT YOUR RESPONSE STRICTLY AS A JSON OBJECT AS FOLLOWS (with no other text, explanation or formatting): { "segments": [ { "topic_name": "Title for the first segment", "key_concepts": ["Key concept 1", "Key concept 2", "Key concept 3"], "summary": "Concise summary paragraph for this segment that captures the essential information." }, { "topic_name": "Title for the second segment", "key_concepts": ["Key concept 1", "Key concept 2", "Key concept 3"], "summary": "Concise summary paragraph for this segment that captures the essential information." } ] }""" QUIZ_PROMPT_TEMPLATE = """You are an expert quiz creator specialized in creating educational assessments. Please analyze the following text and create 5 multiple-choice quiz questions that test understanding of the key concepts and information presented in the text. For each question: 1. Write a clear, concise question 2. Create 4 answer options (A, B, C, D) with exactly one correct answer The text to analyze is: {text} FORMAT YOUR RESPONSE STRICTLY AS A JSON OBJECT AS FOLLOWS (with no other text, explanation or formatting): { "quiz_questions": [ { "question": "The full text of the question?", "options": [ { "text": "First option text", "correct": false }, { "text": "Second option text", "correct": true }, { "text": "Third option text", "correct": false }, { "text": "Fourth option text", "correct": false } ] } ] }""" GEMINI_MODEL = "gemini-2.0-flash" DEFAULT_TEMPERATURE = 0.7 TOKENIZER_MODEL = "answerdotai/ModernBERT-base" SENTENCE_TRANSFORMER_MODEL = "all-MiniLM-L6-v2" hf_token = os.environ.get('HF_TOKEN', None) login(token=hf_token) tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_MODEL) sentence_model = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL) def clean_text(text): text = re.sub(r'\[speaker_\d+\]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def split_text_by_tokens(text, max_tokens=12000): text = clean_text(text) tokens = tokenizer.encode(text) if len(tokens) <= max_tokens: return [text] split_point = len(tokens) // 2 sentences = re.split(r'(?<=[.!?])\s+', text) first_half = [] second_half = [] current_tokens = 0 for sentence in sentences: sentence_tokens = len(tokenizer.encode(sentence)) if current_tokens + sentence_tokens <= split_point: first_half.append(sentence) current_tokens += sentence_tokens else: second_half.append(sentence) return [" ".join(first_half), " ".join(second_half)] def generate_with_gemini(text, api_key, language, content_type="summary"): from langchain_google_genai import ChatGoogleGenerativeAI os.environ["GOOGLE_API_KEY"] = api_key llm = ChatGoogleGenerativeAI( model=GEMINI_MODEL, temperature=DEFAULT_TEMPERATURE, max_retries=3 ) if content_type == "summary": base_prompt = SUMMARY_PROMPT_TEMPLATE.format(text=text) else: base_prompt = QUIZ_PROMPT_TEMPLATE.format(text=text) language_instruction = f"\nIMPORTANT: Generate ALL content in {language} language." prompt = base_prompt + language_instruction try: messages = [ {"role": "system", "content": "You are a helpful AI assistant that creates high-quality text summaries and quizzes."}, {"role": "user", "content": prompt} ] response = llm.invoke(messages) try: content = response.content # First try to find JSON within code blocks json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', content) if json_match: json_str = json_match.group(1) else: # Then try to find JSON with curly braces json_match = re.search(r'(\{[\s\S]*\})', content) if json_match: json_str = json_match.group(1) else: # If we still don't have JSON, try to clean and parse the content directly json_str = content # Clean up the JSON string json_str = json_str.strip() # Try to parse the JSON try: function_call = json.loads(json_str) return function_call except json.JSONDecodeError: # If direct parsing fails, try to fix common issues # Remove markdown formatting or extra text cleaned_json = re.sub(r'^[^{]*', '', json_str) cleaned_json = re.sub(r'[^}]*$', '', cleaned_json) return json.loads(cleaned_json) except json.JSONDecodeError as e: # Fall back to a default structure if content_type == "summary": return { "segments": [ { "topic_name": "Content Analysis", "key_concepts": ["AI Processing", "Text Analysis"], "summary": "The model was unable to produce a properly formatted JSON response. Please try again with a different text sample." } ] } else: return { "quiz_questions": [ { "question": "Unable to generate quiz questions from the provided text.", "options": [ {"text": "Try again", "correct": true}, {"text": "Use different text", "correct": false}, {"text": "Adjust the prompt", "correct": false}, {"text": "Contact support", "correct": false} ] } ] } except Exception as e: raise Exception(f"Error calling API: {str(e)}") def format_summary_for_display(results, language="English"): output = [] if language == "Uzbek": segment_header = "QISM" key_concepts_header = "ASOSIY TUSHUNCHALAR" summary_header = "QISQACHA MAZMUN" elif language == "Russian": segment_header = "СЕГМЕНТ" key_concepts_header = "КЛЮЧЕВЫЕ ПОНЯТИЯ" summary_header = "КРАТКОЕ СОДЕРЖАНИЕ" else: segment_header = "SEGMENT" key_concepts_header = "KEY CONCEPTS" summary_header = "SUMMARY" segments = results.get("segments", []) if not segments: return "No segments were generated. Please try again with a different text sample." for i, segment in enumerate(segments): topic = segment["topic_name"] segment_num = i + 1 output.append(f"\n\n{'='*40}") output.append(f"{segment_header} {segment_num}: {topic}") output.append(f"{'='*40}\n") output.append(f"{key_concepts_header}:") for concept in segment["key_concepts"]: output.append(f"• {concept}") output.append(f"\n{summary_header}:") output.append(segment["summary"]) return "\n".join(output) def format_quiz_for_display(results, language="English"): output = [] if language == "Uzbek": quiz_questions_header = "TEST SAVOLLARI" elif language == "Russian": quiz_questions_header = "ТЕСТОВЫЕ ВОПРОСЫ" else: quiz_questions_header = "QUIZ QUESTIONS" output.append(f"{'='*40}") output.append(f"{quiz_questions_header}") output.append(f"{'='*40}\n") quiz_questions = results.get("quiz_questions", []) if not quiz_questions: return "No quiz questions were generated. Please try again with a different text sample." for i, q in enumerate(quiz_questions): output.append(f"\n{i+1}. {q['question']}") for j, option in enumerate(q['options']): letter = chr(97 + j).upper() correct_marker = " ✓" if option["correct"] else "" output.append(f" {letter}. {option['text']}{correct_marker}") return "\n".join(output) def analyze_document(text, gemini_api_key, language, content_type="summary"): try: if not text or len(text.strip()) < 100: return "Error: Text is too short to analyze. Please provide a longer text sample.", None, None start_time = time.time() text_parts = split_text_by_tokens(text) input_tokens = 0 output_tokens = 0 if content_type == "summary": all_results = {"segments": []} segment_counter = 1 for part in text_parts: actual_prompt = SUMMARY_PROMPT_TEMPLATE.format(text=part) prompt_tokens = len(tokenizer.encode(actual_prompt)) input_tokens += prompt_tokens analysis = generate_with_gemini(part, gemini_api_key, language, "summary") if "segments" in analysis and analysis["segments"]: for segment in analysis["segments"]: segment["segment_number"] = segment_counter all_results["segments"].append(segment) segment_counter += 1 else: # Add a default segment if none were returned all_results["segments"].append({ "segment_number": segment_counter, "topic_name": "Content Analysis", "key_concepts": ["Text Processing", "AI Analysis", "Document Summarization"], "summary": "The system was unable to generate detailed segments from this text portion. This may be due to the complexity of the content or formatting issues. Consider breaking the text into smaller, more focused sections." }) segment_counter += 1 formatted_output = format_summary_for_display(all_results, language) else: # Quiz generation all_results = {"quiz_questions": []} for part in text_parts: actual_prompt = QUIZ_PROMPT_TEMPLATE.format(text=part) prompt_tokens = len(tokenizer.encode(actual_prompt)) input_tokens += prompt_tokens analysis = generate_with_gemini(part, gemini_api_key, language, "quiz") if "quiz_questions" in analysis and analysis["quiz_questions"]: remaining_slots = 10 - len(all_results["quiz_questions"]) if remaining_slots > 0: questions_to_add = analysis["quiz_questions"][:remaining_slots] all_results["quiz_questions"].extend(questions_to_add) else: # Add a default question if none were returned if len(all_results["quiz_questions"]) < 10: all_results["quiz_questions"].append({ "question": "What is the main purpose of text analysis in educational contexts?", "options": [ {"text": "To change the original meaning of the text", "correct": False}, {"text": "To extract key concepts and facilitate understanding", "correct": True}, {"text": "To reduce text to exactly half its original length", "correct": False}, {"text": "To eliminate all technical terminology", "correct": False} ] }) formatted_output = format_quiz_for_display(all_results, language) end_time = time.time() total_time = end_time - start_time output_tokens = len(tokenizer.encode(formatted_output)) token_info = f"Input tokens: {input_tokens}\nOutput tokens: {output_tokens}\nTotal tokens: {input_tokens + output_tokens}\n" formatted_text = f"Total Processing time: {total_time:.2f}s\n{token_info}\n" + formatted_output json_path = tempfile.mktemp(suffix='.json') with open(json_path, 'w', encoding='utf-8') as json_file: json.dump(all_results, json_file, indent=2) txt_path = tempfile.mktemp(suffix='.txt') with open(txt_path, 'w', encoding='utf-8') as txt_file: txt_file.write(formatted_text) return formatted_text, json_path, txt_path except Exception as e: error_message = f"Error processing document: {str(e)}" return error_message, None, None