File size: 10,287 Bytes
521c1f0
e4611cf
78af081
521c1f0
cd3a11d
78af081
 
 
521c1f0
 
 
 
 
 
 
 
 
 
 
 
cd3a11d
 
 
 
5b73cc5
78af081
 
 
f9b55bc
 
 
 
43bee1c
78af081
f9b55bc
 
 
 
78af081
f9b55bc
 
e4611cf
521c1f0
cd3a11d
 
 
 
 
 
 
 
0f2aa55
cd3a11d
 
521c1f0
2ebf628
0f2aa55
 
 
521c1f0
0f2aa55
78af081
0f2aa55
 
 
 
 
 
cd3a11d
 
 
 
 
 
 
 
 
 
5b73cc5
0f2aa55
521c1f0
cd3a11d
 
0f2aa55
 
9e36f0e
521c1f0
9e36f0e
 
 
 
 
 
 
 
cd3a11d
 
 
86c6ea5
cd3a11d
0f2aa55
9e36f0e
cd3a11d
 
 
 
 
 
 
 
 
86c6ea5
cd3a11d
 
9e36f0e
 
cd3a11d
78af081
cd3a11d
f9b55bc
86c6ea5
 
 
 
 
78af081
86c6ea5
78af081
86c6ea5
 
 
 
 
 
 
78af081
521c1f0
86c6ea5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78af081
 
521c1f0
 
86c6ea5
521c1f0
 
 
86c6ea5
 
 
 
 
 
 
 
 
 
521c1f0
86c6ea5
78af081
86c6ea5
521c1f0
78af081
cd3a11d
86c6ea5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b73cc5
78af081
 
 
f9b55bc
86c6ea5
 
 
 
 
 
f9b55bc
0f2aa55
 
9e36f0e
 
0f2aa55
521c1f0
0f2aa55
7c08af8
 
 
86c6ea5
 
 
 
7c08af8
86c6ea5
 
 
7c08af8
86c6ea5
 
 
0f2aa55
86c6ea5
 
5b73cc5
86c6ea5
 
 
 
 
 
 
 
 
1ca242c
86c6ea5
78af081
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import os
import io
import time
import base64
import logging
import fitz  # PyMuPDF
from PIL import Image
import gradio as gr
from openai import OpenAI  # Use the OpenAI client that supports multimodal messages

# Load API key from environment variable (secrets)
HF_API_KEY = os.getenv("OPENAI_TOKEN")
if not HF_API_KEY:
    raise ValueError("HF_API_KEY environment variable not set")

# Create the client pointing to the Hugging Face Inference endpoint
client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=HF_API_KEY
)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# -------------------------------
# Document State and File Processing
# -------------------------------
class DocumentState:
    def __init__(self):
        self.current_doc_images = []
        self.current_doc_text = ""
        self.doc_type = None

    def clear(self):
        self.current_doc_images = []
        self.current_doc_text = ""
        self.doc_type = None

doc_state = DocumentState()

def process_pdf_file(file_path):
    """Convert PDF pages to images and extract text using PyMuPDF."""
    try:
        doc = fitz.open(file_path)
        images = []
        text = ""
        for page_num in range(doc.page_count):
            try:
                page = doc[page_num]
                page_text = page.get_text("text")
                if page_text.strip():
                    text += f"Page {page_num + 1}:\n{page_text}\n\n"
                
                # Render page as an image with a zoom factor
                zoom = 3
                mat = fitz.Matrix(zoom, zoom)
                pix = page.get_pixmap(matrix=mat, alpha=False)
                img_data = pix.tobytes("png")
                img = Image.open(io.BytesIO(img_data)).convert("RGB")
                
                # Resize if image is too large
                max_size = 1600
                if max(img.size) > max_size:
                    ratio = max_size / max(img.size)
                    new_size = tuple(int(dim * ratio) for dim in img.size)
                    img = img.resize(new_size, Image.Resampling.LANCZOS)
                images.append(img)
            except Exception as e:
                logger.error(f"Error processing page {page_num}: {str(e)}")
                continue
        doc.close()
        if not images:
            raise ValueError("No valid images could be extracted from the PDF")
        return images, text
    except Exception as e:
        logger.error(f"Error processing PDF file: {str(e)}")
        raise

def process_uploaded_file(file):
    """Process an uploaded file (PDF or image) and update document state."""
    try:
        doc_state.clear()
        if file is None:
            return "No file uploaded. Please upload a file."
        
        # Get the file path from the Gradio upload (may be a dict or file-like object)
        if isinstance(file, dict):
            file_path = file["name"]
        else:
            file_path = file.name
        file_ext = file_path.lower().split('.')[-1]
        image_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'}
        
        if file_ext == 'pdf':
            doc_state.doc_type = 'pdf'
            try:
                doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path)
                return f"PDF processed successfully. Total pages: {len(doc_state.current_doc_images)}. You can now chat with the bot."
            except Exception as e:
                return f"Error processing PDF: {str(e)}. Please try a different PDF file."
        elif file_ext in image_extensions:
            doc_state.doc_type = 'image'
            try:
                img = Image.open(file_path).convert("RGB")
                max_size = 1600
                if max(img.size) > max_size:
                    ratio = max_size / max(img.size)
                    new_size = tuple(int(dim * ratio) for dim in img.size)
                    img = img.resize(new_size, Image.Resampling.LANCZOS)
                doc_state.current_doc_images = [img]
                return "Image loaded successfully. You can now chat with the bot."
            except Exception as e:
                return f"Error processing image: {str(e)}. Please try a different image file."
        else:
            return f"Unsupported file type: {file_ext}. Please upload a PDF or image file (PNG, JPG, JPEG, GIF, BMP, WEBP)."
    except Exception as e:
        logger.error(f"Error in process_uploaded_file: {str(e)}")
        return "An error occurred while processing the file. Please try again."

