Spaces:
Running
Running
import os | |
import json | |
from llama_cpp import Llama | |
import re | |
from huggingface_hub import hf_hub_download | |
import random | |
class EmailGenerator: | |
def __init__(self, custom_model_path=None): | |
self.model = None | |
self.model_path = custom_model_path or self._download_model() | |
self._load_model() | |
self.prompt_templates = self._load_prompt_templates() | |
def _download_model(self): | |
"""Download Vicuna-7B GGUF model from Hugging Face""" | |
try: | |
model_name = "TheBloke/vicuna-7B-v1.5-GGUF" | |
filename = "vicuna-7b-v1.5.Q4_K_M.gguf" | |
print("Downloading Vicuna-7B model... This may take a while.") | |
model_path = hf_hub_download( | |
repo_id=model_name, | |
filename=filename, | |
cache_dir="./models" | |
) | |
print(f"Model downloaded to: {model_path}") | |
return model_path | |
except Exception as e: | |
print(f"Error downloading model: {e}") | |
return None | |
def _load_model(self): | |
"""Load the GGUF model using llama-cpp-python""" | |
try: | |
if self.model_path and os.path.exists(self.model_path): | |
print(f"π€ Loading language model from: {self.model_path}") | |
self.model = Llama( | |
model_path=self.model_path, | |
n_ctx=2048, # Context length | |
n_threads=2, # Reduced for stability | |
n_batch=512, # Batch size | |
verbose=False, | |
use_mmap=True, # Memory mapping for efficiency | |
use_mlock=False # Don't lock memory | |
) | |
print("β Model loaded successfully!") | |
# Test the model with a simple prompt | |
test_response = self.model("Test", max_tokens=5, temperature=0.1) | |
if test_response and 'choices' in test_response: | |
print("β Model test successful") | |
else: | |
print("β οΈ Model test failed, will use fallback") | |
self.model = None | |
else: | |
print("β No valid model path found. Using advanced fallback generation.") | |
self.model = None | |
except Exception as e: | |
print(f"β Error loading model: {e}") | |
print("π Will use advanced fallback generation system") | |
self.model = None | |
def _generate_with_model(self, prompt, max_tokens=250, temperature=0.7): | |
"""Generate text using the loaded model with retry logic""" | |
try: | |
if self.model: | |
# First attempt | |
response = self.model( | |
prompt, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=0.9, | |
stop=["</s>", "\n\n\n", "EXAMPLE", "Now write"], | |
echo=False | |
) | |
result = response['choices'][0]['text'].strip() | |
# Check if result is valid | |
if self._is_valid_output(result): | |
return result | |
# Retry with different temperature if first attempt failed | |
print("First attempt failed, retrying with adjusted parameters...") | |
response = self.model( | |
prompt, | |
max_tokens=max_tokens, | |
temperature=min(temperature + 0.2, 1.0), | |
top_p=0.8, | |
stop=["</s>", "\n\n\n", "EXAMPLE", "Now write"], | |
echo=False | |
) | |
return response['choices'][0]['text'].strip() | |
else: | |
return self._fallback_generation(prompt) | |
except Exception as e: | |
print(f"Error generating with model: {e}") | |
return self._fallback_generation(prompt) | |
def _is_valid_output(self, output): | |
"""Check if the generated output is valid""" | |
if not output or len(output) < 20: | |
return False | |
# Check for incomplete JSON | |
if '{' in output and '}' not in output: | |
return False | |
# Check for common failure patterns | |
failure_patterns = [ | |
'I cannot', 'I apologize', 'I\'m sorry', | |
'[Your Name]', '[Company]', '[Product]', | |
'EXAMPLE', 'Now write' | |
] | |
return not any(pattern in output for pattern in failure_patterns) | |
def _parse_json_response(self, response): | |
"""Parse JSON response from the model""" | |
try: | |
# Clean up the response | |
response = response.strip() | |
# Extract JSON if it's embedded in text | |
json_match = re.search(r'\{[^}]*"subject"[^}]*\}', response, re.DOTALL) | |
if json_match: | |
response = json_match.group(0) | |
# Parse JSON | |
data = json.loads(response) | |
subject = data.get('subject', '').strip() | |
body = data.get('body', '').strip() | |
# Clean up quotes and formatting | |
subject = subject.strip('"\'') | |
body = body.strip('"\'') | |
return subject, body | |
except (json.JSONDecodeError, KeyError) as e: | |
print(f"JSON parsing error: {e}") | |
return self._extract_fallback_content(response) | |
def _extract_fallback_content(self, response): | |
"""Extract subject and body from non-JSON response""" | |
lines = response.split('\n') | |
subject = "" | |
body = "" | |
# Look for subject line | |
for line in lines: | |
if any(word in line.lower() for word in ['subject:', 'subj:', 'sub:']): | |
subject = re.sub(r'^[^:]*:', '', line).strip() | |
break | |
# Look for body | |
body_started = False | |
body_lines = [] | |
for line in lines: | |
if body_started: | |
if line.strip(): | |
body_lines.append(line.strip()) | |
elif any(word in line.lower() for word in ['body:', 'email:', 'hi ', 'dear ', 'hello ']): | |
body_started = True | |
clean_line = re.sub(r'^[^:]*:', '', line).strip() | |
if clean_line and not clean_line.lower().startswith(('body', 'email')): | |
body_lines.append(clean_line) | |
body = '\n'.join(body_lines) if body_lines else response | |
# Fallback if parsing failed | |
if not subject: | |
subject = f"Partnership opportunity" | |
if not body or len(body) < 20: | |
body = "Hi,\n\nI'd love to explore how we can help your business grow.\n\nInterested in a quick call?\n\nBest regards" | |
return subject, body | |
def _advanced_fallback_generation(self, name, company, company_info, tone="Professional"): | |
"""Advanced fallback with company-specific personalization""" | |
# Extract industry and key details from company info | |
industry_hints = self._extract_industry_details(company_info) | |
# Create tone-specific templates | |
if tone.lower() == "friendly": | |
templates = [ | |
{ | |
"subject": f"Love what {company} is doing{industry_hints['subject_suffix']}", | |
"body": f"Hi {name},\n\nJust came across {company}{industry_hints['context']} - really impressive work!\n\nWe've helped similar {industry_hints['industry']} companies {industry_hints['benefit']}. Mind if I share a quick example?\n\n15-minute call work for you?\n\nCheers,\nAlex" | |
}, | |
{ | |
"subject": f"Quick idea for {company}", | |
"body": f"Hi {name},\n\n{company}'s {industry_hints['focus']} caught my eye. We just helped a similar company {industry_hints['specific_result']}.\n\nWorth exploring for {company}?\n\nBest,\nSam" | |
} | |
] | |
elif tone.lower() == "direct": | |
templates = [ | |
{ | |
"subject": f"{company} + {industry_hints['solution']}?", | |
"body": f"Hi {name},\n\n{industry_hints['direct_opener']} for {company}.\n\nResult: {industry_hints['specific_result']}.\n\nInterested? 10-minute call?\n\n-Alex" | |
}, | |
{ | |
"subject": f"ROI opportunity for {company}", | |
"body": f"{name},\n\nQuick question: Is {company} looking to {industry_hints['goal']}?\n\nWe reduced costs by 35% for a similar {industry_hints['industry']} company.\n\nWorth a conversation?\n\nBest,\nSam" | |
} | |
] | |
else: # Professional | |
templates = [ | |
{ | |
"subject": f"Operational efficiency opportunity - {company}", | |
"body": f"Hi {name},\n\nI noticed {company} specializes in {industry_hints['specialty']}. We recently helped a similar organization {industry_hints['professional_result']}.\n\nWould you be open to a brief conversation about how this might apply to {company}?\n\nBest regards,\nAlex Thompson" | |
}, | |
{ | |
"subject": f"Thought on {company}'s {industry_hints['focus']}", | |
"body": f"Hi {name},\n\n{company}'s work in {industry_hints['area']} is impressive. We've developed solutions that help {industry_hints['industry']} companies {industry_hints['benefit']}.\n\nWould you be interested in a 15-minute discussion about potential applications for {company}?\n\nBest regards,\nSarah Chen" | |
} | |
] | |
template = random.choice(templates) | |
return template["subject"], template["body"] | |
def _extract_industry_details(self, company_info): | |
"""Extract industry-specific details for personalization""" | |
info_lower = company_info.lower() if company_info else "" | |
if any(word in info_lower for word in ['tech', 'software', 'saas', 'ai', 'digital']): | |
return { | |
'industry': 'tech', | |
'specialty': 'technology solutions', | |
'focus': 'innovation', | |
'area': 'technology', | |
'benefit': 'scale their platforms and reduce technical debt', | |
'goal': 'optimize your development pipeline', | |
'solution': 'DevOps automation', | |
'context': ' and their tech stack', | |
'subject_suffix': ' with tech', | |
'direct_opener': 'We implemented automated testing', | |
'specific_result': 'reduced deployment time by 60%', | |
'professional_result': 'achieve 40% faster time-to-market for new features' | |
} | |
elif any(word in info_lower for word in ['manufactur', 'industrial', 'equipment', 'materials']): | |
return { | |
'industry': 'manufacturing', | |
'specialty': 'industrial operations', | |
'focus': 'production efficiency', | |
'area': 'manufacturing', | |
'benefit': 'optimize their production lines and reduce waste', | |
'goal': 'increase production efficiency', | |
'solution': 'process optimization', | |
'context': ' and their manufacturing capabilities', | |
'subject_suffix': ' in manufacturing', | |
'direct_opener': 'We streamlined production workflows', | |
'specific_result': 'increased throughput by 45%', | |
'professional_result': 'achieve 30% improvement in production efficiency' | |
} | |
elif any(word in info_lower for word in ['health', 'medical', 'pharma', 'clinical']): | |
return { | |
'industry': 'healthcare', | |
'specialty': 'healthcare solutions', | |
'focus': 'patient outcomes', | |
'area': 'healthcare', | |
'benefit': 'improve patient outcomes while reducing costs', | |
'goal': 'enhance patient care efficiency', | |
'solution': 'workflow optimization', | |
'context': ' and their patient care approach', | |
'subject_suffix': ' in healthcare', | |
'direct_opener': 'We optimized patient flow systems', | |
'specific_result': 'reduced wait times by 50%', | |
'professional_result': 'achieve 25% improvement in patient satisfaction scores' | |
} | |
else: | |
return { | |
'industry': 'business', | |
'specialty': 'business operations', | |
'focus': 'growth', | |
'area': 'operations', | |
'benefit': 'streamline operations and drive growth', | |
'goal': 'scale your operations', | |
'solution': 'process optimization', | |
'context': ' and their business model', | |
'subject_suffix': '', | |
'direct_opener': 'We automated key business processes', | |
'specific_result': 'increased efficiency by 40%', | |
'professional_result': 'achieve 35% operational cost reduction' | |
} | |
def _load_prompt_templates(self): | |
"""Load sophisticated prompt templates for different use cases""" | |
return { | |
"few_shot_template": '''You are an elite B2B sales copywriter. Write ONE personalized cold email that sounds natural and converts. | |
<examples> | |
EXAMPLE 1: | |
SUBJECT: Quick question about Acme's EU expansion | |
BODY: Hi Sarah, | |
Saw Acme just launched in Berlin β congrats! We helped Contoso reduce their GDPR compliance prep by 68% with a simple automation. | |
Worth a 10-minute chat about how this could apply to your EU rollout? | |
Best, | |
Alex | |
EXAMPLE 2: | |
SUBJECT: Thought on TechCorp's materials testing | |
BODY: Hi John, | |
Noticed TechCorp specializes in X-ray spectroscopy equipment. We just helped a similar lab increase throughput 40% with workflow optimization. | |
Mind if I share what worked for them? 15-minute call? | |
Best, | |
Sam | |
EXAMPLE 3: | |
SUBJECT: Manufacturing efficiency idea for IndustrialCorp | |
BODY: Hi Mike, | |
IndustrialCorp's production line setup caught my attention. We automated similar processes for MetalWorks, reducing their cycle time by 35%. | |
Open to a brief conversation about applications for your facility? | |
Best regards, | |
Jennifer | |
</examples> | |
Now write an email for: | |
Name: {name} | |
Company: {company} | |
Company Info: {company_context} | |
Tone: {tone} | |
Requirements: | |
- Use the company info naturally in the first 2 lines | |
- Maximum 70 words in body (excluding signature) | |
- Clear yes/no question at the end | |
- No placeholders like [Your Name] or [Company] | |
- Professional but conversational | |
- Include specific benefit or result if possible | |
Return ONLY this JSON format: | |
{{"subject": "...", "body": "..."}}''', | |
"industry_specific": { | |
"technology": '''Write a cold email for a tech company. Focus on efficiency, scalability, and competitive advantage.''', | |
"healthcare": '''Write a cold email for a healthcare company. Focus on patient outcomes, compliance, and cost reduction.''', | |
"manufacturing": '''Write a cold email for a manufacturing company. Focus on production efficiency, quality, and cost savings.''', | |
"services": '''Write a cold email for a service company. Focus on client satisfaction, process improvement, and growth.''', | |
"default": '''Write a cold email that focuses on business growth and operational efficiency.''' | |
} | |
} | |
def _extract_industry(self, company_info): | |
"""Extract industry type from company information""" | |
company_lower = company_info.lower() | |
if any(word in company_lower for word in ['tech', 'software', 'saas', 'ai', 'digital', 'app', 'platform']): | |
return 'technology' | |
elif any(word in company_lower for word in ['health', 'medical', 'pharma', 'hospital', 'clinic']): | |
return 'healthcare' | |
elif any(word in company_lower for word in ['manufactur', 'factory', 'production', 'industrial', 'equipment']): | |
return 'manufacturing' | |
elif any(word in company_lower for word in ['service', 'consulting', 'agency', 'firm']): | |
return 'services' | |
else: | |
return 'default' | |
def _create_company_context(self, company, company_info): | |
"""Create focused company context for the prompt""" | |
# Extract key information and clean it up | |
context_parts = [] | |
if company_info and len(company_info) > 10: | |
# Extract meaningful phrases | |
sentences = re.split(r'[.!?]+', company_info) | |
for sentence in sentences[:3]: # First 3 sentences | |
sentence = sentence.strip() | |
if len(sentence) > 20 and not sentence.startswith('Title:'): | |
# Remove common fluff words | |
sentence = re.sub(r'Description:\s*', '', sentence) | |
sentence = re.sub(r'Company Website:\s*', '', sentence) | |
sentence = re.sub(r'LinkedIn:\s*', '', sentence) | |
if sentence: | |
context_parts.append(sentence) | |
if not context_parts: | |
context_parts.append(f"{company} is a company in their industry") | |
return ' | '.join(context_parts[:2]) # Max 2 key points | |
def generate_email(self, name, company, company_info, tone="Professional", temperature=0.7): | |
"""Generate both subject and email body using advanced prompting""" | |
# Clean up and prepare context | |
company_context = self._create_company_context(company, company_info) | |
# Calibrate temperature for production readiness | |
if temperature > 1.0: | |
temperature = 0.8 # Cap at 0.8 for production readiness | |
# Try AI generation first | |
if self.model: | |
try: | |
# Build the prompt using few-shot template | |
prompt = self.prompt_templates["few_shot_template"].format( | |
name=name, | |
company=company, | |
company_context=company_context, | |
tone=tone.lower() | |
) | |
# Generate with model | |
response = self._generate_with_model(prompt, max_tokens=200, temperature=temperature) | |
# Parse the response | |
subject, body = self._parse_json_response(response) | |
# Polish the content | |
subject, body = self._polish_email_content(subject, body) | |
# Validate quality | |
quality_score, issues = self._validate_email_quality(subject, body, name, company) | |
# If quality is good enough, return it | |
if quality_score >= 70 and 'placeholders' not in issues: | |
print(f"β AI generated email (Quality: {quality_score}%)") | |
return subject, body | |
else: | |
print(f"β οΈ AI output quality too low ({quality_score}%), using advanced fallback") | |
except Exception as e: | |
print(f"β AI generation failed: {e}, using advanced fallback") | |
# Use advanced fallback system | |
print("π Using advanced fallback generation") | |
subject, body = self._advanced_fallback_generation(name, company, company_info, tone) | |
# Always polish fallback content | |
subject, body = self._polish_email_content(subject, body) | |
return subject, body | |
def _clean_subject(self, subject, company): | |
"""Clean and validate subject line""" | |
if not subject or len(subject) < 5: | |
return f"Quick question about {company}" | |
# Remove common prefixes | |
subject = re.sub(r'^(Subject|SUBJECT):\s*', '', subject, flags=re.IGNORECASE) | |
subject = subject.strip('"\'') | |
# Ensure reasonable length | |
if len(subject) > 60: | |
subject = subject[:57] + "..." | |
return subject | |
def _clean_body(self, body, name): | |
"""Clean and validate email body""" | |
if not body or len(body) < 20: | |
return f"Hi {name},\n\nI'd love to discuss how we can help your business grow.\n\nInterested in a quick call?\n\nBest regards" | |
# Remove common prefixes | |
body = re.sub(r'^(Body|BODY|Email|EMAIL):\s*', '', body, flags=re.IGNORECASE) | |
# Ensure proper greeting | |
if not body.lower().startswith(('hi ', 'hello ', 'dear ')): | |
body = f"Hi {name},\n\n{body}" | |
# Ensure proper closing | |
closing_patterns = ['best regards', 'best,', 'sincerely', 'regards,', 'cheers,'] | |
has_closing = any(pattern in body.lower() for pattern in closing_patterns) | |
if not has_closing: | |
if not body.endswith('\n'): | |
body += '\n' | |
body += '\nBest regards' | |
return body | |
def _polish_email_content(self, subject, body): | |
"""Polish email content for grammar and professionalism""" | |
# Fix common grammar issues | |
body = re.sub(r'\s+', ' ', body) # Multiple spaces | |
body = re.sub(r'([.!?])\s*([a-z])', r'\1 \2', body) # Space after punctuation | |
body = re.sub(r'(\w)\s*\n\s*(\w)', r'\1\n\n\2', body) # Proper paragraph spacing | |
# Ensure professional closing | |
if not re.search(r'(Best regards|Best|Sincerely|Cheers),?\s*\n?[A-Z][a-z]+', body): | |
if body.strip().endswith(','): | |
body = body.strip() + '\n\nBest regards,\nAlex' | |
else: | |
body = body.strip() + '\n\nBest regards,\nAlex' | |
# Fix subject line | |
subject = subject.strip() | |
if len(subject) > 65: | |
subject = subject[:62] + "..." | |
# Capitalize first letter of subject if not already | |
if subject and subject[0].islower(): | |
subject = subject[0].upper() + subject[1:] | |
return subject, body | |
def _validate_email_quality(self, subject, body, name, company): | |
"""Validate email quality and return quality score""" | |
issues = [] | |
# Check subject length | |
if len(subject) < 10 or len(subject) > 65: | |
issues.append("subject_length") | |
# Check body length | |
words = len(body.split()) | |
if words < 20 or words > 150: | |
issues.append("body_length") | |
# Check for placeholders | |
if '[Your Name]' in body or '[Company]' in body or '{{' in body: | |
issues.append("placeholders") | |
# Check personalization | |
if name not in body or company not in body: | |
issues.append("personalization") | |
# Check for call-to-action | |
cta_phrases = ['call', 'conversation', 'chat', 'discuss', 'talk', 'meeting', 'connect'] | |
if not any(phrase in body.lower() for phrase in cta_phrases): | |
issues.append("no_cta") | |
quality_score = max(0, 100 - (len(issues) * 15)) | |
return quality_score, issues | |
def generate_multiple_variations(self, name, company, company_info, num_variations=3, tone="Professional"): | |
"""Generate multiple email variations with different approaches""" | |
variations = [] | |
tones = ["Professional", "Friendly", "Direct"] | |
temperatures = [0.6, 0.7, 0.8] | |
for i in range(num_variations): | |
current_tone = tones[i % len(tones)] | |
current_temp = temperatures[i % len(temperatures)] | |
subject, email_body = self.generate_email( | |
name, company, company_info, | |
tone=current_tone, temperature=current_temp | |
) | |
variations.append({ | |
'variation': i + 1, | |
'tone': current_tone, | |
'temperature': current_temp, | |
'subject': subject, | |
'email_body': email_body | |
}) | |
return variations | |
def generate_email_v2(self, recipient_name, recipient_email, company_name, company_data, tone="professional", temperature=0.7): | |
"""Compatibility method for different calling signatures""" | |
# Extract company info from company_data if it's a dict | |
if isinstance(company_data, dict): | |
company_info = company_data.get('description', f"Company: {company_name}") | |
else: | |
company_info = str(company_data) if company_data else f"Company: {company_name}" | |
# Call the main generate_email method | |
subject, body = self.generate_email( | |
name=recipient_name, | |
company=company_name, | |
company_info=company_info, | |
tone=tone, | |
temperature=temperature | |
) | |
# Return in the expected format | |
return { | |
'subject': subject, | |
'content': body, | |
'quality_score': 8.0 | |
} | |