Spaces:
Running
Running
File size: 10,287 Bytes
521c1f0 e4611cf 78af081 521c1f0 cd3a11d 78af081 521c1f0 cd3a11d 5b73cc5 78af081 f9b55bc 43bee1c 78af081 f9b55bc 78af081 f9b55bc e4611cf 521c1f0 cd3a11d 0f2aa55 cd3a11d 521c1f0 2ebf628 0f2aa55 521c1f0 0f2aa55 78af081 0f2aa55 cd3a11d 5b73cc5 0f2aa55 521c1f0 cd3a11d 0f2aa55 9e36f0e 521c1f0 9e36f0e cd3a11d 86c6ea5 cd3a11d 0f2aa55 9e36f0e cd3a11d 86c6ea5 cd3a11d 9e36f0e cd3a11d 78af081 cd3a11d f9b55bc 86c6ea5 78af081 86c6ea5 78af081 86c6ea5 78af081 521c1f0 86c6ea5 78af081 521c1f0 86c6ea5 521c1f0 86c6ea5 521c1f0 86c6ea5 78af081 86c6ea5 521c1f0 78af081 cd3a11d 86c6ea5 5b73cc5 78af081 f9b55bc 86c6ea5 f9b55bc 0f2aa55 9e36f0e 0f2aa55 521c1f0 0f2aa55 7c08af8 86c6ea5 7c08af8 86c6ea5 7c08af8 86c6ea5 0f2aa55 86c6ea5 5b73cc5 86c6ea5 1ca242c 86c6ea5 78af081 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
import os
import io
import time
import base64
import logging
import fitz # PyMuPDF
from PIL import Image
import gradio as gr
from openai import OpenAI # Use the OpenAI client that supports multimodal messages
# Load API key from environment variable (secrets)
HF_API_KEY = os.getenv("OPENAI_TOKEN")
if not HF_API_KEY:
raise ValueError("HF_API_KEY environment variable not set")
# Create the client pointing to the Hugging Face Inference endpoint
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=HF_API_KEY
)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# -------------------------------
# Document State and File Processing
# -------------------------------
class DocumentState:
def __init__(self):
self.current_doc_images = []
self.current_doc_text = ""
self.doc_type = None
def clear(self):
self.current_doc_images = []
self.current_doc_text = ""
self.doc_type = None
doc_state = DocumentState()
def process_pdf_file(file_path):
"""Convert PDF pages to images and extract text using PyMuPDF."""
try:
doc = fitz.open(file_path)
images = []
text = ""
for page_num in range(doc.page_count):
try:
page = doc[page_num]
page_text = page.get_text("text")
if page_text.strip():
text += f"Page {page_num + 1}:\n{page_text}\n\n"
# Render page as an image with a zoom factor
zoom = 3
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data)).convert("RGB")
# Resize if image is too large
max_size = 1600
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
images.append(img)
except Exception as e:
logger.error(f"Error processing page {page_num}: {str(e)}")
continue
doc.close()
if not images:
raise ValueError("No valid images could be extracted from the PDF")
return images, text
except Exception as e:
logger.error(f"Error processing PDF file: {str(e)}")
raise
def process_uploaded_file(file):
"""Process an uploaded file (PDF or image) and update document state."""
try:
doc_state.clear()
if file is None:
return "No file uploaded. Please upload a file."
# Get the file path from the Gradio upload (may be a dict or file-like object)
if isinstance(file, dict):
file_path = file["name"]
else:
file_path = file.name
file_ext = file_path.lower().split('.')[-1]
image_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'}
if file_ext == 'pdf':
doc_state.doc_type = 'pdf'
try:
doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path)
return f"PDF processed successfully. Total pages: {len(doc_state.current_doc_images)}. You can now chat with the bot."
except Exception as e:
return f"Error processing PDF: {str(e)}. Please try a different PDF file."
elif file_ext in image_extensions:
doc_state.doc_type = 'image'
try:
img = Image.open(file_path).convert("RGB")
max_size = 1600
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
doc_state.current_doc_images = [img]
return "Image loaded successfully. You can now chat with the bot."
except Exception as e:
return f"Error processing image: {str(e)}. Please try a different image file."
else:
return f"Unsupported file type: {file_ext}. Please upload a PDF or image file (PNG, JPG, JPEG, GIF, BMP, WEBP)."
except Exception as e:
logger.error(f"Error in process_uploaded_file: {str(e)}")
return "An error occurred while processing the file. Please try again."
def clear_context():
"""Clear the current document context and chat history."""
doc_state.clear()
return "Document context cleared. You can upload a new document.", []
# -------------------------------
# Predetermined Prompts
# -------------------------------
predetermined_prompts = {
"Software Tester": (
"Act as a software tester. Analyze the uploaded image of a software interface and generate comprehensive "
"test cases for its features. For each feature, provide test steps, expected results, and any necessary "
"preconditions. Be as detailed as possible."
)
}
# -------------------------------
# Chat Function with Streaming and Conversation History
# -------------------------------
def chat_respond(user_message, history, prompt_option):
"""
Append the user message (or, if starting a new conversation and no message is provided,
use the predetermined prompt) to the conversation history; build the API call using
the full conversation history (and the image if available); stream back the assistant response
while updating the history.
The history is a list of [user_text, assistant_text] pairs.
"""
# If this is the first message, add the predetermined prompt text.
if history == []:
# If user_message is empty, use the predetermined prompt.
if not user_message.strip():
user_message = predetermined_prompts.get(prompt_option, "Hello")
else:
# Optionally, prepend the predetermined prompt.
user_message = predetermined_prompts.get(prompt_option, "") + "\n" + user_message
# Append the new user message with an empty assistant response.
history = history + [[user_message, ""]]
# Build the messages list (for the multimodal API) from the conversation history.
messages = []
for i, (user_msg, assistant_msg) in enumerate(history):
# For the user message:
user_content = [{"type": "text", "text": user_msg}]
# For the very first user message, if an image was uploaded, append the image.
if i == 0 and doc_state.current_doc_images:
buffered = io.BytesIO()
doc_state.current_doc_images[0].save(buffered, format="PNG")
img_b64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
data_uri = f"data:image/png;base64,{img_b64}"
user_content.append({
"type": "image_url",
"image_url": {"url": data_uri}
})
messages.append({"role": "user", "content": user_content})
# For the assistant response, if available.
if assistant_msg:
messages.append({
"role": "assistant",
"content": [{"type": "text", "text": assistant_msg}]
})
# Call the inference API with streaming enabled.
try:
stream = client.chat.completions.create(
model="google/gemini-2.0-pro-exp-02-05:free",
messages=messages,
max_tokens=8192,
stream=True
)
except Exception as e:
logger.error(f"Error calling the API: {str(e)}")
history[-1][1] = "An error occurred while processing your request. Please try again."
yield history, history
# Stream and update the assistant's reply token by token.
buffer = ""
for chunk in stream:
delta = chunk.choices[0].delta.content
buffer += delta
# Update the assistant part of the latest message in the history.
history[-1][1] = buffer
# Yield the updated chat history (for the Chatbot component) and the state.
yield history, history
time.sleep(0.01)
return history, history
# -------------------------------
# Create the Gradio Interface
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("# Document Analyzer & Software Testing Chatbot")
gr.Markdown(
"Upload a PDF or an image (PNG, JPG, JPEG, GIF, BMP, WEBP). Then choose a prompt from the dropdown. "
"For example, select **Software Tester** to have the bot analyze an image of a software interface "
"and generate test cases. Chat with the bot in the conversation below."
)
with gr.Row():
file_upload = gr.File(
label="Upload Document",
file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"]
)
upload_status = gr.Textbox(label="Upload Status", interactive=False)
with gr.Row():
prompt_dropdown = gr.Dropdown(
label="Select Prompt",
choices=[
"Software Tester"
],
value="Software Tester"
)
clear_btn = gr.Button("Clear Document Context & Chat History")
chatbot = gr.Chatbot(label="Chat History", elem_id="chatbot")
with gr.Row():
user_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", show_label=False)
send_btn = gr.Button("Send")
# State to hold the conversation history
chat_state = gr.State([])
# When a file is uploaded, process it.
file_upload.change(fn=process_uploaded_file, inputs=file_upload, outputs=upload_status)
# Clear both the document context and chat history.
clear_btn.click(fn=clear_context, outputs=[upload_status, chat_state])
# When the user clicks Send, process the message and update the chat.
send_btn.click(fn=chat_respond,
inputs=[user_input, chat_state, prompt_dropdown],
outputs=[chatbot, chat_state])
demo.launch(debug=True)
|