Spaces:
Running
Running
import os | |
import gradio as gr | |
import asyncio | |
import tempfile | |
from dotenv import find_dotenv, load_dotenv | |
from langchain.chat_models import init_chat_model | |
from langchain.schema import HumanMessage, SystemMessage | |
from langgraph.prebuilt import create_react_agent | |
from langsmith import traceable | |
# Import the CodeAct agent functionality | |
from agent import FileInjectedPyodideSandbox, create_pyodide_eval_fn, create_codeact | |
# Load environment variables | |
load_dotenv(find_dotenv()) | |
# Initialize OpenAI model | |
openai_model = init_chat_model( | |
model="gpt-4.1-nano-2025-04-14", | |
api_key=os.getenv("OPENAI_API_KEY"), | |
) | |
# Create the basic chat agent | |
chat_agent = create_react_agent(openai_model, tools=[]) | |
# Initialize CodeAct model for file analysis | |
codeact_model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai") | |
# Store uploaded file path globally | |
uploaded_file_path = None | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
""" | |
Main chat function that processes user input and returns AI response | |
""" | |
try: | |
# Convert history to LangChain message format | |
messages = [SystemMessage(content=system_message)] | |
# Add conversation history | |
for user_msg, assistant_msg in history: | |
if user_msg: | |
messages.append(HumanMessage(content=user_msg)) | |
if assistant_msg: | |
messages.append(SystemMessage(content=assistant_msg)) | |
# Add current user message | |
messages.append(HumanMessage(content=message)) | |
# Prepare input for the agent | |
input_data = {"messages": messages} | |
# Stream the response | |
response_text = "" | |
for chunk in chat_agent.stream(input_data, stream_mode="values"): | |
if "messages" in chunk and chunk["messages"]: | |
latest_message = chunk["messages"][-1] | |
if hasattr(latest_message, 'content'): | |
current_content = latest_message.content | |
if current_content and len(current_content) > len(response_text): | |
response_text = current_content | |
yield response_text | |
# Ensure we return something even if streaming doesn't work | |
if not response_text: | |
yield "I'm sorry, I couldn't process your message. Please check your OpenAI API key." | |
except Exception as e: | |
yield f"Error: {str(e)}. Please make sure your OpenAI API key is set correctly." | |
def handle_file_upload(file): | |
"""Handle file upload and store the path globally""" | |
global uploaded_file_path | |
if file is not None: | |
uploaded_file_path = file.name | |
return f"β File uploaded successfully: {os.path.basename(file.name)}" | |
else: | |
uploaded_file_path = None | |
return "β No file uploaded" | |
async def analyze_uploaded_file(): | |
"""Analyze the uploaded file using CodeAct agent""" | |
global uploaded_file_path | |
if not uploaded_file_path or not os.path.exists(uploaded_file_path): | |
return "β No file uploaded or file not found. Please upload a file first." | |
try: | |
# Create sandbox with the uploaded file | |
sandbox = FileInjectedPyodideSandbox( | |
file_path=uploaded_file_path, | |
virtual_path="/uploaded_file.log", | |
sessions_dir=None, # Will create temp directory automatically | |
allow_net=True | |
) | |
eval_fn = create_pyodide_eval_fn(sandbox) | |
code_act = create_codeact(codeact_model, [], eval_fn) | |
agent = code_act.compile() | |
# Create analysis query based on file type | |
file_ext = os.path.splitext(uploaded_file_path)[1].lower() | |
if file_ext in ['.log', '.txt']: | |
query = """ | |
Analyze this uploaded file and provide: | |
1. **Content Overview** - What type of data/logs this file contains | |
2. **Key Patterns** - Important patterns, trends, or anomalies found | |
3. **Statistical Summary** - Basic statistics (line count, data distribution, etc.) | |
4. **Insights & Findings** - Key takeaways from the analysis | |
5. **Recommendations** - Suggested actions based on the analysis | |
DATA SOURCES AVAILABLE: | |
- `file_content`: Raw file content as a string | |
- `log_lines`: List of individual lines | |
- `total_lines`: Number of lines in the file | |
- File path: `/uploaded_file.log` (can be read with open('/uploaded_file.log', 'r')) | |
Generate Python code to analyze the file and provide comprehensive insights. | |
""" | |
else: | |
query = f""" | |
Analyze this uploaded {file_ext} file and provide: | |
1. **File Type Analysis** - What type of file this is and its structure | |
2. **Content Summary** - Overview of the file contents | |
3. **Key Information** - Important data points or patterns found | |
4. **Statistical Analysis** - Basic statistics and data distribution | |
5. **Recommendations** - Suggested next steps or insights | |
DATA SOURCES AVAILABLE: | |
- `file_content`: Raw file content as a string | |
- `log_lines`: List of individual lines | |
- `total_lines`: Number of lines in the file | |
- File path: `/uploaded_file.log` | |
Generate Python code to analyze this file and provide comprehensive insights. | |
""" | |
# Run the analysis | |
result_parts = [] | |
async for typ, chunk in agent.astream( | |
{"messages": query}, | |
stream_mode=["values", "messages"], | |
): | |
if typ == "messages": | |
result_parts.append(chunk[0].content) | |
elif typ == "values": | |
if chunk and "messages" in chunk: | |
final_message = chunk["messages"][-1] | |
if hasattr(final_message, 'content'): | |
result_parts.append(f"\n\n**Final Analysis:**\n{final_message.content}") | |
return "\n".join(result_parts) if result_parts else "Analysis completed but no output generated." | |
except Exception as e: | |
return f"β Error analyzing file: {str(e)}" | |
def run_file_analysis(): | |
"""Wrapper to run async file analysis in sync context""" | |
return asyncio.run(analyze_uploaded_file()) | |
# Create the Gradio interface | |
with gr.Blocks(title="DataForge - AI Assistant with File Analysis") as demo: | |
gr.Markdown("# π DataForge - AI Assistant with File Analysis") | |
gr.Markdown("Upload files for analysis or chat with the AI assistant.") | |
with gr.Tab("π¬ Chat Assistant"): | |
chat_interface = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox( | |
value="You are a helpful AI assistant. Be friendly, informative, and concise in your responses.", | |
label="System message" | |
), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
), | |
], | |
title="Chat with AI Assistant", | |
description="Ask questions or get help with any topic." | |
) | |
with gr.Tab("π File Analysis"): | |
gr.Markdown("## Upload and Analyze Files") | |
gr.Markdown("Upload log files, text files, or other data files for comprehensive AI-powered analysis.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_upload = gr.File( | |
label="Upload File for Analysis", | |
file_types=[".txt", ".log", ".csv", ".json", ".xml", ".py", ".js", ".html", ".md"], | |
type="filepath" | |
) | |
upload_status = gr.Textbox( | |
label="Upload Status", | |
value="No file uploaded", | |
interactive=False | |
) | |
analyze_btn = gr.Button("π Analyze File", variant="primary", size="lg") | |
with gr.Column(scale=2): | |
analysis_output = gr.Textbox( | |
label="Analysis Results", | |
lines=20, | |
max_lines=30, | |
placeholder="Upload a file and click 'Analyze File' to see detailed analysis results here...", | |
interactive=False | |
) | |
# Event handlers | |
file_upload.change( | |
fn=handle_file_upload, | |
inputs=[file_upload], | |
outputs=[upload_status] | |
) | |
analyze_btn.click( | |
fn=run_file_analysis, | |
inputs=[], | |
outputs=[analysis_output] | |
) | |
if __name__ == "__main__": | |
demo.launch() | |