Aigg / app.py
Athspi's picture
Update app.py
a487cc2 verified
raw
history blame
9.84 kB
import os
import zipfile
import uuid
import google.generativeai as genai
from flask import Flask, request, jsonify, render_template, session, send_from_directory
from dotenv import load_dotenv
# --- API and App Configuration ---
load_dotenv() # Load environment variables from .env file
# Initialize the Flask app
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", 'your_default_secret_key_change_me')
# Configure the Gemini API client
try:
genai.configure(api_key=os.environ.get("GEMINI_API_KEY"))
model = genai.GenerativeModel('gemini-1.5-flash') # Or 'gemini-pro'
except Exception as e:
print(f"Error configuring Gemini API: {e}")
model = None
# Configure the upload folder
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# --- Helper Functions ---
def get_project_files(project_path):
"""Gets the file tree and content of all files in the project directory."""
file_data = {}
for root, _, files in os.walk(project_path):
for file in files:
# Skip .DS_Store and other unwanted files
if file == '.DS_Store':
continue
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, project_path)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_data[relative_path] = content
except Exception as e:
file_data[relative_path] = f"Error reading file: {e}"
return file_data
# --- Flask Routes ---
@app.route('/')
def index():
"""Renders the main page and initializes the session."""
if 'project_id' not in session:
session['project_id'] = str(uuid.uuid4())
session['chat_history'] = []
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_project():
"""Handles the project zip file upload."""
# FIX: Ensure session is initialized to prevent KeyError
if 'project_id' not in session:
session['project_id'] = str(uuid.uuid4())
session['chat_history'] = []
if 'project_zip' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['project_zip']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if file and file.filename.endswith('.zip'):
project_id = session['project_id']
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
if not os.path.exists(project_path):
os.makedirs(project_path)
zip_path = os.path.join(project_path, file.filename)
file.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(project_path)
os.remove(zip_path)
project_files = get_project_files(project_path)
session['project_files'] = project_files
initial_context = "The user has uploaded a new project. Here is the file structure and content:\n\n"
for path, content in project_files.items():
initial_context += f"**File:** `{path}`\n```\n{content}\n```\n\n"
session['chat_history'] = [
{"role": "user", "parts": [initial_context]},
{"role": "model", "parts": ["I have analyzed the project. I am ready to help. How can I assist you with your code?"]}
]
frontend_chat_history = [
{"role": "user", "content": initial_context},
{"role": "assistant", "content": "I have analyzed the project. I am ready to help. How can I assist you with your code?"}
]
return jsonify({
"message": "Project uploaded and analyzed.",
"file_tree": list(project_files.keys()),
"chat_history": frontend_chat_history
})
return jsonify({"error": "Invalid file type. Please upload a .zip file."}), 400
@app.route('/chat', methods=['POST'])
def chat():
"""Handles chat interaction with the Gemini API."""
if not model:
return jsonify({"error": "Gemini API is not configured. Please check your API key."}), 500
user_message = request.json.get('message')
if not user_message:
return jsonify({"error": "Empty message"}), 400
project_id = session.get('project_id')
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
# Add user message to history
session['chat_history'].append({"role": "user", "parts": [user_message]})
session.modified = True
chat_session = model.start_chat(history=session['chat_history'])
try:
system_instruction = """You are an expert programmer AI assistant. You must follow these rules:
1. When asked to modify a file, you MUST respond with the full file path on the first line, followed by the complete, updated code for that file inside a markdown code block. Example:
`src/app.js`
```javascript
// new updated javascript code
```
2. When asked to create a new file, respond with the new file path and its content in the same format.
3. When asked to delete a file, respond ONLY with 'DELETE: [file_path]'.
4. For general conversation, just respond naturally."""
# The history already provides context. We send the latest user message.
# The system instruction is part of the model's configuration or initial prompt.
# For simplicity here, we prepend it to guide this specific turn.
# A more advanced setup might use the system_instruction parameter in genai.configure
prompt_with_instruction = f"{system_instruction}\n\n## Project Context:\n{get_project_files(project_path)}\n\n## Conversation History:\n{session['chat_history']}\n\n## User's Latest Message:\n{user_message}"
response = chat_session.send_message(user_message) # Send only the new message
ai_response = response.text
# Add AI response to chat history
session['chat_history'].append({"role": "model", "parts": [ai_response]})
session.modified = True
# --- AI File Operation Logic ---
# A more robust solution might involve structured JSON output from the LLM
if "```" in ai_response:
lines = ai_response.split('\n')
file_path_line = lines[0].strip()
if file_path_line.startswith('`') and file_path_line.endswith('`'):
file_path = file_path_line.strip('`')
try:
content_start = ai_response.find('```')
code_block = ai_response[content_start:]
# Extract language and content
code_content = '\n'.join(code_block.split('\n')[1:-1])
full_path = os.path.join(project_path, file_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f:
f.write(code_content)
except IndexError:
pass # Ignore malformed code blocks for now
elif ai_response.strip().startswith("DELETE:"):
file_to_delete = ai_response.strip().split(":", 1)[1].strip()
full_path = os.path.join(project_path, file_to_delete)
if os.path.exists(full_path):
try:
os.remove(full_path)
except OSError as e:
print(f"Error deleting file {full_path}: {e}")
return jsonify({"reply": ai_response})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/file_tree')
def get_file_tree():
"""Returns the current file structure of the project."""
project_id = session.get('project_id')
if not project_id:
return jsonify({"error": "No project in session."}), 400
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
if not os.path.exists(project_path):
return jsonify({"error": "Project files not found."}), 404
file_tree = list(get_project_files(project_path).keys())
return jsonify({"file_tree": file_tree})
@app.route('/file_content')
def get_file_content():
"""Returns the content of a specific file."""
project_id = session.get('project_id')
file_path = request.args.get('path')
if not project_id or not file_path:
return jsonify({"error": "Missing project or file path"}), 400
full_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id, file_path)
if not os.path.exists(full_path):
return jsonify({"error": "File not found"}), 404
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({"path": file_path, "content": content})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/download')
def download_project():
"""Zips and sends the current project state for download."""
project_id = session.get('project_id')
if not project_id:
return "No project to download.", 404
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
zip_filename = f"project_{project_id}.zip"
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(project_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, project_path)
zipf.write(file_path, arcname)
return send_from_directory(app.config['UPLOAD_FOLDER'], zip_filename, as_attachment=True)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860)