|
import os |
|
import zipfile |
|
import uuid |
|
import google.generativeai as genai |
|
from flask import Flask, request, jsonify, render_template, session, send_from_directory |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.environ.get("FLASK_SECRET_KEY", 'your_default_secret_key_change_me') |
|
|
|
|
|
try: |
|
genai.configure(api_key=os.environ.get("GEMINI_API_KEY")) |
|
model = genai.GenerativeModel('gemini-1.5-flash') |
|
except Exception as e: |
|
print(f"Error configuring Gemini API: {e}") |
|
model = None |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
|
|
if not os.path.exists(UPLOAD_FOLDER): |
|
os.makedirs(UPLOAD_FOLDER) |
|
|
|
|
|
|
|
def get_project_files(project_path): |
|
"""Gets the file tree and content of all files in the project directory.""" |
|
file_data = {} |
|
for root, _, files in os.walk(project_path): |
|
for file in files: |
|
|
|
if file == '.DS_Store': |
|
continue |
|
file_path = os.path.join(root, file) |
|
relative_path = os.path.relpath(file_path, project_path) |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
file_data[relative_path] = content |
|
except Exception as e: |
|
file_data[relative_path] = f"Error reading file: {e}" |
|
return file_data |
|
|
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
"""Renders the main page and initializes the session.""" |
|
if 'project_id' not in session: |
|
session['project_id'] = str(uuid.uuid4()) |
|
session['chat_history'] = [] |
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/upload', methods=['POST']) |
|
def upload_project(): |
|
"""Handles the project zip file upload.""" |
|
|
|
if 'project_id' not in session: |
|
session['project_id'] = str(uuid.uuid4()) |
|
session['chat_history'] = [] |
|
|
|
if 'project_zip' not in request.files: |
|
return jsonify({"error": "No file part"}), 400 |
|
file = request.files['project_zip'] |
|
if file.filename == '': |
|
return jsonify({"error": "No selected file"}), 400 |
|
|
|
if file and file.filename.endswith('.zip'): |
|
project_id = session['project_id'] |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
if not os.path.exists(project_path): |
|
os.makedirs(project_path) |
|
|
|
zip_path = os.path.join(project_path, file.filename) |
|
file.save(zip_path) |
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
zip_ref.extractall(project_path) |
|
os.remove(zip_path) |
|
|
|
project_files = get_project_files(project_path) |
|
session['project_files'] = project_files |
|
|
|
initial_context = "The user has uploaded a new project. Here is the file structure and content:\n\n" |
|
for path, content in project_files.items(): |
|
initial_context += f"**File:** `{path}`\n```\n{content}\n```\n\n" |
|
|
|
session['chat_history'] = [ |
|
{"role": "user", "parts": [initial_context]}, |
|
{"role": "model", "parts": ["I have analyzed the project. I am ready to help. How can I assist you with your code?"]} |
|
] |
|
|
|
frontend_chat_history = [ |
|
{"role": "user", "content": initial_context}, |
|
{"role": "assistant", "content": "I have analyzed the project. I am ready to help. How can I assist you with your code?"} |
|
] |
|
|
|
return jsonify({ |
|
"message": "Project uploaded and analyzed.", |
|
"file_tree": list(project_files.keys()), |
|
"chat_history": frontend_chat_history |
|
}) |
|
|
|
return jsonify({"error": "Invalid file type. Please upload a .zip file."}), 400 |
|
|
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat(): |
|
"""Handles chat interaction with the Gemini API.""" |
|
if not model: |
|
return jsonify({"error": "Gemini API is not configured. Please check your API key."}), 500 |
|
|
|
user_message = request.json.get('message') |
|
if not user_message: |
|
return jsonify({"error": "Empty message"}), 400 |
|
|
|
project_id = session.get('project_id') |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
|
|
|
|
session['chat_history'].append({"role": "user", "parts": [user_message]}) |
|
session.modified = True |
|
|
|
chat_session = model.start_chat(history=session['chat_history']) |
|
|
|
try: |
|
system_instruction = """You are an expert programmer AI assistant. You must follow these rules: |
|
1. When asked to modify a file, you MUST respond with the full file path on the first line, followed by the complete, updated code for that file inside a markdown code block. Example: |
|
`src/app.js` |
|
```javascript |
|
// new updated javascript code |
|
``` |
|
2. When asked to create a new file, respond with the new file path and its content in the same format. |
|
3. When asked to delete a file, respond ONLY with 'DELETE: [file_path]'. |
|
4. For general conversation, just respond naturally.""" |
|
|
|
|
|
|
|
|
|
|
|
prompt_with_instruction = f"{system_instruction}\n\n## Project Context:\n{get_project_files(project_path)}\n\n## Conversation History:\n{session['chat_history']}\n\n## User's Latest Message:\n{user_message}" |
|
|
|
response = chat_session.send_message(user_message) |
|
ai_response = response.text |
|
|
|
|
|
session['chat_history'].append({"role": "model", "parts": [ai_response]}) |
|
session.modified = True |
|
|
|
|
|
|
|
if "```" in ai_response: |
|
lines = ai_response.split('\n') |
|
file_path_line = lines[0].strip() |
|
if file_path_line.startswith('`') and file_path_line.endswith('`'): |
|
file_path = file_path_line.strip('`') |
|
try: |
|
content_start = ai_response.find('```') |
|
code_block = ai_response[content_start:] |
|
|
|
code_content = '\n'.join(code_block.split('\n')[1:-1]) |
|
|
|
full_path = os.path.join(project_path, file_path) |
|
os.makedirs(os.path.dirname(full_path), exist_ok=True) |
|
with open(full_path, 'w', encoding='utf-8') as f: |
|
f.write(code_content) |
|
except IndexError: |
|
pass |
|
|
|
elif ai_response.strip().startswith("DELETE:"): |
|
file_to_delete = ai_response.strip().split(":", 1)[1].strip() |
|
full_path = os.path.join(project_path, file_to_delete) |
|
if os.path.exists(full_path): |
|
try: |
|
os.remove(full_path) |
|
except OSError as e: |
|
print(f"Error deleting file {full_path}: {e}") |
|
|
|
return jsonify({"reply": ai_response}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route('/file_tree') |
|
def get_file_tree(): |
|
"""Returns the current file structure of the project.""" |
|
project_id = session.get('project_id') |
|
if not project_id: |
|
return jsonify({"error": "No project in session."}), 400 |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
if not os.path.exists(project_path): |
|
return jsonify({"error": "Project files not found."}), 404 |
|
file_tree = list(get_project_files(project_path).keys()) |
|
return jsonify({"file_tree": file_tree}) |
|
|
|
|
|
@app.route('/file_content') |
|
def get_file_content(): |
|
"""Returns the content of a specific file.""" |
|
project_id = session.get('project_id') |
|
file_path = request.args.get('path') |
|
if not project_id or not file_path: |
|
return jsonify({"error": "Missing project or file path"}), 400 |
|
|
|
full_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id, file_path) |
|
if not os.path.exists(full_path): |
|
return jsonify({"error": "File not found"}), 404 |
|
try: |
|
with open(full_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
return jsonify({"path": file_path, "content": content}) |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route('/download') |
|
def download_project(): |
|
"""Zips and sends the current project state for download.""" |
|
project_id = session.get('project_id') |
|
if not project_id: |
|
return "No project to download.", 404 |
|
|
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
zip_filename = f"project_{project_id}.zip" |
|
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename) |
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
for root, _, files in os.walk(project_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
arcname = os.path.relpath(file_path, project_path) |
|
zipf.write(file_path, arcname) |
|
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], zip_filename, as_attachment=True) |
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860) |