Spaces:
Sleeping
Sleeping
""" | |
TutorX MCP Server | |
This is the main entry point for the TutorX MCP server. | |
""" | |
import base64 | |
import os | |
import sys | |
from pathlib import Path | |
# Add the current directory to the Python path | |
current_dir = Path(__file__).parent | |
sys.path.insert(0, str(current_dir)) | |
import uvicorn | |
from fastapi import FastAPI, HTTPException, UploadFile, File, Form | |
from fastapi.middleware.cors import CORSMiddleware | |
from mcp.server.fastmcp import FastMCP | |
# Import all tools to register them with MCP | |
from tools import ( | |
concept_tools, | |
lesson_tools, | |
quiz_tools, | |
interaction_tools, | |
ocr_tools, | |
learning_path_tools | |
) | |
# Import resources | |
from resources import concept_graph, curriculum_standards | |
# Create FastAPI app | |
api_app = FastAPI( | |
title="TutorX MCP Server", | |
description="Model Context Protocol server for TutorX educational platform", | |
version="1.0.0" | |
) | |
# Add CORS middleware | |
api_app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Import the shared mcp instance | |
from mcp_server.mcp_instance import mcp | |
# Explicitly import all tool modules so their @mcp.tool() decorators run | |
from mcp_server.tools import concept_tools | |
from mcp_server.tools import lesson_tools | |
from mcp_server.tools import quiz_tools | |
from mcp_server.tools import interaction_tools | |
from mcp_server.tools import ocr_tools | |
from mcp_server.tools import learning_path_tools | |
from mcp_server.tools import concept_graph_tools | |
from mcp_server.tools import ai_tutor_tools | |
from mcp_server.tools import content_generation_tools | |
# Mount the SSE transport for MCP at '/sse/' (with trailing slash) | |
api_app.mount("/sse", mcp.sse_app()) | |
# Health check endpoint | |
async def health_check(): | |
return {"status": "healthy", "service": "tutorx-mcp"} | |
# API endpoints - Concepts | |
async def get_concept_graph(concept_id: str = None): | |
if concept_id: | |
concept = concept_graph.get_concept(concept_id) | |
if not concept: | |
raise HTTPException(status_code=404, detail={"error": f"Concept {concept_id} not found"}) | |
return concept | |
return {"concepts": list(concept_graph.get_concept_graph().values())} | |
async def get_concept_endpoint(concept_id: str): | |
concept = concept_graph.get_concept(concept_id) | |
if not concept: | |
raise HTTPException(status_code=404, detail=f"Concept {concept_id} not found") | |
return concept | |
async def list_concepts(): | |
return concept_graph.get_all_concepts() | |
# API endpoints - Curriculum Standards | |
async def get_curriculum_standards(country: str = "us"): | |
return curriculum_standards.get_curriculum_standards(country) | |
# API endpoints - Text Interaction | |
async def text_interaction_endpoint(request: dict): | |
query = request.get("query") | |
student_id = request.get("student_id") | |
if not query or not student_id: | |
raise HTTPException(status_code=400, detail="Both query and student_id are required") | |
return await interaction_tools.text_interaction(query, student_id) | |
# API endpoints - Submission Originality Check | |
async def check_originality_endpoint(request: dict): | |
submission = request.get("submission") | |
reference_sources = request.get("reference_sources", []) | |
if not submission or not isinstance(reference_sources, list): | |
raise HTTPException(status_code=400, detail="submission (string) and reference_sources (array) are required") | |
return await interaction_tools.check_submission_originality(submission, reference_sources) | |
# API endpoints - Document OCR | |
async def document_ocr_endpoint( | |
file: UploadFile = File(...) | |
): | |
try: | |
# Save the uploaded file to a temporary location | |
import tempfile | |
import os | |
# Get the file extension | |
file_extension = os.path.splitext(file.filename)[1].lower() | |
# Create a temporary file with the same extension | |
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file: | |
content = await file.read() | |
temp_file.write(content) | |
temp_file_path = temp_file.name | |
try: | |
# Upload the file to storage and get the URL | |
from mcp_server.utils.azure_upload import upload_to_azure | |
document_url = upload_to_azure(temp_file_path) | |
# Process the document with OCR | |
result = await ocr_tools.mistral_document_ocr(document_url) | |
return result | |
finally: | |
# Clean up the temporary file | |
try: | |
os.unlink(temp_file_path) | |
except: | |
pass | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error processing document: {str(e)}") | |
# API endpoints - Learning Path | |
async def learning_path_endpoint(request: dict): | |
student_id = request.get("student_id") | |
concept_ids = request.get("concept_ids", []) | |
student_level = request.get("student_level") | |
if not student_id or not concept_ids: | |
raise HTTPException(status_code=400, detail="student_id and concept_ids are required") | |
return await learning_path_tools.get_learning_path( | |
student_id=student_id, | |
concept_ids=concept_ids, | |
student_level=student_level | |
) | |
# API endpoints - Assess Skill | |
from mcp_server.tools.concept_tools import assess_skill_tool | |
async def assess_skill_endpoint(request: dict): | |
student_id = request.get("student_id") | |
concept_id = request.get("concept_id") | |
if not student_id or not concept_id: | |
raise HTTPException(status_code=400, detail="Both student_id and concept_id are required") | |
return await assess_skill_tool(student_id, concept_id) | |
# API endpoints - Generate Lesson | |
from mcp_server.tools.lesson_tools import generate_lesson_tool | |
async def generate_lesson_endpoint(request: dict): | |
topic = request.get("topic") | |
grade_level = request.get("grade_level") | |
duration_minutes = request.get("duration_minutes") | |
if not topic or grade_level is None or duration_minutes is None: | |
raise HTTPException(status_code=400, detail="topic, grade_level, and duration_minutes are required") | |
return await generate_lesson_tool(topic, grade_level, duration_minutes) | |
# API endpoints - Generate Quiz | |
from mcp_server.tools.quiz_tools import ( | |
generate_quiz_tool, | |
start_interactive_quiz_tool, | |
submit_quiz_answer_tool, | |
get_quiz_hint_tool, | |
get_quiz_session_status_tool | |
) | |
async def generate_quiz_endpoint(request: dict): | |
concept = request.get("concept", "") | |
difficulty = request.get("difficulty", 2) | |
if not concept or not isinstance(concept, str) or not concept.strip(): | |
raise HTTPException(status_code=400, detail="concept must be a non-empty string") | |
if isinstance(difficulty, (int, float)): | |
if difficulty <= 2: | |
difficulty = "easy" | |
elif difficulty <= 4: | |
difficulty = "medium" | |
else: | |
difficulty = "hard" | |
if difficulty not in ["easy", "medium", "hard"]: | |
difficulty = "medium" | |
return await generate_quiz_tool(concept.strip(), difficulty) | |
async def start_interactive_quiz_endpoint(request: dict): | |
quiz_data = request.get("quiz_data") | |
student_id = request.get("student_id", "anonymous") | |
if not quiz_data: | |
raise HTTPException(status_code=400, detail="quiz_data is required") | |
return await start_interactive_quiz_tool(quiz_data, student_id) | |
async def submit_quiz_answer_endpoint(request: dict): | |
session_id = request.get("session_id") | |
question_id = request.get("question_id") | |
selected_answer = request.get("selected_answer") | |
if not all([session_id, question_id, selected_answer]): | |
raise HTTPException(status_code=400, detail="session_id, question_id, and selected_answer are required") | |
return await submit_quiz_answer_tool(session_id, question_id, selected_answer) | |
async def get_quiz_hint_endpoint(request: dict): | |
session_id = request.get("session_id") | |
question_id = request.get("question_id") | |
if not all([session_id, question_id]): | |
raise HTTPException(status_code=400, detail="session_id and question_id are required") | |
return await get_quiz_hint_tool(session_id, question_id) | |
async def get_quiz_session_status_endpoint(request: dict): | |
session_id = request.get("session_id") | |
if not session_id: | |
raise HTTPException(status_code=400, detail="session_id is required") | |
return await get_quiz_session_status_tool(session_id) | |
# API endpoints - AI Tutoring | |
from mcp_server.tools.ai_tutor_tools import ( | |
start_tutoring_session, | |
ai_tutor_chat, | |
get_step_by_step_guidance, | |
get_alternative_explanations, | |
update_student_understanding, | |
get_tutoring_session_status, | |
end_tutoring_session, | |
list_active_tutoring_sessions | |
) | |
async def start_tutoring_session_endpoint(request: dict): | |
student_id = request.get("student_id") | |
subject = request.get("subject", "general") | |
learning_objectives = request.get("learning_objectives", []) | |
if not student_id: | |
raise HTTPException(status_code=400, detail="student_id is required") | |
return await start_tutoring_session(student_id, subject, learning_objectives) | |
async def ai_tutor_chat_endpoint(request: dict): | |
session_id = request.get("session_id") | |
student_query = request.get("student_query") | |
request_type = request.get("request_type", "explanation") | |
if not session_id or not student_query: | |
raise HTTPException(status_code=400, detail="session_id and student_query are required") | |
return await ai_tutor_chat(session_id, student_query, request_type) | |
async def step_by_step_guidance_endpoint(request: dict): | |
session_id = request.get("session_id") | |
concept = request.get("concept") | |
current_step = request.get("current_step", 1) | |
if not session_id or not concept: | |
raise HTTPException(status_code=400, detail="session_id and concept are required") | |
return await get_step_by_step_guidance(session_id, concept, current_step) | |
async def alternative_explanations_endpoint(request: dict): | |
session_id = request.get("session_id") | |
concept = request.get("concept") | |
explanation_types = request.get("explanation_types", []) | |
if not session_id or not concept: | |
raise HTTPException(status_code=400, detail="session_id and concept are required") | |
return await get_alternative_explanations(session_id, concept, explanation_types) | |
async def update_student_understanding_endpoint(request: dict): | |
session_id = request.get("session_id") | |
concept = request.get("concept") | |
understanding_level = request.get("understanding_level") | |
feedback = request.get("feedback", "") | |
if not session_id or not concept or understanding_level is None: | |
raise HTTPException(status_code=400, detail="session_id, concept, and understanding_level are required") | |
return await update_student_understanding(session_id, concept, understanding_level, feedback) | |
async def tutoring_session_status_endpoint(session_id: str): | |
return await get_tutoring_session_status(session_id) | |
async def end_tutoring_session_endpoint(request: dict): | |
session_id = request.get("session_id") | |
session_summary = request.get("session_summary", "") | |
if not session_id: | |
raise HTTPException(status_code=400, detail="session_id is required") | |
return await end_tutoring_session(session_id, session_summary) | |
async def active_tutoring_sessions_endpoint(student_id: str = None): | |
return await list_active_tutoring_sessions(student_id) | |
# API endpoints - Content Generation | |
from mcp_server.tools.content_generation_tools import ( | |
generate_interactive_exercise, | |
generate_adaptive_content_sequence, | |
generate_scenario_based_learning, | |
generate_multimodal_content, | |
generate_adaptive_assessment, | |
generate_gamified_content, | |
validate_generated_content | |
) | |
async def generate_interactive_exercise_endpoint(request: dict): | |
concept = request.get("concept") | |
exercise_type = request.get("exercise_type", "problem_solving") | |
difficulty_level = request.get("difficulty_level", 0.5) | |
student_level = request.get("student_level", "intermediate") | |
if not concept: | |
raise HTTPException(status_code=400, detail="concept is required") | |
return await generate_interactive_exercise(concept, exercise_type, difficulty_level, student_level) | |
async def generate_adaptive_content_sequence_endpoint(request: dict): | |
topic = request.get("topic") | |
student_profile = request.get("student_profile", {}) | |
sequence_length = request.get("sequence_length", 5) | |
if not topic: | |
raise HTTPException(status_code=400, detail="topic is required") | |
return await generate_adaptive_content_sequence(topic, student_profile, sequence_length) | |
async def generate_scenario_based_learning_endpoint(request: dict): | |
concept = request.get("concept") | |
scenario_type = request.get("scenario_type", "real_world") | |
complexity_level = request.get("complexity_level", "moderate") | |
if not concept: | |
raise HTTPException(status_code=400, detail="concept is required") | |
return await generate_scenario_based_learning(concept, scenario_type, complexity_level) | |
async def generate_multimodal_content_endpoint(request: dict): | |
concept = request.get("concept") | |
modalities = request.get("modalities", ["visual", "auditory", "kinesthetic", "reading"]) | |
target_audience = request.get("target_audience", "general") | |
if not concept: | |
raise HTTPException(status_code=400, detail="concept is required") | |
return await generate_multimodal_content(concept, modalities, target_audience) | |
async def generate_adaptive_assessment_endpoint(request: dict): | |
concept = request.get("concept") | |
assessment_type = request.get("assessment_type", "formative") | |
student_data = request.get("student_data", {}) | |
if not concept: | |
raise HTTPException(status_code=400, detail="concept is required") | |
return await generate_adaptive_assessment(concept, assessment_type, student_data) | |
async def generate_gamified_content_endpoint(request: dict): | |
concept = request.get("concept") | |
game_type = request.get("game_type", "quest") | |
target_age_group = request.get("target_age_group", "teen") | |
if not concept: | |
raise HTTPException(status_code=400, detail="concept is required") | |
return await generate_gamified_content(concept, game_type, target_age_group) | |
async def validate_content_endpoint(request: dict): | |
content_data = request.get("content_data") | |
validation_criteria = request.get("validation_criteria", {}) | |
if not content_data: | |
raise HTTPException(status_code=400, detail="content_data is required") | |
return await validate_generated_content(content_data, validation_criteria) | |
# Entrypoint for running with MCP SSE transport | |
if __name__ == "__main__": | |
mcp.run(transport="sse") | |