sythenticdata / app.py
mgbam's picture
Update app.py
8cd330b verified
raw
history blame
23.7 kB
import requests
import streamlit as st
import pdfplumber
import pandas as pd
import sqlalchemy
from typing import Any, Dict, List, Optional
from functools import lru_cache
import os
# Provider clients with import guards
try:
from openai import OpenAI
except ImportError:
OpenAI = None
try:
import groq
except ImportError:
groq = None
try:
import google.generativeai as genai
from google.generativeai import GenerativeModel, configure, Part
except ImportError:
GenerativeModel = None
configure = None
genai = None
Part = None
import json
class SyntheticDataGenerator:
"""World's Most Advanced Synthetic Data Generation System"""
PROVIDER_CONFIG = {
"Deepseek": {
"base_url": "https://api.deepseek.com/v1",
"models": ["deepseek-chat"],
"requires_library": "openai"
},
"OpenAI": {
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4-turbo", "gpt-3.5-turbo"],
"requires_library": "openai"
},
"Groq": {
"base_url": "https://api.groq.com/openai/v1",
"models": ["mixtral-8x7b-32768", "llama2-70b-4096"],
"requires_library": "groq"
},
"HuggingFace": {
"base_url": "https://api-inference.huggingface.co/models/",
"models": ["gpt2", "llama-2-13b-chat"],
"requires_library": None
},
"Google": {
"models": ["gemini-1.5-flash-latest", "gemini-1.5-pro-latest", "gemini-pro", "gemini-pro-vision"],
"requires_library": "google.generativeai"
}
}
def __init__(self):
self._init_session_state()
self._setup_input_handlers()
self._setup_providers()
def _init_session_state(self):
"""Initialize enterprise-grade session management"""
defaults = {
"active_provider": "OpenAI",
"api_keys": {},
"input_sources": [],
"generation_results": [],
"system_metrics": {
"api_calls": 0,
"tokens_used": 0,
"error_count": 0
},
"debug_mode": False,
"google_configured": False,
"advanced_options": {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 2000
},
"generation_format": "json",
"csv_schema": ""
}
for key, val in defaults.items():
if key not in st.session_state:
st.session_state[key] = val
def _setup_providers(self):
"""Configure available providers with health checks"""
self.available_providers = []
for provider, config in self.PROVIDER_CONFIG.items():
if config["requires_library"] and not globals().get(config["requires_library"].split('.')[0].title()):
continue
self.available_providers.append(provider)
def _setup_input_handlers(self):
"""Register enterprise input processors"""
self.input_processors = {
"text": self._process_text,
"pdf": self._process_pdf,
"csv": self._process_csv,
"api": self._process_api,
"database": self._process_database,
"web": self._process_web,
"image": self._process_image
}
# --- Core Generation Engine ---
@lru_cache(maxsize=100)
def generate(self, provider: str, model: str, prompt: Any) -> Dict[str, Any]: # Allow "prompt" to be a list or a string
"""Unified generation endpoint with failover support"""
try:
if provider not in self.available_providers:
raise ValueError(f"Provider {provider} not available")
client = self._get_client(provider)
if not client:
raise ConnectionError("Client initialization failed")
return self._execute_generation(client, provider, model, prompt)
except Exception as e:
self._log_error(f"Generation Error: {str(e)}")
return self._failover_generation(prompt)
def _get_client(self, provider: str) -> Any:
"""Secure client initialization with connection pooling"""
config = self.PROVIDER_CONFIG[provider]
api_key = st.session_state.api_keys.get(provider, "")
if not api_key and provider != "Google":
raise ValueError("API key required")
try:
if provider == "Groq":
return groq.Groq(api_key=api_key)
elif provider == "HuggingFace":
return {"headers": {"Authorization": f"Bearer {api_key}"}}
elif provider == "Google":
if not st.session_state.google_configured:
if "GOOGLE_API_KEY" in os.environ:
api_key = os.environ["GOOGLE_API_KEY"]
else:
api_key = st.session_state.api_keys.get("Google", "")
if not api_key:
raise ValueError(
"Google API key is required. Please set it in the app or as the GOOGLE_API_KEY environment variable.")
try:
configure(api_key=api_key) # Moved configure into try block
st.session_state.google_configured = True
except Exception as e:
raise ValueError(f"Error configuring Google API: {e}")
generation_config = genai.GenerationConfig(
temperature=st.session_state.advanced_options["temperature"],
top_p=st.session_state.advanced_options["top_p"],
top_k=st.session_state.advanced_options["top_k"],
max_output_tokens=st.session_state.advanced_options["max_output_tokens"]
)
safety_settings = [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
},
]
return GenerativeModel(model_name=model, generation_config=generation_config, safety_settings=safety_settings)
else:
return OpenAI(
base_url=config["base_url"],
api_key=api_key,
timeout=30
)
except Exception as e:
self._log_error(f"Client Init Failed: {str(e)}")
return None
def _execute_generation(self, client, provider: str, model: str, prompt: Any) -> Dict[str, Any]: # Use Any for prompt type
"""Execute provider-specific generation with circuit breaker"""
st.session_state.system_metrics["api_calls"] += 1
if provider == "HuggingFace":
response = requests.post(
self.PROVIDER_CONFIG[provider]["base_url"] + model,
headers=client["headers"],
json={"inputs": prompt},
timeout=30
)
response.raise_for_status()
return response.json()
elif provider == "Google":
try:
if isinstance(prompt, list): #Multimodal case
response = client.generate_content(prompt)
else:
response = client.generate_content(prompt)
content = response.text
if st.session_state.generation_format == "json":
try:
return json.loads(content)
except json.JSONDecodeError:
return {"content": content,
"warning": "Could not parse response as valid JSON. Returning raw text."}
else:
return {"content": content}
except Exception as e:
self._log_error(f"Google Generation Error: {str(e)}")
return {"error": str(e), "content": ""}
else:
completion = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=st.session_state.advanced_options["temperature"],
max_tokens=st.session_state.advanced_options["max_output_tokens"]
)
st.session_state.system_metrics["tokens_used"] += completion.usage.total_tokens
try:
return json.loads(completion.choices[0].message.content)
except json.JSONDecodeError:
return {"content": completion.choices[0].message.content,
"warning": "Could not parse response as valid JSON. Returning raw text."}
def _failover_generation(self, prompt: str) -> Dict[str, Any]:
"""Enterprise failover to secondary providers"""
for backup_provider in self.available_providers:
if backup_provider != st.session_state.active_provider:
try:
return self.generate(backup_provider, ..., prompt=prompt)
except Exception:
continue
raise RuntimeError("All generation providers unavailable")
# --- Input Processors ---
def _process_pdf(self, file) -> str:
"""Advanced PDF processing with OCR fallback"""
try:
with pdfplumber.open(file) as pdf:
return "\n".join(page.extract_text() or "" for page in pdf.pages)
except Exception as e:
self._log_error(f"PDF Processing Error: {str(e)}")
return ""
def _process_web(self, url: str) -> str:
"""Web content extraction with anti-bot measures"""
try:
response = requests.get(url, headers={
"User-Agent": "Mozilla/5.0 (compatible; SyntheticBot/1.0)"
}, timeout=10)
return response.text
except Exception as e:
self._log_error(f"Web Extraction Error: {str(e)}")
return ""
def _process_csv(self, file) -> str:
"""Process CSV files and return as a string representation."""
try:
df = pd.read_csv(file)
column_names = df.columns.tolist()
data_types = [str(df[col].dtype) for col in df.columns]
schema_prompt = f"Column Names: {column_names}\nData Types: {data_types}"
st.session_state.csv_schema = schema_prompt
return df.to_string()
except Exception as e:
self._log_error(f"CSV Processing Error: {str(e)}")
return ""
def _process_text(self, text: str) -> str:
"""Simple text passthrough processor"""
return text
def _process_api(self, url: str, method="GET", headers: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None) -> str:
"""Generic API endpoint processor with configurable methods and headers."""
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers or {}, timeout=10)
elif method.upper() == "POST":
response = requests.post(url, headers=headers or {}, json=data, timeout=10)
else:
raise ValueError("Unsupported HTTP method.")
response.raise_for_status()
try:
return json.dumps(response.json(), indent=2)
except json.JSONDecodeError:
return response.text
except requests.exceptions.RequestException as e:
self._log_error(f"API Processing Error: {str(e)}")
return ""
def _process_database(self, connection_string: str, query: str) -> str:
"""Database query processor using SQLAlchemy."""
try:
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(query))
df = pd.DataFrame(result.fetchall(), columns=result.keys())
return df.to_string()
except Exception as e:
self._log_error(f"Database Processing Error: {str(e)}")
return ""
def _process_image(self, image_file) -> list: #Returns a list
"""Processes image files for multimodal generation (Google Gemini)"""
try:
image_data = image_file.read()
image_part = Part.from_data(image_data, mime_type=image_file.type) #Use Part for google
return [image_part] #Return a list with the image part as a Google Part object
except Exception as e:
self._log_error(f"Image Processing Error: {str(e)}")
return []
# --- Enterprise Features ---
def _log_error(self, message: str) -> None:
"""Centralized error logging with telemetry"""
st.session_state.system_metrics["error_count"] += 1
st.session_state.error_logs = st.session_state.get("error_logs", []) + [message]
if st.session_state.debug_mode:
st.error(f"[DEBUG] {message}")
def health_check(self) -> Dict[str, Any]:
"""Comprehensive system diagnostics"""
return {
"providers_available": self.available_providers,
"api_connectivity": {
provider: self._test_provider_connectivity(provider)
for provider in self.available_providers
},
"system_metrics": st.session_state.system_metrics
}
def _test_provider_connectivity(self, provider: str) -> bool:
"""Provider-specific connectivity test"""
try:
client = self._get_client(provider)
if provider == "HuggingFace":
response = requests.get(
self.PROVIDER_CONFIG[provider]["base_url"],
headers=client["headers"],
timeout=5
)
return response.status_code == 200
elif provider == "Google":
try:
if not st.session_state.google_configured: #Check if google has been configured
api_key = st.session_state.api_keys.get("Google", "") #Get Key from session state
if not api_key: #If that is not set, check environment variable.
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
return False #Cant test API if no API Key
configure(api_key=api_key) #Configure API Key
st.session_state.google_configured = True
#st.write("configuring key")
genai.GenerativeModel(model_name=self.PROVIDER_CONFIG["Google"]["models"][0]).generate_content("test") #Test a generation
return True
except Exception as e: #Catch any exceptions
print(e)
return False
else:
client.models.list()
return True
except Exception:
return False
# --- Enterprise UI Components ---
def provider_config_ui(gen: SyntheticDataGenerator):
"""Advanced provider configuration interface"""
with st.sidebar:
st.header("⚙️ AI Engine Configuration")
# Provider selection with availability checks
provider = st.selectbox(
"AI Provider",
gen.available_providers,
help="Available providers based on system configuration"
)
st.session_state.active_provider = provider
# API key management
api_key = st.text_input(
f"{provider} API Key",
type="password",
value=st.session_state.api_keys.get(provider, ""),
help=f"Obtain API key from {provider} portal"
)
st.session_state.api_keys[provider] = api_key
# Model selection
model = st.selectbox(
"Model",
gen.PROVIDER_CONFIG[provider]["models"],
help="Select model version based on your API plan"
)
st.session_state.active_model = model
# Advanced Options
if provider == "Google" or provider == "OpenAI":
st.subheader("Advanced Generation Options")
st.session_state.advanced_options["temperature"] = st.slider("Temperature", min_value=0.0,
max_value=1.0,
value=st.session_state.advanced_options[
"temperature"], step=0.05,
help="Controls randomness. Lower values = more deterministic.")
if provider == "Google":
st.session_state.advanced_options["top_p"] = st.slider("Top P", min_value=0.0, max_value=1.0,
value=st.session_state.advanced_options["top_p"],
step=0.05,
help="Nucleus sampling: Considers the most probable tokens.")
st.session_state.advanced_options["top_k"] = st.slider("Top K", min_value=1, max_value=100,
value=st.session_state.advanced_options["top_k"],
step=1,
help="Considers the top K most probable tokens.")
st.session_state.advanced_options["max_output_tokens"] = st.number_input("Max Output Tokens",
min_value=50, max_value=4096,
value=st.session_state.advanced_options[
"max_output_tokens"], step=50,
help="Maximum number of tokens in the generated output.")
st.session_state.generation_format = st.selectbox("Output Format", ["json", "text"],
help="Choose the desired output format.")
# System monitoring
if st.button("Run Health Check"):
report = gen.health_check()
st.json(report)
def input_ui():
"""Creates the input method UI"""
input_method = st.selectbox("Input Method",
["Text", "PDF", "Web URL", "CSV", "Image",
"Structured Prompt (Advanced)"]) # Add Image input, Add Structured Prompt (Advanced)
input_content = None
additional_instructions = "" # For structured prompt
if input_method == "Text":
input_content = st.text_area("Enter Text", height=200)
elif input_method == "PDF":
uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"])
if uploaded_file is not None:
input_content = uploaded_file
elif input_method == "Web URL":
url = st.text_input("Enter Web URL")
input_content = url
elif input_method == "CSV":
uploaded_file = st.file_uploader("Upload a CSV file", type=["csv"])
if uploaded_file is not None:
input_content = uploaded_file
if "csv_schema" in st.session_state:
st.write("Inferred CSV Schema:")
st.write(st.session_state.csv_schema)
elif input_method == "Image":
uploaded_file = st.file_uploader("Upload an Image file", type=["png", "jpg", "jpeg"])
if uploaded_file is not None:
input_content = uploaded_file
elif input_method == "Structured Prompt (Advanced)":
st.subheader("Structured Prompt")
input_content = st.text_area("Enter the base prompt/instructions", height=100)
additional_instructions = st.text_area("Specify constraints, data format, or other requirements:",
height=100)
return input_method, input_content, additional_instructions
def main():
"""Enterprise-grade user interface"""
st.set_page_config(
page_title="Synthetic Data Factory Pro",
page_icon="🏭",
layout="wide"
)
gen = SyntheticDataGenerator()
st.title("🏭 Synthetic Data Factory Pro")
st.markdown("""
**World's Most Advanced Synthetic Data Generation Platform**
*Multi-provider AI Engine | Enterprise Input Processors | Real-time Monitoring*
""")
provider_config_ui(gen)
input_method, input_content, additional_instructions = input_ui()
if st.button("Generate Data"):
if input_content or input_method == "Structured Prompt (Advanced)":
processed_input = None
if input_method == "Text":
processed_input = gen._process_text(input_content)
elif input_method == "PDF":
processed_input = gen._process_pdf(input_content)
elif input_method == "Web URL":
processed_input = gen._process_web(input_content)
elif input_method == "CSV":
processed_input = gen._process_csv(input_content)
elif input_method == "Image":
processed_input = gen._process_image(input_content) #This is a list now
if not processed_input: #If something went wrong with image processing, don't proceed
st.error("Error processing image.")
return
elif input_method == "Structured Prompt (Advanced)":
processed_input = input_content + "\n" + additional_instructions
if processed_input:
try:
if st.session_state.active_provider == "Google" and input_method == "Image":
prompt_parts = [input_content] + processed_input #Keeps text and images separate for google
result = gen.generate(st.session_state.active_provider, st.session_state.active_model, prompt_parts)
else:
result = gen.generate(st.session_state.active_provider, st.session_state.active_model, processed_input)
st.subheader("Generated Output:")
st.json(result)
except Exception as e:
st.error(f"Error during generation: {e}")
else:
st.warning("No data to process. Please check your input.")
else:
st.warning("Please provide input data.")
if __name__ == "__main__":
main()