|
import os |
|
import io |
|
import wave |
|
import struct |
|
import logging |
|
import hashlib |
|
from flask import Flask, render_template, request, send_file, jsonify |
|
from werkzeug.utils import secure_filename |
|
from pydub import AudioSegment |
|
import magic |
|
|
|
app = Flask(__name__) |
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = app.logger |
|
|
|
UPLOAD_FOLDER = '/tmp' |
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp3', 'wav', 'txt', 'pdf', 'docx'} |
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
|
|
def allowed_file(filename): |
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/process', methods=['POST']) |
|
def process_file(): |
|
logger.info("Processing file request received") |
|
if 'file' not in request.files: |
|
logger.error("No file part in the request") |
|
return jsonify({'error': 'No file part'}), 400 |
|
file = request.files['file'] |
|
if file.filename == '': |
|
logger.error("No selected file") |
|
return jsonify({'error': 'No selected file'}), 400 |
|
if file and allowed_file(file.filename): |
|
filename = secure_filename(file.filename) |
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(filepath) |
|
logger.info(f"File saved: {filepath}") |
|
|
|
factor = int(request.form['factor']) |
|
is_decrypting = filename.lower().endswith('.mp3') |
|
logger.info(f"Processing file: {'decrypting' if is_decrypting else 'encrypting'} with factor {factor}") |
|
|
|
try: |
|
if is_decrypting: |
|
output, output_filename, mimetype = decrypt_file(filepath, factor) |
|
else: |
|
output, output_filename, mimetype = encrypt_file(filepath, factor) |
|
|
|
logger.info(f"File processed successfully. Output size: {len(output)} bytes") |
|
os.remove(filepath) |
|
logger.info(f"Temporary file removed: {filepath}") |
|
|
|
return send_file( |
|
io.BytesIO(output), |
|
mimetype=mimetype, |
|
as_attachment=True, |
|
download_name=output_filename |
|
) |
|
except Exception as e: |
|
logger.error(f"Error processing file: {str(e)}") |
|
os.remove(filepath) |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
logger.error("Invalid file type") |
|
return jsonify({'error': 'Invalid file type'}), 400 |
|
|
|
def calculate_checksum(data): |
|
return hashlib.md5(data).hexdigest() |
|
|
|
def encrypt_file(filepath, factor): |
|
logger.info(f"Encrypting file: {filepath}") |
|
with open(filepath, 'rb') as file: |
|
data = file.read() |
|
|
|
|
|
checksum = calculate_checksum(data) |
|
|
|
|
|
samples = [int(byte * factor) for byte in data] |
|
|
|
|
|
checksum_samples = [ord(byte) for byte in checksum.encode('ascii')] |
|
samples = checksum_samples + samples |
|
|
|
|
|
wav_output = io.BytesIO() |
|
with wave.open(wav_output, 'wb') as wav_file: |
|
wav_file.setnchannels(1) |
|
wav_file.setsampwidth(2) |
|
wav_file.setframerate(44100) |
|
wav_file.writeframes(struct.pack(f'{len(samples)}h', *samples)) |
|
|
|
|
|
wav_output.seek(0) |
|
audio = AudioSegment.from_wav(wav_output) |
|
mp3_output = io.BytesIO() |
|
audio.export(mp3_output, format="mp3") |
|
|
|
logger.info(f"Encryption complete. Output size: {mp3_output.getbuffer().nbytes} bytes") |
|
return mp3_output.getvalue(), 'encrypted.mp3', 'audio/mpeg' |
|
|
|
def decrypt_file(filepath, factor): |
|
logger.info(f"Decrypting file: {filepath}") |
|
|
|
|
|
audio = AudioSegment.from_mp3(filepath) |
|
wav_output = io.BytesIO() |
|
audio.export(wav_output, format="wav") |
|
wav_output.seek(0) |
|
|
|
with wave.open(wav_output, 'rb') as wav_file: |
|
params = wav_file.getparams() |
|
frames = wav_file.readframes(wav_file.getnframes()) |
|
|
|
samples = struct.unpack(f'{len(frames)//2}h', frames) |
|
|
|
|
|
checksum = ''.join(chr(sample // factor) for sample in samples[:32]) |
|
samples = samples[32:] |
|
|
|
|
|
decrypted_bytes = bytearray() |
|
for sample in samples: |
|
byte_value = int(round(sample / factor)) |
|
decrypted_bytes.append(max(0, min(byte_value, 255))) |
|
|
|
decrypted_data = bytes(decrypted_bytes) |
|
|
|
|
|
if calculate_checksum(decrypted_data) != checksum: |
|
raise ValueError("Checksum verification failed. The file may be corrupted or the wrong factor was used.") |
|
|
|
|
|
mime = magic.Magic(mime=True) |
|
file_type = mime.from_buffer(decrypted_data) |
|
file_extension = file_type.split('/')[-1] |
|
|
|
logger.info(f"Decryption complete. Output size: {len(decrypted_data)} bytes") |
|
return decrypted_data, f'decrypted_file.{file_extension}', file_type |
|
|
|
if __name__ == '__main__': |
|
app.run(debug=True) |