import os import re import random import requests import gradio as gr import numpy as np import nltk from newspaper import Article from nltk.tokenize import sent_tokenize from sentence_transformers import SentenceTransformer, util import spacy import nltkmodule import en_core_sci_lg from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline import torch #nltk.download('punkt') # --- Models (load once, globally) --- scispacy = en_core_sci_lg.load() sbert_keybert = SentenceTransformer("pritamdeka/BioBERT-mnli-snli-scinli-scitail-mednli-stsb") sbert_rerank = SentenceTransformer("pritamdeka/S-PubMedBert-MS-MARCO") NLI_MODEL_NAME = "pritamdeka/PubMedBERT-MNLI-MedNLI" nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL_NAME) nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL_NAME) NLI_LABELS = ['CONTRADICTION', 'NEUTRAL', 'ENTAILMENT'] PUBMED_N = 100 TOP_ABSTRACTS = 10 # --- Sentence section classifier model (BioBert-PubMed200kRCT) --- EVIDENCE_MODEL = "pritamdeka/BioBert-PubMed200kRCT" evidence_tokenizer = AutoTokenizer.from_pretrained(EVIDENCE_MODEL) evidence_model = AutoModelForSequenceClassification.from_pretrained(EVIDENCE_MODEL) label_map = {0: "BACKGROUND", 1: "OBJECTIVE", 2: "METHODS", 3: "RESULTS", 4: "CONCLUSIONS"} def extract_evidence_sentences_from_abstract(abstract, keep_labels=("RESULTS", "CONCLUSIONS")): sents = sent_tokenize(abstract) evidence_sents = [] for s in sents: inputs = evidence_tokenizer(s, return_tensors="pt", truncation=True, padding=True) with torch.no_grad(): logits = evidence_model(**inputs).logits pred = torch.argmax(logits, dim=1).item() label = label_map[pred] if label in keep_labels: evidence_sents.append((label, s)) return evidence_sents # --- Europe PMC retrieval --- def retrieve_europepmc_abstracts_simple(text, n=TOP_ABSTRACTS): query = get_keybert_query(text, top_n=7) print("Trying Europe PMC query:", query) url = f'https://www.ebi.ac.uk/europepmc/webservices/rest/search?query={query}&resulttype=core&format=json&pageSize={n}' r = requests.get(url) results = r.json().get('resultList', {}).get('result', []) titles = [res.get('title', '') for res in results] abstracts = [res.get('abstractText', '') for res in results] return titles, abstracts # --- Utility: get robust keybert-style query --- def get_keybert_query(text, top_n=10): doc = scispacy(text) phrases = [ent.text for ent in doc.ents] if not phrases: phrases = [chunk.text for chunk in doc.noun_chunks] phrases = list(set([ph.strip() for ph in phrases if len(ph) > 2])) if not phrases: return "" doc_emb = sbert_keybert.encode([text]) phrase_embs = sbert_keybert.encode(phrases) sims = np.array(util.pytorch_cos_sim(doc_emb, phrase_embs))[0] top_idxs = sims.argsort()[-top_n:] keywords = [phrases[i] for i in top_idxs] query = " OR ".join(f'"{kw}"' for kw in keywords) return query # --- Claim extraction --- indicator_phrases = [ "found that", "findings suggest", "shows that", "showed that", "demonstrated", "demonstrates", "revealed", "reveals", "suggests", "suggested", "indicated", "indicates", "reported", "reports", "was reported", "concluded", "concludes", "conclusion", "authors state", "stated", "data suggest", "observed", "observes", "study suggests", "study shows", "study found", "researchers found", "results indicate", "results show", "confirmed", "confirm", "confirming", "point to", "documented", "document", "evidence of", "evidence suggests", "associated with", "correlated with", "link between", "linked to", "relationship between", "was linked", "connected to", "relationship with", "tied to", "association with", "increase", "increases", "increased", "decrease", "decreases", "decreased", "greater risk", "lower risk", "higher risk", "reduced risk", "raises the risk", "reduces the risk", "risk of", "risk for", "likelihood of", "probability of", "chance of", "rate of", "incidence of", "prevalence of", "mortality", "survival rate", "death rate", "odds of", "number of", "percentage of", "percent of", "caused by", "causes", "cause", "resulted in", "results in", "leads to", "led to", "contributed to", "responsible for", "due to", "as a result", "because of", "randomized controlled trial", "RCT", "clinical trial", "participants", "enrolled", "sample size", "statistically significant", "compared to", "compared with", "versus", "compared against", "more than", "less than", "greater than", "lower than", "higher than", "significantly higher", "significantly lower", "significantly increased", "significantly decreased", "significant difference", "effect of", "impact of", "influence of", "predictor of", "predicts", "predictive of", "factor for", "determinant of", "plays a role in", "contributes to", "related to", "affects", "influences", "difference between", "according to", "a recent study", "researchers from" ] def extract_claims_pattern(article_text): sentences = sent_tokenize(article_text) claims = [ s for s in sentences if any(phrase in s.lower() for phrase in indicator_phrases) or re.search(r"\b\d+(\.\d+)?%?\b", s) ] return list(dict.fromkeys(claims)) def match_claims_to_headline(claims, headline): emb_model = sbert_keybert headline_emb = emb_model.encode([headline]) claim_embs = emb_model.encode(claims) sims = util.pytorch_cos_sim(headline_emb, claim_embs)[0] matched_claims = [claim for claim, sim in zip(claims, sims) if sim >= 0.6] if not matched_claims and claims: idxs = np.argsort(-sims.cpu().numpy())[:min(3, len(claims))] matched_claims = [claims[i] for i in idxs] return matched_claims # --- Semantic reranking --- def semantic_rerank_claim_abstracts(claim, titles, abstracts, top_k=TOP_ABSTRACTS): doc_texts = [f"{t}. {a}" for t, a in zip(titles, abstracts)] doc_embs = sbert_rerank.encode(doc_texts) claim_emb = sbert_rerank.encode([claim]) sims = util.pytorch_cos_sim(claim_emb, doc_embs)[0] idxs = np.argsort(-sims.cpu().numpy())[:top_k] return [titles[i] for i in idxs], [abstracts[i] for i in idxs] # --- NLI evidence extraction (run only on results/conclusion sentences) --- def extract_evidence_nli(claim, evidence_sentences): evidence = [] for sent in evidence_sentences: encoding = nli_tokenizer( sent, claim, return_tensors='pt', truncation=True, max_length=256, padding=True ) with torch.no_grad(): outputs = nli_model(**encoding) probs = torch.softmax(outputs.logits, dim=1).cpu().numpy().flatten() max_idx = probs.argmax() label = NLI_LABELS[max_idx] score = float(probs[max_idx]) evidence.append({ "sentence": sent, "label": label, "score": score }) return evidence # --- Summarizer model options (now with Mistral API!) --- model_options = { "Mistral Small (API, fast/free)": "mistral-small-2503", "Llama-3.2-1B-Instruct (Meta, gated)": "meta-llama/Llama-3.2-1B-Instruct", "Gemma-3-1B-it (Google, gated)": "google/gemma-3-1b-it", "TinyLlama-1.1B-Chat (Open)": "TinyLlama/TinyLlama-1.1B-Chat-v1.0" } pipe_cache = {} # --- Mistral API summarization --- def summarize_with_mistral_api(prompt, model_name="mistral-small", max_tokens=128, temperature=0.1): api_key = os.getenv("MISTRAL_API_KEY") if not api_key: return "Missing MISTRAL_API_KEY secret/env variable!" endpoint = "https://api.mistral.ai/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } data = { "model": model_name, "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": temperature } response = requests.post(endpoint, headers=headers, json=data, timeout=30) if response.status_code == 200: content = response.json()["choices"][0]["message"]["content"] return content.strip() else: return f"API Error ({response.status_code}): {response.text}" def get_summarizer(model_choice): model_id = model_options[model_choice] if model_id in pipe_cache: return pipe_cache[model_id] kwargs = { "model": model_id, "torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32, "device_map": "auto", "max_new_tokens": 128 } if any(gated in model_id for gated in ["meta-llama", "gemma"]): hf_token = os.environ.get("HF_TOKEN", None) if hf_token: kwargs["token"] = hf_token else: raise RuntimeError(f"Model '{model_choice}' requires a Hugging Face access token. Please set 'HF_TOKEN' as a Space secret or environment variable.") pipe_cache[model_id] = pipeline("text-generation", **kwargs) return pipe_cache[model_id] def summarize_evidence_llm(claim, evidence_list, model_choice): support = [ev['sentence'] for ev in evidence_list if ev['label'] == 'ENTAILMENT'] contradict = [ev['sentence'] for ev in evidence_list if ev['label'] == 'CONTRADICTION'] user_prompt = ( f"Claim: {claim}\n" f"Supporting evidence:\n" + ("\n".join(support) if support else "None") + "\n" f"Contradicting evidence:\n" + ("\n".join(contradict) if contradict else "None") + "\n" "Explain to a layperson: Is this claim likely true, false, or uncertain based on the evidence above? Give a brief and simple explanation in 2-3 sentences." ) if model_choice in [ "Mistral Small (API, fast/free)", "Mistral Medium (API, free tier)", "Mistral Large (API, may require paid)" ]: mistral_name = model_options[model_choice] return summarize_with_mistral_api(user_prompt, model_name=mistral_name) try: pipe = get_summarizer(model_choice) outputs = pipe( [ {"role": "system", "content": "You are a helpful biomedical assistant. Summarize scientific evidence in plain English for the general public."}, {"role": "user", "content": user_prompt} ], max_new_tokens=128, do_sample=False, temperature=0.1, ) out = outputs[0]["generated_text"] if isinstance(out, list) and "content" in out[-1]: return out[-1]["content"].strip() return out.strip() except Exception as e: return f"Summary could not be generated: {e}" def format_evidence_html(evidence_list): color_map = {"ENTAILMENT":"#e6ffe6", "CONTRADICTION":"#ffe6e6", "NEUTRAL":"#f8f8f8"} html = "" for ev in evidence_list: color = color_map[ev["label"]] html += ( f'