copypasta / app.py
Deadmon's picture
Update app.py
c9fc375 verified
import gradio as gr
import asyncio
import json
import html
import os
import uuid
import sqlite3
import datetime
import difflib
import logging
import pandas as pd
from tiktoken import get_encoding
from openai import AzureOpenAI
import httpx
import re
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('aiapp.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Clear proxy environment variables to avoid interference
os.environ.pop("HTTP_PROXY", None)
os.environ.pop("HTTPS_PROXY", None)
# ConversationMemory class
class ConversationMemory:
def __init__(self, db_path="conversation.db"):
self.conn = sqlite3.connect(db_path)
self.create_table()
self.tokenizer = get_encoding("cl100k_base")
logger.info(f"Initialized ConversationMemory with db_path: {db_path}")
def create_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS conversation_chunks (
chunk_id TEXT PRIMARY KEY,
text TEXT,
role TEXT,
timestamp DATETIME,
intent TEXT,
token_count INTEGER,
embedding BLOB
)
""")
self.conn.commit()
logger.info("Created conversation table")
def add_chunk(self, text, role, intent="general"):
chunk_id = str(uuid.uuid4())
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
timestamp = datetime.datetime.now().isoformat()
self.conn.execute("""
INSERT INTO conversation_chunks (chunk_id, text, role, timestamp, intent, token_count)
VALUES (?, ?, ?, ?, ?, ?)
""", (chunk_id, text, role, timestamp, intent, token_count))
self.conn.commit()
logger.info(f"Added chunk: {chunk_id}, role: {role}, intent: {intent}, token_count: {token_count}")
return chunk_id
def get_chunk(self, chunk_id):
cursor = self.conn.execute("SELECT * FROM conversation_chunks WHERE chunk_id = ?", (chunk_id,))
row = cursor.fetchone()
if row:
chunk = {
"chunk_id": row[0], "text": row[1], "role": row[2],
"timestamp": row[3], "intent": row[4], "token_count": row[5]
}
logger.info(f"Retrieved chunk: {chunk_id}")
return chunk
logger.warning(f"Chunk not found: {chunk_id}")
return None
def update_chunk(self, chunk_id, text):
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
self.conn.execute("""
UPDATE conversation_chunks SET text = ?, token_count = ?
WHERE chunk_id = ?
""", (text, token_count, chunk_id))
self.conn.commit()
logger.info(f"Updated chunk: {chunk_id}, new token_count: {token_count}")
def get_recent_chunks(self, limit=10):
cursor = self.conn.execute("SELECT * FROM conversation_chunks ORDER BY timestamp DESC LIMIT ?", (limit,))
chunks = [{"chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5]} for row in cursor]
logger.info(f"Retrieved {len(chunks)} recent chunks")
return chunks
# TextEditor class
class TextEditor:
def __init__(self, memory):
self.memory = memory
self.clipboard = ""
logger.info("Initialized TextEditor")
def cut(self, chunk_id, start, end):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
self.clipboard = chunk['text'][start:end]
chunk['text'] = chunk['text'][:start] + chunk['text'][end:]
self.memory.update_chunk(chunk_id, chunk['text'])
logger.info(f"Cut text from chunk: {chunk_id}, start: {start}, end: {end}, clipboard: {self.clipboard}")
return chunk['text']
logger.warning(f"Failed to cut text, chunk not found: {chunk_id}")
return "Error: Chunk not found"
def copy(self, chunk_id, start, end):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
self.clipboard = chunk['text'][start:end]
logger.info(f"Copied text from chunk: {chunk_id}, start: {start}, end: {end}, clipboard: {self.clipboard}")
return self.clipboard
logger.warning(f"Failed to copy text, chunk not found: {chunk_id}")
return "Error: Chunk not found"
def paste(self, chunk_id, position):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = chunk['text'][:position] + self.clipboard + chunk['text'][position:]
self.memory.update_chunk(chunk_id, chunk['text'])
logger.info(f"Pasted text to chunk: {chunk_id}, position: {position}, clipboard: {self.clipboard}")
return chunk['text']
logger.warning(f"Failed to paste text, chunk not found: {chunk_id}")
return "Error: Chunk not found"
def add_prefix(self, chunk_id, prefix):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = prefix + chunk['text']
self.memory.update_chunk(chunk_id, chunk['text'])
logger.info(f"Added prefix to chunk: {chunk_id}, prefix: {prefix}")
return chunk['text']
logger.warning(f"Failed to add prefix, chunk not found: {chunk_id}")
return "Error: Chunk not found"
def add_suffix(self, chunk_id, suffix):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = chunk['text'] + suffix
self.memory.update_chunk(chunk_id, chunk['text'])
logger.info(f"Added suffix to chunk: {chunk_id}, suffix: {suffix}")
return chunk['text']
logger.warning(f"Failed to add suffix, chunk not found: {chunk_id}")
return "Error: Chunk not found"
def diff(self, chunk_id, original_text):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
differ = difflib.Differ()
diff = list(differ.compare(original_text.splitlines(), chunk['text'].splitlines()))
logger.info(f"Generated diff for chunk: {chunk_id}")
return '\n'.join(diff)
logger.warning(f"Failed to generate diff, chunk not found: {chunk_id}")
return ""
# OpenAIApi class
class OpenAIApi:
def __init__(self, preprompt="", endpoint="https://T-App-GPT4o.openai.azure.com/", model="gpt-4o", api_key=None):
# Validate endpoint format
if not re.match(r"^https://[a-zA-Z0-9-]+\.openai\.azure\.com/?$", endpoint):
logger.warning(f"Endpoint format may be incorrect: {endpoint}. Expected format: https://<resource-name>.openai.azure.com/")
# Use a minimal httpx.Client to avoid proxies parameter
http_client = httpx.Client()
try:
self.client = AzureOpenAI(
azure_endpoint=endpoint.rstrip('/'), # Ensure no trailing slash
api_key=api_key or os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2024-02-15-preview",
http_client=http_client
)
except Exception as e:
logger.error(f"Failed to initialize AzureOpenAI client: {str(e)}")
raise
self.model = model
self.preprompt = preprompt
self.memory = ConversationMemory()
self.editor = TextEditor(self.memory)
logger.info(f"Initialized OpenAIApi with endpoint: {endpoint}, model: {model}, api_version: 2024-02-15-preview")
self.functions = [
{
"type": "function",
"function": {
"name": "cut_text",
"description": "Cut text from a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"start": {"type": "integer", "description": "Start index"},
"end": {"type": "integer", "description": "End index"}
},
"required": ["chunk_id", "start", "end"]
}
}
},
{
"type": "function",
"function": {
"name": "copy_text",
"description": "Copy text from a conversation chunk to clipboard.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"start": {"type": "integer", "description": "Start index"},
"end": {"type": "integer", "description": "End index"}
},
"required": ["chunk_id", "start", "end"]
}
}
},
{
"type": "function",
"function": {
"name": "paste_text",
"description": "Paste clipboard content into a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"position": {"type": "integer", "description": "Position to paste"}
},
"required": ["chunk_id", "position"]
}
}
},
{
"type": "function",
"function": {
"name": "add_prefix",
"description": "Add a prefix to a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"prefix": {"type": "string", "description": "Prefix to add"}
},
"required": ["chunk_id", "prefix"]
}
}
},
{
"type": "function",
"function": {
"name": "add_suffix",
"description": "Add a suffix to a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"suffix": {"type": "string", "description": "Suffix to add"}
},
"required": ["chunk_id", "suffix"]
}
}
}
]
async def fetch_response(self, raw_prompt, continue_response=False):
sanitized_prompt = html.escape(raw_prompt.strip())
chunk_id = self.memory.add_chunk(sanitized_prompt, "user")
messages = []
if self.preprompt:
messages.append({"role": "system", "content": self.preprompt})
context = self.memory.get_recent_chunks(limit=5)
messages.extend({"role": c["role"], "content": c["text"]} for c in context)
messages.append({"role": "user", "content": sanitized_prompt})
logger.info(f"Sending request to model: {self.model}, endpoint: {self.client._base_url}, messages: {json.dumps(messages, ensure_ascii=False)}")
try:
# Synchronous call to create stream
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.5,
max_tokens=4000,
top_p=1.0,
frequency_penalty=0,
presence_penalty=0,
tools=self.functions,
stream=True
)
def process_stream(sync_stream):
full_response = ""
tool_calls = []
for chunk in sync_stream:
logger.debug(f"Received chunk: {chunk}")
if chunk.choices and chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content
if chunk.choices and chunk.choices[0].delta.tool_calls:
tool_calls.extend(chunk.choices[0].delta.tool_calls)
return full_response, tool_calls
# Run synchronous stream processing in a separate thread
logger.debug("Processing stream in separate thread")
full_response, tool_calls = await asyncio.to_thread(process_stream, response)
logger.debug("Stream processing completed")
response_chunk_id = self.memory.add_chunk(full_response, "assistant")
logger.info(f"Received response for chunk: {response_chunk_id}, length: {len(full_response)}")
for tool_call in tool_calls:
if tool_call and hasattr(tool_call, 'function'):
func_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
logger.info(f"Processing tool call: {func_name}, args: {args}")
if func_name == "cut_text":
result = self.editor.cut(args["chunk_id"], args["start"], args["end"])
self.memory.add_chunk(f"Cut result: {result}", "system")
elif func_name == "copy_text":
result = self.editor.copy(args["chunk_id"], args["start"], args["end"])
self.memory.add_chunk(f"Copy result: {result}", "system")
elif func_name == "paste_text":
result = self.editor.paste(args["chunk_id"], args["position"])
self.memory.add_chunk(f"Paste result: {result}", "system")
elif func_name == "add_prefix":
result = self.editor.add_prefix(args["chunk_id"], args["prefix"])
self.memory.add_chunk(f"Prefix result: {result}", "system")
elif func_name == "add_suffix":
result = self.editor.add_suffix(args["chunk_id"], args["suffix"])
self.memory.add_chunk(f"Suffix result: {result}", "system")
continue_flag = len(self.memory.tokenizer.encode(full_response)) >= 4000
return {"content": full_response, "continue": continue_flag, "chunk_id": response_chunk_id}
except Exception as e:
error_msg = f"API Error: {str(e)}"
logger.error(f"API request failed: {error_msg}, endpoint: {self.client._base_url}, model: {self.model}")
self.memory.add_chunk(error_msg, "system")
return {"error": error_msg}
# Gradio UI
async def chat_submit(user_input, chat_history, preprompt):
try:
api = OpenAIApi(preprompt=preprompt, api_key=os.getenv("AZURE_OPENAI_API_KEY"))
response = await api.fetch_response(user_input)
if "error" in response:
chat_history.append({"role": "assistant", "content": f"Error: {response['error']}"})
logger.warning(f"Chat error: {response['error']}")
else:
chat_history.append({"role": "user", "content": user_input})
chat_history.append({"role": "assistant", "content": response["content"]})
logger.info("Chat response added to history")
return chat_history, preprompt
except ValueError as e:
error_msg = f"Configuration Error: {str(e)}"
logger.error(error_msg)
chat_history.append({"role": "assistant", "content": error_msg})
return chat_history, preprompt
def get_history():
memory = ConversationMemory()
chunks = memory.get_recent_chunks(limit=10)
# Convert to list of lists for Gradio Dataframe
data = [[chunk["chunk_id"], chunk["text"], chunk["role"], chunk["timestamp"], chunk["intent"], chunk["token_count"]] for chunk in chunks]
logger.info(f"Returning {len(data)} chunks for history: {json.dumps(data, ensure_ascii=False)}")
return data
async def async_get_history():
await asyncio.sleep(0.2) # 200ms delay for debounce
return get_history()
def get_logs():
try:
with open("aiapp.log", "r") as f:
logs = f.read()
logger.info("Retrieved logs from aiapp.log")
return logs
except Exception as e:
logger.error(f"Failed to read logs: {str(e)}")
return f"Error reading logs: {str(e)}"
def select_chunk(evt: gr.SelectData):
logger.info(f"Selected chunk raw data: {evt.value}")
# Handle single chunk_id or list of row data
chunk_id = evt.value if isinstance(evt.value, str) else (evt.value[0] if isinstance(evt.value, list) and len(evt.value) > 0 else "")
if not chunk_id:
logger.warning(f"Invalid selection data: No chunk_id found in {evt.value}")
return "", "Error: No chunk_id selected"
try:
uuid.UUID(chunk_id, version=4) # Validate chunk_id
memory = ConversationMemory()
chunk = memory.get_chunk(chunk_id)
if chunk:
logger.info(f"Selected chunk: {chunk_id}")
return chunk_id, chunk["text"]
logger.warning(f"Chunk not found for chunk_id: {chunk_id}")
return "", "Error: Chunk not found"
except ValueError:
logger.warning(f"Invalid chunk_id selected: {chunk_id}")
return "", "Error: Invalid chunk_id selected"
async def edit_cut(chunk_id, start, end):
logger.info(f"edit_cut called with chunk_id: {chunk_id}, start: {start}, end: {end}")
try:
# Validate chunk_id as a UUID
uuid.UUID(chunk_id, version=4)
except ValueError:
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID")
return "Error: Invalid chunk_id", "Invalid chunk_id selected"
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.cut(chunk_id, int(start), int(end))
diff = api.editor.diff(chunk_id, result) if "Error" not in result else ""
return result, diff
async def edit_copy(chunk_id, start, end):
logger.info(f"edit_copy called with chunk_id: {chunk_id}, start: {start}, end: {end}")
try:
uuid.UUID(chunk_id, version=4)
except ValueError:
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID")
return "Error: Invalid chunk_id", ""
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.copy(chunk_id, int(start), int(end))
return result, ""
async def edit_paste(chunk_id, position):
logger.info(f"edit_paste called with chunk_id: {chunk_id}, position: {position}")
try:
uuid.UUID(chunk_id, version=4)
except ValueError:
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID")
return "Error: Invalid chunk_id", ""
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.paste(chunk_id, int(position))
return result, api.editor.diff(chunk_id, result)
async def edit_prefix(chunk_id, prefix):
logger.info(f"edit_prefix called with chunk_id: {chunk_id}, prefix: {prefix}")
try:
uuid.UUID(chunk_id, version=4)
except ValueError:
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID")
return "Error: Invalid chunk_id", ""
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.add_prefix(chunk_id, prefix)
return result, api.editor.diff(chunk_id, result)
async def edit_suffix(chunk_id, suffix):
logger.info(f"edit_suffix called with chunk_id: {chunk_id}, suffix: {suffix}")
try:
uuid.UUID(chunk_id, version=4)
except ValueError:
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID")
return "Error: Invalid chunk_id", ""
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.add_suffix(chunk_id, suffix)
return result, api.editor.diff(chunk_id, result)
async def generate_and_edit(source_text, target_start, target_end, response_prompt):
# Step 1: Generate source paragraph/code
memory = ConversationMemory()
chunk_id = memory.add_chunk(source_text, "user")
logger.info(f"Generated source chunk: {chunk_id}")
# Step 2: Cut out the target text
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
cut_result = api.editor.cut(chunk_id, target_start, target_end)
logger.info(f"Cut target text from chunk: {chunk_id}, start: {target_start}, end: {target_end}")
# Step 3: Generate response
response = await api.fetch_response(response_prompt)
if "error" in response:
return "Error: Failed to generate response", ""
response_text = response["content"]
# Extract only the response part after "Response for [TARGET]:" if present
response_match = re.search(r"Response for \[TARGET\]:\s*(.+)", response_text, re.DOTALL)
if response_match:
api.editor.clipboard = response_match.group(1).strip()
else:
api.editor.clipboard = response_text.strip() # Fallback to full response if no match
logger.info(f"Generated and set response to clipboard: {api.editor.clipboard}")
# Step 4: Paste response into the target hole
paste_result = api.editor.paste(chunk_id, target_start)
logger.info(f"Pasted response into chunk: {chunk_id}, position: {target_start}")
# Return updated text and diff
diff = api.editor.diff(chunk_id, paste_result) if "Error" not in paste_result else ""
return paste_result, diff
def create_ui():
with gr.Blocks(title="Azure OpenAI Chat & Text Editor") as demo:
gr.Markdown("# Azure OpenAI Chat with Text Editing")
gr.Markdown("**Note**: Using Azure OpenAI endpoint: https://T-App-GPT4o.openai.azure.com/")
with gr.Tab("Chat"):
chatbot = gr.Chatbot(label="Conversation", type="messages")
user_input = gr.Textbox(label="Your Message", placeholder="Type your message or editing command...")
preprompt = gr.Textbox(label="System Prompt", value="You are a helpful assistant with text editing capabilities.")
submit_btn = gr.Button("Send")
submit_btn.click(
fn=chat_submit,
inputs=[user_input, chatbot, preprompt],
outputs=[chatbot, preprompt]
)
with gr.Tab("Conversation History"):
history = gr.Dataframe(
label="Recent Chunks",
headers=["chunk_id", "text", "role", "timestamp", "intent", "token_count"],
datatype=["str", "str", "str", "str", "str", "number"],
interactive=False,
key="history_df"
)
history_btn = gr.Button("Refresh History")
history_btn.click(fn=async_get_history, outputs=history, api_name="refresh_history")
with gr.Tab("Text Editor"):
chunk_id = gr.Textbox(label="Selected Chunk ID", interactive=False)
chunk_text = gr.Textbox(label="Chunk Text", interactive=False)
history.select(fn=select_chunk, outputs=[chunk_id, chunk_text])
with gr.Row():
start = gr.Number(label="Start Index", precision=0)
end = gr.Number(label="End Index", precision=0)
position = gr.Number(label="Paste Position", precision=0)
with gr.Row():
prefix = gr.Textbox(label="Prefix")
suffix = gr.Textbox(label="Suffix")
with gr.Row():
cut_btn = gr.Button("Cut")
copy_btn = gr.Button("Copy")
paste_btn = gr.Button("Paste")
prefix_btn = gr.Button("Add Prefix")
suffix_btn = gr.Button("Add Suffix")
diff_output = gr.Textbox(label="Diff Output", interactive=False)
cut_btn.click(fn=edit_cut, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output])
copy_btn.click(fn=edit_copy, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output])
paste_btn.click(fn=edit_paste, inputs=[chunk_id, position], outputs=[chunk_text, diff_output])
prefix_btn.click(fn=edit_prefix, inputs=[chunk_id, prefix], outputs=[chunk_text, diff_output])
suffix_btn.click(fn=edit_suffix, inputs=[chunk_id, suffix], outputs=[chunk_text, diff_output])
with gr.Tab("Advanced Text Manipulation"):
source_text = gr.Textbox(label="Source Text", value="This is a sample paragraph. [TARGET] This is the rest of the text.")
target_start = gr.Number(label="Target Start Index", value=21, precision=0)
target_end = gr.Number(label="Target End Index", value=28, precision=0)
response_prompt = gr.Textbox(label="Response Prompt", value="Generate a response for the target section.")
generate_btn = gr.Button("Generate and Edit")
result_text = gr.Textbox(label="Result Text", interactive=False)
result_diff = gr.Textbox(label="Result Diff", interactive=False)
generate_btn.click(
fn=generate_and_edit,
inputs=[source_text, target_start, target_end, response_prompt],
outputs=[result_text, result_diff]
)
with gr.Tab("Logs"):
logs = gr.Textbox(label="Application Logs", interactive=False)
logs_btn = gr.Button("Refresh Logs")
logs_btn.click(fn=get_logs, outputs=logs)
gr.Markdown(f"Current Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}")
logger.info("Created Gradio UI")
return demo
if __name__ == "__main__":
logger.info("Starting application")
demo = create_ui()
demo.launch(server_name="0.0.0.0", server_port=7860)