File size: 10,897 Bytes
d9cfebf
19103d4
d9cfebf
 
 
 
 
 
 
 
 
 
 
 
 
19103d4
d9cfebf
19103d4
d9cfebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19103d4
d9cfebf
 
 
 
 
 
19103d4
d9cfebf
19103d4
d9cfebf
 
 
 
 
 
 
 
 
 
19103d4
 
d9cfebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19103d4
d9cfebf
 
 
 
 
 
 
19103d4
d9cfebf
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# document_analyzer.py
# Enhanced document analysis module for healthcare fraud detection with Llama 4 (text-only)

import torch
import re
from typing import List, Dict, Any
import nltk
from nltk.tokenize import sent_tokenize

try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

class HealthcareFraudAnalyzer:
    def __init__(self, model, tokenizer, device=None):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device if device else "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)
        self.model.eval()
        
        self.fraud_categories = [
            "Consent violations",
            "Documentation issues",
            "Visitation restrictions",
            "Medication misuse",
            "Chemical restraint",
            "Fraudulent billing",
            "False testimony",
            "Information concealment",
            "Patient neglect",
            "Hospice certification issues"
        ]
        
        self.key_terms = {
            "medication": ["haloperidol", "lorazepam", "sedation", "chemical", "restraint", 
                         "prn", "as needed", "antipsychotic", "sedative", "benadryl", 
                         "ativan", "seroquel", "comfort kit", "medication"],
            "documentation": ["record", "documentation", "log", "chart", "note", "missing", 
                           "altered", "backdated", "omit", "selective", "inconsistent"],
            "visitation": ["visit", "restriction", "limit", "family", "spouse", "access", 
                         "barrier", "monitor", "disruptive", "uncooperative"],
            "consent": ["consent", "authorize", "approval", "permission", "against wishes", 
                     "refused", "decline", "without knowledge"],
            "hospice": ["hospice", "terminal", "end of life", "palliative", "comfort care", 
                      "six months", "6 months", "prognosis", "certification"],
            "billing": ["charge", "bill", "payment", "medicare", "medicaid", "insurance", 
                      "reimbursement", "fee", "additional", "extra"]
        }
    
    def chunk_document(self, text: str, chunk_size: int = 1024, overlap: int = 256) -> List[str]:
        sentences = sent_tokenize(text)
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk) + len(sentence) <= chunk_size:
                current_chunk += sentence + " "
            else:
                chunks.append(current_chunk.strip())
                overlap_start = max(0, len(current_chunk) - overlap)
                current_chunk = current_chunk[overlap_start:] + sentence + " "
        
        if current_chunk.strip():
            chunks.append(current_chunk.strip())
            
        return chunks
    
    def analyze_chunk(self, chunk: str) -> Dict[str, Any]:
        prompt = f"""<s>[INST] Analyze the following healthcare document text for evidence of fraud, neglect, abuse, or criminal conduct.
Focus on: {', '.join(self.fraud_categories)}.
Provide specific indicators and cite the relevant text.

DOCUMENT TEXT:
{chunk}

ANALYSIS: [/INST]"""
        
        inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=2048).to(self.device)
        
        with torch.no_grad():
            output = self.model.generate(
                **inputs,
                max_new_tokens=512,
                temperature=0.1,
                top_p=0.9,
                repetition_penalty=1.2
            )
        
        response = self.tokenizer.decode(output[0], skip_special_tokens=True)
        analysis = response.split("ANALYSIS:")[-1].strip()
        
        term_matches = self._find_key_terms(chunk)
        
        return {
            "analysis": analysis,
            "term_matches": term_matches,
            "chunk_text": chunk[:200] + "..." if len(chunk) > 200 else chunk
        }
    
    def _find_key_terms(self, text: str) -> Dict[str, List[str]]:
        text = text.lower()
        results = {}
        
        for category, terms in self.key_terms.items():
            matches = []
            for term in terms:
                pattern = r'.{0,50}' + re.escape(term) + r'.{0,50}'
                for match in re.finditer(pattern, text):
                    matches.append("..." + match.group(0) + "...")
            
            if matches:
                results[category] = matches
                
        return results
    
    def analyze_document(self, document_text: str) -> Dict[str, Any]:
        document_text = document_text.replace('\n', ' ').replace('\r', ' ')
        document_text = re.sub(r'\s+', ' ', document_text)
        
        chunks = self.chunk_document(document_text)
        chunk_analyses = [self.analyze_chunk(chunk) for chunk in chunks]
        consolidated_findings = self._consolidate_analyses(chunk_analyses)
        
        return {
            "summary": self._generate_summary(consolidated_findings, document_text),
            "detailed_findings": consolidated_findings,
            "chunk_analyses": chunk_analyses,
            "document_metadata": {
                "length": len(document_text),
                "chunk_count": len(chunks)
            }
        }
    
    def _consolidate_analyses(self, chunk_analyses: List[Dict[str, Any]]) -> Dict[str, Any]:
        all_term_matches = {category: [] for category in self.key_terms.keys()}
        
        for analysis in chunk_analyses:
            for category, matches in analysis.get("term_matches", {}).items():
                all_term_matches[category].extend(matches)
        
        for category in all_term_matches:
            if all_term_matches[category]:
                deduplicated = []
                for match in all_term_matches[category]:
                    if not any(match in other and match != other for other in all_term_matches[category]):
                        deduplicated.append(match)
                all_term_matches[category] = deduplicated[:5]
        
        categorized_findings = {category: [] for category in self.fraud_categories}
        
        for analysis in chunk_analyses:
            analysis_text = analysis.get("analysis", "")
            for category in self.fraud_categories:
                if category.lower() in analysis_text.lower():
                    sentences = sent_tokenize(analysis_text)
                    relevant = [s for s in sentences if category.lower() in s.lower()]
                    if relevant:
                        categorized_findings[category].extend(relevant)
        
        return {
            "term_matches": all_term_matches,
            "categorized_findings": categorized_findings
        }
    
    def _generate_summary(self, findings: Dict[str, Any], full_text: str) -> str:
        indicator_counts = {
            category: len(findings["categorized_findings"].get(category, []))
            for category in self.fraud_categories
        }
        
        term_match_counts = {
            category: len(matches) 
            for category, matches in findings["term_matches"].items()
        }
        
        sorted_categories = sorted(
            self.fraud_categories,
            key=lambda x: indicator_counts.get(x, 0) + term_match_counts.get(x, 0),
            reverse=True
        )
        
        summary_lines = ["# Healthcare Fraud Detection Analysis", ""]
        summary_lines.append("## Key Concerns Identified")
        
        for category in sorted_categories[:3]:
            if indicator_counts.get(category, 0) > 0 or term_match_counts.get(category, 0) > 0:
                summary_lines.append(f"### {category}")
                
                if findings["categorized_findings"].get(category):
                    summary_lines.append("Model analysis indicates:")
                    for finding in findings["categorized_findings"].get(category, [])[:3]:
                        summary_lines.append(f"- {finding}")
                
                category_lower = category.lower().rstrip('s')
                for term_category, matches in findings["term_matches"].items():
                    if category_lower in term_category.lower() and matches:
                        summary_lines.append(f"Key terms identified:")
                        for match in matches[:3]:
                            summary_lines.append(f"- {match}")
                
                summary_lines.append("")
        
        summary_lines.append("## Recommended Actions")
        if sum(indicator_counts.values()) > 5:
            summary_lines.append("- **Urgent review recommended** - Multiple indicators of potential fraud detected")
            summary_lines.append("- Consider referral to appropriate regulatory authorities")
            summary_lines.append("- Document preservation should be prioritized")
        elif sum(indicator_counts.values()) > 2:
            summary_lines.append("- **Further investigation recommended** - Several potential indicators identified")
            summary_lines.append("- Conduct interviews with involved personnel")
            summary_lines.append("- Secure additional documentation for verification")
        else:
            summary_lines.append("- **Monitor situation** - Limited indicators detected")
            summary_lines.append("- Consider more specific document analysis")
        
        return "\n".join(summary_lines)
    
    def print_report(self, results: Dict[str, Any]) -> None:
        print("\n" + "="*80)
        print("HEALTHCARE FRAUD DETECTION REPORT")
        print("="*80 + "\n")
        
        print(results["summary"])
        
        print("\n" + "="*80)
        print("DETAILED FINDINGS")
        print("="*80)
        
        for category, findings in results["detailed_findings"]["categorized_findings"].items():
            if findings:
                print(f"\n## {category.upper()}")
                for i, finding in enumerate(findings, 1):
                    print(f"{i}. {finding}")
        
        print("\n" + "="*80)
        print("KEY TERM MATCHES")
        print("="*80)
        
        for category, matches in results["detailed_findings"]["term_matches"].items():
            if matches:
                print(f"\n## {category.upper()}")
                for match in matches:
                    print(f"- {match}")
                    
        print("\n" + "="*80 + "\n")

def analyze_pdf_for_fraud(pdf_path, model, tokenizer):
    import pdfplumber
    
    with pdfplumber.open(pdf_path) as pdf:
        text = ""
        for page in pdf.pages:
            text += page.extract_text() or ""
    
    analyzer = HealthcareFraudAnalyzer(model, tokenizer)
    results = analyzer.analyze_document(text)
    
    analyzer.print_report(results)
    return results