Spaces:
Sleeping
Sleeping
import requests | |
import streamlit as st | |
import pdfplumber | |
import pandas as pd | |
import sqlalchemy | |
from typing import Any, Dict, List, Optional | |
from functools import lru_cache | |
import os | |
# Provider clients with import guards | |
try: | |
from openai import OpenAI | |
except ImportError: | |
OpenAI = None | |
try: | |
import groq | |
except ImportError: | |
groq = None | |
try: | |
import google.generativeai as genai | |
from google.generativeai import GenerativeModel, configure, Part | |
except ImportError: | |
GenerativeModel = None | |
configure = None | |
genai = None | |
Part = None | |
import json | |
class SyntheticDataGenerator: | |
"""World's Most Advanced Synthetic Data Generation System""" | |
PROVIDER_CONFIG = { | |
"Deepseek": { | |
"base_url": "https://api.deepseek.com/v1", | |
"models": ["deepseek-chat"], | |
"requires_library": "openai" | |
}, | |
"OpenAI": { | |
"base_url": "https://api.openai.com/v1", | |
"models": ["gpt-4-turbo", "gpt-3.5-turbo"], | |
"requires_library": "openai" | |
}, | |
"Groq": { | |
"base_url": "https://api.groq.com/openai/v1", | |
"models": ["mixtral-8x7b-32768", "llama2-70b-4096"], | |
"requires_library": "groq" | |
}, | |
"HuggingFace": { | |
"base_url": "https://api-inference.huggingface.co/models/", | |
"models": ["gpt2", "llama-2-13b-chat"], | |
"requires_library": None | |
}, | |
"Google": { | |
"models": ["gemini-1.5-flash-latest", "gemini-1.5-pro-latest", "gemini-pro", "gemini-pro-vision"], | |
"requires_library": "google.generativeai" | |
} | |
} | |
def __init__(self): | |
self._init_session_state() | |
self._setup_input_handlers() | |
self._setup_providers() | |
def _init_session_state(self): | |
"""Initialize enterprise-grade session management""" | |
defaults = { | |
"active_provider": "OpenAI", | |
"api_keys": {}, | |
"input_sources": [], | |
"generation_results": [], | |
"system_metrics": { | |
"api_calls": 0, | |
"tokens_used": 0, | |
"error_count": 0 | |
}, | |
"debug_mode": False, | |
"google_configured": False, | |
"advanced_options": { | |
"temperature": 0.7, | |
"top_p": 0.95, | |
"top_k": 40, | |
"max_output_tokens": 2000 | |
}, | |
"generation_format": "json", | |
"csv_schema": "" | |
} | |
for key, val in defaults.items(): | |
if key not in st.session_state: | |
st.session_state[key] = val | |
def _setup_providers(self): | |
"""Configure available providers with health checks""" | |
self.available_providers = [] | |
for provider, config in self.PROVIDER_CONFIG.items(): | |
if config["requires_library"] and not globals().get(config["requires_library"].split('.')[0].title()): | |
continue | |
self.available_providers.append(provider) | |
def _setup_input_handlers(self): | |
"""Register enterprise input processors""" | |
self.input_processors = { | |
"text": self._process_text, | |
"pdf": self._process_pdf, | |
"csv": self._process_csv, | |
"api": self._process_api, | |
"database": self._process_database, | |
"web": self._process_web, | |
"image": self._process_image | |
} | |
# --- Core Generation Engine --- | |
def generate(self, provider: str, model: str, prompt: Any) -> Dict[str, Any]: # Allow "prompt" to be a list or a string | |
"""Unified generation endpoint with failover support""" | |
try: | |
if provider not in self.available_providers: | |
raise ValueError(f"Provider {provider} not available") | |
client = self._get_client(provider) | |
if not client: | |
raise ConnectionError("Client initialization failed") | |
return self._execute_generation(client, provider, model, prompt) | |
except Exception as e: | |
self._log_error(f"Generation Error: {str(e)}") | |
return self._failover_generation(prompt) | |
def _get_client(self, provider: str) -> Any: | |
"""Secure client initialization with connection pooling""" | |
config = self.PROVIDER_CONFIG[provider] | |
api_key = st.session_state.api_keys.get(provider, "") | |
if not api_key and provider != "Google": | |
raise ValueError("API key required") | |
try: | |
if provider == "Groq": | |
return groq.Groq(api_key=api_key) | |
elif provider == "HuggingFace": | |
return {"headers": {"Authorization": f"Bearer {api_key}"}} | |
elif provider == "Google": | |
if not st.session_state.google_configured: | |
if "GOOGLE_API_KEY" in os.environ: | |
api_key = os.environ["GOOGLE_API_KEY"] | |
else: | |
api_key = st.session_state.api_keys.get("Google", "") | |
if not api_key: | |
raise ValueError( | |
"Google API key is required. Please set it in the app or as the GOOGLE_API_KEY environment variable.") | |
try: | |
configure(api_key=api_key) # Moved configure into try block | |
st.session_state.google_configured = True | |
except Exception as e: | |
raise ValueError(f"Error configuring Google API: {e}") | |
generation_config = genai.GenerationConfig( | |
temperature=st.session_state.advanced_options["temperature"], | |
top_p=st.session_state.advanced_options["top_p"], | |
top_k=st.session_state.advanced_options["top_k"], | |
max_output_tokens=st.session_state.advanced_options["max_output_tokens"] | |
) | |
safety_settings = [ | |
{ | |
"category": "HARM_CATEGORY_HARASSMENT", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_HATE_SPEECH", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
] | |
return GenerativeModel(model_name=model, generation_config=generation_config, safety_settings=safety_settings) | |
else: | |
return OpenAI( | |
base_url=config["base_url"], | |
api_key=api_key, | |
timeout=30 | |
) | |
except Exception as e: | |
self._log_error(f"Client Init Failed: {str(e)}") | |
return None | |
def _execute_generation(self, client, provider: str, model: str, prompt: Any) -> Dict[str, Any]: # Use Any for prompt type | |
"""Execute provider-specific generation with circuit breaker""" | |
st.session_state.system_metrics["api_calls"] += 1 | |
if provider == "HuggingFace": | |
response = requests.post( | |
self.PROVIDER_CONFIG[provider]["base_url"] + model, | |
headers=client["headers"], | |
json={"inputs": prompt}, | |
timeout=30 | |
) | |
response.raise_for_status() | |
return response.json() | |
elif provider == "Google": | |
try: | |
if isinstance(prompt, list): #Multimodal case | |
response = client.generate_content(prompt) | |
else: | |
response = client.generate_content(prompt) | |
content = response.text | |
if st.session_state.generation_format == "json": | |
try: | |
return json.loads(content) | |
except json.JSONDecodeError: | |
return {"content": content, | |
"warning": "Could not parse response as valid JSON. Returning raw text."} | |
else: | |
return {"content": content} | |
except Exception as e: | |
self._log_error(f"Google Generation Error: {str(e)}") | |
return {"error": str(e), "content": ""} | |
else: | |
completion = client.chat.completions.create( | |
model=model, | |
messages=[{"role": "user", "content": prompt}], | |
temperature=st.session_state.advanced_options["temperature"], | |
max_tokens=st.session_state.advanced_options["max_output_tokens"] | |
) | |
st.session_state.system_metrics["tokens_used"] += completion.usage.total_tokens | |
try: | |
return json.loads(completion.choices[0].message.content) | |
except json.JSONDecodeError: | |
return {"content": completion.choices[0].message.content, | |
"warning": "Could not parse response as valid JSON. Returning raw text."} | |
def _failover_generation(self, prompt: str) -> Dict[str, Any]: | |
"""Enterprise failover to secondary providers""" | |
for backup_provider in self.available_providers: | |
if backup_provider != st.session_state.active_provider: | |
try: | |
return self.generate(backup_provider, ..., prompt=prompt) | |
except Exception: | |
continue | |
raise RuntimeError("All generation providers unavailable") | |
# --- Input Processors --- | |
def _process_pdf(self, file) -> str: | |
"""Advanced PDF processing with OCR fallback""" | |
try: | |
with pdfplumber.open(file) as pdf: | |
return "\n".join(page.extract_text() or "" for page in pdf.pages) | |
except Exception as e: | |
self._log_error(f"PDF Processing Error: {str(e)}") | |
return "" | |
def _process_web(self, url: str) -> str: | |
"""Web content extraction with anti-bot measures""" | |
try: | |
response = requests.get(url, headers={ | |
"User-Agent": "Mozilla/5.0 (compatible; SyntheticBot/1.0)" | |
}, timeout=10) | |
return response.text | |
except Exception as e: | |
self._log_error(f"Web Extraction Error: {str(e)}") | |
return "" | |
def _process_csv(self, file) -> str: | |
"""Process CSV files and return as a string representation.""" | |
try: | |
df = pd.read_csv(file) | |
column_names = df.columns.tolist() | |
data_types = [str(df[col].dtype) for col in df.columns] | |
schema_prompt = f"Column Names: {column_names}\nData Types: {data_types}" | |
st.session_state.csv_schema = schema_prompt | |
return df.to_string() | |
except Exception as e: | |
self._log_error(f"CSV Processing Error: {str(e)}") | |
return "" | |
def _process_text(self, text: str) -> str: | |
"""Simple text passthrough processor""" | |
return text | |
def _process_api(self, url: str, method="GET", headers: Optional[Dict[str, str]] = None, | |
data: Optional[Dict[str, Any]] = None) -> str: | |
"""Generic API endpoint processor with configurable methods and headers.""" | |
try: | |
if method.upper() == "GET": | |
response = requests.get(url, headers=headers or {}, timeout=10) | |
elif method.upper() == "POST": | |
response = requests.post(url, headers=headers or {}, json=data, timeout=10) | |
else: | |
raise ValueError("Unsupported HTTP method.") | |
response.raise_for_status() | |
try: | |
return json.dumps(response.json(), indent=2) | |
except json.JSONDecodeError: | |
return response.text | |
except requests.exceptions.RequestException as e: | |
self._log_error(f"API Processing Error: {str(e)}") | |
return "" | |
def _process_database(self, connection_string: str, query: str) -> str: | |
"""Database query processor using SQLAlchemy.""" | |
try: | |
engine = sqlalchemy.create_engine(connection_string) | |
with engine.connect() as connection: | |
result = connection.execute(sqlalchemy.text(query)) | |
df = pd.DataFrame(result.fetchall(), columns=result.keys()) | |
return df.to_string() | |
except Exception as e: | |
self._log_error(f"Database Processing Error: {str(e)}") | |
return "" | |
def _process_image(self, image_file) -> list: #Returns a list | |
"""Processes image files for multimodal generation (Google Gemini)""" | |
try: | |
image_data = image_file.read() | |
image_part = Part.from_data(image_data, mime_type=image_file.type) #Use Part for google | |
return [image_part] #Return a list with the image part as a Google Part object | |
except Exception as e: | |
self._log_error(f"Image Processing Error: {str(e)}") | |
return [] | |
# --- Enterprise Features --- | |
def _log_error(self, message: str) -> None: | |
"""Centralized error logging with telemetry""" | |
st.session_state.system_metrics["error_count"] += 1 | |
st.session_state.error_logs = st.session_state.get("error_logs", []) + [message] | |
if st.session_state.debug_mode: | |
st.error(f"[DEBUG] {message}") | |
def health_check(self) -> Dict[str, Any]: | |
"""Comprehensive system diagnostics""" | |
return { | |
"providers_available": self.available_providers, | |
"api_connectivity": { | |
provider: self._test_provider_connectivity(provider) | |
for provider in self.available_providers | |
}, | |
"system_metrics": st.session_state.system_metrics | |
} | |
def _test_provider_connectivity(self, provider: str) -> bool: | |
"""Provider-specific connectivity test""" | |
try: | |
client = self._get_client(provider) | |
if provider == "HuggingFace": | |
response = requests.get( | |
self.PROVIDER_CONFIG[provider]["base_url"], | |
headers=client["headers"], | |
timeout=5 | |
) | |
return response.status_code == 200 | |
elif provider == "Google": | |
try: | |
if not st.session_state.google_configured: #Check if google has been configured | |
api_key = st.session_state.api_keys.get("Google", "") #Get Key from session state | |
if not api_key: #If that is not set, check environment variable. | |
api_key = os.environ.get("GOOGLE_API_KEY") | |
if not api_key: | |
return False #Cant test API if no API Key | |
configure(api_key=api_key) #Configure API Key | |
st.session_state.google_configured = True | |
#st.write("configuring key") | |
genai.GenerativeModel(model_name=self.PROVIDER_CONFIG["Google"]["models"][0]).generate_content("test") #Test a generation | |
return True | |
except Exception as e: #Catch any exceptions | |
print(e) | |
return False | |
else: | |
client.models.list() | |
return True | |
except Exception: | |
return False | |
# --- Enterprise UI Components --- | |
def provider_config_ui(gen: SyntheticDataGenerator): | |
"""Advanced provider configuration interface""" | |
with st.sidebar: | |
st.header("⚙️ AI Engine Configuration") | |
# Provider selection with availability checks | |
provider = st.selectbox( | |
"AI Provider", | |
gen.available_providers, | |
help="Available providers based on system configuration" | |
) | |
st.session_state.active_provider = provider | |
# API key management | |
api_key = st.text_input( | |
f"{provider} API Key", | |
type="password", | |
value=st.session_state.api_keys.get(provider, ""), | |
help=f"Obtain API key from {provider} portal" | |
) | |
st.session_state.api_keys[provider] = api_key | |
# Model selection | |
model = st.selectbox( | |
"Model", | |
gen.PROVIDER_CONFIG[provider]["models"], | |
help="Select model version based on your API plan" | |
) | |
st.session_state.active_model = model | |
# Advanced Options | |
if provider == "Google" or provider == "OpenAI": | |
st.subheader("Advanced Generation Options") | |
st.session_state.advanced_options["temperature"] = st.slider("Temperature", min_value=0.0, | |
max_value=1.0, | |
value=st.session_state.advanced_options[ | |
"temperature"], step=0.05, | |
help="Controls randomness. Lower values = more deterministic.") | |
if provider == "Google": | |
st.session_state.advanced_options["top_p"] = st.slider("Top P", min_value=0.0, max_value=1.0, | |
value=st.session_state.advanced_options["top_p"], | |
step=0.05, | |
help="Nucleus sampling: Considers the most probable tokens.") | |
st.session_state.advanced_options["top_k"] = st.slider("Top K", min_value=1, max_value=100, | |
value=st.session_state.advanced_options["top_k"], | |
step=1, | |
help="Considers the top K most probable tokens.") | |
st.session_state.advanced_options["max_output_tokens"] = st.number_input("Max Output Tokens", | |
min_value=50, max_value=4096, | |
value=st.session_state.advanced_options[ | |
"max_output_tokens"], step=50, | |
help="Maximum number of tokens in the generated output.") | |
st.session_state.generation_format = st.selectbox("Output Format", ["json", "text"], | |
help="Choose the desired output format.") | |
# System monitoring | |
if st.button("Run Health Check"): | |
report = gen.health_check() | |
st.json(report) | |
def input_ui(): | |
"""Creates the input method UI""" | |
input_method = st.selectbox("Input Method", | |
["Text", "PDF", "Web URL", "CSV", "Image", | |
"Structured Prompt (Advanced)"]) # Add Image input, Add Structured Prompt (Advanced) | |
input_content = None | |
additional_instructions = "" # For structured prompt | |
if input_method == "Text": | |
input_content = st.text_area("Enter Text", height=200) | |
elif input_method == "PDF": | |
uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"]) | |
if uploaded_file is not None: | |
input_content = uploaded_file | |
elif input_method == "Web URL": | |
url = st.text_input("Enter Web URL") | |
input_content = url | |
elif input_method == "CSV": | |
uploaded_file = st.file_uploader("Upload a CSV file", type=["csv"]) | |
if uploaded_file is not None: | |
input_content = uploaded_file | |
if "csv_schema" in st.session_state: | |
st.write("Inferred CSV Schema:") | |
st.write(st.session_state.csv_schema) | |
elif input_method == "Image": | |
uploaded_file = st.file_uploader("Upload an Image file", type=["png", "jpg", "jpeg"]) | |
if uploaded_file is not None: | |
input_content = uploaded_file | |
elif input_method == "Structured Prompt (Advanced)": | |
st.subheader("Structured Prompt") | |
input_content = st.text_area("Enter the base prompt/instructions", height=100) | |
additional_instructions = st.text_area("Specify constraints, data format, or other requirements:", | |
height=100) | |
return input_method, input_content, additional_instructions | |
def main(): | |
"""Enterprise-grade user interface""" | |
st.set_page_config( | |
page_title="Synthetic Data Factory Pro", | |
page_icon="🏭", | |
layout="wide" | |
) | |
gen = SyntheticDataGenerator() | |
st.title("🏭 Synthetic Data Factory Pro") | |
st.markdown(""" | |
**World's Most Advanced Synthetic Data Generation Platform** | |
*Multi-provider AI Engine | Enterprise Input Processors | Real-time Monitoring* | |
""") | |
provider_config_ui(gen) | |
input_method, input_content, additional_instructions = input_ui() | |
if st.button("Generate Data"): | |
if input_content or input_method == "Structured Prompt (Advanced)": | |
processed_input = None | |
if input_method == "Text": | |
processed_input = gen._process_text(input_content) | |
elif input_method == "PDF": | |
processed_input = gen._process_pdf(input_content) | |
elif input_method == "Web URL": | |
processed_input = gen._process_web(input_content) | |
elif input_method == "CSV": | |
processed_input = gen._process_csv(input_content) | |
elif input_method == "Image": | |
processed_input = gen._process_image(input_content) #This is a list now | |
if not processed_input: #If something went wrong with image processing, don't proceed | |
st.error("Error processing image.") | |
return | |
elif input_method == "Structured Prompt (Advanced)": | |
processed_input = input_content + "\n" + additional_instructions | |
if processed_input: | |
try: | |
if st.session_state.active_provider == "Google" and input_method == "Image": | |
prompt_parts = [input_content] + processed_input #Keeps text and images separate for google | |
result = gen.generate(st.session_state.active_provider, st.session_state.active_model, prompt_parts) | |
else: | |
result = gen.generate(st.session_state.active_provider, st.session_state.active_model, processed_input) | |
st.subheader("Generated Output:") | |
st.json(result) | |
except Exception as e: | |
st.error(f"Error during generation: {e}") | |
else: | |
st.warning("No data to process. Please check your input.") | |
else: | |
st.warning("Please provide input data.") | |
if __name__ == "__main__": | |
main() |