Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import sqlite3 | |
import os | |
from datetime import datetime | |
import time | |
from scraper import LinkedInScraper | |
from email_gen import EmailGenerator | |
# Configure Streamlit page | |
st.set_page_config( | |
page_title="Cold Email Outreach Assistant", | |
page_icon="π§", | |
layout="wide" | |
) | |
# Initialize session state | |
if 'processed_data' not in st.session_state: | |
st.session_state.processed_data = None | |
if 'email_generator' not in st.session_state: | |
st.session_state.email_generator = None | |
def init_database(): | |
"""Initialize SQLite database for caching""" | |
conn = sqlite3.connect('leads.db') | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS scraped_data ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT, | |
email TEXT, | |
company TEXT, | |
linkedin_url TEXT, | |
scraped_info TEXT, | |
generated_subject TEXT, | |
generated_email TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
conn.commit() | |
conn.close() | |
def save_to_database(data): | |
"""Save processed data to database""" | |
conn = sqlite3.connect('leads.db') | |
cursor = conn.cursor() | |
for _, row in data.iterrows(): | |
cursor.execute(''' | |
INSERT OR REPLACE INTO scraped_data | |
(name, email, company, linkedin_url, scraped_info, generated_subject, generated_email) | |
VALUES (?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
row['name'], row['email'], row['company'], row['linkedin_url'], | |
row.get('scraped_info', ''), row.get('generated_subject', ''), | |
row.get('generated_email', '') | |
)) | |
conn.commit() | |
conn.close() | |
def load_from_database(): | |
"""Load data from database""" | |
conn = sqlite3.connect('leads.db') | |
df = pd.read_sql_query('SELECT * FROM scraped_data ORDER BY created_at DESC', conn) | |
conn.close() | |
return df | |
def main(): | |
st.title("π§ Cold Email Outreach Assistant") | |
st.markdown("Upload your leads CSV and generate personalized cold emails using AI") | |
# Initialize database | |
init_database() | |
# Sidebar for configuration | |
with st.sidebar: | |
st.header("βοΈ Configuration") | |
# Model configuration | |
st.subheader("AI Model Settings") | |
model_option = st.selectbox( | |
"Model Type", | |
["Download Vicuna-7B (Recommended)", "Use Custom Model Path"] | |
) | |
if model_option == "Use Custom Model Path": | |
custom_model_path = st.text_input("Custom Model Path", "") | |
else: | |
custom_model_path = None | |
# Email generation settings | |
st.subheader("π§ Email Generation") | |
tone = st.selectbox( | |
"Email Tone", | |
["Professional", "Friendly", "Direct", "Authoritative"], | |
index=0, | |
help="Choose the tone for generated emails" | |
) | |
temperature = st.slider( | |
"Creativity Level", | |
min_value=0.3, | |
max_value=1.0, | |
value=0.7, | |
step=0.1, | |
help="Lower = more conservative, Higher = more creative" | |
) | |
generate_variations = st.checkbox( | |
"Generate Multiple Variations", | |
value=False, | |
help="Generate 3 different email variations per lead" | |
) | |
# Scraping configuration | |
st.subheader("π Scraping Settings") | |
scrape_timeout = st.slider("Scrape Timeout (seconds)", 5, 30, 10) | |
use_selenium = st.checkbox("Use Selenium (slower but more reliable)", value=False) | |
# Main content area | |
tab1, tab2, tab3 = st.tabs(["π€ Upload & Process", "π Results", "π History"]) | |
with tab1: | |
st.header("Upload Your Leads CSV") | |
# File upload | |
uploaded_file = st.file_uploader( | |
"Choose a CSV file", | |
type="csv", | |
help="CSV should contain columns: name, email, company, linkedin_url" | |
) | |
if uploaded_file is not None: | |
try: | |
# Read CSV | |
df = pd.read_csv(uploaded_file) | |
# Validate columns | |
required_columns = ['name', 'email', 'company', 'linkedin_url'] | |
missing_columns = [col for col in required_columns if col not in df.columns] | |
if missing_columns: | |
st.error(f"Missing required columns: {', '.join(missing_columns)}") | |
st.info("Required columns: name, email, company, linkedin_url") | |
else: | |
st.success(f"β CSV loaded successfully! Found {len(df)} leads") | |
st.dataframe(df.head()) | |
# Process data button | |
if st.button("π Start Processing", type="primary"): | |
process_leads(df, scrape_timeout, use_selenium, custom_model_path, tone, temperature, generate_variations) | |
except Exception as e: | |
st.error(f"Error reading CSV: {str(e)}") | |
with tab2: | |
st.header("Processing Results") | |
if st.session_state.processed_data is not None: | |
df = st.session_state.processed_data | |
# Display results | |
st.success(f"β Processed {len(df)} leads successfully!") | |
# Show detailed results | |
for idx, row in df.iterrows(): | |
with st.expander(f"π {row['name']} - {row['company']} {'π―' if row.get('tone_used') else ''}"): | |
col1, col2, col3 = st.columns([2, 3, 1]) | |
with col1: | |
st.subheader("π Scraped Information") | |
st.text_area("Company Info", row.get('scraped_info', 'No info scraped'), height=100, key=f"info_{idx}") | |
# Show generation settings if available | |
if row.get('tone_used'): | |
st.write(f"**Tone:** {row.get('tone_used', 'N/A')}") | |
st.write(f"**Temperature:** {row.get('temperature_used', 'N/A')}") | |
with col2: | |
st.subheader("π§ Generated Email") | |
subject = row.get('generated_subject', 'No subject generated') | |
email_body = row.get('generated_email', 'No email generated') | |
st.text_area("Subject", subject, height=50, key=f"subject_{idx}") | |
st.text_area("Email Body", email_body, height=250, key=f"email_{idx}") | |
with col3: | |
st.subheader("π Quality") | |
if subject and email_body: | |
subject_len = len(subject) | |
# Get main body without variations | |
main_body = email_body.split('--- VARIATIONS ---')[0].strip() | |
body_words = len(main_body.split()) | |
# Quality indicators | |
if 15 <= subject_len <= 65: | |
st.success(f"β Subject: {subject_len} chars") | |
else: | |
st.warning(f"β οΈ Subject: {subject_len} chars") | |
if 25 <= body_words <= 100: | |
st.success(f"β Body: {body_words} words") | |
else: | |
st.warning(f"β οΈ Body: {body_words} words") | |
# Check for placeholders | |
if '[Your Name]' in email_body or '{' in email_body: | |
st.error("β Contains placeholders") | |
else: | |
st.success("β No placeholders") | |
# Check for personalization | |
if row['name'] in main_body and row['company'] in main_body: | |
st.success("β Well personalized") | |
else: | |
st.warning("β οΈ Low personalization") | |
# Check for CTA | |
cta_words = ['call', 'conversation', 'chat', 'discuss', 'talk', 'meeting'] | |
if any(word in main_body.lower() for word in cta_words): | |
st.success("β Has call-to-action") | |
else: | |
st.warning("β οΈ Weak call-to-action") | |
# Overall quality score | |
quality_score = 0 | |
if 15 <= subject_len <= 65: quality_score += 20 | |
if 25 <= body_words <= 100: quality_score += 25 | |
if '[Your Name]' not in email_body: quality_score += 25 | |
if row['name'] in main_body and row['company'] in main_body: quality_score += 20 | |
if any(word in main_body.lower() for word in cta_words): quality_score += 10 | |
if quality_score >= 80: | |
st.success(f"π Overall: {quality_score}% - Ready to send!") | |
elif quality_score >= 60: | |
st.warning(f"π Overall: {quality_score}% - Needs polish") | |
else: | |
st.error(f"π§ Overall: {quality_score}% - Needs work") | |
# Quick copy button | |
email_text = f"Subject: {subject}\n\n{email_body}" | |
st.text_area("Copy Email", email_text, height=100, key=f"copy_{idx}") | |
# Export functionality | |
if st.button("π₯ Export to CSV"): | |
csv_data = df.to_csv(index=False) | |
st.download_button( | |
label="β¬οΈ Download CSV", | |
data=csv_data, | |
file_name=f"cold_emails_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
mime="text/csv" | |
) | |
else: | |
st.info("π Upload and process a CSV file to see results here") | |
with tab3: | |
st.header("Processing History") | |
# Load and display historical data | |
try: | |
history_df = load_from_database() | |
if not history_df.empty: | |
st.dataframe(history_df) | |
# Export history | |
if st.button("π₯ Export History"): | |
csv_data = history_df.to_csv(index=False) | |
st.download_button( | |
label="β¬οΈ Download History CSV", | |
data=csv_data, | |
file_name=f"email_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
mime="text/csv" | |
) | |
else: | |
st.info("No historical data found") | |
except Exception as e: | |
st.error(f"Error loading history: {str(e)}") | |
def process_leads(df, scrape_timeout, use_selenium, custom_model_path, tone, temperature, generate_variations): | |
"""Process the uploaded leads with enhanced email generation""" | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
try: | |
# Initialize components | |
status_text.text("π§ Initializing scraper...") | |
scraper = LinkedInScraper(timeout=scrape_timeout, use_selenium=use_selenium) | |
status_text.text("π€ Initializing AI model...") | |
if st.session_state.email_generator is None: | |
st.session_state.email_generator = EmailGenerator(custom_model_path) | |
email_gen = st.session_state.email_generator | |
# Process each lead | |
processed_data = [] | |
total_leads = len(df) | |
for idx, row in df.iterrows(): | |
status_text.text(f"π Processing {row['name']} ({idx + 1}/{total_leads})") | |
# Scrape information | |
scraped_info = scraper.scrape_linkedin_or_company( | |
row['linkedin_url'], | |
row['company'] | |
) | |
# Generate email with new parameters | |
status_text.text(f"βοΈ Generating email for {row['name']} ({tone} tone)...") | |
if generate_variations: | |
# Generate multiple variations | |
variations = email_gen.generate_multiple_variations( | |
row['name'], | |
row['company'], | |
scraped_info, | |
num_variations=3, | |
tone=tone | |
) | |
# Use the first variation as primary | |
subject = variations[0]['subject'] | |
email_body = variations[0]['email_body'] | |
# Store all variations in a formatted way | |
variations_text = "\n\n--- VARIATIONS ---\n" | |
for i, var in enumerate(variations, 1): | |
variations_text += f"\nVariation {i} ({var['tone']}):\n" | |
variations_text += f"Subject: {var['subject']}\n" | |
variations_text += f"Body: {var['email_body']}\n" | |
email_body += variations_text | |
else: | |
# Generate single email with specified parameters | |
subject, email_body = email_gen.generate_email( | |
row['name'], | |
row['company'], | |
scraped_info, | |
tone=tone, | |
temperature=temperature | |
) | |
# Add to processed data | |
processed_data.append({ | |
'name': row['name'], | |
'email': row['email'], | |
'company': row['company'], | |
'linkedin_url': row['linkedin_url'], | |
'scraped_info': scraped_info, | |
'generated_subject': subject, | |
'generated_email': email_body, | |
'tone_used': tone, | |
'temperature_used': temperature | |
}) | |
# Update progress | |
progress_bar.progress((idx + 1) / total_leads) | |
# Convert to DataFrame and save | |
result_df = pd.DataFrame(processed_data) | |
st.session_state.processed_data = result_df | |
# Save to database | |
save_to_database(result_df) | |
status_text.text("β Processing completed!") | |
st.success("π All leads processed successfully!") | |
# Show quality metrics | |
avg_subject_length = result_df['generated_subject'].str.len().mean() | |
avg_body_length = result_df['generated_email'].str.split().str.len().mean() | |
st.info(f"π Quality Metrics: Avg subject length: {avg_subject_length:.0f} chars, Avg body length: {avg_body_length:.0f} words") | |
except Exception as e: | |
st.error(f"β Error during processing: {str(e)}") | |
status_text.text("β Processing failed") | |
if __name__ == "__main__": | |
main() | |