copypasta / app.py
Deadmon's picture
Create app.py
1004b9c verified
raw
history blame
15.4 kB
```python
import gradio as gr
import asyncio
import json
import html
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from tiktoken import get_encoding
import sqlite3
import uuid
import datetime
import difflib
# Reusing ConversationMemory and TextEditor classes from previous artifact
class ConversationMemory:
def __init__(self, db_path="conversation.db"):
self.conn = sqlite3.connect(db_path)
self.create_table()
self.tokenizer = get_encoding("cl100k_base")
def create_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS conversation_chunks (
chunk_id TEXT PRIMARY KEY,
text TEXT,
role TEXT,
timestamp DATETIME,
intent TEXT,
token_count INTEGER,
embedding BLOB
)
""")
self.conn.commit()
def add_chunk(self, text, role, intent="general"):
chunk_id = str(uuid.uuid4())
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
timestamp = datetime.datetime.now().isoformat()
self.conn.execute("""
INSERT INTO conversation_chunks (chunk_id, text, role, timestamp, intent, token_count)
VALUES (?, ?, ?, ?, ?, ?)
""", (chunk_id, text, role, timestamp, intent, token_count))
self.conn.commit()
return chunk_id
def get_chunk(self, chunk_id):
cursor = self.conn.execute("SELECT * FROM conversation_chunks WHERE chunk_id = ?", (chunk_id,))
row = cursor.fetchone()
if row:
return {
"chunk_id": row[0], "text": row[1], "role": row[2],
"timestamp": row[3], "intent": row[4], "token_count": row[5]
}
return None
def update_chunk(self, chunk_id, text):
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
self.conn.execute("""
UPDATE conversation_chunks SET text = ?, token_count = ?
WHERE chunk_id = ?
""", (text, token_count, chunk_id))
self.conn.commit()
def get_recent_chunks(self, limit=10):
cursor = self.conn.execute("SELECT * FROM conversation_chunks ORDER BY timestamp DESC LIMIT ?", (limit,))
return [{"chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5]} for row in cursor]
class TextEditor:
def __init__(self, memory):
self.memory = memory
self.clipboard = ""
def cut(self, chunk_id, start, end):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
self.clipboard = chunk['text'][start:end]
chunk['text'] = chunk['text'][:start] + chunk['text'][end:]
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def copy(self, chunk_id, start, end):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
self.clipboard = chunk['text'][start:end]
return self.clipboard
def paste(self, chunk_id, position):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = chunk['text'][:position] + self.clipboard + chunk['text'][position:]
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def add_prefix(self, chunk_id, prefix):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = prefix + chunk['text']
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def add_suffix(self, chunk_id, suffix):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
chunk['text'] = chunk['text'] + suffix
self.memory.update_chunk(chunk_id, chunk['text'])
return chunk['text']
def diff(self, chunk_id, original_text):
chunk = self.memory.get_chunk(chunk_id)
if chunk:
differ = difflib.Differ()
diff = list(differ.compare(original_text.splitlines(), chunk['text'].splitlines()))
return '\n'.join(diff)
return ""
class OpenAIApi:
def __init__(self, preprompt="", endpoint="https://T-App-GPT4o.openai.azure.com/openai/v1/", model="gpt-4o"):
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
self.client = AzureOpenAI(
azure_endpoint=endpoint,
azure_ad_token_provider=token_provider,
api_version="2025-01-01-preview"
)
self.model = model
self.preprompt = preprompt
self.memory = ConversationMemory()
self.editor = TextEditor(self.memory)
self.functions = [
{
"type": "function",
"name": "cut_text",
"description": "Cut text from a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"start": {"type": "integer", "description": "Start index"},
"end": {"type": "integer", "description": "End index"}
},
"required": ["chunk_id", "start", "end"]
}
},
{
"type": "function",
"name": "copy_text",
"description": "Copy text from a conversation chunk to clipboard.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"start": {"type": "integer", "description": "Start index"},
"end": {"type": "integer", "description": "End index"}
},
"required": ["chunk_id", "start", "end"]
}
},
{
"type": "function",
"name": "paste_text",
"description": "Paste clipboard content into a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"position": {"type": "integer", "description": "Position to paste"}
},
"required": ["chunk_id", "position"]
}
},
{
"type": "function",
"name": "add_prefix",
"description": "Add a prefix to a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"prefix": {"type": "string", "description": "Prefix to add"}
},
"required": ["chunk_id", "prefix"]
}
},
{
"type": "function",
"name": "add_suffix",
"description": "Add a suffix to a conversation chunk.",
"parameters": {
"type": "object",
"properties": {
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"},
"suffix": {"type": "string", "description": "Suffix to add"}
},
"required": ["chunk_id", "suffix"]
}
}
]
async def fetch_response(self, raw_prompt, continue_response=False):
sanitized_prompt = html.escape(raw_prompt.strip())
chunk_id = self.memory.add_chunk(sanitized_prompt, "user")
messages = []
if self.preprompt:
messages.append({"role": "system", "content": self.preprompt})
context = self.memory.get_recent_chunks(limit=5)
messages.extend({"role": c["role"], "content": c["text"]} for c in context)
messages.append({"role": "user", "content": sanitized_prompt})
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.5,
max_tokens=4000,
top_p=1.0,
frequency_penalty=0,
presence_penalty=0,
tools=self.functions,
stream=True
)
full_response = ""
tool_calls = []
async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
full_response += chunk.choices[0].delta.content
if chunk.choices and chunk.choices[0].delta.tool_calls:
tool_calls.extend(chunk.choices[0].delta.tool_calls)
response_chunk_id = self.memory.add_chunk(full_response, "assistant")
for tool_call in tool_calls:
if tool_call.type == "function":
func_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
if func_name == "cut_text":
result = self.editor.cut(args["chunk_id"], args["start"], args["end"])
self.memory.add_chunk(f"Cut result: {result}", "system")
elif func_name == "copy_text":
result = self.editor.copy(args["chunk_id"], args["start"], args["end"])
self.memory.add_chunk(f"Copy result: {result}", "system")
elif func_name == "paste_text":
result = self.editor.paste(args["chunk_id"], args["position"])
self.memory.add_chunk(f"Paste result: {result}", "system")
elif func_name == "add_prefix":
result = self.editor.add_prefix(args["chunk_id"], args["prefix"])
self.memory.add_chunk(f"Prefix result: {result}", "system")
elif func_name == "add_suffix":
result = self.editor.add_suffix(args["chunk_id"], args["suffix"])
self.memory.add_chunk(f"Suffix result: {result}", "system")
continue_flag = len(self.memory.tokenizer.encode(full_response)) >= 4000
return {"content": full_response, "continue": continue_flag, "chunk_id": response_chunk_id}
except Exception as e:
error_msg = f"API Error: {str(e)}"
self.memory.add_chunk(error_msg, "system")
return {"error": error_msg}
# Gradio UI
async def chat_submit(user_input, chat_history, preprompt):
api = OpenAIApi(preprompt=preprompt)
response = await api.fetch_response(user_input)
if "error" in response:
return chat_history + [[user_input, f"Error: {response['error']}"]], preprompt
chat_history.append([user_input, response["content"]])
return chat_history, preprompt
def get_history():
memory = ConversationMemory()
return memory.get_recent_chunks(limit=10)
def select_chunk(evt: gr.SelectData):
return evt.value["chunk_id"], evt.value["text"]
async def edit_cut(chunk_id, start, end):
api = OpenAIApi()
result = api.editor.cut(chunk_id, int(start), int(end))
return result, api.editor.diff(chunk_id, result)
async def edit_copy(chunk_id, start, end):
api = OpenAIApi()
result = api.editor.copy(chunk_id, int(start), int(end))
return result, ""
async def edit_paste(chunk_id, position):
api = OpenAIApi()
result = api.editor.paste(chunk_id, int(position))
return result, api.editor.diff(chunk_id, result)
async def edit_prefix(chunk_id, prefix):
api = OpenAIApi()
result = api.editor.add_prefix(chunk_id, prefix)
return result, api.editor.diff(chunk_id, result)
async def edit_suffix(chunk_id, suffix):
api = OpenAIApi()
result = api.editor.add_suffix(chunk_id, suffix)
return result, api.editor.diff(chunk_id, result)
def create_ui():
with gr.Blocks(title="Azure OpenAI Chat & Text Editor") as demo:
gr.Markdown("# Azure OpenAI Chat with Text Editing")
with gr.Tab("Chat"):
chatbot = gr.Chatbot(label="Conversation")
user_input = gr.Textbox(label="Your Message", placeholder="Type your message or editing command...")
preprompt = gr.Textbox(label="System Prompt", value="You are a helpful assistant with text editing capabilities.")
submit_btn = gr.Button("Send")
submit_btn.click(
fn=chat_submit,
inputs=[user_input, chatbot, preprompt],
outputs=[chatbot, preprompt]
)
with gr.Tab("Conversation History"):
history = gr.Dataframe(
label="Recent Chunks",
headers=["chunk_id", "text", "role", "timestamp", "intent", "token_count"],
datatype=["str", "str", "str", "str", "str", "number"]
)
history_btn = gr.Button("Refresh History")
history_btn.click(fn=get_history, outputs=history)
with gr.Tab("Text Editor"):
chunk_id = gr.Textbox(label="Selected Chunk ID")
chunk_text = gr.Textbox(label="Chunk Text", interactive=False)
history.select(fn=select_chunk, outputs=[chunk_id, chunk_text])
with gr.Row():
start = gr.Number(label="Start Index", precision=0)
end = gr.Number(label="End Index", precision=0)
position = gr.Number(label="Paste Position", precision=0)
with gr.Row():
prefix = gr.Textbox(label="Prefix")
suffix = gr.Textbox(label="Suffix")
with gr.Row():
cut_btn = gr.Button("Cut")
copy_btn = gr.Button("Copy")
paste_btn = gr.Button("Paste")
prefix_btn = gr.Button("Add Prefix")
suffix_btn = gr.Button("Add Suffix")
diff_output = gr.Textbox(label="Diff Output", interactive=False)
cut_btn.click(fn=edit_cut, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output])
copy_btn.click(fn=edit_copy, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output])
paste_btn.click(fn=edit_paste, inputs=[chunk_id, position], outputs=[chunk_text, diff_output])
prefix_btn.click(fn=edit_prefix, inputs=[chunk_id, prefix], outputs=[chunk_text, diff_output])
suffix_btn.click(fn=edit_suffix, inputs=[chunk_id, suffix], outputs=[chunk_text, diff_output])
gr.Markdown(f"Current Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}")
return demo
if __name__ == "__main__":
demo = create_ui()
demo.launch()