Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| import os | |
| import time | |
| from collections import defaultdict | |
| BASE_URL = "https://api.jigsawstack.com/v1" | |
| headers = { | |
| "x-api-key": os.getenv("JIGSAWSTACK_API_KEY") | |
| } | |
| # Rate limiting configuration | |
| request_times = defaultdict(list) | |
| MAX_REQUESTS = 10 # Maximum requests per time window | |
| TIME_WINDOW = 60 # Time window in seconds | |
| def get_real_ip(request: gr.Request): | |
| """Extract real IP address using x-forwarded-for header or fallback""" | |
| if not request: | |
| return "unknown" | |
| forwarded = request.headers.get("x-forwarded-for") | |
| if forwarded: | |
| ip = forwarded.split(",")[0].strip() # First IP in the list is the client's | |
| else: | |
| ip = request.client.host # fallback | |
| return ip | |
| def check_rate_limit(request: gr.Request): | |
| """Check if the current request exceeds rate limits""" | |
| if not request: | |
| return True, "Rate limit check failed - no request info" | |
| ip = get_real_ip(request) | |
| now = time.time() | |
| print(f"Ip: {ip}") | |
| print(f"Now: {now}") | |
| print(f"Request times: {request_times[ip]}") | |
| # Clean up old timestamps outside the time window | |
| request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
| print(f"Request times: {request_times[ip]}") | |
| # Check if rate limit exceeded | |
| if len(request_times[ip]) >= MAX_REQUESTS: | |
| time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
| return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {TIME_WINDOW} seconds. Try again in {time_remaining} seconds." | |
| # Add current request timestamp | |
| request_times[ip].append(now) | |
| return True, "" | |
| def generate_embedding(input_type, text_content, url, content_type, token_overflow_mode, request: gr.Request): | |
| """Generate embeddings using JigsawStack Embedding API with rate limiting""" | |
| # Check rate limit first | |
| rate_limit_ok, rate_limit_msg = check_rate_limit(request) | |
| if not rate_limit_ok: | |
| return rate_limit_msg, "" | |
| # Validate inputs | |
| if input_type == "Text" and not text_content: | |
| return "Error: Please provide text content.", "" | |
| if input_type == "URL" and not url: | |
| return "Error: Please provide a URL.", "" | |
| try: | |
| payload = { | |
| "type": content_type, | |
| "token_overflow_mode": token_overflow_mode | |
| } | |
| if input_type == "Text": | |
| payload["text"] = text_content.strip() | |
| elif input_type == "URL": | |
| payload["url"] = url.strip() | |
| response = requests.post( | |
| f"{BASE_URL}/embedding", | |
| headers=headers, | |
| json=payload | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| if not result.get("success"): | |
| error_msg = f"Error: API call failed - {result.get('message', 'Unknown error')}" | |
| return error_msg, "" | |
| embedding = result.get("embeddings", []) | |
| embedding_str = json.dumps(embedding, indent=2) | |
| return "Embedding generated successfully!", embedding_str | |
| except requests.exceptions.RequestException as e: | |
| return f"Request failed: {str(e)}", "" | |
| except Exception as e: | |
| return f"An unexpected error occurred: {str(e)}", "" | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| <div style='text-align: center; margin-bottom: 24px;'> | |
| <h1 style='font-size:2.2em; margin-bottom: 0.2em;'>Vector Embedding Generator</h1> | |
| <p style='font-size:1.2em; margin-top: 0;'>Generate vector embeddings from various content types including text, images, audio, and PDF files.</p> | |
| <p style='font-size:1em; margin-top: 0.5em;'>Supported types: text, text-other, image, audio, pdf</p> | |
| <p style='font-size:0.9em; margin-top: 0.5em; color: #666;'>Rate limit: {MAX_REQUESTS} requests per {TIME_WINDOW} seconds per IP</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### Content Input") | |
| input_type = gr.Radio([ | |
| "Text", | |
| "URL" | |
| ], value="Text", label="Select Input Type") | |
| text_content = gr.Textbox( | |
| label="Text Content", | |
| placeholder="Enter the text content to generate embeddings for...", | |
| visible=True, | |
| lines=5 | |
| ) | |
| url = gr.Textbox( | |
| label="URL", | |
| placeholder="Enter the URL of the resource...", | |
| visible=False | |
| ) | |
| content_type = gr.Dropdown( | |
| choices=["text", "text-other", "image", "audio", "pdf"], | |
| value="text", | |
| label="Content Type", | |
| info="Select the type of content being processed" | |
| ) | |
| token_overflow_mode = gr.Radio( | |
| choices=["error", "truncate"], | |
| value="error", | |
| label="Token Overflow Mode", | |
| info="How to handle input that exceeds token limits" | |
| ) | |
| generate_btn = gr.Button("Generate Embedding", variant="primary") | |
| with gr.Column(): | |
| gr.Markdown("#### Embedding Result") | |
| status_message = gr.Textbox(label="Status", interactive=False) | |
| embedding_result = gr.Textbox( | |
| label="Vector Embedding", | |
| interactive=False, | |
| lines=15, | |
| max_lines=25 | |
| ) | |
| def toggle_inputs(selected): | |
| if selected == "Text": | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: # URL | |
| return gr.update(visible=False), gr.update(visible=True) | |
| input_type.change(toggle_inputs, inputs=[input_type], outputs=[text_content, url]) | |
| generate_btn.click( | |
| generate_embedding, | |
| inputs=[input_type, text_content, url, content_type, token_overflow_mode], | |
| outputs=[status_message, embedding_result] | |
| ) | |
| demo.launch() | |