Spaces:
Sleeping
Sleeping
# ββ app.py βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
import os, logging, textwrap | |
import gradio as gr | |
from transformers import pipeline, AutoTokenizer | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain_huggingface.embeddings import HuggingFaceEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import concurrent.futures | |
# ββ KONFIG βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
DOCS_DIR = "document" | |
INDEX_DIR = "faiss_index" | |
EMB_MODEL = "KBLab/sentence-bert-swedish-cased" | |
#LLM_MODEL = "tiiuae/falcon-rw-1b" # DΓ₯lig | |
#LLM_MODEL = "google/flan-t5-base" # DΓ₯lig | |
#LLM_MODEL = "bigscience/bloom-560m" # DΓ₯lig | |
#LLM_MODEL = "NbAiLab/nb-gpt-j-6B" #- Restricted | |
#LLM_MODEL = "datificate/gpt2-small-swedish" # Finns ej pΓ₯ Hugging face | |
#LLM_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" | |
# timpal0l/mdeberta-v3-base-squad2 liten och mΓΆjlig pΓ₯ Svenska | |
#LLM_MODEL = "AI-Sweden-Models/gpt-sw3-1.3B" # finns olika varianter 126M, 356M, 1.3B, 6.7B, 20B, 40B | |
LLM_MODEL = "AI-Sweden-Models/gpt-sw3-356M" | |
# LLM_MODEL = AI-Sweden-Models/Llama-3-8B-instruct # kanske fΓΆr stor | |
# https://www.ai.se/en/ai-labs/natural-language-understanding/models-resources | |
CHUNK_SIZE = 400 | |
CHUNK_OVERLAP = 40 | |
CTX_TOK_MAX = 750 # sparar marginal till frΓ₯ga + svar | |
MAX_NEW_TOKENS = 512 | |
K = 5 | |
DEFAULT_TEMP = 0.8 | |
GEN_TIMEOUT = 180 # Timeout fΓΆr generering i sekunder | |
# ββ LOGGING ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
log = logging.getLogger(__name__) | |
# ββ 1) Index (bygg eller ladda) βββββββββββββββββββββββββββββββββββββ | |
emb = HuggingFaceEmbeddings(model_name=EMB_MODEL) | |
INDEX_PATH = os.path.join(INDEX_DIR, "index.faiss") | |
if os.path.isfile(INDEX_PATH): | |
log.info(f"π Laddar index frΓ₯n {INDEX_DIR}") | |
vs = FAISS.load_local(INDEX_DIR, emb) | |
else: | |
splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) | |
docs, pdfs = [], [] | |
for fn in os.listdir(DOCS_DIR): | |
if fn.lower().endswith(".pdf"): | |
chunks = splitter.split_documents(PyPDFLoader(os.path.join(DOCS_DIR, fn)).load()) | |
for c in chunks: | |
c.metadata["source"] = fn | |
docs.extend(chunks); pdfs.append(fn) | |
vs = FAISS.from_documents(docs, emb); vs.save_local(INDEX_DIR) | |
log.info(f"β Byggt index β {len(pdfs)}β―PDF / {len(docs)}β―chunkar") | |
retriever = vs.as_retriever(search_kwargs={"k": K}) | |
# ββ 2) LLMβpipeline & tokenizer βββββββββββββββββββββββββββββββββββββ | |
log.info("π Initierar LLM β¦") | |
gen_pipe = pipeline("text-generation", model=LLM_MODEL, device=-1, max_new_tokens=MAX_NEW_TOKENS) | |
tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) | |
log.info("β LLM klar") | |
# ββ 3) HjΓ€lpfunktioner ββββββββββββββββββββββββββββββββββββββββββββββ | |
def build_prompt(query: str, docs): | |
""" | |
Tar sΓ₯ mΓ₯nga chunkar som ryms i CTX_TOK_MAX token | |
""" | |
context_parts = [] | |
total_ctx_tok = 0 | |
for d in docs: | |
tok_len = len(tokenizer.encode(d.page_content)) | |
if total_ctx_tok + tok_len > CTX_TOK_MAX: | |
break | |
context_parts.append(d.page_content) | |
total_ctx_tok += tok_len | |
context = "\n\n---\n\n".join(context_parts) | |
return textwrap.dedent(f"""\ | |
Du Γ€r en hjΓ€lpsam assistent som svarar pΓ₯ svenska. | |
Kontext (hΓ€mtat ur PDFβdokument): | |
{context} | |
FrΓ₯ga: {query} | |
Svar (svenska):""").strip() | |
def test_retrieval(q): # snabbβtest utan AI | |
docs = retriever.invoke(q) | |
return "\n\n".join([f"{i+1}. ({d.metadata['source']}) {d.page_content[:160]}β¦" for i, d in enumerate(docs)]) or "π« Inga trΓ€ffar" | |
def chat_fn(q, temp, max_new_tokens, k, ctx_tok_max, history): | |
history = history or [] | |
history.append({"role": "user", "content": q}) | |
# HΓ€mta chunkar och poΓ€ng | |
docs_and_scores = vs.similarity_search_with_score(q, k=int(k)) | |
docs = [doc for doc, score in docs_and_scores] | |
scores = [score for doc, score in docs_and_scores] | |
if not docs: | |
history.append({"role": "assistant", "content": "π« Hittade inget relevant."}) | |
return history, history | |
# Visa chunkar och poΓ€ng | |
chunk_info = "\n\n".join([ | |
f"{i+1}. ({d.metadata['source']}) score={scores[i]:.3f}\n{d.page_content[:160]}β¦" | |
for i, d in enumerate(docs) | |
]) | |
history.append({"role": "system", "content": f"π Chunkar som anvΓ€nds:\n{chunk_info}"}) | |
def build_prompt_dynamic(query, docs, ctx_tok_max): | |
context_parts = [] | |
total_ctx_tok = 0 | |
for d in docs: | |
tok_len = len(tokenizer.encode(d.page_content)) | |
if total_ctx_tok + tok_len > int(ctx_tok_max): | |
break | |
context_parts.append(d.page_content) | |
total_ctx_tok += tok_len | |
context = "\n\n---\n\n".join(context_parts) | |
return textwrap.dedent(f"""\ | |
Du Γ€r en hjΓ€lpsam assistent som svarar pΓ₯ svenska. | |
Kontext (hΓ€mtat ur PDFβdokument): | |
{context} | |
FrΓ₯ga: {query} | |
Svar (svenska):""").strip() | |
prompt = build_prompt_dynamic(q, docs, ctx_tok_max) | |
log.info(f"Prompt tokens={len(tokenizer.encode(prompt))} temp={temp} max_new_tokens={max_new_tokens} k={k} ctx_tok_max={ctx_tok_max}") | |
def generate(): | |
return gen_pipe( | |
prompt, | |
temperature=float(temp), | |
max_new_tokens=int(max_new_tokens), | |
pad_token_id=tokenizer.eos_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
do_sample=True, | |
return_full_text=False | |
)[0]["generated_text"] | |
try: | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit(generate) | |
ans = future.result(timeout=GEN_TIMEOUT) # Timeout in seconds | |
except concurrent.futures.TimeoutError: | |
ans = f"β° Ingen respons frΓ₯n modellen inom {GEN_TIMEOUT} sekunder." | |
except Exception as e: | |
log.exception("Genereringsβfel") | |
ans = f"β Fel vid generering: {type(e).__name__}: {e}\n\nPrompt:\n{prompt}" | |
src_hint = docs[0].metadata["source"] if docs else "Ingen kΓ€lla" | |
history.append({"role": "assistant", "content": f"**(KΓ€lla: {src_hint})**\n\n{ans}"}) | |
return history, history | |
# ββ 4) Gradio UI ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
with gr.Blocks() as demo: | |
gr.Markdown("# π Svensk RAGβchat") | |
gr.Markdown(f"**PDFβfiler:** {', '.join(os.listdir(DOCS_DIR)) or 'β'}") | |
gr.Markdown(f"**LLM-modell som anvΓ€nds:** `{LLM_MODEL}`", elem_id="llm-info") | |
with gr.Row(): | |
q_test = gr.Textbox(label="π TestβRetrieval") | |
b_test = gr.Button("Testa") | |
o_test = gr.Textbox(label="Chunkar") | |
with gr.Row(): | |
q_in = gr.Textbox(label="FrΓ₯ga", placeholder="Ex: Vad Γ€r fΓΆrvaltningsΓΆverlΓ€mnande?") | |
temp = gr.Slider(0, 1, value=DEFAULT_TEMP, step=0.05, label="Temperatur") | |
max_new_tokens = gr.Slider(32, 1024, value=MAX_NEW_TOKENS, step=8, label="Max svarslΓ€ngd (tokens)") | |
k = gr.Slider(1, 10, value=K, step=1, label="Antal chunkar (K)") | |
ctx_tok_max = gr.Slider(100, 2000, value=CTX_TOK_MAX, step=50, label="Max kontexttokens") | |
b_send = gr.Button("Skicka") | |
b_stop = gr.Button("Stoppa") # LΓ€gg till stoppknapp | |
chat = gr.Chatbot(type="messages", label="Chat") | |
chat_hist = gr.State([]) | |
b_test.click(test_retrieval, inputs=[q_test], outputs=[o_test]) | |
send_event = b_send.click( | |
chat_fn, | |
inputs=[q_in, temp, max_new_tokens, k, ctx_tok_max, chat_hist], | |
outputs=[chat, chat_hist] | |
) | |
b_stop.click(None, cancels=[send_event]) | |
if __name__ == "__main__": | |
demo.launch(share=True) # ta bort share=True om du vill hΓ₯lla den privat | |