Spaces:
Sleeping
Sleeping
""" | |
SEO Analyzer UI with Auto Ad Generator Tab | |
""" | |
import gradio as gr | |
import logging | |
import json | |
from typing import Dict, List, Any, Tuple, Optional | |
from urllib.parse import urlparse | |
import tldextract | |
from openai import OpenAI | |
import time | |
import os | |
import threading | |
import queue | |
import shutil | |
import uuid | |
from concurrent.futures import ThreadPoolExecutor | |
from datetime import datetime | |
import tempfile | |
from bs4 import BeautifulSoup | |
import requests | |
from crawler import Crawler | |
from frontier import URLFrontier | |
from models import URL, Page | |
import config | |
from run_crawler import reset_databases | |
from dotenv import load_dotenv, find_dotenv | |
load_dotenv(find_dotenv()) | |
IS_DEPLOYMENT = os.getenv('DEPLOYMENT', 'false').lower() == 'true' | |
# Custom CSS for better styling | |
CUSTOM_CSS = """ | |
.container { | |
max-width: 1200px !important; | |
margin: auto; | |
padding: 20px; | |
} | |
.header { | |
text-align: center; | |
margin-bottom: 2rem; | |
} | |
.header h1 { | |
color: #2d3748; | |
font-size: 2.5rem; | |
font-weight: 700; | |
margin-bottom: 1rem; | |
} | |
.header p { | |
color: #4a5568; | |
font-size: 1.1rem; | |
max-width: 800px; | |
margin: 0 auto; | |
} | |
.input-section { | |
background: #f7fafc; | |
border-radius: 12px; | |
padding: 24px; | |
margin-bottom: 24px; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
} | |
.analysis-section { | |
background: white; | |
border-radius: 12px; | |
padding: 24px; | |
margin-top: 24px; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
} | |
.log-section { | |
font-family: monospace; | |
background: #1a202c; | |
color: #e2e8f0; | |
padding: 16px; | |
border-radius: 8px; | |
margin-top: 24px; | |
} | |
/* Custom styling for inputs */ | |
.input-container { | |
background: white; | |
padding: 16px; | |
border-radius: 8px; | |
margin-bottom: 16px; | |
} | |
/* Custom styling for the slider */ | |
.slider-container { | |
padding: 12px; | |
background: white; | |
border-radius: 8px; | |
} | |
/* Custom styling for buttons */ | |
.primary-button { | |
background: #4299e1 !important; | |
color: white !important; | |
padding: 12px 24px !important; | |
border-radius: 8px !important; | |
font-weight: 600 !important; | |
transition: all 0.3s ease !important; | |
} | |
.primary-button:hover { | |
background: #3182ce !important; | |
transform: translateY(-1px) !important; | |
} | |
/* Markdown output styling */ | |
.markdown-output { | |
font-family: system-ui, -apple-system, sans-serif; | |
line-height: 1.6; | |
} | |
.markdown-output h1 { | |
color: #2d3748; | |
border-bottom: 2px solid #e2e8f0; | |
padding-bottom: 0.5rem; | |
} | |
.markdown-output h2 { | |
color: #4a5568; | |
margin-top: 2rem; | |
} | |
.markdown-output h3 { | |
color: #718096; | |
margin-top: 1.5rem; | |
} | |
/* Progress bar styling */ | |
.progress-bar { | |
height: 8px !important; | |
border-radius: 4px !important; | |
background: #ebf8ff !important; | |
} | |
.progress-bar-fill { | |
background: #4299e1 !important; | |
border-radius: 4px !important; | |
} | |
/* Add some spacing between sections */ | |
.gap { | |
margin: 2rem 0; | |
} | |
""" | |
# Create a custom handler that will store logs in a queue | |
class QueueHandler(logging.Handler): | |
def __init__(self, log_queue): | |
super().__init__() | |
self.log_queue = log_queue | |
def emit(self, record): | |
log_entry = self.format(record) | |
try: | |
self.log_queue.put_nowait(f"{datetime.now().strftime('%H:%M:%S')} - {log_entry}") | |
except queue.Full: | |
pass # Ignore if queue is full | |
# Configure logging | |
logging.basicConfig( | |
level=getattr(logging, config.LOG_LEVEL), | |
format='%(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
logger.info(f"IS_DEPLOYMENT: {IS_DEPLOYMENT}") | |
class InMemoryStorage: | |
"""Simple in-memory storage for deployment mode""" | |
def __init__(self): | |
self.urls = {} | |
self.pages = {} | |
def reset(self): | |
self.urls.clear() | |
self.pages.clear() | |
def add_url(self, url_obj): | |
self.urls[url_obj.url] = url_obj | |
def add_page(self, page_obj): | |
self.pages[page_obj.url] = page_obj | |
def get_url(self, url): | |
return self.urls.get(url) | |
def get_page(self, url): | |
return self.pages.get(url) | |
class SEOAnalyzer: | |
""" | |
SEO Analyzer that combines web crawler with OpenAI analysis | |
""" | |
def __init__(self, api_key: str): | |
"""Initialize SEO Analyzer""" | |
self.client = OpenAI(api_key=api_key) | |
self.crawler = None | |
self.crawled_pages = [] | |
self.pages_crawled = 0 | |
self.max_pages = 0 | |
self.crawl_complete = threading.Event() | |
self.log_queue = queue.Queue(maxsize=1000) | |
self.session_id = str(uuid.uuid4()) | |
self.storage = InMemoryStorage() if IS_DEPLOYMENT else None | |
# Add queue handler to logger | |
queue_handler = QueueHandler(self.log_queue) | |
queue_handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s')) | |
logger.addHandler(queue_handler) | |
def _setup_session_storage(self) -> Tuple[str, str, str]: | |
""" | |
Set up session-specific storage directories | |
Returns: | |
Tuple of (storage_path, html_path, log_path) | |
""" | |
# Create session-specific paths | |
session_storage = os.path.join(config.STORAGE_PATH, self.session_id) | |
session_html = os.path.join(session_storage, "html") | |
session_logs = os.path.join(session_storage, "logs") | |
# Create directories | |
os.makedirs(session_storage, exist_ok=True) | |
os.makedirs(session_html, exist_ok=True) | |
os.makedirs(session_logs, exist_ok=True) | |
logger.info(f"Created session storage at {session_storage}") | |
return session_storage, session_html, session_logs | |
def _cleanup_session_storage(self): | |
"""Clean up session-specific storage""" | |
session_path = os.path.join(config.STORAGE_PATH, self.session_id) | |
try: | |
if os.path.exists(session_path): | |
shutil.rmtree(session_path) | |
logger.info(f"Cleaned up session storage at {session_path}") | |
except Exception as e: | |
logger.error(f"Error cleaning up session storage: {e}") | |
def _reset_storage(self): | |
"""Reset storage based on deployment mode""" | |
if IS_DEPLOYMENT: | |
self.storage.reset() | |
else: | |
reset_databases() | |
def analyze_website(self, url: str, max_pages: int = 10, progress: gr.Progress = gr.Progress()) -> Tuple[str, List[Dict], str]: | |
""" | |
Crawl website and analyze SEO using OpenAI | |
Args: | |
url: Seed URL to crawl | |
max_pages: Maximum number of pages to crawl | |
progress: Gradio progress indicator | |
Returns: | |
Tuple of (overall analysis, list of page-specific analyses, log output) | |
""" | |
try: | |
# Reset state | |
self.crawled_pages = [] | |
self.pages_crawled = 0 | |
self.max_pages = max_pages | |
self.crawl_complete.clear() | |
# Set up storage | |
if IS_DEPLOYMENT: | |
# Use temporary directory for file storage in deployment | |
temp_dir = tempfile.mkdtemp() | |
session_storage = temp_dir | |
session_html = os.path.join(temp_dir, "html") | |
session_logs = os.path.join(temp_dir, "logs") | |
os.makedirs(session_html, exist_ok=True) | |
os.makedirs(session_logs, exist_ok=True) | |
else: | |
session_storage, session_html, session_logs = self._setup_session_storage() | |
# Update config paths for this session | |
config.HTML_STORAGE_PATH = session_html | |
config.LOG_PATH = session_logs | |
# Clear log queue | |
while not self.log_queue.empty(): | |
self.log_queue.get_nowait() | |
logger.info(f"Starting analysis of {url} with max_pages={max_pages}") | |
# Reset storage | |
logger.info("Resetting storage...") | |
self._reset_storage() | |
logger.info("Storage reset completed") | |
# Create new crawler instance with appropriate storage | |
logger.info("Creating crawler instance...") | |
if IS_DEPLOYMENT: | |
# In deployment mode, use in-memory storage | |
self.crawler = Crawler(storage=self.storage) | |
# Set frontier to use memory mode | |
self.crawler.frontier = URLFrontier(use_memory=True) | |
else: | |
# In local mode, use MongoDB and Redis | |
self.crawler = Crawler() | |
logger.info("Crawler instance created successfully") | |
# Extract domain for filtering | |
domain = self._extract_domain(url) | |
logger.info(f"Analyzing domain: {domain}") | |
# Add seed URL and configure domain filter | |
self.crawler.add_seed_urls([url]) | |
config.ALLOWED_DOMAINS = [domain] | |
logger.info("Added seed URL and configured domain filter") | |
# Override the crawler's _process_url method to capture pages | |
original_process_url = self.crawler._process_url | |
def wrapped_process_url(url_obj): | |
if self.pages_crawled >= self.max_pages: | |
self.crawler.running = False # Signal crawler to stop | |
self.crawl_complete.set() | |
return | |
original_process_url(url_obj) | |
# Get the page based on storage mode | |
if IS_DEPLOYMENT: | |
# In deployment mode, get page from in-memory storage | |
page = self.storage.get_page(url_obj.url) | |
if page: | |
_, metadata = self.crawler.parser.parse(page) | |
self.crawled_pages.append({ | |
'url': url_obj.url, | |
'content': page.content, | |
'metadata': metadata | |
}) | |
self.pages_crawled += 1 | |
logger.info(f"Crawled page {self.pages_crawled}/{max_pages}: {url_obj.url}") | |
else: | |
# In local mode, get page from MongoDB | |
page_data = self.crawler.pages_collection.find_one({'url': url_obj.url}) | |
if page_data and page_data.get('content'): | |
_, metadata = self.crawler.parser.parse(Page(**page_data)) | |
self.crawled_pages.append({ | |
'url': url_obj.url, | |
'content': page_data['content'], | |
'metadata': metadata | |
}) | |
self.pages_crawled += 1 | |
logger.info(f"Crawled page {self.pages_crawled}/{max_pages}: {url_obj.url}") | |
if self.pages_crawled >= self.max_pages: | |
self.crawler.running = False # Signal crawler to stop | |
self.crawl_complete.set() | |
self.crawler._process_url = wrapped_process_url | |
def run_crawler(): | |
try: | |
# Skip signal handler registration | |
self.crawler.running = True | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
try: | |
futures = [executor.submit(self.crawler._crawl_worker)] | |
for future in futures: | |
future.result() | |
except Exception as e: | |
logger.error(f"Error in crawler worker: {e}") | |
finally: | |
self.crawler.running = False | |
self.crawl_complete.set() | |
except Exception as e: | |
logger.error(f"Error in run_crawler: {e}") | |
self.crawl_complete.set() | |
# Start crawler in a thread | |
crawler_thread = threading.Thread(target=run_crawler) | |
crawler_thread.daemon = True | |
crawler_thread.start() | |
# Wait for completion or timeout with progress updates | |
timeout = 300 # 5 minutes | |
start_time = time.time() | |
last_progress = 0 | |
while not self.crawl_complete.is_set() and time.time() - start_time < timeout: | |
current_progress = min(0.8, self.pages_crawled / max_pages) | |
if current_progress != last_progress: | |
progress(current_progress, f"Crawled {self.pages_crawled}/{max_pages} pages") | |
last_progress = current_progress | |
time.sleep(0.1) # More frequent updates | |
if time.time() - start_time >= timeout: | |
logger.warning("Crawler timed out") | |
self.crawler.running = False | |
# Wait for thread to finish | |
crawler_thread.join(timeout=10) | |
# Restore original method | |
self.crawler._process_url = original_process_url | |
# Collect all logs | |
logs = [] | |
while not self.log_queue.empty(): | |
logs.append(self.log_queue.get_nowait()) | |
log_output = "\n".join(logs) | |
if not self.crawled_pages: | |
self._cleanup_session_storage() | |
return "No pages were successfully crawled.", [], log_output | |
logger.info("Starting OpenAI analysis...") | |
progress(0.9, "Analyzing crawled pages with OpenAI...") | |
# Analyze crawled pages with OpenAI | |
overall_analysis = self._get_overall_analysis(self.crawled_pages) | |
progress(0.95, "Generating page-specific analyses...") | |
page_analyses = self._get_page_analyses(self.crawled_pages) | |
logger.info("Analysis complete") | |
progress(1.0, "Analysis complete") | |
# Format the results | |
formatted_analysis = f""" | |
# SEO Analysis Report for {domain} | |
## Overall Analysis | |
{overall_analysis} | |
## Page-Specific Analyses | |
""" | |
for page_analysis in page_analyses: | |
formatted_analysis += f""" | |
### {page_analysis['url']} | |
{page_analysis['analysis']} | |
""" | |
# Clean up all resources | |
logger.info("Cleaning up resources...") | |
if IS_DEPLOYMENT: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
self.storage.reset() | |
else: | |
self._cleanup_session_storage() | |
self._reset_storage() | |
logger.info("All resources cleaned up") | |
return formatted_analysis, page_analyses, log_output | |
except Exception as e: | |
logger.error(f"Error analyzing website: {e}") | |
# Clean up all resources even on error | |
if IS_DEPLOYMENT: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
self.storage.reset() | |
else: | |
self._cleanup_session_storage() | |
self._reset_storage() | |
# Collect all logs | |
logs = [] | |
while not self.log_queue.empty(): | |
logs.append(self.log_queue.get_nowait()) | |
log_output = "\n".join(logs) | |
return f"Error analyzing website: {str(e)}", [], log_output | |
def _extract_domain(self, url: str) -> str: | |
"""Extract domain from URL""" | |
extracted = tldextract.extract(url) | |
return f"{extracted.domain}.{extracted.suffix}" | |
def _get_overall_analysis(self, pages: List[Dict]) -> str: | |
"""Get overall SEO analysis using OpenAI""" | |
try: | |
# Prepare site overview for analysis | |
site_overview = { | |
'num_pages': len(pages), | |
'pages': [{ | |
'url': page['url'], | |
'metadata': page['metadata'] | |
} for page in pages] | |
} | |
# Create analysis prompt | |
prompt = f""" | |
You are an expert SEO consultant. Analyze this website's SEO based on the crawled data: | |
{json.dumps(site_overview, indent=2)} | |
Provide a comprehensive SEO analysis including: | |
1. Overall site structure and navigation | |
2. Common SEO issues across pages | |
3. Content quality and optimization | |
4. Technical SEO recommendations | |
5. Priority improvements | |
Format your response in Markdown. | |
""" | |
# Get analysis from OpenAI | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are an expert SEO consultant providing detailed website analysis."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.7, | |
max_tokens=2000 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logger.error(f"Error getting overall analysis: {e}") | |
return f"Error generating overall analysis: {str(e)}" | |
def _get_page_analyses(self, pages: List[Dict]) -> List[Dict]: | |
"""Get page-specific SEO analyses using OpenAI""" | |
page_analyses = [] | |
for page in pages: | |
try: | |
# Create page analysis prompt | |
prompt = f""" | |
Analyze this page's SEO: | |
URL: {page['url']} | |
Metadata: {json.dumps(page['metadata'], indent=2)} | |
Provide specific recommendations for: | |
1. Title and meta description | |
2. Heading structure | |
3. Content optimization | |
4. Internal linking | |
5. Technical improvements | |
Format your response in Markdown. | |
""" | |
# Get analysis from OpenAI | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are an expert SEO consultant providing detailed page analysis."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.7, | |
max_tokens=1000 | |
) | |
page_analyses.append({ | |
'url': page['url'], | |
'analysis': response.choices[0].message.content | |
}) | |
# Sleep to respect rate limits | |
time.sleep(1) | |
except Exception as e: | |
logger.error(f"Error analyzing page {page['url']}: {e}") | |
page_analyses.append({ | |
'url': page['url'], | |
'analysis': f"Error analyzing page: {str(e)}" | |
}) | |
return page_analyses | |
def create_ui() -> gr.Interface: | |
"""Create Gradio interface""" | |
def analyze(url: str, api_key: str, max_pages: int, progress: gr.Progress = gr.Progress()) -> Tuple[str, str]: | |
"""Gradio interface function""" | |
try: | |
# Initialize analyzer | |
analyzer = SEOAnalyzer(api_key) | |
# Run analysis with progress updates | |
analysis, _, logs = analyzer.analyze_website(url, max_pages, progress) | |
# Collect all logs | |
log_output = "" | |
while not analyzer.log_queue.empty(): | |
try: | |
log_output += analyzer.log_queue.get_nowait() + "\n" | |
except queue.Empty: | |
break | |
# Set progress to complete | |
progress(1.0, "Analysis complete") | |
# Return results | |
return analysis, log_output | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
logger.error(error_msg) | |
return error_msg, error_msg | |
# Create markdown content for the about section | |
about_markdown = """ | |
# 🔍 SEO Analyzer Pro | |
Analyze your website's SEO performance using advanced crawling and AI technology. | |
### Features: | |
- 🕷️ Intelligent Web Crawling | |
- 🧠 AI-Powered Analysis | |
- 📊 Comprehensive Reports | |
- 🚀 Performance Insights | |
### How to Use: | |
1. Enter your website URL | |
2. Provide your OpenAI API key | |
3. Choose how many pages to analyze | |
4. Click Analyze and watch the magic happen! | |
### What You'll Get: | |
- Detailed SEO analysis | |
- Content quality assessment | |
- Technical recommendations | |
- Performance insights | |
- Actionable improvements | |
""" | |
# Create the interface with custom styling | |
with gr.Blocks(css=CUSTOM_CSS) as iface: | |
gr.Markdown(about_markdown) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
with gr.Group(elem_classes="input-section"): | |
gr.Markdown("### 📝 Enter Website Details") | |
url_input = gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com", | |
elem_classes="input-container", | |
info="Enter the full URL of the website you want to analyze (e.g., https://example.com)" | |
) | |
api_key = gr.Textbox( | |
label="OpenAI API Key", | |
placeholder="sk-...", | |
type="password", | |
elem_classes="input-container", | |
info="Your OpenAI API key is required for AI-powered analysis. Keep this secure!" | |
) | |
max_pages = gr.Slider( | |
minimum=1, | |
maximum=50, | |
value=10, | |
step=1, | |
label="Maximum Pages to Crawl", | |
elem_classes="slider-container", | |
info="Choose how many pages to analyze. More pages = more comprehensive analysis but takes longer" | |
) | |
analyze_btn = gr.Button( | |
"🔍 Analyze Website", | |
elem_classes="primary-button" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Group(elem_classes="analysis-section"): | |
gr.Markdown("### 📊 Analysis Results") | |
analysis_output = gr.Markdown( | |
label="SEO Analysis", | |
elem_classes="markdown-output" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Group(elem_classes="log-section"): | |
gr.Markdown("### 📋 Process Logs") | |
logs_output = gr.Textbox( | |
label="Logs", | |
lines=10, | |
elem_classes="log-output" | |
) | |
# Connect the button click to the analyze function | |
analyze_btn.click( | |
fn=analyze, | |
inputs=[url_input, api_key, max_pages], | |
outputs=[analysis_output, logs_output], | |
) | |
return iface | |
# ----- SEO Analyzer UI (as a function) ----- | |
def seo_analyzer_ui(): | |
def analyze(url: str, api_key: str, max_pages: int, progress: gr.Progress = gr.Progress()) -> Tuple[str, str]: | |
try: | |
analyzer = SEOAnalyzer(api_key) | |
analysis, _, logs = analyzer.analyze_website(url, max_pages, progress) | |
log_output = "" | |
while not analyzer.log_queue.empty(): | |
try: | |
log_output += analyzer.log_queue.get_nowait() + "\n" | |
except queue.Empty: | |
break | |
progress(1.0, "Analysis complete") | |
return analysis, log_output | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
logger.error(error_msg) | |
return error_msg, error_msg | |
about_markdown = """ | |
# 🔍 SEO Analyzer Pro | |
Analyze your website's SEO performance using advanced crawling and AI technology. | |
... | |
""" | |
with gr.Blocks() as seo_tab: | |
gr.Markdown(about_markdown) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
with gr.Group(elem_classes="input-section"): | |
gr.Markdown("### 📝 Enter Website Details") | |
url_input = gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com", | |
elem_classes="input-container", | |
info="Enter the full URL of the website you want to analyze (e.g., https://example.com)" | |
) | |
api_key = gr.Textbox( | |
label="OpenAI API Key", | |
placeholder="sk-...", | |
type="password", | |
elem_classes="input-container", | |
info="Your OpenAI API key is required for AI-powered analysis. Keep this secure!" | |
) | |
max_pages = gr.Slider( | |
minimum=1, | |
maximum=50, | |
value=10, | |
step=1, | |
label="Maximum Pages to Crawl", | |
elem_classes="slider-container", | |
info="Choose how many pages to analyze. More pages = more comprehensive analysis but takes longer" | |
) | |
analyze_btn = gr.Button( | |
"🔍 Analyze Website", | |
elem_classes="primary-button" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Group(elem_classes="analysis-section"): | |
gr.Markdown("### 📊 Analysis Results") | |
analysis_output = gr.Markdown( | |
label="SEO Analysis", | |
elem_classes="markdown-output" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Group(elem_classes="log-section"): | |
gr.Markdown("### 📋 Process Logs") | |
logs_output = gr.Textbox( | |
label="Logs", | |
lines=10, | |
elem_classes="log-output" | |
) | |
analyze_btn.click( | |
fn=analyze, | |
inputs=[url_input, api_key, max_pages], | |
outputs=[analysis_output, logs_output], | |
) | |
return seo_tab | |
# ---- Auto Ad Generator UI as a function ---- | |
def auto_ad_generator_ui(): | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
def extract_text_from_url(url): | |
try: | |
resp = requests.get(url, timeout=30, headers={ | |
"User-Agent": "Mozilla/5.0 (compatible; Bot/1.0)" | |
}) | |
soup = BeautifulSoup(resp.content, "html.parser") | |
candidates = soup.find_all(['h1','h2','h3','h4','p','span','li']) | |
text = ' '.join([c.get_text(strip=True) for c in candidates]) | |
text = text[:4000] | |
if len(text) < 100: | |
raise ValueError("Could not extract enough content (site may require JavaScript). Please enter keywords manually.") | |
return text | |
except Exception as e: | |
raise ValueError(f"URL extraction error: {e}") | |
def extract_keywords(text): | |
prompt = f""" | |
Extract up to 10 concise, relevant SEO keywords suitable for an automotive advertisement from the following content: | |
{text} | |
Keywords: | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-4", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.6, | |
max_tokens=100 | |
) | |
output = response.choices[0].message.content.strip() | |
if ',' in output: | |
keywords = output.split(',') | |
else: | |
keywords = output.split('\n') | |
return [kw.strip() for kw in keywords if kw.strip()] | |
def generate_ad_copy(platform, keywords): | |
prompt = f""" | |
Create a compelling, SEO-optimized {platform} ad using these keywords: {', '.join(keywords)}. | |
Include a clear and enticing call-to-action. | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-4", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=300 | |
) | |
return response.choices[0].message.content.strip() | |
def generate_ad_image(keywords): | |
kw_str = ", ".join(keywords) | |
image_prompt = ( | |
f"High-quality, photorealistic automotive ad photo of a luxury car. " | |
f"Clean background, professional lighting, stylish dealership setting. " | |
f"Keywords: {kw_str}. Room for text overlay, wide format, visually appealing." | |
) | |
response = openai.Image.create( | |
prompt=image_prompt, | |
n=1, | |
size="512x512" | |
) | |
image_url = response["data"][0]["url"] | |
img_data = requests.get(image_url).content | |
img_file = "generated_ad_image.png" | |
with open(img_file, "wb") as f: | |
f.write(img_data) | |
return img_file | |
def platform_html(platform, ad_text): | |
if platform == "Facebook": | |
color = "#1877F2" | |
icon = "🌐" | |
elif platform == "Instagram": | |
color = "linear-gradient(90deg, #f58529 0%, #dd2a7b 50%, #8134af 100%)" | |
icon = "📸" | |
elif platform == "X (Twitter)": | |
color = "#14171A" | |
icon = "🐦" | |
else: # Google Search | |
color = "#4285F4" | |
icon = "🔍" | |
if platform == "Instagram": | |
content = f""" | |
<div style="background: {color}; padding: 2px; border-radius: 12px; margin-bottom:16px;"> | |
<div style="background: white; color: #333; padding: 18px 20px; border-radius: 10px;"> | |
<span style="font-size: 1.5em;">{icon} <b>{platform}</b></span> | |
<div style="margin-top: 12px; font-size: 1.1em; line-height:1.6;">{ad_text}</div> | |
</div> | |
</div> | |
""" | |
else: | |
content = f""" | |
<div style="background: {color}; color: white; padding: 18px 20px; border-radius: 12px; margin-bottom:16px; min-height: 120px;"> | |
<span style="font-size: 1.5em;">{icon} <b>{platform}</b></span> | |
<div style="margin-top: 12px; font-size: 1.1em; line-height:1.6;">{ad_text}</div> | |
</div> | |
""" | |
return content | |
def main_workflow(input_mode, url_or_keywords): | |
error = None | |
keywords = [] | |
ad_copies = {} | |
image_path = None | |
if input_mode == "URL": | |
try: | |
text = extract_text_from_url(url_or_keywords) | |
keywords = extract_keywords(text) | |
except Exception as e: | |
return None, None, None, f"{e}" | |
else: | |
keywords = [kw.strip() for kw in url_or_keywords.split(",") if kw.strip()] | |
if not keywords: | |
return None, None, None, "Please provide at least one keyword." | |
# Generate ad copies | |
platforms = ["Facebook", "Instagram", "X (Twitter)", "Google Search"] | |
for platform in platforms: | |
ad_copies[platform] = generate_ad_copy(platform, keywords) | |
# Generate image | |
try: | |
image_path = generate_ad_image(keywords) | |
except Exception as e: | |
error = f"Image generation error: {e}" | |
# Save ads to txt | |
output_txt = "generated_ads.txt" | |
with open(output_txt, "w", encoding="utf-8") as f: | |
for platform, content in ad_copies.items(): | |
f.write(f"--- {platform} Ad Copy ---\n{content}\n\n") | |
return keywords, ad_copies, image_path, error | |
def run_space(input_mode, url, keywords): | |
url_or_keywords = url if input_mode == "URL" else keywords | |
keywords, ad_copies, image_path, error = main_workflow(input_mode, url_or_keywords) | |
ad_previews = "" | |
if ad_copies: | |
for platform, ad in ad_copies.items(): | |
ad_previews += platform_html(platform, ad) | |
return ( | |
keywords, | |
ad_previews, | |
image_path, | |
"generated_ads.txt" if ad_copies else None, | |
error | |
) | |
with gr.Blocks() as ad_tab: | |
gr.Markdown("# 🚗 Auto Ad Generator\nPaste a car listing URL **or** enter your own keywords, then preview AI-generated ads for each social media platform, plus an auto-generated image!") | |
input_mode = gr.Radio(["URL", "Keywords"], value="URL", label="Input Type") | |
url_input = gr.Textbox(label="Listing URL", placeholder="https://www.cars.com/listing/...", visible=True) | |
kw_input = gr.Textbox(label="Manual Keywords (comma separated)", placeholder="e.g. BMW, used car, sunroof", visible=False) | |
submit_btn = gr.Button("Generate Ads") | |
gr.Markdown("## Keywords") | |
kw_out = gr.JSON(label="Extracted/Provided Keywords") | |
gr.Markdown("## Ad Copy Previews") | |
ad_out = gr.HTML(label="Ad Copy Preview") | |
gr.Markdown("## Generated Ad Image") | |
img_out = gr.Image(label="Generated Ad Image", type="filepath") | |
gr.Markdown("## Download Ad Copies") | |
file_out = gr.File(label="Download TXT") | |
err_out = gr.Textbox(label="Errors", interactive=False) | |
def show_hide_fields(choice): | |
return ( | |
gr.update(visible=choice == "URL"), | |
gr.update(visible=choice == "Keywords"), | |
) | |
input_mode.change(show_hide_fields, input_mode, [url_input, kw_input]) | |
submit_btn.click( | |
run_space, | |
inputs=[input_mode, url_input, kw_input], | |
outputs=[kw_out, ad_out, img_out, file_out, err_out] | |
) | |
return ad_tab | |
# ---- Main App: Two Tabs ---- | |
if __name__ == "__main__": | |
os.makedirs(config.STORAGE_PATH, exist_ok=True) | |
with gr.Blocks(css=CUSTOM_CSS) as demo: | |
with gr.Tab("SEO Analyzer"): | |
seo_analyzer_ui() | |
with gr.Tab("Auto Ad Generator"): | |
auto_ad_generator_ui() | |
demo.launch( | |
share=False, | |
server_name="0.0.0.0", | |
show_api=False, | |
show_error=True, | |
) |