copypasta / app.py
Deadmon's picture
Update app.py
9eb270c verified
raw
history blame
15.8 kB
import gradio as gr
import asyncio
import json
import html
import os
import uuid
import sqlite3
import datetime
import difflib
from tiktoken import get_encoding
from openai import AzureOpenAI
# Clear proxy environment variables to avoid interference
os.environ.pop("HTTP_PROXY", None)
os.environ.pop("HTTPS_PROXY", None)
# ConversationMemory class
class ConversationMemory:
def __init__(self, db_path="conversation.db"):
self.conn = sqlite3.connect(db_path)
self.create_table()
self.tokenizer = get_encoding("cl100k_base")
def create_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS conversation_chunks (
chunk_id TEXT PRIMARY KEY,
text TEXT,
role TEXT,
timestamp DATETIME,
intent TEXT,
token_count INTEGER,
embedding BLOB
)
""")
self.conn.commit()
def add_chunk(self, text, role, intent="general"):
chunk_id = str(uuid.uuid4())
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
timestamp = datetime.datetime.now().isoformat()
self.conn.execute("""
INSERT INTO conversation_chunks (chunk_id, text, role, timestamp, intent, token_count)
VALUES (?, ?, ?, ?, ?, ?)
""", (chunk_id, text, role, timestamp, intent, token_count))
self.conn.commit()
return chunk_id
def get_chunk(self, chunk_id):
cursor = self.conn.execute("SELECT * FROM conversation_chunks WHERE chunk_id = ?", (chunk_id,))
row = cursor.fetchone()
if row:
return {
"chunk_id": row[0], "text": row[1], "role": row[2],
"timestamp": row[3], "intent": row[4], "token_count": row[5]
}
return None
def update_chunk(self, chunk_id, text):
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
self.conn.execute("""
UPDATE conversation_chunks SET text = ?, token_count = ?
WHERE chunk_id = ?
""", (text, token_count, chunk_id))
self.conn.commit()
def get_recent_chunks(self, limit=10):
cursor = self.conn.execute("SELECT * FROM conversation_chunks ORDER BY timestamp DESC LIMIT ?", (limit,))
return [{"chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5]} for row in cursor]
# TextEditor class
class TextEditor:
def __init__(self, memory):
self.memory = memory
self.clipboard = ""
def cut(self, chunk_id, start, end):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
self.clipboard = chunk['text'][start:end]
chunk['text'] = chunk['text'][:start] + chunk['text'][end:]
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def copy(self, chunk_id, start, end):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
self.clipboard = chunk['text'][start:end]
return self.clipboard
def paste(self, chunk_id, position):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = chunk['text'][:position] + self.clipboard + chunk['text'][position:]
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def add_prefix(self, chunk_id, prefix):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = prefix + chunk['text']
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def add_suffix(self, chunk_id, suffix):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = chunk['text'] + suffix
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def diff(self, chunk_id, original_text):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
differ = difflib.Differ()
diff = list(differ.compare(original_text.splitlines(), chunk['text'].splitlines()))
return '\n'.join(diff)
return ""
# OpenAIApi class
class OpenAIApi:
def __init__(self, preprompt="", endpoint="https://T-App-GPT4o.openai.azure.com/openai/v1/", model="gpt-4o", api_key=None):
self.client = AzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key or os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2025-01-01-preview"
)
self.model = model
self.preprompt = preprompt
self.memory = ConversationMemory()
self.editor = TextEditor(self.memory)
self.functions = [
{
"type": "function",
"name": "cut_text",
"description": "Cut text from a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"start": {"type": "integer", "description": "Start index"},
"end": {"type": "integer", "description": "End index"}
},
"required": ["chunk_id", "start", "end"]
}
},
{
"type": "function",
"name": "copy_text",
"description": "Copy text from a conversation chunk to clipboard.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"start": {"type": "integer", "description": "Start index"},
"end": {"type": "integer", "description": "End index"}
},
"required": ["chunk_id", "start", "end"]
}
},
{
"type": "function",
"name": "paste_text",
"description": "Paste clipboard content into a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"position": {"type": "integer", "description": "Position to paste"}
},
"required": ["chunk_id", "position"]
}
},
{
"type": "function",
"name": "add_prefix",
"description": "Add a prefix to a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"prefix": {"type": "string", "description": "Prefix to add"}
},
"required": ["chunk_id", "prefix"]
}
},
{
"type": "function",
"name": "add_suffix",
"description": "Add a suffix to a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"suffix": {"type": "string", "description": "Suffix to add"}
},
"required": ["chunk_id", "suffix"]
}
}
]
async def fetch_response(self, raw_prompt, continue_response=False):
sanitized_prompt = html.escape(raw_prompt.strip())
chunk_id = self.memory.add_chunk(sanitized_prompt, "user")
messages = []
if self.preprompt:
messages.append({"role": "system", "content": self.preprompt})
context = self.memory.get_recent_chunks(limit=5)
messages.extend({"role": c["role"], "content": c["text"]} for c in context)
messages.append({"role": "user", "content": sanitized_prompt})
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.5,
max_tokens=4000,
top_p=1.0,
frequency_penalty=0,
presence_penalty=0,
tools=self.functions,
stream=True
)
full_response = ""
tool_calls = []
async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content
if chunk.choices and chunk.choices[0].delta.tool_calls:
tool_calls.extend(chunk.choices[0].delta.tool_calls)
response_chunk_id = self.memory.add_chunk(full_response, "assistant")
for tool_call in tool_calls:
if tool_call and hasattr(tool_call, 'function'):
func_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
if func_name == "cut_text":
result = self.editor.cut(args["chunk_id"], args["start"], args["end"])
self.memory.add_chunk(f"Cut result: {result}", "system")
elif func_name == "copy_text":
result = self.editor.copy(args["chunk_id"], args["start"], args["end"])
self.memory.add_chunk(f"Copy result: {result}", "system")
elif func_name == "paste_text":
result = self.editor.paste(args["chunk_id"], args["position"])
self.memory.add_chunk(f"Paste result: {result}", "system")
elif func_name == "add_prefix":
result = self.editor.add_prefix(args["chunk_id"], args["prefix"])
self.memory.add_chunk(f"Prefix result: {result}", "system")
elif func_name == "add_suffix":
result = self.editor.add_suffix(args["chunk_id"], args["suffix"])
self.memory.add_chunk(f"Suffix result: {result}", "system")
continue_flag = len(self.memory.tokenizer.encode(full_response)) >= 4000
return {"content": full_response, "continue": continue_flag, "chunk_id": response_chunk_id}
except Exception as e:
error_msg = f"API Error: {str(e)}"
self.memory.add_chunk(error_msg, "system")
return {"error": error_msg}
# Gradio UI
async def chat_submit(user_input, chat_history, preprompt):
api = OpenAIApi(preprompt=preprompt, api_key=os.getenv("AZURE_OPENAI_API_KEY"))
response = await api.fetch_response(user_input)
if "error" in response:
chat_history.append({"role": "assistant", "content": f"Error: {response['error']}"})
else:
chat_history.append({"role": "user", "content": user_input})
chat_history.append({"role": "assistant", "content": response["content"]})
return chat_history, preprompt
def get_history():
memory = ConversationMemory()
return memory.get_recent_chunks(limit=10)
def select_chunk(evt: gr.SelectData):
return evt.value["chunk_id"], evt.value["text"]
async def edit_cut(chunk_id, start, end):
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.cut(chunk_id, int(start), int(end))
return result, api.editor.diff(chunk_id, result)
async def edit_copy(chunk_id, start, end):
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.copy(chunk_id, int(start), int(end))
return result, ""
async def edit_paste(chunk_id, position):
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.paste(chunk_id, int(position))
return result, api.editor.diff(chunk_id, result)
async def edit_prefix(chunk_id, prefix):
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.add_prefix(chunk_id, prefix)
return result, api.editor.diff(chunk_id, result)
async def edit_suffix(chunk_id, suffix):
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY"))
result = api.editor.add_suffix(chunk_id, suffix)
return result, api.editor.diff(chunk_id, result)
def create_ui():
with gr.Blocks(title="Azure OpenAI Chat & Text Editor") as demo:
gr.Markdown("# Azure OpenAI Chat with Text Editing")
with gr.Tab("Chat"):
chatbot = gr.Chatbot(label="Conversation", type="messages")
user_input = gr.Textbox(label="Your Message", placeholder="Type your message or editing command...")
preprompt = gr.Textbox(label="System Prompt", value="You are a helpful assistant with text editing capabilities.")
submit_btn = gr.Button("Send")
submit_btn.click(
fn=chat_submit,
inputs=[user_input, chatbot, preprompt],
outputs=[chatbot, preprompt]
)
with gr.Tab("Conversation History"):
history = gr.Dataframe(
label="Recent Chunks",
headers=["chunk_id", "text", "role", "timestamp", "intent", "token_count"],
datatype=["str", "str", "str", "str", "str", "number"]
)
history_btn = gr.Button("Refresh History")
history_btn.click(fn=get_history, outputs=history)
with gr.Tab("Text Editor"):
chunk_id = gr.Textbox(label="Selected Chunk ID")
chunk_text = gr.Textbox(label="Chunk Text", interactive=False)
history.select(fn=select_chunk, outputs=[chunk_id, chunk_text])
with gr.Row():
start = gr.Number(label="Start Index", precision=0)
end = gr.Number(label="End Index", precision=0)
position = gr.Number(label="Paste Position", precision=0)
with gr.Row():
prefix = gr.Textbox(label="Prefix")
suffix = gr.Textbox(label="Suffix")
with gr.Row():
cut_btn = gr.Button("Cut")
copy_btn = gr.Button("Copy")
paste_btn = gr.Button("Paste")
prefix_btn = gr.Button("Add Prefix")
suffix_btn = gr.Button("Add Suffix")
diff_output = gr.Textbox(label="Diff Output", interactive=False)
cut_btn.click(fn=edit_cut, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output])
copy_btn.click(fn=edit_copy, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output])
paste_btn.click(fn=edit_paste, inputs=[chunk_id, position], outputs=[chunk_text, diff_output])
prefix_btn.click(fn=edit_prefix, inputs=[chunk_id, prefix], outputs=[chunk_text, diff_output])
suffix_btn.click(fn=edit_suffix, inputs=[chunk_id, suffix], outputs=[chunk_text, diff_output])
gr.Markdown(f"Current Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}")
return demo
if __name__ == "__main__":
demo = create_ui()
demo.launch(server_name="0.0.0.0", server_port=7860)