Spaces:
Running
Running
# app.py | |
# Phiên bản cuối cùng: Thêm Voice Activity Detection (VAD) để lọc bỏ khoảng lặng. | |
import os | |
import joblib | |
import numpy as np | |
import librosa | |
from flask import Flask, request, jsonify, render_template | |
from werkzeug.utils import secure_filename | |
import traceback | |
import collections | |
# --- Thư viện mới để đọc audio, giảm nhiễu và VAD --- | |
from pydub import AudioSegment | |
import noisereduce as nr | |
import webrtcvad | |
# --- Cấu hình TensorFlow và các thư viện AI --- | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
import tensorflow as tf | |
from transformers import Wav2Vec2Processor, Wav2Vec2Model | |
import torch | |
# --- KHỞI TẠO ỨNG DỤNG FLASK --- | |
app = Flask(__name__) | |
UPLOAD_FOLDER = 'uploads/' | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
# --- TẢI TẤT CẢ CÁC MÔ HÌNH KHI SERVER KHỞI ĐỘNG --- | |
print(">>> Đang tải các mô hình AI, quá trình này có thể mất một lúc...") | |
try: | |
MODEL_PATH = 'models/' | |
scaler = joblib.load(os.path.join(MODEL_PATH, 'scaler.pkl')) | |
label_encoder = joblib.load(os.path.join(MODEL_PATH, 'label_encoder.pkl')) | |
model_xgb = joblib.load(os.path.join(MODEL_PATH, 'xgboost.pkl')) | |
model_lgb = joblib.load(os.path.join(MODEL_PATH, 'lightgbm.pkl')) | |
model_cnn = tf.keras.models.load_model(os.path.join(MODEL_PATH, 'cnn.keras')) | |
wav2vec_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base") | |
wav2vec_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base") | |
print(">>> OK! Tất cả các mô hình đã được tải thành công!") | |
except Exception as e: | |
print(f"!!! LỖI NGHIÊM TRỌNG: Không thể tải một hoặc nhiều mô hình. Lỗi: {e}") | |
traceback.print_exc() | |
exit() | |
# --- CÁC HÀM TRÍCH XUẤT ĐẶC TRƯNG (KHÔNG ĐỔI) --- | |
SAMPLE_RATE = 22050 | |
MAX_LENGTH_SECONDS = 5.0 | |
MAX_SAMPLES = int(SAMPLE_RATE * MAX_LENGTH_SECONDS) | |
N_MELS = 128 | |
TRADITIONAL_FEATURE_SIZE = 570 | |
WAV2VEC_FEATURE_SIZE = 768 | |
SPECTROGRAM_SHAPE = (224, 224, 3) | |
def _extract_traditional_features(y, sr): | |
try: | |
mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=N_MELS) | |
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) | |
features = np.mean(mel_spec_db, axis=1) | |
features = np.append(features, np.std(mel_spec_db, axis=1)) | |
features = np.append(features, np.max(mel_spec_db, axis=1)) | |
features = np.append(features, np.min(mel_spec_db, axis=1)) | |
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) | |
features = np.append(features, np.mean(mfccs, axis=1)) | |
features = np.append(features, np.std(mfccs, axis=1)) | |
if len(features) > TRADITIONAL_FEATURE_SIZE: | |
features = features[:TRADITIONAL_FEATURE_SIZE] | |
elif len(features) < TRADITIONAL_FEATURE_SIZE: | |
features = np.pad(features, (0, TRADITIONAL_FEATURE_SIZE - len(features)), mode='constant') | |
return features | |
except Exception as e: | |
print(f"Lỗi trích xuất đặc trưng truyền thống: {e}") | |
return np.zeros(TRADITIONAL_FEATURE_SIZE) | |
def _extract_wav2vec_features(y, sr): | |
try: | |
y_16k = librosa.resample(y, orig_sr=sr, target_sr=16000) | |
inputs = wav2vec_processor(y_16k, sampling_rate=16000, return_tensors="pt", padding=True) | |
with torch.no_grad(): | |
outputs = wav2vec_model(**inputs) | |
features = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy() | |
return features | |
except Exception as e: | |
print(f"Lỗi trích xuất Wav2Vec2: {e}") | |
return np.zeros(WAV2VEC_FEATURE_SIZE) | |
def _create_spectrogram_image(y, sr): | |
try: | |
mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=SPECTROGRAM_SHAPE[0]) | |
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) | |
mel_spec_norm = ((mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min() + 1e-8) * 255).astype(np.uint8) | |
img = tf.keras.preprocessing.image.array_to_img(np.stack([mel_spec_norm]*3, axis=-1)) | |
img = img.resize((SPECTROGRAM_SHAPE[1], SPECTROGRAM_SHAPE[0])) | |
return np.array(img) | |
except Exception as e: | |
print(f"Lỗi tạo ảnh spectrogram: {e}") | |
return np.zeros(SPECTROGRAM_SHAPE) | |
# --- HÀM XỬ LÝ AUDIO ĐÃ ĐƯỢC CẬP NHẬT VỚI VAD --- | |
def process_audio_file(file_path): | |
""" | |
Hàm tổng hợp phiên bản cuối cùng: Thêm Voice Activity Detection (VAD) để | |
lọc bỏ các khoảng lặng trước khi xử lý. | |
""" | |
try: | |
# 1. Dùng pydub để mở file audio | |
audio = AudioSegment.from_file(file_path) | |
# 2. Chuẩn hóa âm lượng | |
target_dbfs = -20.0 | |
change_in_dbfs = target_dbfs - audio.dBFS | |
audio = audio.apply_gain(change_in_dbfs) | |
# 3. Đảm bảo audio là mono và có sample rate đúng cho VAD | |
# VAD hoạt động tốt nhất ở các sample rate 8000, 16000, 32000, 48000 | |
vad_sample_rate = 32000 | |
audio = audio.set_channels(1) | |
audio = audio.set_frame_rate(vad_sample_rate) | |
# 4. **BƯỚC MỚI: VOICE ACTIVITY DETECTION** | |
print("DEBUG | Bắt đầu Voice Activity Detection (VAD)...") | |
vad = webrtcvad.Vad(3) # Mức độ mạnh nhất (0-3) | |
frame_duration_ms = 30 # 30ms mỗi frame | |
frame_bytes = int(vad_sample_rate * frame_duration_ms / 1000) * audio.sample_width | |
frames = [audio[i:i + frame_duration_ms] for i in range(0, len(audio), frame_duration_ms)] | |
voiced_frames = [f for f in frames if vad.is_speech(f.raw_data, vad_sample_rate)] | |
if voiced_frames: | |
# Nối các frame có tiếng nói lại với nhau | |
audio_voiced = sum(voiced_frames, AudioSegment.empty()) | |
print(f"DEBUG | VAD hoàn tất. Giữ lại {len(audio_voiced)}ms âm thanh.") | |
else: | |
# Nếu không tìm thấy tiếng nói, dùng lại audio gốc | |
audio_voiced = audio | |
print("DEBUG | VAD không tìm thấy âm thanh, sử dụng audio gốc.") | |
# 5. Chuyển đổi audio đã lọc về sample rate mục tiêu | |
audio_final = audio_voiced.set_frame_rate(SAMPLE_RATE) | |
samples = np.array(audio_final.get_array_of_samples()).astype(np.float32) | |
y = samples / (2**(audio_final.sample_width * 8 - 1)) | |
# 6. Giảm nhiễu trên tín hiệu đã được lọc | |
print("DEBUG | Bắt đầu giảm nhiễu...") | |
y_reduced_noise = nr.reduce_noise(y=y, sr=SAMPLE_RATE, prop_decrease=0.8) | |
print("DEBUG | Giảm nhiễu hoàn tất.") | |
# 7. Chuẩn hóa độ dài | |
if len(y_reduced_noise) > MAX_SAMPLES: | |
y_final = y_reduced_noise[:MAX_SAMPLES] | |
else: | |
y_final = np.pad(y_reduced_noise, (0, MAX_SAMPLES - len(y_reduced_noise)), mode='constant') | |
# 8. Trích xuất đặc trưng | |
traditional_features = _extract_traditional_features(y_final, SAMPLE_RATE) | |
wav2vec_features = _extract_wav2vec_features(y_final, SAMPLE_RATE) | |
spectrogram = _create_spectrogram_image(y_final, SAMPLE_RATE) | |
return traditional_features, wav2vec_features, spectrogram | |
except Exception as e: | |
print(f"Lỗi nghiêm trọng khi xử lý file audio {file_path}: {e}") | |
traceback.print_exc() | |
return None, None, None | |
# --- ĐỊNH NGHĨA CÁC ROUTE CỦA ỨNG DỤNG --- | |
def home(): | |
return render_template('index.html') | |
def predict(): | |
if 'audio_file' not in request.files: | |
return jsonify({'error': 'Không có file audio nào trong yêu cầu.'}), 400 | |
file = request.files['audio_file'] | |
if file.filename == '': | |
return jsonify({'error': 'Tên file không hợp lệ.'}), 400 | |
try: | |
filename = secure_filename(file.filename) | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
file.save(filepath) | |
trad_feats, w2v_feats, spec_img = process_audio_file(filepath) | |
if trad_feats is None: | |
return jsonify({'error': 'Không thể xử lý file audio.'}), 500 | |
print("\n--- BẮT ĐẦU CHẨN ĐOÁN DỮ LIỆU ĐẦU VÀO (SAU KHI VAD & GIẢM NHIỄU) ---") | |
print(f"DEBUG | trad_feats stats: mean={np.mean(trad_feats):.2f}, std={np.std(trad_feats):.2f}, min={np.min(trad_feats):.2f}, max={np.max(trad_feats):.2f}") | |
print(f"DEBUG | w2v_feats stats: mean={np.mean(w2v_feats):.2f}, std={np.std(w2v_feats):.2f}, min={np.min(w2v_feats):.2f}, max={np.max(w2v_feats):.2f}") | |
print(f"DEBUG | spec_img stats: mean={np.mean(spec_img):.2f}, std={np.std(spec_img):.2f}, min={np.min(spec_img):.2f}, max={np.max(spec_img):.2f}") | |
combined_feats = np.concatenate([trad_feats, w2v_feats]).reshape(1, -1) | |
scaled_feats = scaler.transform(combined_feats) | |
spec_img = spec_img / 255.0 | |
spec_img = np.expand_dims(spec_img, axis=0) | |
pred_xgb = model_xgb.predict_proba(scaled_feats)[0][1] | |
pred_lgb = model_lgb.predict_proba(scaled_feats)[0][1] | |
pred_cnn = model_cnn.predict(spec_img, verbose=0)[0][0] | |
print(f"DEBUG | Individual probabilities (for Male): XGB={pred_xgb:.4f}, LGBM={pred_lgb:.4f}, CNN={pred_cnn:.4f}") | |
final_prediction_prob = (pred_xgb + pred_lgb + pred_cnn) / 3 | |
final_prediction_label_index = 1 if final_prediction_prob > 0.5 else 0 | |
result_label_text = label_encoder.inverse_transform([final_prediction_label_index])[0] | |
os.remove(filepath) | |
print(f"Phân tích hoàn tất. Kết quả: {result_label_text.upper()} (Xác suất: {final_prediction_prob:.2f})") | |
return jsonify({ | |
'prediction': result_label_text.capitalize(), | |
'probability': f"{final_prediction_prob:.2f}" | |
}) | |
except Exception as e: | |
print(f"Đã xảy ra lỗi trong quá trình dự đoán: {e}") | |
traceback.print_exc() | |
return jsonify({'error': 'Đã xảy ra lỗi không xác định trên máy chủ.'}), 500 | |
# --- ĐIỂM BẮT ĐẦU CHẠY ỨNG DỤNG --- | |
if __name__ == '__main__': | |
port = int(os.environ.get("PORT", 7860)) | |
app.run(host='0.0.0.0', port=port) | |