import json import re import os from pathlib import Path from typing import Dict, List, Optional, Union from pdfminer.high_level import extract_text as pdf_extract_text from docx import Document from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ResumeParser: def __init__(self): self.ner_pipeline = None self.model_loaded = False self._load_model() def _load_model(self): """Load the NER model with error handling and fallbacks""" try: # Try the original model first MODEL_NAME = "manishiitg/resume-ner" logger.info(f"Attempting to load model: {MODEL_NAME}") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForTokenClassification.from_pretrained(MODEL_NAME) self.ner_pipeline = pipeline( "ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple", device=0 if os.environ.get("L4_GPU", "false").lower() == "true" else -1 ) self.model_loaded = True logger.info("Model loaded successfully") except Exception as e: logger.warning(f"Failed to load primary model: {e}") try: # Fallback to a more reliable model MODEL_NAME = "dbmdz/bert-large-cased-finetuned-conll03-english" logger.info(f"Trying fallback model: {MODEL_NAME}") self.ner_pipeline = pipeline( "ner", model=MODEL_NAME, aggregation_strategy="simple", device=0 if os.environ.get("L4_GPU", "false").lower() == "true" else -1 ) self.model_loaded = True logger.info("Fallback model loaded successfully") except Exception as e2: logger.error(f"Failed to load fallback model: {e2}") self.model_loaded = False def extract_text(self, file_path: str) -> str: """Extract text from PDF or DOCX files with error handling""" try: path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if path.suffix.lower() == ".pdf": text = pdf_extract_text(file_path) # Clean up PDF text extraction artifacts text = re.sub(r'\s+', ' ', text).strip() logger.info(f"Extracted {len(text)} characters from PDF") return text elif path.suffix.lower() == ".docx": doc = Document(file_path) text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) logger.info(f"Extracted {len(text)} characters from DOCX") return text else: raise ValueError(f"Unsupported file format: {path.suffix}") except Exception as e: logger.error(f"Error extracting text: {e}") raise def extract_with_regex(self, text: str) -> Dict[str, List[str]]: """Improved regex patterns for extraction""" patterns = { 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'phone': r'(?:\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}', 'skills': r'(?i)(?:skills?|technologies?|tools?|expertise)[:\-\s]*(.*?)(?:\n\n|\n\s*\n|$)', 'education': r'(?i)(?:education|degree|university|college|bachelor|master|phd)[:\-\s]*(.*?)(?:\n\n|\n\s*\n|$)', 'experience': r'(?i)(?:experience|work\shistory|employment|job\shistory)[:\-\s]*(.*?)(?:\n\n|\n\s*\n|$)', 'name': r'^(?!(resume|cv|curriculum vitae|\d))[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+' } results = {} for key, pattern in patterns.items(): matches = re.findall(pattern, text, re.MULTILINE | re.IGNORECASE) if key == 'name' and matches: # Take the first likely name match results[key] = [matches[0].strip()] else: # Clean and filter matches cleaned = [m.strip() for m in matches if m.strip()] if cleaned: results[key] = cleaned return results def extract_name_from_text(self, text: str) -> str: """Improved name extraction heuristics""" # First try to find name using regex name_match = re.search( r'^(?!(resume|cv|curriculum vitae|\d))[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+', text, re.MULTILINE | re.IGNORECASE ) if name_match: return name_match.group(0).strip() # Fallback to line-based approach lines = text.split('\n') for line in lines[:10]: # Check first 10 lines line = line.strip() if line and 2 <= len(line.split()) <= 4: # Check if it looks like a name (not email, phone, etc.) if not re.search(r'[@\d+\-\(\)]', line): if line[0].isupper() and not line.lower().startswith(('resume', 'cv', 'curriculum')): return line return "Not Found" def process_ner_entities(self, entities: List[Dict]) -> Dict[str, List[str]]: """Process NER entities with improved logic""" results = { "name": [], "skills": [], "education": [], "experience": [] } logger.info(f"Processing {len(entities)} entities") for ent in entities: label = ent.get("entity_group", "").upper() value = ent.get("word", "").strip() confidence = ent.get("score", 0) # Skip low confidence entities and empty values if confidence < 0.7 or not value: continue # Normalize labels if label in ["PERSON", "PER", "NAME"]: results["name"].append(value) elif label in ["SKILL", "TECH", "TECHNOLOGY"]: results["skills"].append(value) elif label in ["EDUCATION", "DEGREE", "EDU", "ORG"] and "university" not in value.lower(): results["education"].append(value) elif label in ["EXPERIENCE", "JOB", "ROLE", "POSITION", "WORK"]: results["experience"].append(value) # Deduplicate and clean results for key in results: results[key] = list(dict.fromkeys(results[key])) # Preserve order return results def merge_results(self, ner_results: Dict, regex_results: Dict) -> Dict[str, str]: """Merge NER and regex results intelligently""" merged = { "name": "Not Found", "email": "Not Found", "phone": "Not Found", "skills": "Not Found", "education": "Not Found", "experience": "Not Found" } # Name - prioritize NER, then regex, then text extraction if ner_results.get("name"): merged["name"] = " ".join(ner_results["name"][:1]) # Take first name only elif regex_results.get("name"): merged["name"] = regex_results["name"][0] # Email and phone - only from regex if regex_results.get("email"): merged["email"] = regex_results["email"][0] if regex_results.get("phone"): merged["phone"] = regex_results["phone"][0] # Skills - combine both sources all_skills = [] if ner_results.get("skills"): all_skills.extend(ner_results["skills"]) if regex_results.get("skills"): all_skills.extend(regex_results["skills"]) if all_skills: merged["skills"] = ", ".join(list(dict.fromkeys(all_skills))[:10]) # Limit to 10 skills # Education - combine both sources all_edu = [] if ner_results.get("education"): all_edu.extend(ner_results["education"]) if regex_results.get("education"): all_edu.extend(regex_results["education"]) if all_edu: merged["education"] = ", ".join(list(dict.fromkeys(all_edu))[:3] # Limit to 3 items # Experience - combine both sources all_exp = [] if ner_results.get("experience"): all_exp.extend(ner_results["experience"]) if regex_results.get("experience"): all_exp.extend(regex_results["experience"]) if all_exp: merged["experience"] = ", ".join(list(dict.fromkeys(all_exp))[:3] # Limit to 3 items return merged def parse_resume(self, file_path: str, filename: str = None) -> Dict[str, str]: """Parse resume with multiple extraction methods""" try: # Extract text text = self.extract_text(file_path) if not text or len(text.strip()) < 10: raise ValueError("Extracted text is too short or empty") logger.info(f"Text preview: {text[:200]}...") # Initialize results ner_results = { "name": [], "skills": [], "education": [], "experience": [] } # Method 1: Try NER model if available if self.model_loaded and self.ner_pipeline: try: logger.info("Using NER model for extraction") entities = self.ner_pipeline(text[:5120]) # Limit input size for NER ner_results = self.process_ner_entities(entities) logger.info(f"NER results: {json.dumps(ner_results, indent=2)}") except Exception as e: logger.warning(f"NER extraction failed: {e}") # Method 2: Regex extraction logger.info("Using regex patterns for extraction") regex_results = self.extract_with_regex(text) logger.info(f"Regex results: {json.dumps(regex_results, indent=2)}") # Method 3: Name extraction fallback if not ner_results.get("name") and not regex_results.get("name"): name = self.extract_name_from_text(text) if name != "Not Found": regex_results["name"] = [name] # Merge all results final_results = self.merge_results(ner_results, regex_results) # If name still not found, try filename if final_results["name"] == "Not Found" and filename: # Try to extract name from filename (common pattern: "Firstname Lastname - Resume.pdf") name_from_file = re.sub(r'[-_].*', '', filename).strip() if len(name_from_file.split()) >= 2: final_results["name"] = name_from_file logger.info("Parsing completed successfully") return final_results except Exception as e: logger.error(f"Error parsing resume: {e}") return { "name": "Error", "email": "Error", "phone": "Error", "skills": "Error", "education": "Error", "experience": "Error", "error": str(e) } # Create global instance resume_parser = ResumeParser() def parse_resume(file_path: str, filename: str = None) -> Dict[str, str]: """Main function to parse resume""" return resume_parser.parse_resume(file_path, filename) if __name__ == "__main__": # Test the parser test_file = input("Enter path to resume file: ") if os.path.exists(test_file): results = parse_resume(test_file, os.path.basename(test_file)) print("\nParsing Results:") print(json.dumps(results, indent=2)) else: print("File not found")