Spaces:
Running
Running
import io | |
import pdfplumber | |
import pandas as pd | |
import json | |
from docx import Document | |
from rag.RAG import rag | |
from openpyxl import load_workbook | |
ALLOWED_EXTENSIONS = {'pdf', 'txt', 'docx', 'csv', 'xlsx', 'xls', 'json'} | |
MAX_CHARS = 5000000 | |
class FileHandler: | |
def __init__(self): | |
pass | |
def allowed_file(self, filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def check_char_limit(self, text): | |
"""Check if text exceeds the character limit""" | |
if len(text.strip()) > MAX_CHARS: | |
raise ValueError(f"File exceeds the maximum character limit of {MAX_CHARS} characters") | |
return text | |
def read_pdf(self, file): | |
text = "" | |
try: | |
with pdfplumber.open(file) as pdf: | |
for page in pdf.pages: | |
page_text = page.extract_text(layout=True) | |
if page_text: | |
text += page_text.strip() | |
text = self.check_char_limit(text) | |
return rag.generate_embedding(text.strip()) | |
except Exception as e: | |
raise ValueError(f"An error occurred while reading the PDF: {e}") | |
def read_txt(self, file): | |
try: | |
text = file.read().decode("utf-8") | |
text = self.check_char_limit(text) | |
return rag.generate_embedding(text.strip()) | |
except Exception as e: | |
raise ValueError(f"An error occurred while reading the TXT file: {e}") | |
def read_docx(self, file): | |
try: | |
doc = Document(file) | |
text = "\n".join(paragraph.text.strip() for paragraph in doc.paragraphs) | |
text = self.check_char_limit(text) | |
return rag.generate_embedding(text.strip()) | |
except Exception as e: | |
raise ValueError(f"An error occurred while reading the DOCX file: {e}") | |
def read_csv(self, file): | |
try: | |
df = pd.read_csv(file) | |
text = df.to_string(index=False) | |
text = self.check_char_limit(text) | |
return rag.generate_embedding(text.strip()) | |
except Exception as e: | |
raise ValueError(f"An error occurred while reading the CSV file: {e}") | |
def read_excel(self, file): | |
try: | |
all_text = [] | |
workbook = load_workbook(filename=file) | |
for sheet_name in workbook.sheetnames: | |
sheet = workbook[sheet_name] | |
sheet_text = f"Sheet: {sheet_name}\n" | |
for row in sheet.iter_rows(values_only=True): | |
row_text = " | ".join([str(cell) if cell is not None else "" for cell in row]) | |
sheet_text += row_text + "\n" | |
all_text.append(sheet_text) | |
text = "\n\n".join(all_text) | |
text = self.check_char_limit(text) | |
return rag.generate_embedding(text.strip()) | |
except Exception as e: | |
raise ValueError(f"An error occurred while reading the Excel file: {e}") | |
def read_json(self, file): | |
try: | |
data = json.load(file) | |
text = json.dumps(data, indent=2) | |
text = self.check_char_limit(text) | |
return rag.generate_embedding(text.strip()) | |
except Exception as e: | |
raise ValueError(f"An error occurred while reading the JSON file: {e}") | |
def handle_file(self, file): | |
filename = file.filename.lower() | |
if filename.endswith('.pdf'): | |
return self.read_pdf(file) | |
elif filename.endswith('.txt'): | |
return self.read_txt(file) | |
elif filename.endswith('.docx'): | |
return self.read_docx(file) | |
elif filename.endswith('.csv'): | |
return self.read_csv(file) | |
elif filename.endswith(('.xlsx', '.xls')): | |
return self.read_excel(file) | |
elif filename.endswith('.json'): | |
return self.read_json(file) | |
else: | |
raise ValueError(f"Unsupported file type: {filename}") | |
def process_file(self, file): | |
try: | |
if not self.allowed_file(file.filename): | |
return {"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}, 400 | |
return self.handle_file(file) | |
except Exception as e: | |
return {"error": f"Error processing file: {e}"}, 400 | |
file_handler = FileHandler() |