|
import gradio as gr |
|
import yaml |
|
import json |
|
import uuid |
|
from pathlib import Path |
|
from docx import Document |
|
import PyPDF2 |
|
from sentence_transformers import SentenceTransformer |
|
import tiktoken |
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
tokenizer = tiktoken.get_encoding("cl100k_base") |
|
|
|
def extract_front_matter_and_body(text: str): |
|
import re |
|
fm_regex = r"^---\n(.*?)\n---\n(.*)$" |
|
m = re.match(fm_regex, text, re.DOTALL) |
|
if m: |
|
meta = yaml.safe_load(m.group(1)) or {} |
|
body = m.group(2) |
|
else: |
|
meta = {} |
|
body = text |
|
return meta, body |
|
|
|
def chunk_text(text: str, max_tokens: int = 500, overlap: int = 50): |
|
tokens = tokenizer.encode(text) |
|
chunks = [] |
|
start = 0 |
|
while start < len(tokens): |
|
end = min(start + max_tokens, len(tokens)) |
|
chunk_toks = tokens[start:end] |
|
chunks.append(tokenizer.decode(chunk_toks)) |
|
start += max_tokens - overlap |
|
return chunks |
|
|
|
def process_file(path: str, vertical: str, language: str): |
|
ext = Path(path).suffix.lower() |
|
if ext in ['.md', '.markdown']: |
|
raw = Path(path).read_text(encoding='utf-8') |
|
meta, body = extract_front_matter_and_body(raw) |
|
elif ext == '.docx': |
|
doc = Document(path) |
|
body = "\n".join(p.text for p in doc.paragraphs) |
|
meta = {} |
|
elif ext == '.pdf': |
|
reader = PyPDF2.PdfReader(path) |
|
pages = [page.extract_text() or "" for page in reader.pages] |
|
body = "\n".join(pages) |
|
meta = {} |
|
else: |
|
return [] |
|
|
|
default_meta = { |
|
'vertical': vertical, |
|
'language': language, |
|
'source': Path(path).name |
|
} |
|
meta = {**default_meta, **meta} |
|
records = [] |
|
for i, chunk in enumerate(chunk_text(body)): |
|
emb = model.encode(chunk).tolist() |
|
metadata = { |
|
'id': f"{Path(path).stem}-chunk-{i+1:04d}", |
|
'chunk_index': i+1, |
|
**meta |
|
} |
|
records.append({'vector': emb, 'metadata': metadata}) |
|
return records |
|
|
|
def run_pipeline(files, vertical, language): |
|
all_records = [] |
|
for file_path in files: |
|
recs = process_file(file_path, vertical, language) |
|
all_records.extend(recs) |
|
|
|
out_file = f"/tmp/{uuid.uuid4().hex}.jsonl" |
|
with open(out_file, 'w', encoding='utf-8') as f: |
|
for rec in all_records: |
|
json.dump({'id': rec['metadata']['id'], 'vector': rec['vector'], 'metadata': rec['metadata']}, f, ensure_ascii=False) |
|
f.write("\n") |
|
return out_file |
|
|
|
demo = gr.Blocks() |
|
with demo: |
|
gr.Markdown("## Ingesta para Amazon S3 Vector Features") |
|
with gr.Row(): |
|
uploader = gr.File(label="Sube tus documentos", file_count="multiple", type="filepath") |
|
vertical = gr.Textbox(label="Vertical (p.ej. SEO, eCommerce)", value="general") |
|
language = gr.Textbox(label="Idioma", value="es") |
|
btn = gr.Button("Procesar y Generar JSONL") |
|
output = gr.File(label="Descarga el JSONL") |
|
|
|
btn.click(fn=run_pipeline, inputs=[uploader, vertical, language], outputs=output) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|