|
import gradio as gr |
|
import asyncio |
|
import json |
|
import html |
|
import os |
|
import uuid |
|
import sqlite3 |
|
import datetime |
|
import difflib |
|
import logging |
|
import pandas as pd |
|
from tiktoken import get_encoding |
|
from openai import AzureOpenAI |
|
import httpx |
|
import re |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('aiapp.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
os.environ.pop("HTTP_PROXY", None) |
|
os.environ.pop("HTTPS_PROXY", None) |
|
|
|
|
|
class ConversationMemory: |
|
def __init__(self, db_path="conversation.db"): |
|
self.conn = sqlite3.connect(db_path) |
|
self.create_table() |
|
self.tokenizer = get_encoding("cl100k_base") |
|
logger.info(f"Initialized ConversationMemory with db_path: {db_path}") |
|
|
|
def create_table(self): |
|
self.conn.execute(""" |
|
CREATE TABLE IF NOT EXISTS conversation_chunks ( |
|
chunk_id TEXT PRIMARY KEY, |
|
text TEXT, |
|
role TEXT, |
|
timestamp DATETIME, |
|
intent TEXT, |
|
token_count INTEGER, |
|
embedding BLOB |
|
) |
|
""") |
|
self.conn.commit() |
|
logger.info("Created conversation table") |
|
|
|
def add_chunk(self, text, role, intent="general"): |
|
chunk_id = str(uuid.uuid4()) |
|
tokens = self.tokenizer.encode(text) |
|
token_count = len(tokens) |
|
timestamp = datetime.datetime.now().isoformat() |
|
self.conn.execute(""" |
|
INSERT INTO conversation_chunks (chunk_id, text, role, timestamp, intent, token_count) |
|
VALUES (?, ?, ?, ?, ?, ?) |
|
""", (chunk_id, text, role, timestamp, intent, token_count)) |
|
self.conn.commit() |
|
logger.info(f"Added chunk: {chunk_id}, role: {role}, intent: {intent}, token_count: {token_count}") |
|
return chunk_id |
|
|
|
def get_chunk(self, chunk_id): |
|
cursor = self.conn.execute("SELECT * FROM conversation_chunks WHERE chunk_id = ?", (chunk_id,)) |
|
row = cursor.fetchone() |
|
if row: |
|
chunk = { |
|
"chunk_id": row[0], "text": row[1], "role": row[2], |
|
"timestamp": row[3], "intent": row[4], "token_count": row[5] |
|
} |
|
logger.info(f"Retrieved chunk: {chunk_id}") |
|
return chunk |
|
logger.warning(f"Chunk not found: {chunk_id}") |
|
return None |
|
|
|
def update_chunk(self, chunk_id, text): |
|
tokens = self.tokenizer.encode(text) |
|
token_count = len(tokens) |
|
self.conn.execute(""" |
|
UPDATE conversation_chunks SET text = ?, token_count = ? |
|
WHERE chunk_id = ? |
|
""", (text, token_count, chunk_id)) |
|
self.conn.commit() |
|
logger.info(f"Updated chunk: {chunk_id}, new token_count: {token_count}") |
|
|
|
def get_recent_chunks(self, limit=10): |
|
cursor = self.conn.execute("SELECT * FROM conversation_chunks ORDER BY timestamp DESC LIMIT ?", (limit,)) |
|
chunks = [{"chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5]} for row in cursor] |
|
logger.info(f"Retrieved {len(chunks)} recent chunks") |
|
return chunks |
|
|
|
|
|
class TextEditor: |
|
def __init__(self, memory): |
|
self.memory = memory |
|
self.clipboard = "" |
|
logger.info("Initialized TextEditor") |
|
|
|
def cut(self, chunk_id, start, end): |
|
chunk = self.memory.get_chunk(chunk_id) |
|
if chunk: |
|
self.clipboard = chunk['text'][start:end] |
|
chunk['text'] = chunk['text'][:start] + chunk['text'][end:] |
|
self.memory.update_chunk(chunk_id, chunk['text']) |
|
logger.info(f"Cut text from chunk: {chunk_id}, start: {start}, end: {end}, clipboard: {self.clipboard}") |
|
return chunk['text'] |
|
logger.warning(f"Failed to cut text, chunk not found: {chunk_id}") |
|
return "Error: Chunk not found" |
|
|
|
def copy(self, chunk_id, start, end): |
|
chunk = self.memory.get_chunk(chunk_id) |
|
if chunk: |
|
self.clipboard = chunk['text'][start:end] |
|
logger.info(f"Copied text from chunk: {chunk_id}, start: {start}, end: {end}, clipboard: {self.clipboard}") |
|
return self.clipboard |
|
logger.warning(f"Failed to copy text, chunk not found: {chunk_id}") |
|
return "Error: Chunk not found" |
|
|
|
def paste(self, chunk_id, position): |
|
chunk = self.memory.get_chunk(chunk_id) |
|
if chunk: |
|
chunk['text'] = chunk['text'][:position] + self.clipboard + chunk['text'][position:] |
|
self.memory.update_chunk(chunk_id, chunk['text']) |
|
logger.info(f"Pasted text to chunk: {chunk_id}, position: {position}, clipboard: {self.clipboard}") |
|
return chunk['text'] |
|
logger.warning(f"Failed to paste text, chunk not found: {chunk_id}") |
|
return "Error: Chunk not found" |
|
|
|
def add_prefix(self, chunk_id, prefix): |
|
chunk = self.memory.get_chunk(chunk_id) |
|
if chunk: |
|
chunk['text'] = prefix + chunk['text'] |
|
self.memory.update_chunk(chunk_id, chunk['text']) |
|
logger.info(f"Added prefix to chunk: {chunk_id}, prefix: {prefix}") |
|
return chunk['text'] |
|
logger.warning(f"Failed to add prefix, chunk not found: {chunk_id}") |
|
return "Error: Chunk not found" |
|
|
|
def add_suffix(self, chunk_id, suffix): |
|
chunk = self.memory.get_chunk(chunk_id) |
|
if chunk: |
|
chunk['text'] = chunk['text'] + suffix |
|
self.memory.update_chunk(chunk_id, chunk['text']) |
|
logger.info(f"Added suffix to chunk: {chunk_id}, suffix: {suffix}") |
|
return chunk['text'] |
|
logger.warning(f"Failed to add suffix, chunk not found: {chunk_id}") |
|
return "Error: Chunk not found" |
|
|
|
def diff(self, chunk_id, original_text): |
|
chunk = self.memory.get_chunk(chunk_id) |
|
if chunk: |
|
differ = difflib.Differ() |
|
diff = list(differ.compare(original_text.splitlines(), chunk['text'].splitlines())) |
|
logger.info(f"Generated diff for chunk: {chunk_id}") |
|
return '\n'.join(diff) |
|
logger.warning(f"Failed to generate diff, chunk not found: {chunk_id}") |
|
return "" |
|
|
|
|
|
class OpenAIApi: |
|
def __init__(self, preprompt="", endpoint="https://T-App-GPT4o.openai.azure.com/", model="gpt-4o", api_key=None): |
|
|
|
if not re.match(r"^https://[a-zA-Z0-9-]+\.openai\.azure\.com/?$", endpoint): |
|
logger.warning(f"Endpoint format may be incorrect: {endpoint}. Expected format: https://<resource-name>.openai.azure.com/") |
|
|
|
|
|
http_client = httpx.Client() |
|
try: |
|
self.client = AzureOpenAI( |
|
azure_endpoint=endpoint.rstrip('/'), |
|
api_key=api_key or os.getenv("AZURE_OPENAI_API_KEY"), |
|
api_version="2024-02-15-preview", |
|
http_client=http_client |
|
) |
|
except Exception as e: |
|
logger.error(f"Failed to initialize AzureOpenAI client: {str(e)}") |
|
raise |
|
self.model = model |
|
self.preprompt = preprompt |
|
self.memory = ConversationMemory() |
|
self.editor = TextEditor(self.memory) |
|
logger.info(f"Initialized OpenAIApi with endpoint: {endpoint}, model: {model}, api_version: 2024-02-15-preview") |
|
self.functions = [ |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "cut_text", |
|
"description": "Cut text from a conversation chunk.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, |
|
"start": {"type": "integer", "description": "Start index"}, |
|
"end": {"type": "integer", "description": "End index"} |
|
}, |
|
"required": ["chunk_id", "start", "end"] |
|
} |
|
} |
|
}, |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "copy_text", |
|
"description": "Copy text from a conversation chunk to clipboard.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, |
|
"start": {"type": "integer", "description": "Start index"}, |
|
"end": {"type": "integer", "description": "End index"} |
|
}, |
|
"required": ["chunk_id", "start", "end"] |
|
} |
|
} |
|
}, |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "paste_text", |
|
"description": "Paste clipboard content into a conversation chunk.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, |
|
"position": {"type": "integer", "description": "Position to paste"} |
|
}, |
|
"required": ["chunk_id", "position"] |
|
} |
|
} |
|
}, |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "add_prefix", |
|
"description": "Add a prefix to a conversation chunk.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, |
|
"prefix": {"type": "string", "description": "Prefix to add"} |
|
}, |
|
"required": ["chunk_id", "prefix"] |
|
} |
|
} |
|
}, |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "add_suffix", |
|
"description": "Add a suffix to a conversation chunk.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, |
|
"suffix": {"type": "string", "description": "Suffix to add"} |
|
}, |
|
"required": ["chunk_id", "suffix"] |
|
} |
|
} |
|
} |
|
] |
|
|
|
async def fetch_response(self, raw_prompt, continue_response=False): |
|
sanitized_prompt = html.escape(raw_prompt.strip()) |
|
chunk_id = self.memory.add_chunk(sanitized_prompt, "user") |
|
|
|
messages = [] |
|
if self.preprompt: |
|
messages.append({"role": "system", "content": self.preprompt}) |
|
context = self.memory.get_recent_chunks(limit=5) |
|
messages.extend({"role": c["role"], "content": c["text"]} for c in context) |
|
messages.append({"role": "user", "content": sanitized_prompt}) |
|
|
|
logger.info(f"Sending request to model: {self.model}, endpoint: {self.client._base_url}, messages: {json.dumps(messages, ensure_ascii=False)}") |
|
|
|
try: |
|
|
|
response = self.client.chat.completions.create( |
|
model=self.model, |
|
messages=messages, |
|
temperature=0.5, |
|
max_tokens=4000, |
|
top_p=1.0, |
|
frequency_penalty=0, |
|
presence_penalty=0, |
|
tools=self.functions, |
|
stream=True |
|
) |
|
|
|
def process_stream(sync_stream): |
|
full_response = "" |
|
tool_calls = [] |
|
for chunk in sync_stream: |
|
logger.debug(f"Received chunk: {chunk}") |
|
if chunk.choices and chunk.choices[0].delta.content: |
|
full_response += chunk.choices[0].delta.content |
|
if chunk.choices and chunk.choices[0].delta.tool_calls: |
|
tool_calls.extend(chunk.choices[0].delta.tool_calls) |
|
return full_response, tool_calls |
|
|
|
|
|
logger.debug("Processing stream in separate thread") |
|
full_response, tool_calls = await asyncio.to_thread(process_stream, response) |
|
logger.debug("Stream processing completed") |
|
|
|
response_chunk_id = self.memory.add_chunk(full_response, "assistant") |
|
logger.info(f"Received response for chunk: {response_chunk_id}, length: {len(full_response)}") |
|
|
|
for tool_call in tool_calls: |
|
if tool_call and hasattr(tool_call, 'function'): |
|
func_name = tool_call.function.name |
|
args = json.loads(tool_call.function.arguments) |
|
logger.info(f"Processing tool call: {func_name}, args: {args}") |
|
if func_name == "cut_text": |
|
result = self.editor.cut(args["chunk_id"], args["start"], args["end"]) |
|
self.memory.add_chunk(f"Cut result: {result}", "system") |
|
elif func_name == "copy_text": |
|
result = self.editor.copy(args["chunk_id"], args["start"], args["end"]) |
|
self.memory.add_chunk(f"Copy result: {result}", "system") |
|
elif func_name == "paste_text": |
|
result = self.editor.paste(args["chunk_id"], args["position"]) |
|
self.memory.add_chunk(f"Paste result: {result}", "system") |
|
elif func_name == "add_prefix": |
|
result = self.editor.add_prefix(args["chunk_id"], args["prefix"]) |
|
self.memory.add_chunk(f"Prefix result: {result}", "system") |
|
elif func_name == "add_suffix": |
|
result = self.editor.add_suffix(args["chunk_id"], args["suffix"]) |
|
self.memory.add_chunk(f"Suffix result: {result}", "system") |
|
|
|
continue_flag = len(self.memory.tokenizer.encode(full_response)) >= 4000 |
|
|
|
return {"content": full_response, "continue": continue_flag, "chunk_id": response_chunk_id} |
|
|
|
except Exception as e: |
|
error_msg = f"API Error: {str(e)}" |
|
logger.error(f"API request failed: {error_msg}, endpoint: {self.client._base_url}, model: {self.model}") |
|
self.memory.add_chunk(error_msg, "system") |
|
return {"error": error_msg} |
|
|
|
|
|
async def chat_submit(user_input, chat_history, preprompt): |
|
try: |
|
api = OpenAIApi(preprompt=preprompt, api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
response = await api.fetch_response(user_input) |
|
if "error" in response: |
|
chat_history.append({"role": "assistant", "content": f"Error: {response['error']}"}) |
|
logger.warning(f"Chat error: {response['error']}") |
|
else: |
|
chat_history.append({"role": "user", "content": user_input}) |
|
chat_history.append({"role": "assistant", "content": response["content"]}) |
|
logger.info("Chat response added to history") |
|
return chat_history, preprompt |
|
except ValueError as e: |
|
error_msg = f"Configuration Error: {str(e)}" |
|
logger.error(error_msg) |
|
chat_history.append({"role": "assistant", "content": error_msg}) |
|
return chat_history, preprompt |
|
|
|
def get_history(): |
|
memory = ConversationMemory() |
|
chunks = memory.get_recent_chunks(limit=10) |
|
|
|
data = [[chunk["chunk_id"], chunk["text"], chunk["role"], chunk["timestamp"], chunk["intent"], chunk["token_count"]] for chunk in chunks] |
|
logger.info(f"Returning {len(data)} chunks for history: {json.dumps(data, ensure_ascii=False)}") |
|
return data |
|
|
|
async def async_get_history(): |
|
await asyncio.sleep(0.2) |
|
return get_history() |
|
|
|
def get_logs(): |
|
try: |
|
with open("aiapp.log", "r") as f: |
|
logs = f.read() |
|
logger.info("Retrieved logs from aiapp.log") |
|
return logs |
|
except Exception as e: |
|
logger.error(f"Failed to read logs: {str(e)}") |
|
return f"Error reading logs: {str(e)}" |
|
|
|
def select_chunk(evt: gr.SelectData): |
|
logger.info(f"Selected chunk raw data: {evt.value}") |
|
|
|
chunk_id = evt.value if isinstance(evt.value, str) else (evt.value[0] if isinstance(evt.value, list) and len(evt.value) > 0 else "") |
|
if not chunk_id: |
|
logger.warning(f"Invalid selection data: No chunk_id found in {evt.value}") |
|
return "", "Error: No chunk_id selected" |
|
try: |
|
uuid.UUID(chunk_id, version=4) |
|
memory = ConversationMemory() |
|
chunk = memory.get_chunk(chunk_id) |
|
if chunk: |
|
logger.info(f"Selected chunk: {chunk_id}") |
|
return chunk_id, chunk["text"] |
|
logger.warning(f"Chunk not found for chunk_id: {chunk_id}") |
|
return "", "Error: Chunk not found" |
|
except ValueError: |
|
logger.warning(f"Invalid chunk_id selected: {chunk_id}") |
|
return "", "Error: Invalid chunk_id selected" |
|
|
|
async def edit_cut(chunk_id, start, end): |
|
logger.info(f"edit_cut called with chunk_id: {chunk_id}, start: {start}, end: {end}") |
|
try: |
|
|
|
uuid.UUID(chunk_id, version=4) |
|
except ValueError: |
|
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") |
|
return "Error: Invalid chunk_id", "Invalid chunk_id selected" |
|
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
result = api.editor.cut(chunk_id, int(start), int(end)) |
|
diff = api.editor.diff(chunk_id, result) if "Error" not in result else "" |
|
return result, diff |
|
|
|
async def edit_copy(chunk_id, start, end): |
|
logger.info(f"edit_copy called with chunk_id: {chunk_id}, start: {start}, end: {end}") |
|
try: |
|
uuid.UUID(chunk_id, version=4) |
|
except ValueError: |
|
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") |
|
return "Error: Invalid chunk_id", "" |
|
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
result = api.editor.copy(chunk_id, int(start), int(end)) |
|
return result, "" |
|
|
|
async def edit_paste(chunk_id, position): |
|
logger.info(f"edit_paste called with chunk_id: {chunk_id}, position: {position}") |
|
try: |
|
uuid.UUID(chunk_id, version=4) |
|
except ValueError: |
|
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") |
|
return "Error: Invalid chunk_id", "" |
|
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
result = api.editor.paste(chunk_id, int(position)) |
|
return result, api.editor.diff(chunk_id, result) |
|
|
|
async def edit_prefix(chunk_id, prefix): |
|
logger.info(f"edit_prefix called with chunk_id: {chunk_id}, prefix: {prefix}") |
|
try: |
|
uuid.UUID(chunk_id, version=4) |
|
except ValueError: |
|
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") |
|
return "Error: Invalid chunk_id", "" |
|
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
result = api.editor.add_prefix(chunk_id, prefix) |
|
return result, api.editor.diff(chunk_id, result) |
|
|
|
async def edit_suffix(chunk_id, suffix): |
|
logger.info(f"edit_suffix called with chunk_id: {chunk_id}, suffix: {suffix}") |
|
try: |
|
uuid.UUID(chunk_id, version=4) |
|
except ValueError: |
|
logger.warning(f"Invalid chunk_id: {chunk_id} is not a valid UUID") |
|
return "Error: Invalid chunk_id", "" |
|
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
result = api.editor.add_suffix(chunk_id, suffix) |
|
return result, api.editor.diff(chunk_id, result) |
|
|
|
async def generate_and_edit(source_text, target_start, target_end, response_prompt): |
|
|
|
memory = ConversationMemory() |
|
chunk_id = memory.add_chunk(source_text, "user") |
|
logger.info(f"Generated source chunk: {chunk_id}") |
|
|
|
|
|
api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) |
|
cut_result = api.editor.cut(chunk_id, target_start, target_end) |
|
logger.info(f"Cut target text from chunk: {chunk_id}, start: {target_start}, end: {target_end}") |
|
|
|
|
|
response = await api.fetch_response(response_prompt) |
|
if "error" in response: |
|
return "Error: Failed to generate response", "" |
|
response_text = response["content"] |
|
|
|
response_match = re.search(r"Response for \[TARGET\]:\s*(.+)", response_text, re.DOTALL) |
|
if response_match: |
|
api.editor.clipboard = response_match.group(1).strip() |
|
else: |
|
api.editor.clipboard = response_text.strip() |
|
logger.info(f"Generated and set response to clipboard: {api.editor.clipboard}") |
|
|
|
|
|
paste_result = api.editor.paste(chunk_id, target_start) |
|
logger.info(f"Pasted response into chunk: {chunk_id}, position: {target_start}") |
|
|
|
|
|
diff = api.editor.diff(chunk_id, paste_result) if "Error" not in paste_result else "" |
|
return paste_result, diff |
|
|
|
def create_ui(): |
|
with gr.Blocks(title="Azure OpenAI Chat & Text Editor") as demo: |
|
gr.Markdown("# Azure OpenAI Chat with Text Editing") |
|
gr.Markdown("**Note**: Using Azure OpenAI endpoint: https://T-App-GPT4o.openai.azure.com/") |
|
|
|
with gr.Tab("Chat"): |
|
chatbot = gr.Chatbot(label="Conversation", type="messages") |
|
user_input = gr.Textbox(label="Your Message", placeholder="Type your message or editing command...") |
|
preprompt = gr.Textbox(label="System Prompt", value="You are a helpful assistant with text editing capabilities.") |
|
submit_btn = gr.Button("Send") |
|
submit_btn.click( |
|
fn=chat_submit, |
|
inputs=[user_input, chatbot, preprompt], |
|
outputs=[chatbot, preprompt] |
|
) |
|
|
|
with gr.Tab("Conversation History"): |
|
history = gr.Dataframe( |
|
label="Recent Chunks", |
|
headers=["chunk_id", "text", "role", "timestamp", "intent", "token_count"], |
|
datatype=["str", "str", "str", "str", "str", "number"], |
|
interactive=False, |
|
key="history_df" |
|
) |
|
history_btn = gr.Button("Refresh History") |
|
history_btn.click(fn=async_get_history, outputs=history, api_name="refresh_history") |
|
|
|
with gr.Tab("Text Editor"): |
|
chunk_id = gr.Textbox(label="Selected Chunk ID", interactive=False) |
|
chunk_text = gr.Textbox(label="Chunk Text", interactive=False) |
|
history.select(fn=select_chunk, outputs=[chunk_id, chunk_text]) |
|
|
|
with gr.Row(): |
|
start = gr.Number(label="Start Index", precision=0) |
|
end = gr.Number(label="End Index", precision=0) |
|
position = gr.Number(label="Paste Position", precision=0) |
|
with gr.Row(): |
|
prefix = gr.Textbox(label="Prefix") |
|
suffix = gr.Textbox(label="Suffix") |
|
with gr.Row(): |
|
cut_btn = gr.Button("Cut") |
|
copy_btn = gr.Button("Copy") |
|
paste_btn = gr.Button("Paste") |
|
prefix_btn = gr.Button("Add Prefix") |
|
suffix_btn = gr.Button("Add Suffix") |
|
diff_output = gr.Textbox(label="Diff Output", interactive=False) |
|
|
|
cut_btn.click(fn=edit_cut, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output]) |
|
copy_btn.click(fn=edit_copy, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output]) |
|
paste_btn.click(fn=edit_paste, inputs=[chunk_id, position], outputs=[chunk_text, diff_output]) |
|
prefix_btn.click(fn=edit_prefix, inputs=[chunk_id, prefix], outputs=[chunk_text, diff_output]) |
|
suffix_btn.click(fn=edit_suffix, inputs=[chunk_id, suffix], outputs=[chunk_text, diff_output]) |
|
|
|
with gr.Tab("Advanced Text Manipulation"): |
|
source_text = gr.Textbox(label="Source Text", value="This is a sample paragraph. [TARGET] This is the rest of the text.") |
|
target_start = gr.Number(label="Target Start Index", value=21, precision=0) |
|
target_end = gr.Number(label="Target End Index", value=28, precision=0) |
|
response_prompt = gr.Textbox(label="Response Prompt", value="Generate a response for the target section.") |
|
generate_btn = gr.Button("Generate and Edit") |
|
result_text = gr.Textbox(label="Result Text", interactive=False) |
|
result_diff = gr.Textbox(label="Result Diff", interactive=False) |
|
generate_btn.click( |
|
fn=generate_and_edit, |
|
inputs=[source_text, target_start, target_end, response_prompt], |
|
outputs=[result_text, result_diff] |
|
) |
|
|
|
with gr.Tab("Logs"): |
|
logs = gr.Textbox(label="Application Logs", interactive=False) |
|
logs_btn = gr.Button("Refresh Logs") |
|
logs_btn.click(fn=get_logs, outputs=logs) |
|
|
|
gr.Markdown(f"Current Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}") |
|
|
|
logger.info("Created Gradio UI") |
|
return demo |
|
|
|
if __name__ == "__main__": |
|
logger.info("Starting application") |
|
demo = create_ui() |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |