Spaces:
Sleeping
Sleeping
| import torch | |
| import mauve | |
| from sacrebleu import corpus_bleu | |
| from rouge_score import rouge_scorer | |
| from bert_score import score | |
| from transformers import GPT2LMHeadModel, GPT2Tokenizer, pipeline, AutoTokenizer, AutoModelForCausalLM | |
| import re | |
| from mauve import compute_mauve | |
| import os | |
| import gradio as gr | |
| import requests | |
| import mlflow | |
| import dagshub | |
| from pinecone import Pinecone | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains.llm import LLMChain | |
| from langchain.llms.base import LLM | |
| from typing import Optional, List, Mapping, Any | |
| import time | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| # ------------------ Load Environment ------------------ | |
| load_dotenv() | |
| pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
| mlflow_tracking_uri = os.environ.get("MLFLOW_TRACKING_URI") | |
| # ------------------ DagsHub & MLflow Setup ------------------ | |
| try: | |
| dagshub.init( | |
| repo_owner='prathamesh.khade20', | |
| repo_name='Maintenance_AI_website', | |
| mlflow=True | |
| ) | |
| mlflow.set_tracking_uri(mlflow_tracking_uri) | |
| mlflow.set_experiment("Maintenance-RAG-Chatbot") | |
| mlflow.langchain.autolog() | |
| except Exception as e: | |
| print(f"MLflow/DagsHub initialization failed: {e}") | |
| # ------------------ RAG Evaluator ------------------ | |
| class RAGEvaluator: | |
| def __init__(self): | |
| try: | |
| self.gpt2_model, self.gpt2_tokenizer = self.load_gpt2_model() | |
| self.bias_pipeline = pipeline("zero-shot-classification", model="Hate-speech-CNERG/dehatebert-mono-english") | |
| self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") | |
| except Exception as e: | |
| print(f"Evaluator initialization failed: {e}") | |
| def load_gpt2_model(self): | |
| model = GPT2LMHeadModel.from_pretrained('gpt2') | |
| tokenizer = GPT2Tokenizer.from_pretrained('gpt2') | |
| return model, tokenizer | |
| def evaluate_bleu_rouge(self, candidates, references): | |
| try: | |
| bleu_score = corpus_bleu(candidates, [references]).score | |
| scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) | |
| rouge_scores = [scorer.score(ref, cand) for ref, cand in zip(references, candidates)] | |
| rouge1 = sum([score['rouge1'].fmeasure for score in rouge_scores]) / len(rouge_scores) | |
| rouge2 = sum([score['rouge2'].fmeasure for score in rouge_scores]) / len(rouge_scores) | |
| rougeL = sum([score['rougeL'].fmeasure for score in rouge_scores]) / len(rouge_scores) | |
| return bleu_score, rouge1, rouge2, rougeL | |
| except Exception as e: | |
| print(f"BLEU/ROUGE evaluation failed: {e}") | |
| return 0, 0, 0, 0 | |
| def evaluate_bert_score(self, candidates, references): | |
| try: | |
| P, R, F1 = score(candidates, references, lang="en", model_type='bert-base-multilingual-cased') | |
| return P.mean().item(), R.mean().item(), F1.mean().item() | |
| except Exception as e: | |
| print(f"BERT score evaluation failed: {e}") | |
| return 0, 0, 0 | |
| def evaluate_perplexity(self, text): | |
| try: | |
| encodings = self.gpt2_tokenizer(text, return_tensors='pt') | |
| max_length = self.gpt2_model.config.n_positions | |
| stride = 512 | |
| lls = [] | |
| for i in range(0, encodings.input_ids.size(1), stride): | |
| begin_loc = max(i + stride - max_length, 0) | |
| end_loc = min(i + stride, encodings.input_ids.size(1)) | |
| trg_len = end_loc - i | |
| input_ids = encodings.input_ids[:, begin_loc:end_loc] | |
| target_ids = input_ids.clone() | |
| target_ids[:, :-trg_len] = -100 | |
| with torch.no_grad(): | |
| outputs = self.gpt2_model(input_ids, labels=target_ids) | |
| log_likelihood = outputs[0] * trg_len | |
| lls.append(log_likelihood) | |
| ppl = torch.exp(torch.stack(lls).sum() / end_loc) | |
| return ppl.item() | |
| except Exception as e: | |
| print(f"Perplexity evaluation failed: {e}") | |
| return 1000.0 | |
| def evaluate_diversity(self, texts): | |
| try: | |
| all_tokens = [] | |
| for text in texts: | |
| tokens = self.tokenizer.tokenize(text) | |
| all_tokens.extend(tokens) | |
| unique_bigrams = set() | |
| for i in range(len(all_tokens) - 1): | |
| unique_bigrams.add((all_tokens[i], all_tokens[i+1])) | |
| return len(unique_bigrams) / len(all_tokens) if all_tokens else 0 | |
| except Exception as e: | |
| print(f"Diversity evaluation failed: {e}") | |
| return 0 | |
| def evaluate_racial_bias(self, text): | |
| try: | |
| results = self.bias_pipeline([text], candidate_labels=["hate speech", "not hate speech"]) | |
| bias_score = results[0]['scores'][results[0]['labels'].index('hate speech')] | |
| return bias_score | |
| except Exception as e: | |
| print(f"Bias evaluation failed: {e}") | |
| return 0 | |
| def evaluate_meteor(self, candidates, references): | |
| try: | |
| meteor_scores = [] | |
| for ref, cand in zip(references, candidates): | |
| ref_tokens = self.tokenizer.tokenize(ref) | |
| cand_tokens = self.tokenizer.tokenize(cand) | |
| common_tokens = set(ref_tokens) & set(cand_tokens) | |
| precision = len(common_tokens) / len(cand_tokens) if cand_tokens else 0 | |
| recall = len(common_tokens) / len(ref_tokens) if ref_tokens else 0 | |
| if precision + recall == 0: | |
| f_score = 0 | |
| else: | |
| f_score = (10 * precision * recall) / (9 * precision + recall) | |
| meteor_scores.append(f_score) | |
| return sum(meteor_scores) / len(meteor_scores) if meteor_scores else 0 | |
| except Exception as e: | |
| print(f"METEOR evaluation failed: {e}") | |
| return 0 | |
| def evaluate_chrf(self, candidates, references): | |
| try: | |
| chrf_scores = [] | |
| for ref, cand in zip(references, candidates): | |
| ref_chars = list(ref) | |
| cand_chars = list(cand) | |
| ref_ngrams = set() | |
| cand_ngrams = set() | |
| for i in range(len(ref_chars) - 5): | |
| ref_ngrams.add(tuple(ref_chars[i:i+6])) | |
| for i in range(len(cand_chars) - 5): | |
| cand_ngrams.add(tuple(cand_chars[i:i+6])) | |
| common_ngrams = ref_ngrams & cand_ngrams | |
| precision = len(common_ngrams) / len(cand_ngrams) if cand_ngrams else 0 | |
| recall = len(common_ngrams) / len(ref_ngrams) if ref_ngrams else 0 | |
| chrf_score = 2 * precision * recall / (precision + recall) if precision + recall else 0 | |
| chrf_scores.append(chrf_score) | |
| return sum(chrf_scores) / len(chrf_scores) if chrf_scores else 0 | |
| except Exception as e: | |
| print(f"CHRF evaluation failed: {e}") | |
| return 0 | |
| def evaluate_readability(self, text): | |
| try: | |
| words = re.findall(r'\b\w+\b', text.lower()) | |
| sentences = re.split(r'[.!?]+', text) | |
| num_words = len(words) | |
| num_sentences = len([s for s in sentences if s.strip()]) | |
| avg_word_length = sum(len(word) for word in words) / num_words if num_words else 0 | |
| words_per_sentence = num_words / num_sentences if num_sentences else 0 | |
| flesch_ease = 206.835 - (1.015 * words_per_sentence) - (84.6 * avg_word_length) | |
| flesch_grade = (0.39 * words_per_sentence) + (11.8 * avg_word_length) - 15.59 | |
| return flesch_ease, flesch_grade | |
| except Exception as e: | |
| print(f"Readability evaluation failed: {e}") | |
| return 0, 0 | |
| def evaluate_mauve(self, reference_texts, generated_texts): | |
| try: | |
| out = compute_mauve( | |
| p_text=reference_texts, | |
| q_text=generated_texts, | |
| device_id=0, | |
| max_text_length=1024, | |
| verbose=False | |
| ) | |
| return out.mauve | |
| except Exception as e: | |
| print(f"MAUVE evaluation failed: {e}") | |
| return 0.0 | |
| def evaluate_all(self, question, response, reference): | |
| try: | |
| candidates = [response] | |
| references = [reference] | |
| bleu, rouge1, rouge2, rougeL = self.evaluate_bleu_rouge(candidates, references) | |
| bert_p, bert_r, bert_f1 = self.evaluate_bert_score(candidates, references) | |
| perplexity = self.evaluate_perplexity(response) | |
| diversity = self.evaluate_diversity(candidates) | |
| racial_bias = self.evaluate_racial_bias(response) | |
| meteor = self.evaluate_meteor(candidates, references) | |
| chrf = self.evaluate_chrf(candidates, references) | |
| flesch_ease, flesch_grade = self.evaluate_readability(response) | |
| mauve_score = self.evaluate_mauve(references, candidates) if len(references) > 1 else 0.0 | |
| return { | |
| "BLEU": bleu, | |
| "ROUGE-1": rouge1, | |
| "ROUGE-2": rouge2, | |
| "ROUGE-L": rougeL, | |
| "BERT_Precision": bert_p, | |
| "BERT_Recall": bert_r, | |
| "BERT_F1": bert_f1, | |
| "Perplexity": perplexity, | |
| "Diversity": diversity, | |
| "Racial_Bias": racial_bias, | |
| "MAUVE": mauve_score, | |
| "METEOR": meteor, | |
| "CHRF": chrf, | |
| "Flesch_Reading_Ease": flesch_ease, | |
| "Flesch_Kincaid_Grade": flesch_grade, | |
| } | |
| except Exception as e: | |
| print(f"Complete evaluation failed: {e}") | |
| return {"error": str(e)} | |
| # Initialize evaluator | |
| evaluator = RAGEvaluator() | |
| # ------------------ Pinecone ------------------ | |
| def init_pinecone(): | |
| try: | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| return pc.Index("rag-granite-index") | |
| except Exception as e: | |
| print(f"Pinecone initialization failed: {e}") | |
| return None | |
| index = init_pinecone() | |
| # ------------------ Embeddings ------------------ | |
| try: | |
| embeddings_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| except Exception as e: | |
| print(f"Embeddings initialization failed: {e}") | |
| embeddings_model = None | |
| def get_retrieved_context(query: str, top_k=3): | |
| if not index or not embeddings_model: | |
| return "No context available - system initialization failed" | |
| try: | |
| start = time.time() | |
| query_embedding = embeddings_model.embed_query(query) | |
| if mlflow.active_run(): | |
| mlflow.log_metric("embedding_latency", time.time() - start) | |
| results = index.query( | |
| namespace="rag-ns", | |
| vector=query_embedding, | |
| top_k=top_k, | |
| include_metadata=True | |
| ) | |
| if mlflow.active_run(): | |
| mlflow.log_metric("retrieved_chunks", len(results['matches'])) | |
| context_texts = [m['metadata']['text'] for m in results['matches']] | |
| return "\n".join(context_texts) if context_texts else "No relevant context found." | |
| except Exception as e: | |
| print(f"Context retrieval failed: {e}") | |
| return f"Context retrieval error: {str(e)}" | |
| # ------------------ Fallback LLM Models ------------------ | |
| class FallbackLLM: | |
| def __init__(self): | |
| self.models_loaded = False | |
| self.pipeline = None | |
| self.load_fallback_models() | |
| def load_fallback_models(self): | |
| """Load local models as fallback""" | |
| try: | |
| # Use a smaller model for fallback | |
| self.pipeline = pipeline( | |
| "text-generation", | |
| model="microsoft/DialoGPT-small", | |
| tokenizer="microsoft/DialoGPT-small", | |
| max_length=150, | |
| do_sample=True, | |
| temperature=0.7 | |
| ) | |
| self.models_loaded = True | |
| print("Fallback model loaded successfully") | |
| except Exception as e: | |
| print(f"Fallback model loading failed: {e}") | |
| self.models_loaded = False | |
| def generate_response(self, context, question): | |
| if not self.models_loaded: | |
| return "I'm currently experiencing technical difficulties. Please try again later." | |
| try: | |
| prompt = f""" | |
| Based on the following context, please provide a concise answer to the question. | |
| Context: {context} | |
| Question: {question} | |
| Answer: """ | |
| response = self.pipeline( | |
| prompt, | |
| max_new_tokens=100, | |
| num_return_sequences=1, | |
| pad_token_id=50256 | |
| ) | |
| if response and len(response) > 0: | |
| full_response = response[0]['generated_text'] | |
| # Extract only the answer part | |
| if "Answer:" in full_response: | |
| answer = full_response.split("Answer:")[-1].strip() | |
| return answer | |
| return full_response.strip() | |
| else: | |
| return "I couldn't generate a response at the moment. Please try again." | |
| except Exception as e: | |
| print(f"Fallback model generation failed: {e}") | |
| return "I'm having trouble generating a response. Please try again later." | |
| # Initialize fallback LLM | |
| fallback_llm = FallbackLLM() | |
| # ------------------ Custom LLM with Fallback ------------------ | |
| class RobustLitServeLLM(LLM): | |
| endpoint_url: str | |
| use_fallback: bool = True | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: | |
| # Try the primary endpoint first | |
| primary_success, primary_response = self._try_primary_endpoint(prompt) | |
| if primary_success: | |
| return primary_response | |
| # If primary fails and fallback is enabled, use fallback | |
| if self.use_fallback: | |
| print("Using fallback LLM due to primary endpoint failure") | |
| # Extract context and question from prompt | |
| context, question = self._extract_context_question(prompt) | |
| return fallback_llm.generate_response(context, question) | |
| else: | |
| return "I apologize, but the AI service is currently unavailable. Please try again later." | |
| def _try_primary_endpoint(self, prompt: str): | |
| """Try to get response from primary endpoint""" | |
| try: | |
| payload = {"prompt": prompt} | |
| start_time = time.time() | |
| response = requests.post(self.endpoint_url, json=payload, timeout=30) | |
| if mlflow.active_run(): | |
| mlflow.log_metric("lit_serve_latency", time.time() - start_time) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if mlflow.active_run(): | |
| mlflow.log_metric("response_tokens", len(data.get("response", "").split())) | |
| return True, data.get("response", "").strip() | |
| else: | |
| if mlflow.active_run(): | |
| mlflow.log_metric("request_errors", 1) | |
| print(f"Primary endpoint failed with status: {response.status_code}") | |
| return False, "" | |
| except Exception as e: | |
| print(f"Primary endpoint error: {e}") | |
| return False, "" | |
| def _extract_context_question(self, prompt: str): | |
| """Extract context and question from the prompt template""" | |
| try: | |
| if "Context:" in prompt and "Question:" in prompt: | |
| context_part = prompt.split("Context:")[1].split("Question:")[0].strip() | |
| question_part = prompt.split("Question:")[1].split("Answer:")[0].strip() | |
| return context_part, question_part | |
| return "", prompt | |
| except: | |
| return "", prompt | |
| def _identifying_params(self) -> Mapping[str, Any]: | |
| return {"endpoint_url": self.endpoint_url, "use_fallback": self.use_fallback} | |
| def _llm_type(self) -> str: | |
| return "robust_litserve_llm" | |
| # Initialize the robust model | |
| try: | |
| model = RobustLitServeLLM( | |
| endpoint_url="https://8001-01k2h9d9mervcmgfn66ybkpwvq.cloudspaces.litng.ai/predict", | |
| use_fallback=True | |
| ) | |
| print("Robust LLM initialized successfully") | |
| except Exception as e: | |
| print(f"Robust LLM initialization failed: {e}") | |
| model = None | |
| # ------------------ Prompt Template ------------------ | |
| prompt = PromptTemplate( | |
| input_variables=["context", "question"], | |
| template=""" | |
| You are a smart maintenance assistant. Based on the provided context, answer the question concisely in 1-2 lines. | |
| Context: | |
| {context} | |
| Question: {question} | |
| Answer: | |
| """ | |
| ) | |
| # Initialize LLM chain | |
| try: | |
| if model: | |
| llm_chain = LLMChain(llm=model, prompt=prompt) | |
| print("LLM chain initialized successfully") | |
| else: | |
| llm_chain = None | |
| print("LLM chain not initialized - no model available") | |
| except Exception as e: | |
| print(f"LLM chain initialization failed: {e}") | |
| llm_chain = None | |
| # ------------------ RAG Pipeline ------------------ | |
| def get_rag_response(question): | |
| """Get the complete RAG response with robust error handling""" | |
| try: | |
| if not question.strip(): | |
| return "Please enter a valid question.", "" | |
| # Get context from Pinecone | |
| retrieved_context = get_retrieved_context(question) | |
| # If we have an LLM chain, use it | |
| if llm_chain: | |
| result = llm_chain.invoke({ | |
| "context": retrieved_context, | |
| "question": question | |
| }) | |
| full_response = result["text"].strip() | |
| # Clean up the response | |
| if "Answer:" in full_response: | |
| full_response = full_response.split("Answer:")[-1].strip() | |
| return full_response, retrieved_context | |
| else: | |
| # Use direct fallback | |
| fallback_response = fallback_llm.generate_response(retrieved_context, question) | |
| return fallback_response, retrieved_context | |
| except Exception as e: | |
| error_msg = f"Error in RAG pipeline: {str(e)}" | |
| print(error_msg) | |
| # Final fallback - simple response based on context | |
| if "context" in locals() and retrieved_context: | |
| return f"Based on available information: I found relevant maintenance data, but encountered an issue processing it. Context available: {len(retrieved_context)} characters.", retrieved_context | |
| else: | |
| return "I apologize, but I'm experiencing technical difficulties. Please try again later or contact support.", "No context retrieved" | |
| def rag_pipeline_stream(question): | |
| """Streaming version of RAG pipeline""" | |
| try: | |
| full_response, _ = get_rag_response(question) | |
| # Stream word by word for better UX | |
| words = full_response.split() | |
| current_text = "" | |
| for word in words: | |
| current_text += word + " " | |
| yield current_text | |
| time.sleep(0.03) # Faster streaming | |
| except Exception as e: | |
| error_msg = f"Error in streaming: {str(e)}" | |
| print(error_msg) | |
| yield "I apologize, but I encountered an error while generating the response." | |
| # ------------------ Gradio UI ------------------ | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Maintenance AI Assistant") as demo: | |
| gr.Markdown(""" | |
| # π Maintenance AI Assistant | |
| *Your intelligent companion for maintenance queries and troubleshooting* | |
| **Note**: This system uses multiple fallback mechanisms to ensure reliability. | |
| """) | |
| usage_counter = gr.State(value=0) | |
| session_start = gr.State(value=datetime.now().isoformat()) | |
| current_response = gr.State(value="") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π¬ Chat Interface") | |
| question_input = gr.Textbox( | |
| label="Ask your maintenance question", | |
| placeholder="e.g., How do I troubleshoot a leaking valve? What's the maintenance schedule for pumps?", | |
| lines=3 | |
| ) | |
| ask_button = gr.Button("Get Answer π", variant="primary", size="lg") | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear Chat ποΈ") | |
| evaluate_btn = gr.Button("Show Metrics π", variant="secondary") | |
| feedback = gr.Radio( | |
| ["Helpful", "Not Helpful"], | |
| label="Was this response helpful?", | |
| info="Your feedback helps improve the system" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ AI Response") | |
| answer_output = gr.Textbox( | |
| label="Response", | |
| lines=8, | |
| interactive=False, | |
| show_copy_button=True, | |
| autoscroll=True | |
| ) | |
| gr.Markdown("### π Evaluation Metrics") | |
| metrics_output = gr.JSON( | |
| label="Quality Metrics", | |
| visible=False, | |
| show_label=True | |
| ) | |
| def track_usage(question, count, session_start, feedback_value=None): | |
| """Track usage and get response""" | |
| if not question.strip(): | |
| return "Please enter a question.", count, session_start, "" | |
| count += 1 | |
| try: | |
| # Only use MLflow if properly configured | |
| if mlflow_tracking_uri: | |
| with mlflow.start_run(run_name=f"User-Interaction-{count}", nested=True): | |
| mlflow.log_param("question", question) | |
| mlflow.log_param("session_start", session_start) | |
| mlflow.log_param("user_feedback", feedback_value or "No feedback") | |
| if feedback_value: | |
| mlflow.log_metric("helpful_responses", 1 if feedback_value == "Helpful" else 0) | |
| mlflow.log_metric("total_queries", count) | |
| # Get response and context | |
| response, context = get_rag_response(question) | |
| mlflow.log_metric("response_length", len(response)) | |
| mlflow.log_metric("response_tokens", len(response.split())) | |
| mlflow.log_metric("context_length", len(context)) | |
| return response, count, session_start, response | |
| else: | |
| # Without MLflow | |
| response, context = get_rag_response(question) | |
| return response, count, session_start, response | |
| except Exception as e: | |
| print(f"Tracking error: {e}") | |
| error_response = f"I encountered a system error. Please try again. Error: {str(e)}" | |
| return error_response, count, session_start, error_response | |
| def evaluate_response(question, response): | |
| """Evaluate the response and return metrics""" | |
| if not question or not response: | |
| return gr.update(value={"info": "No question or response to evaluate"}, visible=True) | |
| # Skip evaluation for error responses | |
| if any(error_word in response.lower() for error_word in ["error", "apologize", "unavailable", "technical"]): | |
| return gr.update(value={"info": "Evaluation skipped for error response"}, visible=True) | |
| try: | |
| context = get_retrieved_context(question) | |
| metrics = evaluator.evaluate_all(question, response, context) | |
| return gr.update(value=metrics, visible=True) | |
| except Exception as e: | |
| print(f"Evaluation error: {e}") | |
| return gr.update(value={"error": f"Evaluation failed: {str(e)}"}, visible=True) | |
| def clear_chat(): | |
| """Clear the chat interface""" | |
| return "", "", gr.update(visible=False) | |
| # Main interaction flow | |
| ask_button.click( | |
| fn=lambda: ("", gr.update(visible=False)), # Clear previous metrics and response | |
| outputs=[answer_output, metrics_output] | |
| ).then( | |
| fn=rag_pipeline_stream, | |
| inputs=[question_input], | |
| outputs=[answer_output] | |
| ).then( | |
| fn=track_usage, | |
| inputs=[question_input, usage_counter, session_start, feedback], | |
| outputs=[answer_output, usage_counter, session_start, current_response] | |
| ) | |
| # Evaluation flow | |
| evaluate_btn.click( | |
| fn=evaluate_response, | |
| inputs=[question_input, current_response], | |
| outputs=[metrics_output] | |
| ) | |
| # Clear chat | |
| clear_btn.click( | |
| fn=clear_chat, | |
| outputs=[question_input, answer_output, metrics_output] | |
| ) | |
| # Handle feedback | |
| def handle_feedback(feedback_val): | |
| try: | |
| if mlflow_tracking_uri and mlflow.active_run(): | |
| mlflow.log_metric("user_feedback_score", 1 if feedback_val == "Helpful" else 0) | |
| except: | |
| pass # Silently fail if feedback logging doesn't work | |
| feedback.change( | |
| fn=handle_feedback, | |
| inputs=[feedback], | |
| outputs=[] | |
| ) | |
| if __name__ == "__main__": | |
| print("π Starting Maintenance AI Assistant...") | |
| print("β System initialized with fallback mechanisms") | |
| print("π Web interface available at http://0.0.0.0:7860") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| debug=False | |
| ) |