def clear_context():
    """Clear the current document context and chat history."""
    doc_state.clear()
    return "Document context cleared. You can upload a new document.", []

# -------------------------------
# Predetermined Prompts
# -------------------------------
predetermined_prompts = {
    
    "Software Tester": (
        "Act as a software tester. Analyze the uploaded image of a software interface and generate comprehensive "
        "test cases for its features. For each feature, provide test steps, expected results, and any necessary "
        "preconditions. Be as detailed as possible."
    )
}

# -------------------------------
# Chat Function with Streaming and Conversation History
# -------------------------------
def chat_respond(user_message, history, prompt_option):
    """
    Append the user message (or, if starting a new conversation and no message is provided,
    use the predetermined prompt) to the conversation history; build the API call using
    the full conversation history (and the image if available); stream back the assistant response
    while updating the history.
    
    The history is a list of [user_text, assistant_text] pairs.
    """
    # If this is the first message, add the predetermined prompt text.
    if history == []:
        # If user_message is empty, use the predetermined prompt.
        if not user_message.strip():
            user_message = predetermined_prompts.get(prompt_option, "Hello")
        else:
            # Optionally, prepend the predetermined prompt.
            user_message = predetermined_prompts.get(prompt_option, "") + "\n" + user_message

    # Append the new user message with an empty assistant response.
    history = history + [[user_message, ""]]

    # Build the messages list (for the multimodal API) from the conversation history.
    messages = []
    for i, (user_msg, assistant_msg) in enumerate(history):
        # For the user message:
        user_content = [{"type": "text", "text": user_msg}]
        # For the very first user message, if an image was uploaded, append the image.
        if i == 0 and doc_state.current_doc_images:
            buffered = io.BytesIO()
            doc_state.current_doc_images[0].save(buffered, format="PNG")
            img_b64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
            data_uri = f"data:image/png;base64,{img_b64}"
            user_content.append({
                "type": "image_url",
                "image_url": {"url": data_uri}
            })
        messages.append({"role": "user", "content": user_content})
        # For the assistant response, if available.
        if assistant_msg:
            messages.append({
                "role": "assistant",
                "content": [{"type": "text", "text": assistant_msg}]
            })

    # Call the inference API with streaming enabled.
    try:
        stream = client.chat.completions.create(
            model="google/gemini-2.0-pro-exp-02-05:free",
            messages=messages,
            max_tokens=8192,
            stream=True
        )
    except Exception as e:
        logger.error(f"Error calling the API: {str(e)}")
        history[-1][1] = "An error occurred while processing your request. Please try again."
        yield history, history

    # Stream and update the assistant's reply token by token.
    buffer = ""
    for chunk in stream:
        delta = chunk.choices[0].delta.content
        buffer += delta
        # Update the assistant part of the latest message in the history.
        history[-1][1] = buffer
        # Yield the updated chat history (for the Chatbot component) and the state.
        yield history, history
        time.sleep(0.01)

    return history, history

# -------------------------------
# Create the Gradio Interface
# -------------------------------
with gr.Blocks() as demo:
    gr.Markdown("# Document Analyzer & Software Testing Chatbot")
    gr.Markdown(
        "Upload a PDF or an image (PNG, JPG, JPEG, GIF, BMP, WEBP). Then choose a prompt from the dropdown. "
        "For example, select **Software Tester** to have the bot analyze an image of a software interface "
        "and generate test cases. Chat with the bot in the conversation below."
    )
    
    with gr.Row():
        file_upload = gr.File(
            label="Upload Document",
            file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"]
        )
        upload_status = gr.Textbox(label="Upload Status", interactive=False)
    
    with gr.Row():
        prompt_dropdown = gr.Dropdown(
            label="Select Prompt",
            choices=[
                "Software Tester"
            ],
            value="Software Tester"
        )
        clear_btn = gr.Button("Clear Document Context & Chat History")
    
    chatbot = gr.Chatbot(label="Chat History", elem_id="chatbot")
    
    with gr.Row():
        user_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", show_label=False)
        send_btn = gr.Button("Send")
    
    # State to hold the conversation history
    chat_state = gr.State([])

    # When a file is uploaded, process it.
    file_upload.change(fn=process_uploaded_file, inputs=file_upload, outputs=upload_status)
    
    # Clear both the document context and chat history.
    clear_btn.click(fn=clear_context, outputs=[upload_status, chat_state])
    
    # When the user clicks Send, process the message and update the chat.
    send_btn.click(fn=chat_respond,
                   inputs=[user_input, chat_state, prompt_dropdown],
                   outputs=[chatbot, chat_state])
    
demo.launch(debug=True)