AI_SEO_Crawler / seo_analyzer_ui.py
IAMTFRMZA's picture
Update seo_analyzer_ui.py
615b4c7 verified
raw
history blame
34.9 kB
"""
SEO Analyzer UI with Auto Ad Generator Tab
"""
import gradio as gr
import logging
import json
from typing import Dict, List, Any, Tuple, Optional
from urllib.parse import urlparse
import tldextract
from openai import OpenAI
import time
import os
import threading
import queue
import shutil
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import tempfile
from bs4 import BeautifulSoup
import requests
from crawler import Crawler
from frontier import URLFrontier
from models import URL, Page
import config
from run_crawler import reset_databases
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
IS_DEPLOYMENT = os.getenv('DEPLOYMENT', 'false').lower() == 'true'
# Custom CSS for better styling
CUSTOM_CSS = """
.container {
max-width: 1200px !important;
margin: auto;
padding: 20px;
}
.header {
text-align: center;
margin-bottom: 2rem;
}
.header h1 {
color: #2d3748;
font-size: 2.5rem;
font-weight: 700;
margin-bottom: 1rem;
}
.header p {
color: #4a5568;
font-size: 1.1rem;
max-width: 800px;
margin: 0 auto;
}
.input-section {
background: #f7fafc;
border-radius: 12px;
padding: 24px;
margin-bottom: 24px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.analysis-section {
background: white;
border-radius: 12px;
padding: 24px;
margin-top: 24px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.log-section {
font-family: monospace;
background: #1a202c;
color: #e2e8f0;
padding: 16px;
border-radius: 8px;
margin-top: 24px;
}
/* Custom styling for inputs */
.input-container {
background: white;
padding: 16px;
border-radius: 8px;
margin-bottom: 16px;
}
/* Custom styling for the slider */
.slider-container {
padding: 12px;
background: white;
border-radius: 8px;
}
/* Custom styling for buttons */
.primary-button {
background: #4299e1 !important;
color: white !important;
padding: 12px 24px !important;
border-radius: 8px !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
}
.primary-button:hover {
background: #3182ce !important;
transform: translateY(-1px) !important;
}
/* Markdown output styling */
.markdown-output {
font-family: system-ui, -apple-system, sans-serif;
line-height: 1.6;
}
.markdown-output h1 {
color: #2d3748;
border-bottom: 2px solid #e2e8f0;
padding-bottom: 0.5rem;
}
.markdown-output h2 {
color: #4a5568;
margin-top: 2rem;
}
.markdown-output h3 {
color: #718096;
margin-top: 1.5rem;
}
/* Progress bar styling */
.progress-bar {
height: 8px !important;
border-radius: 4px !important;
background: #ebf8ff !important;
}
.progress-bar-fill {
background: #4299e1 !important;
border-radius: 4px !important;
}
/* Add some spacing between sections */
.gap {
margin: 2rem 0;
}
"""
# Create a custom handler that will store logs in a queue
class QueueHandler(logging.Handler):
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
log_entry = self.format(record)
try:
self.log_queue.put_nowait(f"{datetime.now().strftime('%H:%M:%S')} - {log_entry}")
except queue.Full:
pass # Ignore if queue is full
# Configure logging
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL),
format='%(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info(f"IS_DEPLOYMENT: {IS_DEPLOYMENT}")
class InMemoryStorage:
"""Simple in-memory storage for deployment mode"""
def __init__(self):
self.urls = {}
self.pages = {}
def reset(self):
self.urls.clear()
self.pages.clear()
def add_url(self, url_obj):
self.urls[url_obj.url] = url_obj
def add_page(self, page_obj):
self.pages[page_obj.url] = page_obj
def get_url(self, url):
return self.urls.get(url)
def get_page(self, url):
return self.pages.get(url)
class SEOAnalyzer:
"""
SEO Analyzer that combines web crawler with OpenAI analysis
"""
def __init__(self, api_key: str):
"""Initialize SEO Analyzer"""
self.client = OpenAI(api_key=api_key)
self.crawler = None
self.crawled_pages = []
self.pages_crawled = 0
self.max_pages = 0
self.crawl_complete = threading.Event()
self.log_queue = queue.Queue(maxsize=1000)
self.session_id = str(uuid.uuid4())
self.storage = InMemoryStorage() if IS_DEPLOYMENT else None
# Add queue handler to logger
queue_handler = QueueHandler(self.log_queue)
queue_handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
logger.addHandler(queue_handler)
def _setup_session_storage(self) -> Tuple[str, str, str]:
"""
Set up session-specific storage directories
Returns:
Tuple of (storage_path, html_path, log_path)
"""
# Create session-specific paths
session_storage = os.path.join(config.STORAGE_PATH, self.session_id)
session_html = os.path.join(session_storage, "html")
session_logs = os.path.join(session_storage, "logs")
# Create directories
os.makedirs(session_storage, exist_ok=True)
os.makedirs(session_html, exist_ok=True)
os.makedirs(session_logs, exist_ok=True)
logger.info(f"Created session storage at {session_storage}")
return session_storage, session_html, session_logs
def _cleanup_session_storage(self):
"""Clean up session-specific storage"""
session_path = os.path.join(config.STORAGE_PATH, self.session_id)
try:
if os.path.exists(session_path):
shutil.rmtree(session_path)
logger.info(f"Cleaned up session storage at {session_path}")
except Exception as e:
logger.error(f"Error cleaning up session storage: {e}")
def _reset_storage(self):
"""Reset storage based on deployment mode"""
if IS_DEPLOYMENT:
self.storage.reset()
else:
reset_databases()
def analyze_website(self, url: str, max_pages: int = 10, progress: gr.Progress = gr.Progress()) -> Tuple[str, List[Dict], str]:
"""
Crawl website and analyze SEO using OpenAI
Args:
url: Seed URL to crawl
max_pages: Maximum number of pages to crawl
progress: Gradio progress indicator
Returns:
Tuple of (overall analysis, list of page-specific analyses, log output)
"""
try:
# Reset state
self.crawled_pages = []
self.pages_crawled = 0
self.max_pages = max_pages
self.crawl_complete.clear()
# Set up storage
if IS_DEPLOYMENT:
# Use temporary directory for file storage in deployment
temp_dir = tempfile.mkdtemp()
session_storage = temp_dir
session_html = os.path.join(temp_dir, "html")
session_logs = os.path.join(temp_dir, "logs")
os.makedirs(session_html, exist_ok=True)
os.makedirs(session_logs, exist_ok=True)
else:
session_storage, session_html, session_logs = self._setup_session_storage()
# Update config paths for this session
config.HTML_STORAGE_PATH = session_html
config.LOG_PATH = session_logs
# Clear log queue
while not self.log_queue.empty():
self.log_queue.get_nowait()
logger.info(f"Starting analysis of {url} with max_pages={max_pages}")
# Reset storage
logger.info("Resetting storage...")
self._reset_storage()
logger.info("Storage reset completed")
# Create new crawler instance with appropriate storage
logger.info("Creating crawler instance...")
if IS_DEPLOYMENT:
# In deployment mode, use in-memory storage
self.crawler = Crawler(storage=self.storage)
# Set frontier to use memory mode
self.crawler.frontier = URLFrontier(use_memory=True)
else:
# In local mode, use MongoDB and Redis
self.crawler = Crawler()
logger.info("Crawler instance created successfully")
# Extract domain for filtering
domain = self._extract_domain(url)
logger.info(f"Analyzing domain: {domain}")
# Add seed URL and configure domain filter
self.crawler.add_seed_urls([url])
config.ALLOWED_DOMAINS = [domain]
logger.info("Added seed URL and configured domain filter")
# Override the crawler's _process_url method to capture pages
original_process_url = self.crawler._process_url
def wrapped_process_url(url_obj):
if self.pages_crawled >= self.max_pages:
self.crawler.running = False # Signal crawler to stop
self.crawl_complete.set()
return
original_process_url(url_obj)
# Get the page based on storage mode
if IS_DEPLOYMENT:
# In deployment mode, get page from in-memory storage
page = self.storage.get_page(url_obj.url)
if page:
_, metadata = self.crawler.parser.parse(page)
self.crawled_pages.append({
'url': url_obj.url,
'content': page.content,
'metadata': metadata
})
self.pages_crawled += 1
logger.info(f"Crawled page {self.pages_crawled}/{max_pages}: {url_obj.url}")
else:
# In local mode, get page from MongoDB
page_data = self.crawler.pages_collection.find_one({'url': url_obj.url})
if page_data and page_data.get('content'):
_, metadata = self.crawler.parser.parse(Page(**page_data))
self.crawled_pages.append({
'url': url_obj.url,
'content': page_data['content'],
'metadata': metadata
})
self.pages_crawled += 1
logger.info(f"Crawled page {self.pages_crawled}/{max_pages}: {url_obj.url}")
if self.pages_crawled >= self.max_pages:
self.crawler.running = False # Signal crawler to stop
self.crawl_complete.set()
self.crawler._process_url = wrapped_process_url
def run_crawler():
try:
# Skip signal handler registration
self.crawler.running = True
with ThreadPoolExecutor(max_workers=1) as executor:
try:
futures = [executor.submit(self.crawler._crawl_worker)]
for future in futures:
future.result()
except Exception as e:
logger.error(f"Error in crawler worker: {e}")
finally:
self.crawler.running = False
self.crawl_complete.set()
except Exception as e:
logger.error(f"Error in run_crawler: {e}")
self.crawl_complete.set()
# Start crawler in a thread
crawler_thread = threading.Thread(target=run_crawler)
crawler_thread.daemon = True
crawler_thread.start()
# Wait for completion or timeout with progress updates
timeout = 300 # 5 minutes
start_time = time.time()
last_progress = 0
while not self.crawl_complete.is_set() and time.time() - start_time < timeout:
current_progress = min(0.8, self.pages_crawled / max_pages)
if current_progress != last_progress:
progress(current_progress, f"Crawled {self.pages_crawled}/{max_pages} pages")
last_progress = current_progress
time.sleep(0.1) # More frequent updates
if time.time() - start_time >= timeout:
logger.warning("Crawler timed out")
self.crawler.running = False
# Wait for thread to finish
crawler_thread.join(timeout=10)
# Restore original method
self.crawler._process_url = original_process_url
# Collect all logs
logs = []
while not self.log_queue.empty():
logs.append(self.log_queue.get_nowait())
log_output = "\n".join(logs)
if not self.crawled_pages:
self._cleanup_session_storage()
return "No pages were successfully crawled.", [], log_output
logger.info("Starting OpenAI analysis...")
progress(0.9, "Analyzing crawled pages with OpenAI...")
# Analyze crawled pages with OpenAI
overall_analysis = self._get_overall_analysis(self.crawled_pages)
progress(0.95, "Generating page-specific analyses...")
page_analyses = self._get_page_analyses(self.crawled_pages)
logger.info("Analysis complete")
progress(1.0, "Analysis complete")
# Format the results
formatted_analysis = f"""
# SEO Analysis Report for {domain}
## Overall Analysis
{overall_analysis}
## Page-Specific Analyses
"""
for page_analysis in page_analyses:
formatted_analysis += f"""
### {page_analysis['url']}
{page_analysis['analysis']}
"""
# Clean up all resources
logger.info("Cleaning up resources...")
if IS_DEPLOYMENT:
shutil.rmtree(temp_dir, ignore_errors=True)
self.storage.reset()
else:
self._cleanup_session_storage()
self._reset_storage()
logger.info("All resources cleaned up")
return formatted_analysis, page_analyses, log_output
except Exception as e:
logger.error(f"Error analyzing website: {e}")
# Clean up all resources even on error
if IS_DEPLOYMENT:
shutil.rmtree(temp_dir, ignore_errors=True)
self.storage.reset()
else:
self._cleanup_session_storage()
self._reset_storage()
# Collect all logs
logs = []
while not self.log_queue.empty():
logs.append(self.log_queue.get_nowait())
log_output = "\n".join(logs)
return f"Error analyzing website: {str(e)}", [], log_output
def _extract_domain(self, url: str) -> str:
"""Extract domain from URL"""
extracted = tldextract.extract(url)
return f"{extracted.domain}.{extracted.suffix}"
def _get_overall_analysis(self, pages: List[Dict]) -> str:
"""Get overall SEO analysis using OpenAI"""
try:
# Prepare site overview for analysis
site_overview = {
'num_pages': len(pages),
'pages': [{
'url': page['url'],
'metadata': page['metadata']
} for page in pages]
}
# Create analysis prompt
prompt = f"""
You are an expert SEO consultant. Analyze this website's SEO based on the crawled data:
{json.dumps(site_overview, indent=2)}
Provide a comprehensive SEO analysis including:
1. Overall site structure and navigation
2. Common SEO issues across pages
3. Content quality and optimization
4. Technical SEO recommendations
5. Priority improvements
Format your response in Markdown.
"""
# Get analysis from OpenAI
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an expert SEO consultant providing detailed website analysis."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=2000
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error getting overall analysis: {e}")
return f"Error generating overall analysis: {str(e)}"
def _get_page_analyses(self, pages: List[Dict]) -> List[Dict]:
"""Get page-specific SEO analyses using OpenAI"""
page_analyses = []
for page in pages:
try:
# Create page analysis prompt
prompt = f"""
Analyze this page's SEO:
URL: {page['url']}
Metadata: {json.dumps(page['metadata'], indent=2)}
Provide specific recommendations for:
1. Title and meta description
2. Heading structure
3. Content optimization
4. Internal linking
5. Technical improvements
Format your response in Markdown.
"""
# Get analysis from OpenAI
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an expert SEO consultant providing detailed page analysis."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
page_analyses.append({
'url': page['url'],
'analysis': response.choices[0].message.content
})
# Sleep to respect rate limits
time.sleep(1)
except Exception as e:
logger.error(f"Error analyzing page {page['url']}: {e}")
page_analyses.append({
'url': page['url'],
'analysis': f"Error analyzing page: {str(e)}"
})
return page_analyses
def create_ui() -> gr.Interface:
"""Create Gradio interface"""
def analyze(url: str, api_key: str, max_pages: int, progress: gr.Progress = gr.Progress()) -> Tuple[str, str]:
"""Gradio interface function"""
try:
# Initialize analyzer
analyzer = SEOAnalyzer(api_key)
# Run analysis with progress updates
analysis, _, logs = analyzer.analyze_website(url, max_pages, progress)
# Collect all logs
log_output = ""
while not analyzer.log_queue.empty():
try:
log_output += analyzer.log_queue.get_nowait() + "\n"
except queue.Empty:
break
# Set progress to complete
progress(1.0, "Analysis complete")
# Return results
return analysis, log_output
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(error_msg)
return error_msg, error_msg
# Create markdown content for the about section
about_markdown = """
# 🔍 SEO Analyzer Pro
Analyze your website's SEO performance using advanced crawling and AI technology.
### Features:
- 🕷️ Intelligent Web Crawling
- 🧠 AI-Powered Analysis
- 📊 Comprehensive Reports
- 🚀 Performance Insights
### How to Use:
1. Enter your website URL
2. Provide your OpenAI API key
3. Choose how many pages to analyze
4. Click Analyze and watch the magic happen!
### What You'll Get:
- Detailed SEO analysis
- Content quality assessment
- Technical recommendations
- Performance insights
- Actionable improvements
"""
# Create the interface with custom styling
with gr.Blocks(css=CUSTOM_CSS) as iface:
gr.Markdown(about_markdown)
with gr.Row():
with gr.Column(scale=2):
with gr.Group(elem_classes="input-section"):
gr.Markdown("### 📝 Enter Website Details")
url_input = gr.Textbox(
label="Website URL",
placeholder="https://example.com",
elem_classes="input-container",
info="Enter the full URL of the website you want to analyze (e.g., https://example.com)"
)
api_key = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-...",
type="password",
elem_classes="input-container",
info="Your OpenAI API key is required for AI-powered analysis. Keep this secure!"
)
max_pages = gr.Slider(
minimum=1,
maximum=50,
value=10,
step=1,
label="Maximum Pages to Crawl",
elem_classes="slider-container",
info="Choose how many pages to analyze. More pages = more comprehensive analysis but takes longer"
)
analyze_btn = gr.Button(
"🔍 Analyze Website",
elem_classes="primary-button"
)
with gr.Row():
with gr.Column():
with gr.Group(elem_classes="analysis-section"):
gr.Markdown("### 📊 Analysis Results")
analysis_output = gr.Markdown(
label="SEO Analysis",
elem_classes="markdown-output"
)
with gr.Row():
with gr.Column():
with gr.Group(elem_classes="log-section"):
gr.Markdown("### 📋 Process Logs")
logs_output = gr.Textbox(
label="Logs",
lines=10,
elem_classes="log-output"
)
# Connect the button click to the analyze function
analyze_btn.click(
fn=analyze,
inputs=[url_input, api_key, max_pages],
outputs=[analysis_output, logs_output],
)
return iface
# ----- SEO Analyzer UI (as a function) -----
def seo_analyzer_ui():
def analyze(url: str, api_key: str, max_pages: int, progress: gr.Progress = gr.Progress()) -> Tuple[str, str]:
try:
analyzer = SEOAnalyzer(api_key)
analysis, _, logs = analyzer.analyze_website(url, max_pages, progress)
log_output = ""
while not analyzer.log_queue.empty():
try:
log_output += analyzer.log_queue.get_nowait() + "\n"
except queue.Empty:
break
progress(1.0, "Analysis complete")
return analysis, log_output
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(error_msg)
return error_msg, error_msg
about_markdown = """
# 🔍 SEO Analyzer Pro
Analyze your website's SEO performance using advanced crawling and AI technology.
...
"""
with gr.Blocks() as seo_tab:
gr.Markdown(about_markdown)
with gr.Row():
with gr.Column(scale=2):
with gr.Group(elem_classes="input-section"):
gr.Markdown("### 📝 Enter Website Details")
url_input = gr.Textbox(
label="Website URL",
placeholder="https://example.com",
elem_classes="input-container",
info="Enter the full URL of the website you want to analyze (e.g., https://example.com)"
)
api_key = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-...",
type="password",
elem_classes="input-container",
info="Your OpenAI API key is required for AI-powered analysis. Keep this secure!"
)
max_pages = gr.Slider(
minimum=1,
maximum=50,
value=10,
step=1,
label="Maximum Pages to Crawl",
elem_classes="slider-container",
info="Choose how many pages to analyze. More pages = more comprehensive analysis but takes longer"
)
analyze_btn = gr.Button(
"🔍 Analyze Website",
elem_classes="primary-button"
)
with gr.Row():
with gr.Column():
with gr.Group(elem_classes="analysis-section"):
gr.Markdown("### 📊 Analysis Results")
analysis_output = gr.Markdown(
label="SEO Analysis",
elem_classes="markdown-output"
)
with gr.Row():
with gr.Column():
with gr.Group(elem_classes="log-section"):
gr.Markdown("### 📋 Process Logs")
logs_output = gr.Textbox(
label="Logs",
lines=10,
elem_classes="log-output"
)
analyze_btn.click(
fn=analyze,
inputs=[url_input, api_key, max_pages],
outputs=[analysis_output, logs_output],
)
return seo_tab
# ---- Auto Ad Generator UI as a function ----
def auto_ad_generator_ui():
openai.api_key = os.getenv("OPENAI_API_KEY")
def extract_text_from_url(url):
try:
resp = requests.get(url, timeout=30, headers={
"User-Agent": "Mozilla/5.0 (compatible; Bot/1.0)"
})
soup = BeautifulSoup(resp.content, "html.parser")
candidates = soup.find_all(['h1','h2','h3','h4','p','span','li'])
text = ' '.join([c.get_text(strip=True) for c in candidates])
text = text[:4000]
if len(text) < 100:
raise ValueError("Could not extract enough content (site may require JavaScript). Please enter keywords manually.")
return text
except Exception as e:
raise ValueError(f"URL extraction error: {e}")
def extract_keywords(text):
prompt = f"""
Extract up to 10 concise, relevant SEO keywords suitable for an automotive advertisement from the following content:
{text}
Keywords:
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.6,
max_tokens=100
)
output = response.choices[0].message.content.strip()
if ',' in output:
keywords = output.split(',')
else:
keywords = output.split('\n')
return [kw.strip() for kw in keywords if kw.strip()]
def generate_ad_copy(platform, keywords):
prompt = f"""
Create a compelling, SEO-optimized {platform} ad using these keywords: {', '.join(keywords)}.
Include a clear and enticing call-to-action.
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=300
)
return response.choices[0].message.content.strip()
def generate_ad_image(keywords):
kw_str = ", ".join(keywords)
image_prompt = (
f"High-quality, photorealistic automotive ad photo of a luxury car. "
f"Clean background, professional lighting, stylish dealership setting. "
f"Keywords: {kw_str}. Room for text overlay, wide format, visually appealing."
)
response = openai.Image.create(
prompt=image_prompt,
n=1,
size="512x512"
)
image_url = response["data"][0]["url"]
img_data = requests.get(image_url).content
img_file = "generated_ad_image.png"
with open(img_file, "wb") as f:
f.write(img_data)
return img_file
def platform_html(platform, ad_text):
if platform == "Facebook":
color = "#1877F2"
icon = "🌐"
elif platform == "Instagram":
color = "linear-gradient(90deg, #f58529 0%, #dd2a7b 50%, #8134af 100%)"
icon = "📸"
elif platform == "X (Twitter)":
color = "#14171A"
icon = "🐦"
else: # Google Search
color = "#4285F4"
icon = "🔍"
if platform == "Instagram":
content = f"""
<div style="background: {color}; padding: 2px; border-radius: 12px; margin-bottom:16px;">
<div style="background: white; color: #333; padding: 18px 20px; border-radius: 10px;">
<span style="font-size: 1.5em;">{icon} <b>{platform}</b></span>
<div style="margin-top: 12px; font-size: 1.1em; line-height:1.6;">{ad_text}</div>
</div>
</div>
"""
else:
content = f"""
<div style="background: {color}; color: white; padding: 18px 20px; border-radius: 12px; margin-bottom:16px; min-height: 120px;">
<span style="font-size: 1.5em;">{icon} <b>{platform}</b></span>
<div style="margin-top: 12px; font-size: 1.1em; line-height:1.6;">{ad_text}</div>
</div>
"""
return content
def main_workflow(input_mode, url_or_keywords):
error = None
keywords = []
ad_copies = {}
image_path = None
if input_mode == "URL":
try:
text = extract_text_from_url(url_or_keywords)
keywords = extract_keywords(text)
except Exception as e:
return None, None, None, f"{e}"
else:
keywords = [kw.strip() for kw in url_or_keywords.split(",") if kw.strip()]
if not keywords:
return None, None, None, "Please provide at least one keyword."
# Generate ad copies
platforms = ["Facebook", "Instagram", "X (Twitter)", "Google Search"]
for platform in platforms:
ad_copies[platform] = generate_ad_copy(platform, keywords)
# Generate image
try:
image_path = generate_ad_image(keywords)
except Exception as e:
error = f"Image generation error: {e}"
# Save ads to txt
output_txt = "generated_ads.txt"
with open(output_txt, "w", encoding="utf-8") as f:
for platform, content in ad_copies.items():
f.write(f"--- {platform} Ad Copy ---\n{content}\n\n")
return keywords, ad_copies, image_path, error
def run_space(input_mode, url, keywords):
url_or_keywords = url if input_mode == "URL" else keywords
keywords, ad_copies, image_path, error = main_workflow(input_mode, url_or_keywords)
ad_previews = ""
if ad_copies:
for platform, ad in ad_copies.items():
ad_previews += platform_html(platform, ad)
return (
keywords,
ad_previews,
image_path,
"generated_ads.txt" if ad_copies else None,
error
)
with gr.Blocks() as ad_tab:
gr.Markdown("# 🚗 Auto Ad Generator\nPaste a car listing URL **or** enter your own keywords, then preview AI-generated ads for each social media platform, plus an auto-generated image!")
input_mode = gr.Radio(["URL", "Keywords"], value="URL", label="Input Type")
url_input = gr.Textbox(label="Listing URL", placeholder="https://www.cars.com/listing/...", visible=True)
kw_input = gr.Textbox(label="Manual Keywords (comma separated)", placeholder="e.g. BMW, used car, sunroof", visible=False)
submit_btn = gr.Button("Generate Ads")
gr.Markdown("## Keywords")
kw_out = gr.JSON(label="Extracted/Provided Keywords")
gr.Markdown("## Ad Copy Previews")
ad_out = gr.HTML(label="Ad Copy Preview")
gr.Markdown("## Generated Ad Image")
img_out = gr.Image(label="Generated Ad Image", type="filepath")
gr.Markdown("## Download Ad Copies")
file_out = gr.File(label="Download TXT")
err_out = gr.Textbox(label="Errors", interactive=False)
def show_hide_fields(choice):
return (
gr.update(visible=choice == "URL"),
gr.update(visible=choice == "Keywords"),
)
input_mode.change(show_hide_fields, input_mode, [url_input, kw_input])
submit_btn.click(
run_space,
inputs=[input_mode, url_input, kw_input],
outputs=[kw_out, ad_out, img_out, file_out, err_out]
)
return ad_tab
# ---- Main App: Two Tabs ----
if __name__ == "__main__":
os.makedirs(config.STORAGE_PATH, exist_ok=True)
with gr.Blocks(css=CUSTOM_CSS) as demo:
with gr.Tab("SEO Analyzer"):
seo_analyzer_ui()
with gr.Tab("Auto Ad Generator"):
auto_ad_generator_ui()
demo.launch(
share=False,
server_name="0.0.0.0",
show_api=False,
show_error=True,
)