Spaces:
Runtime error
Runtime error
import os | |
import io | |
import torch | |
import uvicorn | |
import spacy | |
import pdfplumber | |
import ffmpeg # β Replaced moviepy with ffmpeg-python | |
import librosa | |
import soundfile as sf | |
import subprocess | |
from fastapi import FastAPI, UploadFile, File, HTTPException | |
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
from sentence_transformers import SentenceTransformer, util | |
# β Suppress Warnings | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
os.environ['CUDA_VISIBLE_DEVICES'] = '0' | |
# β Ensure GPU is Used | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# β Fix Spacy Installation (Prevent Permission Errors) | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
except OSError: | |
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm", "--user"]) | |
nlp = spacy.load("en_core_web_sm") | |
# β Load NLP Models | |
try: | |
summarizer = pipeline("summarization", model="nsi319/legal-pegasus", device=0 if torch.cuda.is_available() else -1) | |
embedding_model = SentenceTransformer("all-mpnet-base-v2", device=device) | |
ner_model = pipeline("ner", model="dslim/bert-base-NER", tokenizer="dslim/bert-base-NER", device=0 if torch.cuda.is_available() else -1) | |
speech_to_text = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=0 if torch.cuda.is_available() else -1) | |
except Exception as e: | |
raise RuntimeError(f"Error loading models: {str(e)}") | |
# β Load Falcon 7B for Chatbot | |
MODEL_NAME = "tiiuae/falcon-7b-instruct" | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
chatbot_model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
torch_dtype=torch.bfloat16, | |
device_map="auto" | |
) | |
# β Initialize FastAPI | |
app = FastAPI() | |
# β PDF Text Extraction | |
def extract_text_from_pdf(pdf_file): | |
"""Extracts text from a PDF file using pdfplumber.""" | |
try: | |
with pdfplumber.open(pdf_file) as pdf: | |
text = "\n".join([page.extract_text() or "" for page in pdf.pages]) | |
if not text.strip(): | |
raise ValueError("No readable text found in PDF. It may be a scanned document.") | |
return text | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"PDF extraction failed: {str(e)}") | |
# β Video-to-Audio Extraction (Using FFmpeg Instead of MoviePy) | |
def extract_audio_from_video(video_path): | |
"""Extracts audio from a video file using FFmpeg.""" | |
try: | |
audio_path = video_path.replace(".mp4", ".wav") | |
ffmpeg.input(video_path).output(audio_path, format="wav").run(overwrite_output=True) | |
return audio_path | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Audio extraction failed: {str(e)}") | |
# β Speech-to-Text Transcription (Fix for Long Audio) | |
def transcribe_audio(audio_path): | |
"""Transcribes speech to text using Whisper model with chunking for long files.""" | |
try: | |
audio, sr = librosa.load(audio_path, sr=16000) | |
duration = len(audio) / sr | |
if duration > 30: | |
chunk_size = 30 * sr # 30-second chunks | |
chunks = [audio[i:i + chunk_size] for i in range(0, len(audio), chunk_size)] | |
transcripts = [] | |
for idx, chunk in enumerate(chunks): | |
temp_chunk_path = f"temp_chunk_{idx}.wav" | |
sf.write(temp_chunk_path, chunk, sr) | |
result = speech_to_text(temp_chunk_path) | |
transcripts.append(result["text"]) | |
os.remove(temp_chunk_path) | |
return " ".join(transcripts) | |
else: | |
result = speech_to_text(audio_path) | |
return result["text"] | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Speech-to-text failed: {str(e)}") | |
# β Legal Document Summarization | |
async def summarize_legal_document(text): | |
"""Generates a summary of the legal document.""" | |
try: | |
summary = summarizer(text[:1024], max_length=200, min_length=50, do_sample=False) | |
return summary[0]['summary_text'] | |
except Exception as e: | |
return "Summarization failed due to an internal error." | |
# β Legal Document Analysis API | |
async def analyze_legal_document(file: UploadFile = File(...)): | |
"""Analyzes a legal document by extracting text, summarizing, and identifying entities.""" | |
try: | |
content = await file.read() | |
text = extract_text_from_pdf(io.BytesIO(content)) | |
summary = await summarize_legal_document(text) | |
return {"status": "success", "summary": summary} | |
except Exception as e: | |
return {"status": "error", "detail": str(e)} | |
# β Chatbot API | |
async def chatbot_endpoint(query: dict): | |
"""Handles chatbot queries using Falcon 7B.""" | |
try: | |
input_text = query.get("query", "") | |
if not input_text: | |
raise HTTPException(status_code=400, detail="Query cannot be empty.") | |
inputs = tokenizer(input_text, return_tensors="pt").to(device) | |
outputs = chatbot_model.generate(**inputs, max_length=200) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return {"status": "success", "answer": response} | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
# β Video Upload & Analysis API | |
async def analyze_video(file: UploadFile = File(...)): | |
"""Extracts speech from video and analyzes it.""" | |
try: | |
video_path = f"temp_{file.filename}" | |
with open(video_path, "wb") as f: | |
f.write(await file.read()) | |
audio_path = extract_audio_from_video(video_path) | |
transcript = transcribe_audio(audio_path) | |
return {"status": "success", "transcript": transcript} | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
# β Run FastAPI Server | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |