|
import os |
|
import zipfile |
|
import uuid |
|
import google.generativeai as genai |
|
from flask import Flask, request, jsonify, render_template, session, send_from_directory |
|
from flask_session import Session |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
|
|
app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY", 'a-strong-default-secret-key') |
|
|
|
|
|
app.config["SESSION_PERMANENT"] = False |
|
app.config["SESSION_TYPE"] = "filesystem" |
|
|
|
|
|
app.config['SESSION_FILE_DIR'] = './flask_session' |
|
Session(app) |
|
|
|
|
|
|
|
|
|
try: |
|
gemini_api_key = os.environ.get("GEMINI_API_KEY") |
|
if not gemini_api_key: |
|
print("Warning: GEMINI_API_KEY not found. The AI will not function.") |
|
model = None |
|
else: |
|
genai.configure(api_key=gemini_api_key) |
|
model = genai.GenerativeModel('gemini-1.5-flash') |
|
except Exception as e: |
|
print(f"Error configuring Gemini API: {e}") |
|
model = None |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
if not os.path.exists(UPLOAD_FOLDER): |
|
os.makedirs(UPLOAD_FOLDER) |
|
if not os.path.exists('./flask_session'): |
|
os.makedirs('./flask_session') |
|
|
|
|
|
def get_project_files(project_path): |
|
file_data = {} |
|
for root, _, files in os.walk(project_path): |
|
for file in files: |
|
if file in ['.DS_Store', '__MACOSX']: continue |
|
file_path = os.path.join(root, file) |
|
relative_path = os.path.relpath(file_path, project_path) |
|
try: |
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
content = f.read() |
|
file_data[relative_path] = content |
|
except Exception as e: |
|
file_data[relative_path] = f"Error reading file: {e}" |
|
return file_data |
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
if 'project_id' not in session: |
|
session['project_id'] = str(uuid.uuid4()) |
|
session['chat_history'] = [] |
|
return render_template('index.html') |
|
|
|
@app.route('/upload', methods=['POST']) |
|
def upload_project(): |
|
if 'project_id' not in session: |
|
session['project_id'] = str(uuid.uuid4()) |
|
session['chat_history'] = [] |
|
|
|
if 'project_zip' not in request.files: return jsonify({"error": "No file part"}), 400 |
|
file = request.files['project_zip'] |
|
if file.filename == '': return jsonify({"error": "No selected file"}), 400 |
|
|
|
if file and file.filename.endswith('.zip'): |
|
project_id = session['project_id'] |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
if not os.path.exists(project_path): os.makedirs(project_path) |
|
zip_path = os.path.join(project_path, file.filename) |
|
file.save(zip_path) |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(project_path) |
|
os.remove(zip_path) |
|
project_files = get_project_files(project_path) |
|
initial_context = "The user has uploaded a new project. Here is the file structure and content:\n\n" |
|
for path, content in project_files.items(): initial_context += f"**File:** `{path}`\n```\n{content}\n```\n\n" |
|
session['chat_history'] = [ |
|
{"role": "user", "parts": [initial_context]}, |
|
{"role": "model", "parts": ["I have analyzed the project. I am ready to help. How can I assist you with your code?"]} |
|
] |
|
frontend_chat_history = [ |
|
{"role": "user", "content": initial_context}, |
|
{"role": "assistant", "content": "I have analyzed the project. I am ready to help. How can I assist you with your code?"} |
|
] |
|
return jsonify({ "message": "Project uploaded and analyzed.", "file_tree": list(project_files.keys()), "chat_history": frontend_chat_history }) |
|
return jsonify({"error": "Invalid file type. Please upload a .zip file."}), 400 |
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat(): |
|
if not model: return jsonify({"error": "AI model not configured. The admin needs to set the API key."}), 500 |
|
user_message = request.json.get('message') |
|
if not user_message: return jsonify({"error": "Empty message"}), 400 |
|
project_id = session.get('project_id') |
|
if not project_id: return jsonify({"error": "Your session has expired. Please upload your project again."}), 400 |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
session['chat_history'].append({"role": "user", "parts": [user_message]}) |
|
chat_session = model.start_chat(history=session['chat_history']) |
|
try: |
|
response = chat_session.send_message(user_message) |
|
ai_response = response.text |
|
session['chat_history'].append({"role": "model", "parts": [ai_response]}) |
|
if "```" in ai_response: |
|
lines = ai_response.split('\n') |
|
if lines[0].strip().startswith('`') and lines[0].strip().endswith('`'): |
|
file_path = lines[0].strip().strip('`') |
|
try: |
|
code_content = '\n'.join(ai_response.split('```')[1].split('\n')[1:]) |
|
full_path = os.path.join(project_path, file_path) |
|
os.makedirs(os.path.dirname(full_path), exist_ok=True) |
|
with open(full_path, 'w', encoding='utf-8') as f: f.write(code_content) |
|
except IndexError: pass |
|
elif ai_response.strip().startswith("DELETE:"): |
|
file_to_delete = ai_response.strip().split(":", 1)[1].strip() |
|
full_path = os.path.join(project_path, file_to_delete) |
|
if os.path.exists(full_path): |
|
try: os.remove(full_path) |
|
except OSError as e: print(f"Error deleting file {full_path}: {e}") |
|
return jsonify({"reply": ai_response}) |
|
except Exception as e: return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/file_tree') |
|
def get_file_tree(): |
|
project_id = session.get('project_id') |
|
if not project_id: return jsonify({"error": "No project in session."}), 400 |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
if not os.path.exists(project_path): return jsonify({"file_tree": []}) |
|
return jsonify({"file_tree": list(get_project_files(project_path).keys())}) |
|
|
|
@app.route('/file_content') |
|
def get_file_content(): |
|
project_id = session.get('project_id') |
|
file_path = request.args.get('path') |
|
if not project_id or not file_path: return jsonify({"error": "Missing project/file path"}), 400 |
|
full_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id, file_path) |
|
if not os.path.exists(full_path): return jsonify({"error": "File not found"}), 404 |
|
try: |
|
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() |
|
return jsonify({"path": file_path, "content": content}) |
|
except Exception as e: return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/download') |
|
def download_project(): |
|
project_id = session.get('project_id') |
|
if not project_id: return "No project to download.", 404 |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
zip_filename = f"project_{project_id}.zip" |
|
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename) |
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
for root, _, files in os.walk(project_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
arcname = os.path.relpath(file_path, project_path) |
|
zipf.write(file_path, arcname) |
|
return send_from_directory(app.config['UPLOAD_FOLDER'], zip_filename, as_attachment=True) |
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860) |