Spaces:
Running
Running
File size: 9,620 Bytes
521c1f0 e4611cf 78af081 521c1f0 cd3a11d 78af081 521c1f0 8adc570 521c1f0 edaf4b6 521c1f0 edaf4b6 521c1f0 cd3a11d 5b73cc5 78af081 f9b55bc 43bee1c 78af081 f9b55bc 78af081 f9b55bc e4611cf 521c1f0 cd3a11d 0f2aa55 edaf4b6 521c1f0 2ebf628 0f2aa55 521c1f0 78af081 0f2aa55 cd3a11d 5b73cc5 0f2aa55 521c1f0 cd3a11d 0f2aa55 9e36f0e 8adc570 9e36f0e cd3a11d 86c6ea5 cd3a11d 0f2aa55 9e36f0e cd3a11d 86c6ea5 cd3a11d 9e36f0e cd3a11d 78af081 cd3a11d f9b55bc 86c6ea5 78af081 86c6ea5 78af081 86c6ea5 8adc570 86c6ea5 78af081 521c1f0 86c6ea5 8adc570 86c6ea5 8adc570 86c6ea5 8adc570 86c6ea5 8adc570 86c6ea5 8adc570 86c6ea5 78af081 521c1f0 86c6ea5 521c1f0 86c6ea5 8adc570 86c6ea5 521c1f0 8adc570 78af081 86c6ea5 521c1f0 78af081 cd3a11d 86c6ea5 edaf4b6 8adc570 86c6ea5 8adc570 86c6ea5 8adc570 86c6ea5 5b73cc5 78af081 f9b55bc edaf4b6 86c6ea5 edaf4b6 86c6ea5 f9b55bc 0f2aa55 9e36f0e 0f2aa55 521c1f0 0f2aa55 7c08af8 86c6ea5 8adc570 86c6ea5 7c08af8 86c6ea5 8adc570 7c08af8 86c6ea5 0f2aa55 86c6ea5 edaf4b6 86c6ea5 8adc570 86c6ea5 edaf4b6 8adc570 edaf4b6 86c6ea5 78af081 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
import os
import io
import time
import base64
import logging
import fitz # PyMuPDF
from PIL import Image
import gradio as gr
from openai import OpenAI # Use the OpenAI client that supports multimodal messages
# Load API key from environment variable
HF_API_KEY = os.getenv("OPENAI_TOKEN")
if not HF_API_KEY:
raise ValueError("OPENAI_TOKEN environment variable not set")
# Create the client pointing to the inference endpoint (e.g., OpenRouter)
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=HF_API_KEY
)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# -------------------------------
# Document State and File Processing
# -------------------------------
class DocumentState:
def __init__(self):
self.current_doc_images = []
self.current_doc_text = ""
self.doc_type = None
def clear(self):
self.current_doc_images = []
self.current_doc_text = ""
self.doc_type = None
doc_state = DocumentState()
def process_pdf_file(file_path):
"""Convert PDF pages to images and extract text using PyMuPDF."""
try:
doc = fitz.open(file_path)
images = []
text = ""
for page_num in range(doc.page_count):
try:
page = doc[page_num]
page_text = page.get_text("text")
if page_text.strip():
text += f"Page {page_num+1}:\n{page_text}\n\n"
# Render page as an image with a zoom factor
zoom = 3
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data)).convert("RGB")
# Resize if image is too large
max_size = 1600
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
images.append(img)
except Exception as e:
logger.error(f"Error processing page {page_num}: {str(e)}")
continue
doc.close()
if not images:
raise ValueError("No valid images could be extracted from the PDF")
return images, text
except Exception as e:
logger.error(f"Error processing PDF file: {str(e)}")
raise
def process_uploaded_file(file):
"""Process an uploaded file (PDF or image) and update document state."""
try:
doc_state.clear()
if file is None:
return "No file uploaded. Please upload a file."
# Gradio may pass a dict or a file-like object
if isinstance(file, dict):
file_path = file["name"]
else:
file_path = file.name
file_ext = file_path.lower().split('.')[-1]
image_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'}
if file_ext == 'pdf':
doc_state.doc_type = 'pdf'
try:
doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path)
return f"PDF processed successfully. Total pages: {len(doc_state.current_doc_images)}. You can now chat with the bot."
except Exception as e:
return f"Error processing PDF: {str(e)}. Please try a different PDF file."
elif file_ext in image_extensions:
doc_state.doc_type = 'image'
try:
img = Image.open(file_path).convert("RGB")
max_size = 1600
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
doc_state.current_doc_images = [img]
return "Image loaded successfully. You can now chat with the bot."
except Exception as e:
return f"Error processing image: {str(e)}. Please try a different image file."
else:
return f"Unsupported file type: {file_ext}. Please upload a PDF or image file (PNG, JPG, JPEG, GIF, BMP, WEBP)."
except Exception as e:
logger.error(f"Error in process_uploaded_file: {str(e)}")
return "An error occurred while processing the file. Please try again."
def clear_context():
"""Clear the current document context and chat history."""
doc_state.clear()
return "Document context cleared. You can upload a new document.", []
# -------------------------------
# Predetermined Prompts
# -------------------------------
predetermined_prompts = {
"Software Tester": (
"Act as a software tester. Analyze the uploaded image of a software interface and generate comprehensive "
"test cases for its features. For each feature, provide test steps, expected results, and any necessary "
"preconditions. Be as detailed as possible."
)
}
# -------------------------------
# Chat Function (Non-streaming Version)
# -------------------------------
def chat_respond(user_message, history, prompt_option):
"""
Append the user message to the conversation history, call the API, and return the full response.
The conversation history is a list of [user_text, assistant_text] pairs.
"""
# If this is the first message and none is provided, use the predetermined prompt.
if history == []:
if not user_message.strip():
user_message = predetermined_prompts.get(prompt_option, "Hello")
else:
user_message = predetermined_prompts.get(prompt_option, "") + "\n" + user_message
history = history + [[user_message, ""]]
# Build the messages list for the multimodal API
messages = []
for i, (user_msg, assistant_msg) in enumerate(history):
user_content = [{"type": "text", "text": user_msg}]
# For the very first message, attach the image (if available)
if i == 0 and doc_state.current_doc_images:
buffered = io.BytesIO()
doc_state.current_doc_images[0].save(buffered, format="PNG")
img_b64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
data_uri = f"data:image/png;base64,{img_b64}"
user_content.append({
"type": "image_url",
"image_url": {"url": data_uri}
})
messages.append({"role": "user", "content": user_content})
if assistant_msg:
messages.append({
"role": "assistant",
"content": [{"type": "text", "text": assistant_msg}]
})
# Call the API (using stream=True internally but waiting for the full response)
try:
stream = client.chat.completions.create(
model="google/gemini-2.0-pro-exp-02-05:free",
messages=messages,
max_tokens=8192,
stream=True
)
except Exception as e:
logger.error(f"Error calling the API: {str(e)}")
history[-1][1] = "An error occurred while processing your request. Please check your API credentials."
return history, history
# Gather the full response from the streaming generator
buffer = ""
for chunk in stream:
delta = chunk.choices[0].delta.content
buffer += delta
history[-1][1] = buffer
return history, history
# -------------------------------
# Create the Gradio Interface
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("# Document Analyzer & Software Testing Chatbot")
gr.Markdown(
"Upload a PDF or an image (PNG, JPG, JPEG, GIF, BMP, WEBP). Then choose a prompt from the dropdown. "
"For example, select **Software Tester** to have the bot analyze an image of a software interface "
"and generate test cases. You can also chat with the model—the conversation history is preserved."
)
with gr.Row():
file_upload = gr.File(
label="Upload Document",
file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"]
)
upload_status = gr.Textbox(label="Upload Status", interactive=False)
with gr.Row():
prompt_dropdown = gr.Dropdown(
label="Select Prompt",
choices=[
"Software Tester"
],
value="Software Tester"
)
clear_btn = gr.Button("Clear Document Context & Chat History")
# Set type='messages' to avoid deprecation warnings
chatbot = gr.Chatbot(label="Chat History", type="messages", elem_id="chatbot")
with gr.Row():
user_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", show_label=False)
send_btn = gr.Button("Send")
# State to hold the conversation history
chat_state = gr.State([])
# When a file is uploaded, process it.
file_upload.change(fn=process_uploaded_file, inputs=file_upload, outputs=upload_status)
# Clear document context and chat history.
clear_btn.click(fn=clear_context, outputs=[upload_status, chat_state])
# When the user clicks Send, process the message and update the chat.
send_btn.click(
fn=chat_respond,
inputs=[user_input, chat_state, prompt_dropdown],
outputs=[chatbot, chat_state]
)
demo.launch(debug=True)
|