|
import os |
|
import uuid |
|
import zipfile |
|
from flask import Flask, request, jsonify, render_template, session, send_from_directory |
|
from flask_session import Session |
|
from dotenv import load_dotenv |
|
import google.generativeai as genai |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
gemini_key = os.getenv("GEMINI_API_KEY") |
|
if gemini_key: |
|
genai.configure(api_key=gemini_key) |
|
|
|
|
|
app = Flask(__name__) |
|
app.config['SECRET_KEY'] = os.getenv("FLASK_SECRET_KEY", "super-secret-key-change-me") |
|
app.config['SESSION_TYPE'] = 'filesystem' |
|
app.config['SESSION_PERMANENT'] = False |
|
app.config['SESSION_FILE_DIR'] = './flask_session' |
|
Session(app) |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
os.makedirs(app.config['SESSION_FILE_DIR'], exist_ok=True) |
|
|
|
def get_project_file_tree(project_path): |
|
"""Generates a list of file paths within a project directory.""" |
|
file_list = [] |
|
for root, _, filenames in os.walk(project_path): |
|
|
|
if any(part.startswith('.') for part in root.split(os.sep)): |
|
continue |
|
for filename in filenames: |
|
if filename.startswith('.'): |
|
continue |
|
|
|
rel_path = os.path.relpath(os.path.join(root, filename), project_path) |
|
file_list.append(rel_path) |
|
return file_list |
|
|
|
@app.route('/') |
|
def index(): |
|
"""Serves the main HTML page.""" |
|
|
|
if 'project_id' not in session: |
|
session['project_id'] = str(uuid.uuid4()) |
|
session['chat_history'] = [] |
|
return render_template('index.html') |
|
|
|
@app.route('/upload', methods=['POST']) |
|
def upload(): |
|
"""Handles the upload and initial analysis of a .zip project.""" |
|
if not gemini_key: |
|
return jsonify({'error': 'Gemini API key is not configured on the server.'}), 500 |
|
|
|
if 'project_zip' not in request.files: |
|
return jsonify({'error': 'No file part in the request.'}), 400 |
|
|
|
file = request.files['project_zip'] |
|
if file.filename == '' or not file.filename.endswith('.zip'): |
|
return jsonify({'error': 'Invalid file. Please upload a .zip file.'}), 400 |
|
|
|
|
|
project_id = session.get('project_id', str(uuid.uuid4())) |
|
session['project_id'] = project_id |
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
|
|
|
|
if os.path.exists(project_path): |
|
import shutil |
|
shutil.rmtree(project_path) |
|
os.makedirs(project_path) |
|
|
|
|
|
zip_path = os.path.join(project_path, file.filename) |
|
file.save(zip_path) |
|
|
|
try: |
|
with zipfile.ZipFile(zip_path, 'r') as z: |
|
z.extractall(project_path) |
|
except zipfile.BadZipFile: |
|
return jsonify({'error': 'The uploaded file is not a valid zip archive.'}), 400 |
|
|
|
|
|
try: |
|
print("Uploading file to Gemini API...") |
|
|
|
project_file = genai.upload_file(path=zip_path, display_name=file.filename) |
|
print(f"File uploaded successfully. Name: {project_file.name}") |
|
|
|
|
|
session['gemini_file_name'] = project_file.name |
|
|
|
|
|
model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest") |
|
prompt = [ |
|
"You are an expert AI code assistant.", |
|
"Analyze the entire codebase provided in this zip file. Give me a high-level summary of the project, identify potential bugs, suggest improvements or refactoring, and point out any security vulnerabilities.", |
|
"Structure your response clearly using Markdown.", |
|
project_file |
|
] |
|
print("Generating initial analysis...") |
|
response = model.generate_content(prompt) |
|
ai_response = response.text |
|
|
|
|
|
session['chat_history'] = [ |
|
{"role": "user", "content": "Project uploaded for analysis."}, |
|
{"role": "assistant", "content": ai_response} |
|
] |
|
session.modified = True |
|
|
|
|
|
file_tree = get_project_file_tree(project_path) |
|
|
|
return jsonify({ |
|
"message": "Project uploaded and analyzed.", |
|
"file_tree": file_tree, |
|
"chat_history": session['chat_history'] |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error during Gemini API call: {e}") |
|
return jsonify({"error": f"An error occurred while communicating with the AI model: {e}"}), 500 |
|
|
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat(): |
|
"""Handles follow-up chat messages, maintaining file context.""" |
|
if not gemini_key: |
|
return jsonify({"error": "Gemini API key not configured."}), 500 |
|
|
|
message = request.json.get("message") |
|
if not message: |
|
return jsonify({"error": "Empty message received."}), 400 |
|
|
|
gemini_file_name = session.get('gemini_file_name') |
|
if not gemini_file_name: |
|
return jsonify({"error": "No project context found. Please upload a project first."}), 400 |
|
|
|
try: |
|
|
|
project_file = genai.get_file(name=gemini_file_name) |
|
|
|
|
|
model = genai.GenerativeModel(model_name="gemini-2.0-flash") |
|
|
|
|
|
prompt = [ |
|
message, |
|
project_file |
|
] |
|
|
|
response = model.generate_content(prompt) |
|
|
|
|
|
if 'chat_history' not in session: |
|
session['chat_history'] = [] |
|
session['chat_history'].append({"role": "user", "content": message}) |
|
session['chat_history'].append({"role": "assistant", "content": response.text}) |
|
session.modified = True |
|
|
|
return jsonify({"reply": response.text}) |
|
|
|
except Exception as e: |
|
print(f"Error during Gemini chat: {e}") |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/file_tree') |
|
def file_tree(): |
|
"""Returns the file structure of the uploaded project.""" |
|
project_id = session.get('project_id') |
|
if not project_id: |
|
return jsonify({"file_tree": []}) |
|
|
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
if not os.path.exists(project_path): |
|
return jsonify({"file_tree": []}) |
|
|
|
return jsonify({"file_tree": get_project_file_tree(project_path)}) |
|
|
|
@app.route('/file_content') |
|
def file_content(): |
|
"""Returns the content of a specific file.""" |
|
project_id = session.get('project_id') |
|
file_path = request.args.get('path') |
|
|
|
if not project_id or not file_path: |
|
return jsonify({"error": "Missing project or file path."}), 400 |
|
|
|
|
|
base_path = os.path.abspath(os.path.join(app.config['UPLOAD_FOLDER'], project_id)) |
|
full_path = os.path.abspath(os.path.join(base_path, file_path)) |
|
|
|
if not full_path.startswith(base_path): |
|
return jsonify({"error": "Access denied."}), 403 |
|
|
|
try: |
|
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
return jsonify({"content": f.read()}) |
|
except FileNotFoundError: |
|
return jsonify({"error": "File not found."}), 404 |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/download') |
|
def download(): |
|
"""Allows the user to download the current state of the project as a zip.""" |
|
project_id = session.get('project_id') |
|
if not project_id: |
|
return "No project found in session.", 404 |
|
|
|
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id) |
|
zip_filename = f"{project_id}.zip" |
|
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename) |
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
for root, _, files in os.walk(project_path): |
|
|
|
if os.path.basename(root) == project_id and any(f.endswith('.zip') for f in files): |
|
files = [f for f in files if not f.endswith('.zip')] |
|
|
|
for file in files: |
|
file_full_path = os.path.join(root, file) |
|
|
|
zipf.write(file_full_path, os.path.relpath(file_full_path, project_path)) |
|
|
|
return send_from_directory( |
|
app.config['UPLOAD_FOLDER'], |
|
zip_filename, |
|
as_attachment=True, |
|
download_name=f"project_{project_id}.zip" |
|
) |
|
|
|
if __name__ == "__main__": |
|
app.run(host='0.0.0.0', port=7860) |