|
import os |
|
import tempfile |
|
import uuid |
|
import pandas as pd |
|
import logging |
|
import json |
|
import yaml |
|
import time |
|
import datetime |
|
import asyncio |
|
import warnings |
|
from pathlib import Path |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader, PromptTemplate, ServiceContext |
|
from llama_index.core.retrievers import VectorIndexRetriever |
|
from llama_index.core.query_engine import RetrieverQueryEngine |
|
from llama_index.core.postprocessor import SimilarityPostprocessor, KeywordNodePostprocessor |
|
from llama_index.core.node_parser import MarkdownNodeParser, SentenceSplitter |
|
from llama_index.llms.cerebras import Cerebras |
|
from llama_index.embeddings.nomic import NomicEmbedding |
|
from llama_index.readers.docling import DoclingReader |
|
from llama_index.core.response_synthesizers import CompactAndRefine |
|
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter |
|
from llama_index.vector_stores.faiss import FaissVectorStore |
|
|
|
|
|
from groq import Groq |
|
|
|
|
|
import gradio as gr |
|
import gradio.themes as themes |
|
|
|
|
|
warnings.filterwarnings("ignore", message=".*clean_up_tokenization_spaces.*") |
|
|
|
|
|
CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY") |
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
NOMIC_API_KEY = os.getenv("NOMIC_API_KEY") |
|
|
|
if not CEREBRAS_API_KEY: |
|
raise ValueError("CEREBRAS_API_KEY belum diset.") |
|
if not GROQ_API_KEY: |
|
raise ValueError("GROQ_API_KEY belum diset.") |
|
if not NOMIC_API_KEY: |
|
raise ValueError("NOMIC_API_KEY belum diset.") |
|
|
|
|
|
global_file_cache = {} |
|
|
|
|
|
logging.info("Inisialisasi Groq client") |
|
groq_client = Groq(api_key=GROQ_API_KEY) |
|
|
|
|
|
def load_cerebras_llm(): |
|
logging.info("Memuat Cerebras LLM") |
|
try: |
|
llm = Cerebras( |
|
model="llama-3.3-70b", |
|
api_key=CEREBRAS_API_KEY, |
|
temperature=0.1, |
|
max_tokens=1024, |
|
top_p=0.9 |
|
) |
|
logging.debug("Cerebras LLM berhasil dimuat") |
|
return llm |
|
except Exception as e: |
|
logging.error(f"Error load_cerebras_llm: {e}") |
|
raise |
|
|
|
def create_embedding(): |
|
logging.info("Menginisialisasi embedding model dengan NomicEmbedding") |
|
try: |
|
embed_model = NomicEmbedding( |
|
model_name="nomic-embed-text-v1.5", |
|
vision_model_name="nomic-embed-vision-v1.5", |
|
api_key=NOMIC_API_KEY, |
|
embed_batch_size=10 |
|
) |
|
Settings.embed_model = embed_model |
|
logging.debug("Embedding model berhasil di-set") |
|
return embed_model |
|
except Exception as e: |
|
logging.error(f"Error create_embedding: {e}") |
|
raise |
|
|
|
|
|
def load_documents(file_list): |
|
logging.info("Memuat dokumen yang diunggah") |
|
if not file_list: |
|
logging.error("Tidak ada file yang diunggah.") |
|
return "Error: Tidak ada file yang diunggah.", None |
|
documents = [] |
|
doc_names = [] |
|
try: |
|
for file_obj in file_list: |
|
file_name = os.path.basename(file_obj.name) |
|
doc_names.append(file_name) |
|
logging.debug(f"Memuat file: {file_name}") |
|
try: |
|
loader = SimpleDirectoryReader(input_files=[file_obj.name], file_extractor={".xlsx": DoclingReader()}) |
|
except Exception: |
|
loader = SimpleDirectoryReader(input_files=[file_obj.name]) |
|
docs = loader.load_data() |
|
for doc in docs: |
|
|
|
doc.metadata["source"] = file_name |
|
doc.metadata["file_name"] = file_name |
|
documents.append(doc) |
|
if not documents: |
|
logging.error("Tidak ditemukan dokumen yang valid.") |
|
return "Tidak ditemukan dokumen yang valid.", None |
|
|
|
llm = load_cerebras_llm() |
|
embed_model = create_embedding() |
|
|
|
|
|
node_parser = SentenceSplitter( |
|
chunk_size=512, |
|
chunk_overlap=50, |
|
separator=" ", |
|
paragraph_separator="\n\n", |
|
secondary_chunking_regex="[^,.;γ]+[,.;γ]?", |
|
) |
|
|
|
|
|
service_context = ServiceContext.from_defaults( |
|
llm=llm, |
|
embed_model=embed_model, |
|
node_parser=node_parser |
|
) |
|
Settings.llm = llm |
|
Settings.embed_model = embed_model |
|
|
|
|
|
qa_template = """ |
|
Kamu adalah asisten yang sangat hati-hati yang hanya menjawab berdasarkan informasi yang ada dalam dokumen. |
|
Jika pertanyaan tidak dapat dijawab hanya berdasarkan konteks, katakan "Maaf, saya tidak menemukan informasi tersebut dalam dokumen yang diberikan." |
|
|
|
Jika pertanyaannya tidak relevan dengan dokumen, katakan "Pertanyaan ini tidak relevan dengan dokumen yang sedang dianalisis." |
|
|
|
Jangan pernah mengada-ada atau membuat informasi. Jika kamu tidak yakin, katakan bahwa kamu tidak bisa menjawab dengan pasti berdasarkan dokumen. |
|
|
|
Saat menjawab, selalu berikan kembali sumber informasimu dengan format yang jelas. |
|
|
|
Konteks Dokumen: |
|
{context_str} |
|
|
|
Pertanyaan: {query_str} |
|
|
|
Jawabanmu (hanya berdasarkan konteks dokumen): |
|
""" |
|
qa_prompt_tmpl = PromptTemplate(qa_template) |
|
|
|
|
|
vector_store = FaissVectorStore(dim=embed_model.embed_dim) |
|
|
|
|
|
nodes = node_parser.get_nodes_from_documents(documents) |
|
|
|
|
|
for i, node in enumerate(nodes): |
|
if i % 10 == 0: |
|
logging.debug(f"Embedding node {i+1}/{len(nodes)}") |
|
node_embedding = embed_model.get_text_embedding( |
|
node.get_content(metadata_mode="all") |
|
) |
|
node.embedding = node_embedding |
|
vector_store.add(node_embedding, node.node_id, node) |
|
|
|
logging.info(f"Berhasil embedding {len(nodes)} nodes ke FAISS vector store") |
|
|
|
|
|
index = VectorStoreIndex.from_vector_store( |
|
vector_store=vector_store, |
|
service_context=service_context, |
|
show_progress=True |
|
) |
|
|
|
|
|
retriever = VectorIndexRetriever( |
|
index=index, |
|
similarity_top_k=5, |
|
vector_store_query_mode="hybrid", |
|
alpha=0.5 |
|
) |
|
|
|
|
|
postprocessors = [ |
|
SimilarityPostprocessor(similarity_cutoff=0.7), |
|
KeywordNodePostprocessor(required_keywords=[]), |
|
] |
|
|
|
|
|
response_synthesizer = CompactAndRefine( |
|
service_context=service_context, |
|
text_qa_template=qa_prompt_tmpl, |
|
refine_template=qa_prompt_tmpl, |
|
verbose=True |
|
) |
|
|
|
|
|
query_engine = RetrieverQueryEngine( |
|
retriever=retriever, |
|
response_synthesizer=response_synthesizer, |
|
node_postprocessors=postprocessors |
|
) |
|
|
|
file_key = f"doc-{uuid.uuid4()}" |
|
global_file_cache[file_key] = query_engine |
|
logging.info(f"Berhasil memuat {len(documents)} dokumen: {', '.join(doc_names)} dengan file_key: {file_key}") |
|
return f"Berhasil memuat {len(documents)} dokumen: {', '.join(doc_names)}.", file_key |
|
except Exception as e: |
|
logging.error(f"Error loading documents: {e}") |
|
return f"Error loading documents: {str(e)}", None |
|
|
|
|
|
async def document_chat(file_key: str, prompt: str, audio_file=None, translate_audio: bool=False, history=[]): |
|
logging.info(f"Memproses dokumen chat untuk file_key: {file_key} dengan prompt: {prompt}") |
|
if file_key not in global_file_cache: |
|
logging.error("File key dokumen tidak ditemukan pada cache global.") |
|
return history + [("Error", "Silakan muat dokumen terlebih dahulu.")] |
|
query_engine = global_file_cache[file_key] |
|
try: |
|
if audio_file: |
|
logging.info("Audio file diterima, memulai transkripsi/terjemahan") |
|
transcription = transcribe_or_translate_audio(audio_file, translate=translate_audio) |
|
logging.debug(f"Hasil transkripsi: {transcription}") |
|
prompt = f"{prompt} {transcription}".strip() |
|
|
|
|
|
if not prompt or prompt.strip() == "": |
|
return history + [("", "Pertanyaan tidak boleh kosong. Silakan ajukan pertanyaan.")] |
|
|
|
|
|
response = await asyncio.to_thread(query_engine.query, prompt) |
|
answer = str(response) |
|
|
|
|
|
sources_text = "" |
|
if hasattr(response, "source_nodes") and response.source_nodes: |
|
sources = [] |
|
for i, node in enumerate(response.source_nodes, 1): |
|
source = node.metadata.get('source', 'Tidak ada sumber') |
|
score = node.score if hasattr(node, 'score') else 'N/A' |
|
content_preview = node.get_content()[:100] + "..." if len(node.get_content()) > 100 else node.get_content() |
|
sources.append(f"[{i}] Sumber: {source} (Relevansi: {score:.2f})\nPreview: {content_preview}") |
|
sources_text = "\n\n" + "Sumber Informasi:\n" + "\n".join(sources) |
|
|
|
|
|
if (not hasattr(response, "source_nodes") or not response.source_nodes) and \ |
|
not "tidak menemukan informasi" in answer.lower(): |
|
answer = "Maaf, saya tidak menemukan informasi yang relevan dalam dokumen yang diberikan." |
|
|
|
final_answer = answer + sources_text |
|
|
|
return history + [(prompt, final_answer)] |
|
except Exception as e: |
|
logging.error(f"Error processing document_chat: {e}") |
|
return history + [(prompt, f"Error processing query: {str(e)}")] |
|
|
|
|
|
def transcribe_or_translate_audio(audio_file, translate=False): |
|
logging.info(f"Memulai proses {'terjemahan' if translate else 'transkripsi'} audio") |
|
try: |
|
with open(audio_file, "rb") as file: |
|
file_content = file.read() |
|
logging.debug("File audio berhasil dibaca") |
|
if translate: |
|
result = groq_client.audio.translations.create( |
|
file=(audio_file, file_content), |
|
model="whisper-large-v3", |
|
response_format="json", |
|
temperature=0.0 |
|
) |
|
logging.debug("Terjemahan audio berhasil diproses") |
|
return result.text |
|
else: |
|
result = groq_client.audio.transcriptions.create( |
|
file=(audio_file, file_content), |
|
model="whisper-large-v3", |
|
response_format="json", |
|
temperature=0.0 |
|
) |
|
logging.debug("Transkripsi audio berhasil diproses") |
|
return result.text |
|
except Exception as e: |
|
logging.error(f"Error processing audio: {e}") |
|
return f"Error processing audio: {str(e)}" |
|
|
|
|
|
def convert_text_to_speech(text, voice): |
|
logging.info("Memulai konversi teks ke suara dengan TTS") |
|
model = "playai-tts" |
|
response_format = "wav" |
|
try: |
|
if not text: |
|
logging.warning("Input teks kosong, TTS tidak dijalankan.") |
|
return None |
|
logging.debug(f"Parameter TTS: model={model}, voice={voice}, panjang teks={len(text)} karakter") |
|
response = groq_client.audio.speech.create( |
|
model=model, |
|
voice=voice, |
|
input=text, |
|
response_format=response_format |
|
) |
|
logging.debug("Response TTS diterima dari Groq API") |
|
temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
temp_wav_path = temp_wav.name |
|
temp_wav.close() |
|
response.write_to_file(temp_wav_path) |
|
if os.path.exists(temp_wav_path): |
|
logging.info(f"Audio TTS berhasil disimpan di {temp_wav_path}") |
|
else: |
|
logging.error("File audio TTS tidak ditemukan setelah disimpan.") |
|
return temp_wav_path |
|
except Exception as e: |
|
logging.error(f"Error converting text to speech: {e}") |
|
return f"Error converting text to speech: {str(e)}" |
|
|
|
|
|
def doc_chat_with_tts(prompt, history, file_key, audio_file, translate, voice, enable_tts): |
|
logging.info("Memproses document chat dengan TTS (opsional)") |
|
|
|
if history and isinstance(history[0], dict): |
|
tuple_history = history |
|
else: |
|
tuple_history = history or [] |
|
try: |
|
updated_history = asyncio.run(document_chat(file_key, prompt, audio_file, translate, tuple_history)) |
|
logging.debug("Updated history dari document_chat diterima") |
|
except Exception as e: |
|
logging.error(f"Error dalam document_chat: {e}") |
|
updated_history = tuple_history |
|
|
|
|
|
new_messages = [] |
|
for entry in updated_history: |
|
if isinstance(entry, dict): |
|
new_messages.append(entry) |
|
elif isinstance(entry, (list, tuple)): |
|
if len(entry) == 2: |
|
user_msg, assistant_msg = entry |
|
new_messages.append({"role": "user", "content": user_msg}) |
|
new_messages.append({"role": "assistant", "content": assistant_msg}) |
|
else: |
|
logging.warning("Entry in history does not have exactly 2 elements, skipping.") |
|
else: |
|
logging.warning("Unexpected entry type in history, skipping.") |
|
|
|
|
|
if enable_tts: |
|
last_assistant = "" |
|
for msg in reversed(new_messages): |
|
if msg.get("role") == "assistant": |
|
last_assistant = msg.get("content", "") |
|
break |
|
if last_assistant is None or last_assistant.strip() == "": |
|
logging.warning("Tidak ada pesan asisten yang valid untuk TTS.") |
|
audio_path = None |
|
else: |
|
logging.info("Memulai konversi jawaban akhir ke audio dengan TTS") |
|
|
|
if "Sumber Informasi:" in last_assistant: |
|
tts_text = last_assistant.split("Sumber Informasi:")[0].strip() |
|
else: |
|
tts_text = last_assistant |
|
audio_path = convert_text_to_speech(tts_text, voice) |
|
logging.info(f"Audio output dihasilkan: {audio_path}") |
|
else: |
|
audio_path = None |
|
logging.info("TTS tidak diaktifkan, sehingga tidak menghasilkan audio.") |
|
|
|
return new_messages, audio_path |
|
|
|
|
|
with gr.Blocks(theme=themes.Base(primary_hue="teal", secondary_hue="teal", neutral_hue="slate")) as demo: |
|
|
|
|
|
doc_chat_history = gr.Chatbot(label="Riwayat Chat", type="messages") |
|
doc_audio_output = gr.Audio(label="Audio Output", type="filepath") |
|
doc_voice = gr.Dropdown(label="Pilih Suara untuk TTS", |
|
choices=["Arista-PlayAI", "Atlas-PlayAI", "Basil-PlayAI", "Briggs-PlayAI", |
|
"Calum-PlayAI", "Celeste-PlayAI", "Cheyenne-PlayAI", "Chip-PlayAI", |
|
"Cillian-PlayAI", "Deedee-PlayAI", "Fritz-PlayAI", "Gail-PlayAI", |
|
"Indigo-PlayAI", "Mamaw-PlayAI", "Mason-PlayAI", "Mikail-PlayAI", |
|
"Mitch-PlayAI", "Quinn-PlayAI", "Thunder-PlayAI"], |
|
value="Fritz-PlayAI") |
|
|
|
enable_tts = gr.Checkbox(label="Aktifkan TTS", value=True) |
|
|
|
|
|
with gr.Row(): |
|
doc_file_input = gr.File(label="Unggah Dokumen", file_count="multiple") |
|
load_doc_btn = gr.Button("Muat Dokumen") |
|
doc_load_status = gr.Textbox(label="Status Dokumen") |
|
doc_file_key = gr.State() |
|
with gr.Row(): |
|
doc_chat_input = gr.Textbox(label="Masukkan Pertanyaan") |
|
doc_audio_input = gr.Microphone(label="Record", type="filepath") |
|
doc_translate = gr.Checkbox(label="Terjemahkan Audio ke Bahasa Inggris", value=False) |
|
|
|
|
|
def load_doc(files): |
|
logging.info("Callback load_doc dipanggil") |
|
status, file_key = load_documents(files) |
|
return status, file_key |
|
load_doc_btn.click(load_doc, inputs=[doc_file_input], outputs=[doc_load_status, doc_file_key]) |
|
|
|
|
|
def process_doc_chat(prompt, history, file_key, audio_file, translate, voice, enable_tts): |
|
logging.info("Callback process_doc_chat dipanggil") |
|
return doc_chat_with_tts(prompt, history, file_key, audio_file, translate, voice, enable_tts) |
|
doc_chat_input.submit(process_doc_chat, inputs=[doc_chat_input, doc_chat_history, doc_file_key, doc_audio_input, doc_translate, doc_voice, enable_tts], |
|
outputs=[doc_chat_history, doc_audio_output]) |
|
doc_audio_input.change(process_doc_chat, inputs=[doc_chat_input, doc_chat_history, doc_file_key, doc_audio_input, doc_translate, doc_voice, enable_tts], |
|
outputs=[doc_chat_history, doc_audio_output]) |
|
|
|
|
|
clear_btn = gr.Button("Clear All") |
|
def clear_all(): |
|
global global_file_cache |
|
global_file_cache = {} |
|
return "Cache dikosongkan." |
|
clear_status = gr.Textbox(label="Clear Status") |
|
clear_btn.click(clear_all, outputs=[clear_status]) |
|
|
|
demo.queue() |
|
demo.launch(debug=True) |
|
|