|
from flask import Flask, request, jsonify, send_from_directory
|
|
import speech_recognition as sr
|
|
import threading
|
|
import datetime
|
|
import pyttsx3
|
|
from langdetect import detect
|
|
from huggingface_hub import login
|
|
from sentence_transformers import SentenceTransformer
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering, AutoModelForSeq2SeqLM
|
|
import faiss
|
|
import numpy as np
|
|
import pandas as pd
|
|
import json
|
|
import webbrowser
|
|
from pydub import AudioSegment
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
import tempfile
|
|
|
|
app = Flask(__name__, static_folder='.')
|
|
|
|
|
|
hf_token = os.environ.get("API_KEY")
|
|
if not hf_token:
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
hf_token = os.environ.get("API_KEY")
|
|
if not hf_token:
|
|
raise ValueError("Hugging Face API key not found. Please set 'API_KEY' as an environment variable or in a .env file.")
|
|
|
|
login(token=hf_token)
|
|
|
|
|
|
qa_model = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-base-squad2")
|
|
qa_tokenizer = AutoTokenizer.from_pretrained("deepset/roberta-base-squad2")
|
|
qa_pipeline = pipeline("question-answering", model=qa_model, tokenizer=qa_tokenizer)
|
|
|
|
|
|
summarizer_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn")
|
|
summarizer_tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn")
|
|
summarizer_pipeline = pipeline("summarization", model=summarizer_model, tokenizer=summarizer_tokenizer)
|
|
|
|
embed_model = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L6-v2")
|
|
|
|
|
|
df_parquet = pd.read_parquet("ibtehaj dataset.parquet")
|
|
corpus_parquet = df_parquet["text"].dropna().tolist()
|
|
|
|
|
|
with open("pdf_data.json", "r", encoding="utf-8") as f:
|
|
json_data = json.load(f)
|
|
|
|
|
|
corpus_json = []
|
|
for entry in json_data:
|
|
if isinstance(entry, dict) and "text" in entry:
|
|
text = entry["text"].strip()
|
|
if text:
|
|
corpus_json.append(text)
|
|
|
|
|
|
corpus = corpus_parquet + corpus_json
|
|
|
|
|
|
embeddings = embed_model.encode(corpus, show_progress_bar=True, batch_size=16)
|
|
|
|
|
|
index = faiss.IndexFlatL2(embeddings.shape[1])
|
|
index.add(np.array(embeddings))
|
|
|
|
def rag_answer(question: str, k: int = 3) -> str:
|
|
q_emb = embed_model.encode([question])
|
|
D, I = index.search(q_emb, k)
|
|
context = "\n\n".join(corpus[i] for i in I[0] if 0 <= i < len(corpus))
|
|
|
|
if not context.strip():
|
|
return "Context is empty. Try rephrasing the question."
|
|
|
|
try:
|
|
result = qa_pipeline(question=question, context=context)
|
|
raw_answer = result.get("answer", "No answer found.")
|
|
|
|
|
|
if len(raw_answer.split()) > 40 or len(raw_answer) > 300:
|
|
summary = summarizer_pipeline(raw_answer, max_length=50, min_length=15, do_sample=False)
|
|
summarized_answer = summary[0]['summary_text']
|
|
else:
|
|
summarized_answer = raw_answer
|
|
|
|
return f"Answer: {summarized_answer}\n\n[Context Used]:\n{context[:500]}..."
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
tts_engine = None
|
|
|
|
def init_tts_engine():
|
|
global tts_engine
|
|
if tts_engine is None:
|
|
tts_engine = pyttsx3.init()
|
|
tts_engine.setProperty('rate', 150)
|
|
tts_engine.setProperty('volume', 1.0)
|
|
voices = tts_engine.getProperty('voices')
|
|
for v in voices:
|
|
if "zira" in v.name.lower() or "female" in v.name.lower():
|
|
tts_engine.setProperty('voice', v.id)
|
|
break
|
|
|
|
init_tts_engine()
|
|
|
|
|
|
conversation_history = []
|
|
last_question_text = ""
|
|
last_answer_text = ""
|
|
|
|
@app.route('/')
|
|
def serve_index():
|
|
return send_from_directory('.', 'index.html')
|
|
|
|
@app.route('/<path:path>')
|
|
def serve_static_files(path):
|
|
return send_from_directory('.', path)
|
|
|
|
@app.route('/answer', methods=['POST'])
|
|
def generate_answer_endpoint():
|
|
global last_question_text, last_answer_text, conversation_history
|
|
data = request.get_json()
|
|
question = data.get('question', '').strip()
|
|
|
|
if not question:
|
|
return jsonify({"answer": "Please provide a question."}), 400
|
|
|
|
last_question_text = question
|
|
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
|
|
conversation_history.append({"role": "user", "time": timestamp, "text": question})
|
|
|
|
ans = rag_answer(question)
|
|
last_answer_text = ans
|
|
conversation_history.append({"role": "bot", "time": timestamp, "text": ans})
|
|
|
|
return jsonify({"answer": ans})
|
|
|
|
@app.route('/read-aloud', methods=['POST'])
|
|
def read_aloud_endpoint():
|
|
data = request.get_json()
|
|
text_to_read = data.get('text', '').strip()
|
|
|
|
if not text_to_read:
|
|
return jsonify({"status": "No text provided to read."}), 400
|
|
|
|
try:
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
|
|
temp_audio_path = fp.name
|
|
|
|
tts_engine.save_to_file(text_to_read, temp_audio_path)
|
|
tts_engine.runAndWait()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return jsonify({"status": "TTS audio generated (server-side)."})
|
|
except Exception as e:
|
|
return jsonify({"status": f"Error during TTS: {str(e)}"}), 500
|
|
finally:
|
|
if os.path.exists(temp_audio_path):
|
|
os.remove(temp_audio_path)
|
|
|
|
|
|
@app.route('/upload-mp3', methods=['POST'])
|
|
def upload_mp3_endpoint():
|
|
global last_question_text, last_answer_text, conversation_history
|
|
|
|
if 'file' not in request.files:
|
|
return jsonify({"message": "No file part"}), 400
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({"message": "No selected file"}), 400
|
|
if file:
|
|
filename = secure_filename(file.filename)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
mp3_path = os.path.join(tmpdir, filename)
|
|
file.save(mp3_path)
|
|
|
|
wav_path = os.path.join(tmpdir, filename.replace(".mp3", ".wav"))
|
|
try:
|
|
sound = AudioSegment.from_mp3(mp3_path)
|
|
sound.export(wav_path, format="wav")
|
|
except Exception as e:
|
|
return jsonify({"message": f"Error converting MP3 to WAV: {e}"}), 500
|
|
|
|
try:
|
|
recognizer = sr.Recognizer()
|
|
with sr.AudioFile(wav_path) as src:
|
|
audio = recognizer.record(src)
|
|
text = recognizer.recognize_google(audio)
|
|
except sr.UnknownValueError:
|
|
return jsonify({"message": "Speech not understood."}), 400
|
|
except sr.RequestError as e:
|
|
return jsonify({"message": f"Speech recognition service error: {e}"}), 500
|
|
|
|
|
|
transcript_path = os.path.join(tmpdir, "transcription.txt")
|
|
with open(transcript_path, "w", encoding="utf-8") as f:
|
|
f.write(text)
|
|
|
|
|
|
|
|
return jsonify({
|
|
"message": "MP3 transcribed successfully.",
|
|
"transcription": text
|
|
})
|
|
|
|
@app.route('/summarize', methods=['POST'])
|
|
def summarize_endpoint():
|
|
data = request.get_json()
|
|
text_to_summarize = data.get('text', '').strip()
|
|
|
|
if not text_to_summarize:
|
|
return jsonify({"summary": "No text provided for summarization."}), 400
|
|
|
|
def chunk_text(text, max_chunk_size=4000):
|
|
sentences = text.split(". ")
|
|
chunks = []
|
|
current_chunk = ""
|
|
for sentence in sentences:
|
|
|
|
if len(current_chunk) + len(sentence) + 2 < max_chunk_size:
|
|
current_chunk += sentence + ". "
|
|
else:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = sentence + ". "
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
return chunks
|
|
|
|
try:
|
|
chunks = chunk_text(text_to_summarize)
|
|
summaries = [
|
|
summarizer_pipeline(chunk, max_length=150, min_length=50, do_sample=False)[0]["summary_text"]
|
|
for chunk in chunks
|
|
]
|
|
final_input = " ".join(summaries)
|
|
final_summary = summarizer_pipeline(final_input, max_length=150, min_length=50, do_sample=False)[0]["summary_text"]
|
|
return jsonify({"summary": final_summary})
|
|
except Exception as e:
|
|
return jsonify({"summary": f"Error during summarization: {e}"}), 500
|
|
|
|
@app.route('/history', methods=['GET'])
|
|
def get_history():
|
|
return jsonify({"history": conversation_history})
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
|
|
|
|
app.run(debug=True) |