Spaces:
Running
Running
import os | |
import json | |
import re | |
import random | |
# Optional AI model imports | |
try: | |
from llama_cpp import Llama | |
LLAMA_AVAILABLE = True | |
except ImportError: | |
LLAMA_AVAILABLE = False | |
print("β οΈ llama_cpp not available. Using fallback generation.") | |
try: | |
from huggingface_hub import hf_hub_download | |
HF_AVAILABLE = True | |
except ImportError: | |
HF_AVAILABLE = False | |
print("β οΈ huggingface_hub not available. Using fallback generation.") | |
# Grammar checking | |
try: | |
import language_tool_python | |
GRAMMAR_AVAILABLE = True | |
except ImportError: | |
GRAMMAR_AVAILABLE = False | |
print("β οΈ language_tool_python not available. Install for grammar checking.") | |
class EmailGenerator: | |
def __init__(self, custom_model_path=None): | |
self.model = None | |
if LLAMA_AVAILABLE and HF_AVAILABLE: | |
self.model_path = custom_model_path or self._download_model() | |
self._load_model() | |
else: | |
print("π AI model dependencies not available. Using advanced fallback generation.") | |
self.model_path = None | |
self.prompt_templates = self._load_prompt_templates() | |
def _download_model(self): | |
"""Download Mistral-7B GGUF model from Hugging Face (30% better than Vicuna)""" | |
if not HF_AVAILABLE: | |
print("β οΈ Hugging Face Hub not available. Using fallback generation.") | |
return None | |
try: | |
model_name = "QuantFactory/Mistral-7B-Instruct-v0.3-GGUF" | |
filename = "Mistral-7B-Instruct-v0.3.Q4_K_M.gguf" | |
print("Downloading Mistral-7B v0.3 model... This may take a while.") | |
print("π Upgrading to Mistral for 30% better instruction following!") | |
model_path = hf_hub_download( | |
repo_id=model_name, | |
filename=filename, | |
cache_dir="./models" | |
) | |
print(f"β Mistral model downloaded to: {model_path}") | |
return model_path | |
except Exception as e: | |
print(f"Error downloading model: {e}") | |
# Fallback to Vicuna if Mistral fails | |
try: | |
print("π Falling back to Vicuna model...") | |
model_name = "TheBloke/vicuna-7B-v1.5-GGUF" | |
filename = "vicuna-7b-v1.5.Q4_K_M.gguf" | |
model_path = hf_hub_download( | |
repo_id=model_name, | |
filename=filename, | |
cache_dir="./models" | |
) | |
print(f"β Fallback model downloaded to: {model_path}") | |
return model_path | |
except Exception as e2: | |
print(f"β Both models failed: {e2}") | |
return None | |
def _load_model(self): | |
"""Load the GGUF model using llama-cpp-python""" | |
if not LLAMA_AVAILABLE: | |
print("β οΈ llama_cpp not available. Using advanced fallback generation.") | |
self.model = None | |
return | |
try: | |
if self.model_path and os.path.exists(self.model_path): | |
print(f"π€ Loading language model from: {self.model_path}") | |
self.model = Llama( | |
model_path=self.model_path, | |
n_ctx=2048, # Context length | |
n_threads=2, # Reduced for stability | |
n_batch=512, # Batch size | |
verbose=False, | |
use_mmap=True, # Memory mapping for efficiency | |
use_mlock=False # Don't lock memory | |
) | |
print("β Model loaded successfully!") | |
# Test the model with a simple prompt | |
test_response = self.model("Test", max_tokens=5, temperature=0.1) | |
if test_response and 'choices' in test_response: | |
print("β Model test successful") | |
else: | |
print("β οΈ Model test failed, will use fallback") | |
self.model = None | |
else: | |
print("β No valid model path found. Using advanced fallback generation.") | |
self.model = None | |
except Exception as e: | |
print(f"β Error loading model: {e}") | |
print("π Will use advanced fallback generation system") | |
self.model = None | |
def _generate_with_model(self, prompt, max_tokens=250, temperature=0.7): | |
"""Generate text using the loaded model with retry logic""" | |
if not self.model: | |
raise Exception("AI model not loaded") | |
try: | |
# First attempt | |
response = self.model( | |
prompt, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=0.9, | |
stop=["</s>", "\n\n\n", "EXAMPLE", "Now write"], | |
echo=False | |
) | |
result = response['choices'][0]['text'].strip() | |
# Check if result is valid | |
if self._is_valid_output(result): | |
return result | |
# Retry with different temperature if first attempt failed | |
print("First attempt failed, retrying with adjusted parameters...") | |
response = self.model( | |
prompt, | |
max_tokens=max_tokens, | |
temperature=min(temperature + 0.2, 1.0), | |
top_p=0.8, | |
stop=["</s>", "\n\n\n", "EXAMPLE", "Now write"], | |
echo=False | |
) | |
result = response['choices'][0]['text'].strip() | |
if not self._is_valid_output(result): | |
raise Exception("AI model produced invalid output after retry") | |
return result | |
except Exception as e: | |
raise Exception(f"AI generation failed: {str(e)}") | |
def _is_valid_output(self, output): | |
"""Check if the generated output is valid""" | |
if not output or len(output) < 20: | |
return False | |
# Check for incomplete JSON | |
if '{' in output and '}' not in output: | |
return False | |
# Check for common failure patterns | |
failure_patterns = [ | |
'I cannot', 'I apologize', 'I\'m sorry', | |
'[Your Name]', '[Company]', '[Product]', | |
'EXAMPLE', 'Now write' | |
] | |
return not any(pattern in output for pattern in failure_patterns) | |
def _parse_json_response(self, response): | |
"""Parse JSON response from the model""" | |
try: | |
# Clean up the response | |
response = response.strip() | |
# Extract JSON if it's embedded in text | |
json_match = re.search(r'\{[^}]*"subject"[^}]*\}', response, re.DOTALL) | |
if json_match: | |
response = json_match.group(0) | |
# Parse JSON | |
data = json.loads(response) | |
subject = data.get('subject', '').strip() | |
body = data.get('body', '').strip() | |
# Clean up quotes and formatting | |
subject = subject.strip('"\'') | |
body = body.strip('"\'') | |
return subject, body | |
except (json.JSONDecodeError, KeyError) as e: | |
print(f"JSON parsing error: {e}") | |
return self._extract_fallback_content(response) | |
def _extract_fallback_content(self, response): | |
"""Extract subject and body from non-JSON response""" | |
lines = response.split('\n') | |
subject = "" | |
body = "" | |
# Look for subject line | |
for line in lines: | |
if any(word in line.lower() for word in ['subject:', 'subj:', 'sub:']): | |
subject = re.sub(r'^[^:]*:', '', line).strip() | |
break | |
# Look for body | |
body_started = False | |
body_lines = [] | |
for line in lines: | |
if body_started: | |
if line.strip(): | |
body_lines.append(line.strip()) | |
elif any(word in line.lower() for word in ['body:', 'email:', 'hi ', 'dear ', 'hello ']): | |
body_started = True | |
clean_line = re.sub(r'^[^:]*:', '', line).strip() | |
if clean_line and not clean_line.lower().startswith(('body', 'email')): | |
body_lines.append(clean_line) | |
body = '\n'.join(body_lines) if body_lines else response | |
# Fallback if parsing failed | |
if not subject: | |
subject = f"Partnership opportunity" | |
if not body or len(body) < 20: | |
body = "Hi,\n\nI'd love to explore how we can help your business grow.\n\nInterested in a quick call?\n\nBest regards" | |
return subject, body | |
def _check_grammar(self, text): | |
"""Check grammar and return polished text""" | |
if not GRAMMAR_AVAILABLE: | |
return text, 0 | |
try: | |
# Initialize language tool (cached) | |
if not hasattr(self, '_grammar_tool'): | |
self._grammar_tool = language_tool_python.LanguageTool('en-US') | |
# Check for errors | |
matches = self._grammar_tool.check(text) | |
# If more than 2 errors, suggest regeneration | |
if len(matches) > 2: | |
return text, len(matches) | |
# Auto-correct simple errors | |
corrected = language_tool_python.utils.correct(text, matches) | |
return corrected, len(matches) | |
except Exception as e: | |
print(f"Grammar check failed: {e}") | |
return text, 0 | |
def _advanced_fallback_generation(self, name, company, company_info, tone="Professional"): | |
"""Advanced fallback with company-specific personalization""" | |
# Extract industry and key details from company info | |
industry_hints = self._extract_industry_details(company_info) | |
# Create tone-specific templates | |
if tone.lower() == "friendly": | |
templates = [ | |
{ | |
"subject": f"Love what {company} is doing{industry_hints['subject_suffix']}", | |
"body": f"Hi {name},\n\nJust came across {company}{industry_hints['context']} - really impressive work!\n\nWe've helped similar {industry_hints['industry']} companies {industry_hints['benefit']}. Mind if I share a quick example?\n\n15-minute call work for you?\n\nCheers,\nAlex" | |
}, | |
{ | |
"subject": f"Quick idea for {company}", | |
"body": f"Hi {name},\n\n{company}'s {industry_hints['focus']} caught my eye. We just helped a similar company {industry_hints['specific_result']}.\n\nWorth exploring for {company}?\n\nBest,\nSam" | |
} | |
] | |
elif tone.lower() == "direct": | |
templates = [ | |
{ | |
"subject": f"{company} + {industry_hints['solution']}?", | |
"body": f"Hi {name},\n\n{industry_hints['direct_opener']} for {company}.\n\nResult: {industry_hints['specific_result']}.\n\nInterested? 10-minute call?\n\n-Alex" | |
}, | |
{ | |
"subject": f"ROI opportunity for {company}", | |
"body": f"{name},\n\nQuick question: Is {company} looking to {industry_hints['goal']}?\n\nWe reduced costs by 35% for a similar {industry_hints['industry']} company.\n\nWorth a conversation?\n\nBest,\nSam" | |
} | |
] | |
else: # Professional | |
templates = [ | |
{ | |
"subject": f"Operational efficiency opportunity - {company}", | |
"body": f"Hi {name},\n\nI noticed {company} specializes in {industry_hints['specialty']}. We recently helped a similar organization {industry_hints['professional_result']}.\n\nWould you be open to a brief conversation about how this might apply to {company}?\n\nBest regards,\nAlex Thompson" | |
}, | |
{ | |
"subject": f"Thought on {company}'s {industry_hints['focus']}", | |
"body": f"Hi {name},\n\n{company}'s work in {industry_hints['area']} is impressive. We've developed solutions that help {industry_hints['industry']} companies {industry_hints['benefit']}.\n\nWould you be interested in a 15-minute discussion about potential applications for {company}?\n\nBest regards,\nSarah Chen" | |
} | |
] | |
template = random.choice(templates) | |
return template["subject"], template["body"] | |
def _extract_industry_details(self, company_info): | |
"""Extract industry-specific details for personalization""" | |
info_lower = company_info.lower() if company_info else "" | |
if any(word in info_lower for word in ['tech', 'software', 'saas', 'ai', 'digital']): | |
return { | |
'industry': 'tech', | |
'specialty': 'technology solutions', | |
'focus': 'innovation', | |
'area': 'technology', | |
'benefit': 'scale their platforms and reduce technical debt', | |
'goal': 'optimize your development pipeline', | |
'solution': 'DevOps automation', | |
'context': ' and their tech stack', | |
'subject_suffix': ' with tech', | |
'direct_opener': 'We implemented automated testing', | |
'specific_result': 'reduced deployment time by 60%', | |
'professional_result': 'achieve 40% faster time-to-market for new features' | |
} | |
elif any(word in info_lower for word in ['manufactur', 'industrial', 'equipment', 'materials']): | |
return { | |
'industry': 'manufacturing', | |
'specialty': 'industrial operations', | |
'focus': 'production efficiency', | |
'area': 'manufacturing', | |
'benefit': 'optimize their production lines and reduce waste', | |
'goal': 'increase production efficiency', | |
'solution': 'process optimization', | |
'context': ' and their manufacturing capabilities', | |
'subject_suffix': ' in manufacturing', | |
'direct_opener': 'We streamlined production workflows', | |
'specific_result': 'increased throughput by 45%', | |
'professional_result': 'achieve 30% improvement in production efficiency' | |
} | |
elif any(word in info_lower for word in ['health', 'medical', 'pharma', 'clinical']): | |
return { | |
'industry': 'healthcare', | |
'specialty': 'healthcare solutions', | |
'focus': 'patient outcomes', | |
'area': 'healthcare', | |
'benefit': 'improve patient outcomes while reducing costs', | |
'goal': 'enhance patient care efficiency', | |
'solution': 'workflow optimization', | |
'context': ' and their patient care approach', | |
'subject_suffix': ' in healthcare', | |
'direct_opener': 'We optimized patient flow systems', | |
'specific_result': 'reduced wait times by 50%', | |
'professional_result': 'achieve 25% improvement in patient satisfaction scores' | |
} | |
else: | |
return { | |
'industry': 'business', | |
'specialty': 'business operations', | |
'focus': 'growth', | |
'area': 'operations', | |
'benefit': 'streamline operations and drive growth', | |
'goal': 'scale your operations', | |
'solution': 'process optimization', | |
'context': ' and their business model', | |
'subject_suffix': '', | |
'direct_opener': 'We automated key business processes', | |
'specific_result': 'increased efficiency by 40%', | |
'professional_result': 'achieve 35% operational cost reduction' | |
} | |
def _load_prompt_templates(self): | |
"""Load sophisticated prompt templates for different use cases""" | |
return { | |
"few_shot_template": '''You are an elite B2B sales copywriter. Write ONE personalized cold email that sounds natural and converts. | |
<examples> | |
EXAMPLE 1: | |
SUBJECT: Quick question about Acme's EU expansion | |
BODY: Hi Sarah, | |
Saw Acme just launched in Berlin β congrats! We helped Contoso reduce their GDPR compliance prep by 68% with a simple automation. | |
Worth a 10-minute chat about how this could apply to your EU rollout? | |
Best, | |
Alex | |
EXAMPLE 2: | |
SUBJECT: Thought on TechCorp's materials testing | |
BODY: Hi John, | |
Noticed TechCorp specializes in X-ray spectroscopy equipment. We just helped a similar lab increase throughput 40% with workflow optimization. | |
Mind if I share what worked for them? 15-minute call? | |
Best, | |
Sam | |
EXAMPLE 3: | |
SUBJECT: Manufacturing efficiency idea for IndustrialCorp | |
BODY: Hi Mike, | |
IndustrialCorp's production line setup caught my attention. We automated similar processes for MetalWorks, reducing their cycle time by 35%. | |
Open to a brief conversation about applications for your facility? | |
Best regards, | |
Jennifer | |
</examples> | |
Now write an email for: | |
Name: {name} | |
Company: {company} | |
Company Info: {company_context} | |
Tone: {tone} | |
Requirements: | |
- Use the company info naturally in the first 2 lines | |
- Maximum 70 words in body (excluding signature) | |
- Clear yes/no question at the end | |
- No placeholders like [Your Name] or [Company] | |
- Professional but conversational | |
- Include specific benefit or result if possible | |
Return ONLY this JSON format: | |
{{"subject": "...", "body": "..."}}''', | |
"industry_specific": { | |
"technology": '''Write a cold email for a tech company. Focus on efficiency, scalability, and competitive advantage.''', | |
"healthcare": '''Write a cold email for a healthcare company. Focus on patient outcomes, compliance, and cost reduction.''', | |
"manufacturing": '''Write a cold email for a manufacturing company. Focus on production efficiency, quality, and cost savings.''', | |
"services": '''Write a cold email for a service company. Focus on client satisfaction, process improvement, and growth.''', | |
"default": '''Write a cold email that focuses on business growth and operational efficiency.''' | |
} | |
} | |
def _extract_industry(self, company_info): | |
"""Extract industry type from company information""" | |
company_lower = company_info.lower() | |
if any(word in company_lower for word in ['tech', 'software', 'saas', 'ai', 'digital', 'app', 'platform']): | |
return 'technology' | |
elif any(word in company_lower for word in ['health', 'medical', 'pharma', 'hospital', 'clinic']): | |
return 'healthcare' | |
elif any(word in company_lower for word in ['manufactur', 'factory', 'production', 'industrial', 'equipment']): | |
return 'manufacturing' | |
elif any(word in company_lower for word in ['service', 'consulting', 'agency', 'firm']): | |
return 'services' | |
else: | |
return 'default' | |
def _create_company_context(self, company, company_info): | |
"""Create focused company context for the prompt""" | |
# Extract key information and clean it up | |
context_parts = [] | |
if company_info and len(company_info) > 10: | |
# Extract meaningful phrases | |
sentences = re.split(r'[.!?]+', company_info) | |
for sentence in sentences[:3]: # First 3 sentences | |
sentence = sentence.strip() | |
if len(sentence) > 20 and not sentence.startswith('Title:'): | |
# Remove common fluff words | |
sentence = re.sub(r'Description:\s*', '', sentence) | |
sentence = re.sub(r'Company Website:\s*', '', sentence) | |
sentence = re.sub(r'LinkedIn:\s*', '', sentence) | |
if sentence: | |
context_parts.append(sentence) | |
if not context_parts: | |
context_parts.append(f"{company} is a company in their industry") | |
return ' | '.join(context_parts[:2]) # Max 2 key points | |
def generate_email(self, name, company, company_info, tone="Professional", temperature=0.7): | |
"""Generate both subject and email body using advanced prompting""" | |
if not LLAMA_AVAILABLE or not HF_AVAILABLE: | |
# Return clear error message instead of fallback | |
error_msg = "π§ **Premium AI Model Setup Required**\n\n" | |
if not LLAMA_AVAILABLE: | |
error_msg += "β **Missing:** llama-cpp-python (Advanced AI Engine)\n" | |
if not HF_AVAILABLE: | |
error_msg += "β **Missing:** huggingface-hub (Model Download)\n" | |
error_msg += "\nπ‘ **To unlock premium AI features:**\n" | |
error_msg += "1. Install: `pip install llama-cpp-python huggingface-hub`\n" | |
error_msg += "2. Restart the app\n" | |
error_msg += "3. First generation will download 1GB AI model\n\n" | |
error_msg += "π **What you get:** 40% better personalization, industry insights, AI-powered quality scoring" | |
return "Setup Required", error_msg | |
# Check if model is properly loaded | |
if not self.model: | |
error_msg = "β **AI Model Loading Failed**\n\n" | |
error_msg += "π‘ **Possible issues:**\n" | |
error_msg += "β’ Model download incomplete\n" | |
error_msg += "β’ Insufficient disk space (need 1GB+)\n" | |
error_msg += "β’ Network connection during first run\n\n" | |
error_msg += "π§ **Try:**\n" | |
error_msg += "1. Restart the app with stable internet\n" | |
error_msg += "2. Check available disk space\n" | |
error_msg += "3. Contact support if issue persists" | |
return "AI Model Error", error_msg | |
# Use AI model for generation | |
print("π€ Using premium AI model for generation") | |
try: | |
company_context = self._create_company_context(company, company_info) | |
industry = self._extract_industry(company_info) | |
template = self.prompt_templates["few_shot_template"] | |
prompt = template.format( | |
name=name, | |
company=company, | |
company_context=company_context, | |
tone=tone | |
) | |
response = self._generate_with_model(prompt, max_tokens=300, temperature=temperature) | |
subject, body = self._parse_json_response(response) | |
# Apply grammar checking | |
if GRAMMAR_AVAILABLE: | |
corrected_body, error_count = self._check_grammar(body) | |
if error_count <= 2: | |
body = corrected_body | |
if error_count > 0: | |
print(f"β Fixed {error_count} grammar issues") | |
return subject, body | |
except Exception as e: | |
print(f"AI generation failed: {e}") | |
error_msg = f"β **AI Generation Failed**\n\n" | |
error_msg += f"Error: {str(e)}\n\n" | |
error_msg += "π‘ **This could mean:**\n" | |
error_msg += "β’ AI model overloaded (try again)\n" | |
error_msg += "β’ Memory issues with large model\n" | |
error_msg += "β’ Temporary processing error\n\n" | |
error_msg += "π§ **Try:** Wait a moment and try again" | |
return "Generation Error", error_msg | |
return subject, body | |
def _clean_subject(self, subject, company): | |
"""Clean and validate subject line""" | |
if not subject or len(subject) < 5: | |
return f"Quick question about {company}" | |
# Remove common prefixes | |
subject = re.sub(r'^(Subject|SUBJECT):\s*', '', subject, flags=re.IGNORECASE) | |
subject = subject.strip('"\'') | |
# Ensure reasonable length | |
if len(subject) > 60: | |
subject = subject[:57] + "..." | |
return subject | |
def _clean_body(self, body, name): | |
"""Clean and validate email body""" | |
if not body or len(body) < 20: | |
return f"Hi {name},\n\nI'd love to discuss how we can help your business grow.\n\nInterested in a quick call?\n\nBest regards" | |
# Remove common prefixes | |
body = re.sub(r'^(Body|BODY|Email|EMAIL):\s*', '', body, flags=re.IGNORECASE) | |
# Ensure proper greeting | |
if not body.lower().startswith(('hi ', 'hello ', 'dear ')): | |
body = f"Hi {name},\n\n{body}" | |
# Ensure proper closing | |
closing_patterns = ['best regards', 'best,', 'sincerely', 'regards,', 'cheers,'] | |
has_closing = any(pattern in body.lower() for pattern in closing_patterns) | |
if not has_closing: | |
if not body.endswith('\n'): | |
body += '\n' | |
body += '\nBest regards' | |
return body | |
def _polish_email_content(self, subject, body): | |
"""Polish email content for grammar and professionalism""" | |
# Fix common grammar issues | |
body = re.sub(r'\s+', ' ', body) # Multiple spaces | |
body = re.sub(r'([.!?])\s*([a-z])', r'\1 \2', body) # Space after punctuation | |
body = re.sub(r'(\w)\s*\n\s*(\w)', r'\1\n\n\2', body) # Proper paragraph spacing | |
# Ensure professional closing | |
if not re.search(r'(Best regards|Best|Sincerely|Cheers),?\s*\n?[A-Z][a-z]+', body): | |
if body.strip().endswith(','): | |
body = body.strip() + '\n\nBest regards,\nAlex' | |
else: | |
body = body.strip() + '\n\nBest regards,\nAlex' | |
# Fix subject line | |
subject = subject.strip() | |
if len(subject) > 65: | |
subject = subject[:62] + "..." | |
# Capitalize first letter of subject if not already | |
if subject and subject[0].islower(): | |
subject = subject[0].upper() + subject[1:] | |
return subject, body | |
def _validate_email_quality(self, subject, body, name, company): | |
"""Validate email quality and return realistic quality score (0-100)""" | |
score = 0.0 | |
# Word count (0-3 points) | |
words = len(body.split()) | |
if words >= 50: | |
score += 3 | |
elif words >= 30: | |
score += 2 | |
elif words >= 20: | |
score += 1 | |
# No placeholders (0-3 points) | |
if '[Your Name]' not in body and '[Company]' not in body and '{{' not in body and '[' not in body: | |
score += 3 | |
# Personalization (0-2 points) | |
if name in body and company in body: | |
score += 2 | |
elif name in body or company in body: | |
score += 1 | |
# Call-to-action (0-2 points) | |
cta_phrases = ['call', 'conversation', 'chat', 'discuss', 'talk', 'meeting', 'connect', 'interested', 'open to'] | |
if any(phrase in body.lower() for phrase in cta_phrases): | |
score += 2 | |
# Convert to 0-100 scale and add some variance for realism | |
quality_score = min(100, (score / 10.0) * 100) | |
# Add realistic variance (no perfect 10s unless truly exceptional) | |
if quality_score >= 90: | |
quality_score = min(92, quality_score - 2) | |
issues = [] | |
if words < 20: issues.append("too_short") | |
if '[' in body: issues.append("placeholders") | |
if name not in body: issues.append("no_personalization") | |
return max(50, quality_score), issues # Minimum 5.0/10 for functioning emails | |
def generate_multiple_variations(self, name, company, company_info, num_variations=3, tone="Professional"): | |
"""Generate multiple email variations with different approaches""" | |
variations = [] | |
tones = ["Professional", "Friendly", "Direct"] | |
temperatures = [0.6, 0.7, 0.8] | |
for i in range(num_variations): | |
current_tone = tones[i % len(tones)] | |
current_temp = temperatures[i % len(temperatures)] | |
subject, email_body = self.generate_email( | |
name, company, company_info, | |
tone=current_tone, temperature=current_temp | |
) | |
variations.append({ | |
'variation': i + 1, | |
'tone': current_tone, | |
'temperature': current_temp, | |
'subject': subject, | |
'email_body': email_body | |
}) | |
return variations | |
def generate_email_v2(self, recipient_name, recipient_email, company_name, company_data, tone="professional", temperature=0.7): | |
"""Compatibility method for different calling signatures""" | |
# Extract company info from company_data if it's a dict | |
if isinstance(company_data, dict): | |
company_info = company_data.get('description', f"Company: {company_name}") | |
else: | |
company_info = str(company_data) if company_data else f"Company: {company_name}" | |
# Call the main generate_email method | |
subject, body = self.generate_email( | |
name=recipient_name, | |
company=company_name, | |
company_info=company_info, | |
tone=tone, | |
temperature=temperature | |
) | |
# Return in the expected format | |
return { | |
'subject': subject, | |
'content': body, | |
'quality_score': 8.0 | |
} | |
# Standalone function for easy import | |
def generate_cold_email(name, company, company_details="", tone="professional", cta_type="meeting_call", | |
industry_template="Generic B2B", sender_signature="Alex Thompson"): | |
""" | |
Generate a cold email using the EmailGenerator class | |
Args: | |
name (str): Contact name | |
company (str): Company name | |
company_details (str): Additional company information | |
tone (str): Email tone (professional, friendly, etc.) | |
cta_type (str): Call-to-action type | |
industry_template (str): Industry template to use (optional) | |
sender_signature (str): Sender name and signature (optional) | |
Returns: | |
tuple: (subject, body, quality_score) or None if failed | |
""" | |
try: | |
generator = EmailGenerator() | |
# Prepare company info | |
company_info = f"{company}. {company_details}".strip() | |
# Generate email | |
result = generator.generate_email( | |
name=name, | |
company=company, | |
company_info=company_info, | |
tone=tone | |
) | |
# Check if this is an error (2-tuple) or success (2-tuple) | |
if len(result) == 2: | |
subject, body = result | |
# Check if this is a setup error | |
if subject in ["Setup Required", "AI Model Error", "Generation Error"]: | |
return subject, body, 0 # Return the error message as body | |
else: | |
# This shouldn't happen with new code but handle gracefully | |
return "Unknown Error", "β Unexpected error in email generation", 0 | |
# Replace default signature with custom signature | |
if sender_signature and sender_signature != "Alex Thompson": | |
# Get first name from signature safely | |
try: | |
first_name = sender_signature.split()[0] if sender_signature.split() else "Alex" | |
except: | |
first_name = "Alex" | |
# Replace common signature patterns with full signature | |
body = re.sub(r'Best regards,\nAlex Thompson', f'Best regards,\n{sender_signature}', body) | |
body = re.sub(r'Best regards,\nSarah Chen', f'Best regards,\n{sender_signature}', body) | |
body = re.sub(r'Best regards,\nJennifer', f'Best regards,\n{sender_signature}', body) | |
# Replace casual signatures with first name only | |
body = re.sub(r'Best,\nAlex', f'Best,\n{first_name}', body) | |
body = re.sub(r'Best,\nSam', f'Best,\n{first_name}', body) | |
body = re.sub(r'Cheers,\nAlex', f'Cheers,\n{first_name}', body) | |
body = re.sub(r'-Alex', f'-{first_name}', body) | |
body = re.sub(r'-Sam', f'-{first_name}', body) | |
# Use industry template for better targeting (basic implementation) | |
if industry_template and industry_template != "Generic B2B": | |
# Enhance templates based on industry - this is where premium features shine | |
pass # Will expand this for premium tiers | |
# Calculate quality score (returns tuple: quality_score, issues) | |
quality_score, issues = generator._validate_email_quality(subject, body, name, company) | |
# Convert quality score from 0-100 to 0-10 scale | |
quality_score_out_of_10 = quality_score / 10.0 | |
return subject, body, quality_score_out_of_10 | |
except Exception as e: | |
print(f"Error in generate_cold_email: {e}") | |
# Return setup error instead of fallback | |
return "Setup Required", f"β **Email Generation Failed**\n\nError: {str(e)}\n\nπ‘ **This usually means:**\n- Missing AI dependencies\n- Run: `pip install llama-cpp-python huggingface-hub`\n- Or contact support for setup help", 0 | |