import gradio as gr import asyncio import json import html import os import uuid import sqlite3 import datetime import difflib import logging import pandas as pd from tiktoken import get_encoding from openai import AzureOpenAI import httpx import re # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('aiapp.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Clear proxy environment variables to avoid interference os.environ.pop("HTTP_PROXY", None) os.environ.pop("HTTPS_PROXY", None) # ConversationMemory class class ConversationMemory: def __init__(self, db_path="conversation.db"): self.conn = sqlite3.connect(db_path) self.create_table() self.tokenizer = get_encoding("cl100k_base") logger.info(f"Initialized ConversationMemory with db_path: {db_path}") def create_table(self): self.conn.execute(""" CREATE TABLE IF NOT EXISTS conversation_chunks ( chunk_id TEXT PRIMARY KEY, text TEXT, role TEXT, timestamp DATETIME, intent TEXT, token_count INTEGER, embedding BLOB ) """) self.conn.commit() logger.info("Created conversation table") def add_chunk(self, text, role, intent="general"): chunk_id = str(uuid.uuid4()) tokens = self.tokenizer.encode(text) token_count = len(tokens) timestamp = datetime.datetime.now().isoformat() self.conn.execute(""" INSERT INTO conversation_chunks (chunk_id, text, role, timestamp, intent, token_count) VALUES (?, ?, ?, ?, ?, ?) """, (chunk_id, text, role, timestamp, intent, token_count)) self.conn.commit() logger.info(f"Added chunk: {chunk_id}, role: {role}, intent: {intent}, token_count: {token_count}") return chunk_id def get_chunk(self, chunk_id): cursor = self.conn.execute("SELECT * FROM conversation_chunks WHERE chunk_id = ?", (chunk_id,)) row = cursor.fetchone() if row: chunk = { "chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5] } logger.info(f"Retrieved chunk: {chunk_id}") return chunk logger.warning(f"Chunk not found: {chunk_id}") return None def update_chunk(self, chunk_id, text): tokens = self.tokenizer.encode(text) token_count = len(tokens) self.conn.execute(""" UPDATE conversation_chunks SET text = ?, token_count = ? WHERE chunk_id = ? """, (text, token_count, chunk_id)) self.conn.commit() logger.info(f"Updated chunk: {chunk_id}, new token_count: {token_count}") def get_recent_chunks(self, limit=10): cursor = self.conn.execute("SELECT * FROM conversation_chunks ORDER BY timestamp DESC LIMIT ?", (limit,)) chunks = [{"chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5]} for row in cursor] logger.info(f"Retrieved {len(chunks)} recent chunks") return chunks # TextEditor class class TextEditor: def __init__(self, memory): self.memory = memory self.clipboard = "" logger.info("Initialized TextEditor") def cut(self, chunk_id, start, end): chunk = self.memory.get_chunk(chunk_id) if chunk: self.clipboard = chunk['text'][start:end] chunk['text'] = chunk['text'][:start] + chunk['text'][end:] self.memory.update_chunk(chunk_id, chunk['text']) logger.info(f"Cut text from chunk: {chunk_id}, start: {start}, end: {end}, clipboard: {self.clipboard}") return chunk['text'] logger.warning(f"Failed to cut text, chunk not found: {chunk_id}") return "Error: Chunk not found" def copy(self, chunk_id, start, end): chunk = self.memory.get_chunk(chunk_id) if chunk: self.clipboard = chunk['text'][start:end] logger.info(f"Copied text from chunk: {chunk_id}, start: {start}, end: {end}, clipboard: {self.clipboard}") return self.clipboard logger.warning(f"Failed to copy text, chunk not found: {chunk_id}") return "Error: Chunk not found" def paste(self, chunk_id, position): chunk = self.memory.get_chunk(chunk_id) if chunk: chunk['text'] = chunk['text'][:position] + self.clipboard + chunk['text'][position:] self.memory.update_chunk(chunk_id, chunk['text']) logger.info(f"Pasted text to chunk: {chunk_id}, position: {position}, clipboard: {self.clipboard}") return chunk['text'] logger.warning(f"Failed to paste text, chunk not found: {chunk_id}") return "Error: Chunk not found" def add_prefix(self, chunk_id, prefix): chunk = self.memory.get_chunk(chunk_id) if chunk: chunk['text'] = prefix + chunk['text'] self.memory.update_chunk(chunk_id, chunk['text']) logger.info(f"Added prefix to chunk: {chunk_id}, prefix: {prefix}") return chunk['text'] logger.warning(f"Failed to add prefix, chunk not found: {chunk_id}") return "Error: Chunk not found" def add_suffix(self, chunk_id, suffix): chunk = self.memory.get_chunk(chunk_id) if chunk: chunk['text'] = chunk['text'] + suffix self.memory.update_chunk(chunk_id, chunk['text']) logger.info(f"Added suffix to chunk: {chunk_id}, suffix: {suffix}") return chunk['text'] logger.warning(f"Failed to add suffix, chunk not found: {chunk_id}") return "Error: Chunk not found" def diff(self, chunk_id, original_text): chunk = self.memory.get_chunk(chunk_id) if chunk: differ = difflib.Differ() diff = list(differ.compare(original_text.splitlines(), chunk['text'].splitlines())) logger.info(f"Generated diff for chunk: {chunk_id}") return '\n'.join(diff) logger.warning(f"Failed to generate diff, chunk not found: {chunk_id}") return "" # OpenAIApi class class OpenAIApi: def __init__(self, preprompt="", endpoint="https://T-App-GPT4o.openai.azure.com/", model="gpt-4o", api_key=None): # Validate endpoint format if not re.match(r"^https://[a-zA-Z0-9-]+\.openai\.azure\.com/?$", endpoint): logger.warning(f"Endpoint format may be incorrect: {endpoint}. Expected format: https://.openai.azure.com/") # Use a minimal httpx.Client to avoid proxies parameter http_client = httpx.Client() try: self.client = AzureOpenAI( azure_endpoint=endpoint.rstrip('/'), # Ensure no trailing slash api_key=api_key or os.getenv("AZURE_OPENAI_API_KEY"), api_version="2024-02-15-preview", http_client=http_client ) except Exception as e: logger.error(f"Failed to initialize AzureOpenAI client: {str(e)}") raise self.model = model self.preprompt = preprompt self.memory = ConversationMemory() self.editor = TextEditor(self.memory) logger.info(f"Initialized OpenAIApi with endpoint: {endpoint}, model: {model}, api_version: 2024-02-15-preview") self.functions = [ { "type": "function", "function": { "name": "cut_text", "description": "Cut text from a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "start": {"type": "integer", "description": "Start index"}, "end": {"type": "integer", "description": "End index"} }, "required": ["chunk_id", "start", "end"] } } }, { "type": "function", "function": { "name": "copy_text", "description": "Copy text from a conversation chunk to clipboard.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "start": {"type": "integer", "description": "Start index"}, "end": {"type": "integer", "description": "End index"} }, "required": ["chunk_id", "start", "end"] } } }, { "type": "function", "function": { "name": "paste_text", "description": "Paste clipboard content into a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "position": {"type": "integer", "description": "Position to paste"} }, "required": ["chunk_id", "position"] } } }, { "type": "function", "function": { "name": "add_prefix", "description": "Add a prefix to a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "prefix": {"type": "string", "description": "Prefix to add"} }, "required": ["chunk_id", "prefix"] } } }, { "type": "function", "function": { "name": "add_suffix", "description": "Add a suffix to a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "suffix": {"type": "string", "description": "Suffix to add"} }, "required": ["chunk_id", "suffix"] } } } ] async def fetch_response(self, raw_prompt, continue_response=False): sanitized_prompt = html.escape(raw_prompt.strip()) chunk_id = self.memory.add_chunk(sanitized_prompt, "user") messages = [] if self.preprompt: messages.append({"role": "system", "content": self.preprompt}) context = self.memory.get_recent_chunks(limit=5) messages.extend({"role": c["role"], "content": c["text"]} for c in context) messages.append({"role": "user", "content": sanitized_prompt}) logger.info(f"Sending request to model: {self.model}, endpoint: {self.client._base_url}, messages: {json.dumps(messages, ensure_ascii=False)}") try: # Synchronous call to create stream response = self.client.chat.completions.create( model=self.model, messages=messages, temperature=0.5, max_tokens=4000, top_p=1.0, frequency_penalty=0, presence_penalty=0, tools=self.functions, stream=True ) def process_stream(sync_stream): full_response = "" tool_calls = [] for chunk in sync_stream: logger.debug(f"Received chunk: {chunk}") if chunk.choices and chunk.choices[0].delta.content: full_response += chunk.choices[0].delta.content if chunk.choices and chunk.choices[0].delta.tool_calls: tool_calls.extend(chunk.choices[0].delta.tool_calls) return full_response, tool_calls # Run synchronous stream processing in a separate thread logger.debug("Processing stream in separate thread") full_response, tool_calls = await asyncio.to_thread(process_stream, response) logger.debug("Stream processing completed") response_chunk_id = self.memory.add_chunk(full_response, "assistant") logger.info(f"Received response for chunk: {response_chunk_id}, length: {len(full_response)}") for tool_call in tool_calls: if tool_call and hasattr(tool_call, 'function'): func_name = tool_call.function.name args = json.loads(tool_call.function.arguments) logger.info(f"Processing tool call: {func_name}, args: {args}") if func_name == "cut_text": result = self.editor.cut(args["chunk_id"], args["start"], args["end"]) self.memory.add_chunk(f"Cut result: {result}", "system") elif func_name == "copy_text": result = self.editor.copy(args["chunk_id"], args["start"], args["end"]) self.memory.add_chunk(f"Copy result: {result}", "system") elif func_name == "paste_text": result = self.editor.paste(args["chunk_id"], args["position"]) self.memory.add_chunk(f"Paste result: {result}", "system") elif func_name == "add_prefix": result = self.editor.add_prefix(args["chunk_id"], args["prefix"]) self.memory.add_chunk(f"Prefix result: {result}", "system") elif func_name == "add_suffix": result = self.editor.add_suffix(args["chunk_id"], args["suffix"]) self.memory.add_chunk(f"Suffix result: {result}", "system") continue_flag = len(self.memory.tokenizer.encode(full_response)) >= 4000 return {"content": full_response, "continue": continue_flag, "chunk_id": response_chunk_id} except Exception as e: error_msg = f"API Error: {str(e)}" logger.error(f"API request failed: {error_msg}, endpoint: {self.client._base_url}, model: {self.model}") self.memory.add_chunk(error_msg, "system") return {"error": error_msg} # Gradio UI async def chat_submit(user_input, chat_history, preprompt): try: api = OpenAIApi(preprompt=preprompt, api_key=os.getenv("AZURE_OPENAI_API_KEY")) response = await api.fetch_response(user_input) if "error" in response: chat_history.append({"role": "assistant", "content": f"Error: {response['error']}"}) logger.warning(f"Chat error: {response['error']}") else: chat_history.append({"role": "user", "content": user_input}) chat_history.append({"role": "assistant", "content": response["content"]}) logger.info("Chat response added to history") return chat_history, preprompt except ValueError as e: error_msg = f"Configuration Error: {str(e)}" logger.error(error_msg) chat_history.append({"role": "assistant", "content": error_msg}) return chat_history, preprompt def get_history(): memory = ConversationMemory() chunks = memory.get_recent_chunks(limit=10) # Convert to list of lists for Gradio Dataframe data = [[chunk["chunk_id"], chunk["text"], chunk["role"], chunk["timestamp"], chunk["intent"], chunk["token_count"]] for chunk in chunks] logger.info(f"Returning {len(data)} chunks for history: {json.dumps(data, ensure_ascii=False)}") return data async def async_get_history(): await asyncio.sleep(0.2) # 200ms delay for debounce return get_history() def get_logs(): try: with open("aiapp.log", "r") as f: logs = f.read() logger.info("Retrieved logs from aiapp.log") return logs except Exception as e: logger.error(f"Failed to read logs: {str(e)}") return f"Error reading logs: {str(e)}" def select_chunk(evt: gr.SelectData): logger.info(f"Selected chunk raw data: {evt.value}") # Handle single chunk_id or list of row data chunk_id = evt.value if isinstance(evt.value, str) else (evt.value[0] if isinstance(evt.value, list) and len(evt.value) > 0 else "") if not chunk_id: logger.warning(f"Invalid selection data: No chunk_id found in {evt.value}") return "", "Error: No chunk_id selected" try: uuid.UUID(chunk_id, version=4) # Validate chunk_id memory = ConversationMemory() chunk = memory.get_chunk(chunk_id) if chunk: logger.info(f"Selected chunk: {chunk_id}") return chunk_id, chunk["text"] logger.warning(f"Chunk not found for chunk_id: {chunk_id}") return "", "Error: Chunk not found" except ValueError: logger.warning(f"Invalid chunk_id selected: {chunk_id}") return "", "Error: Invalid chunk_id selected" async def edit_cut(chunk_id, start, end): logger.info(f"edit_cut called with chunk_id: {chunk_id}, start: {start}, end: {end}") try: # Validate chunk_id as a UUID uuid.UUID(chunk_id, version=4) except ValueError: logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") return "Error: Invalid chunk_id", "Invalid chunk_id selected" api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.cut(chunk_id, int(start), int(end)) diff = api.editor.diff(chunk_id, result) if "Error" not in result else "" return result, diff async def edit_copy(chunk_id, start, end): logger.info(f"edit_copy called with chunk_id: {chunk_id}, start: {start}, end: {end}") try: uuid.UUID(chunk_id, version=4) except ValueError: logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") return "Error: Invalid chunk_id", "" api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.copy(chunk_id, int(start), int(end)) return result, "" async def edit_paste(chunk_id, position): logger.info(f"edit_paste called with chunk_id: {chunk_id}, position: {position}") try: uuid.UUID(chunk_id, version=4) except ValueError: logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") return "Error: Invalid chunk_id", "" api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.paste(chunk_id, int(position)) return result, api.editor.diff(chunk_id, result) async def edit_prefix(chunk_id, prefix): logger.info(f"edit_prefix called with chunk_id: {chunk_id}, prefix: {prefix}") try: uuid.UUID(chunk_id, version=4) except ValueError: logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") return "Error: Invalid chunk_id", "" api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.add_prefix(chunk_id, prefix) return result, api.editor.diff(chunk_id, result) async def edit_suffix(chunk_id, suffix): logger.info(f"edit_suffix called with chunk_id: {chunk_id}, suffix: {suffix}") try: uuid.UUID(chunk_id, version=4) except ValueError: logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") return "Error: Invalid chunk_id", "" api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.add_suffix(chunk_id, suffix) return result, api.editor.diff(chunk_id, result) async def generate_and_edit(source_text, target_start, target_end, response_prompt): # Step 1: Generate source paragraph/code memory = ConversationMemory() chunk_id = memory.add_chunk(source_text, "user") logger.info(f"Generated source chunk: {chunk_id}") # Step 2: Cut out the target text api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) cut_result = api.editor.cut(chunk_id, target_start, target_end) logger.info(f"Cut target text from chunk: {chunk_id}, start: {target_start}, end: {target_end}") # Step 3: Generate response response = await api.fetch_response(response_prompt) if "error" in response: return "Error: Failed to generate response", "" response_text = response["content"] # Extract only the response part after "Response for [TARGET]:" if present response_match = re.search(r"Response for \[TARGET\]:\s*(.+)", response_text, re.DOTALL) if response_match: api.editor.clipboard = response_match.group(1).strip() else: api.editor.clipboard = response_text.strip() # Fallback to full response if no match logger.info(f"Generated and set response to clipboard: {api.editor.clipboard}") # Step 4: Paste response into the target hole paste_result = api.editor.paste(chunk_id, target_start) logger.info(f"Pasted response into chunk: {chunk_id}, position: {target_start}") # Return updated text and diff diff = api.editor.diff(chunk_id, paste_result) if "Error" not in paste_result else "" return paste_result, diff def create_ui(): with gr.Blocks(title="Azure OpenAI Chat & Text Editor") as demo: gr.Markdown("# Azure OpenAI Chat with Text Editing") gr.Markdown("**Note**: Using Azure OpenAI endpoint: https://T-App-GPT4o.openai.azure.com/") with gr.Tab("Chat"): chatbot = gr.Chatbot(label="Conversation", type="messages") user_input = gr.Textbox(label="Your Message", placeholder="Type your message or editing command...") preprompt = gr.Textbox(label="System Prompt", value="You are a helpful assistant with text editing capabilities.") submit_btn = gr.Button("Send") submit_btn.click( fn=chat_submit, inputs=[user_input, chatbot, preprompt], outputs=[chatbot, preprompt] ) with gr.Tab("Conversation History"): history = gr.Dataframe( label="Recent Chunks", headers=["chunk_id", "text", "role", "timestamp", "intent", "token_count"], datatype=["str", "str", "str", "str", "str", "number"], interactive=False, key="history_df" ) history_btn = gr.Button("Refresh History") history_btn.click(fn=async_get_history, outputs=history, api_name="refresh_history") with gr.Tab("Text Editor"): chunk_id = gr.Textbox(label="Selected Chunk ID", interactive=False) chunk_text = gr.Textbox(label="Chunk Text", interactive=False) history.select(fn=select_chunk, outputs=[chunk_id, chunk_text]) with gr.Row(): start = gr.Number(label="Start Index", precision=0) end = gr.Number(label="End Index", precision=0) position = gr.Number(label="Paste Position", precision=0) with gr.Row(): prefix = gr.Textbox(label="Prefix") suffix = gr.Textbox(label="Suffix") with gr.Row(): cut_btn = gr.Button("Cut") copy_btn = gr.Button("Copy") paste_btn = gr.Button("Paste") prefix_btn = gr.Button("Add Prefix") suffix_btn = gr.Button("Add Suffix") diff_output = gr.Textbox(label="Diff Output", interactive=False) cut_btn.click(fn=edit_cut, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output]) copy_btn.click(fn=edit_copy, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output]) paste_btn.click(fn=edit_paste, inputs=[chunk_id, position], outputs=[chunk_text, diff_output]) prefix_btn.click(fn=edit_prefix, inputs=[chunk_id, prefix], outputs=[chunk_text, diff_output]) suffix_btn.click(fn=edit_suffix, inputs=[chunk_id, suffix], outputs=[chunk_text, diff_output]) with gr.Tab("Advanced Text Manipulation"): source_text = gr.Textbox(label="Source Text", value="This is a sample paragraph. [TARGET] This is the rest of the text.") target_start = gr.Number(label="Target Start Index", value=21, precision=0) target_end = gr.Number(label="Target End Index", value=28, precision=0) response_prompt = gr.Textbox(label="Response Prompt", value="Generate a response for the target section.") generate_btn = gr.Button("Generate and Edit") result_text = gr.Textbox(label="Result Text", interactive=False) result_diff = gr.Textbox(label="Result Diff", interactive=False) generate_btn.click( fn=generate_and_edit, inputs=[source_text, target_start, target_end, response_prompt], outputs=[result_text, result_diff] ) with gr.Tab("Logs"): logs = gr.Textbox(label="Application Logs", interactive=False) logs_btn = gr.Button("Refresh Logs") logs_btn.click(fn=get_logs, outputs=logs) gr.Markdown(f"Current Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}") logger.info("Created Gradio UI") return demo if __name__ == "__main__": logger.info("Starting application") demo = create_ui() demo.launch(server_name="0.0.0.0", server_port=7860)