Spaces:
Running
Running
from flask import Flask, request, jsonify, send_from_directory | |
from PIL import Image | |
from io import BytesIO | |
from prodiapy import Prodia | |
import requests | |
import os | |
import psutil | |
import time | |
import datetime | |
import json | |
import subprocess | |
import string | |
import random | |
app = Flask(__name__) | |
# Define the path to static files | |
static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) | |
def css(filename): | |
return send_from_directory(os.path.join(static_dir, 'css'), filename) | |
def js(filename): | |
return send_from_directory(os.path.join(static_dir, 'js'), filename) | |
def img(filename): | |
return send_from_directory(os.path.join(static_dir, 'img'), filename) | |
def index(): | |
return send_from_directory(static_dir, 'index.html') | |
def ai(): | |
return send_from_directory(os.path.join(static_dir, 'views'), 'ai.html') | |
def ai_file(filename): | |
if filename.endswith('.py'): | |
with open(os.path.join(static_dir, 'ai', filename), 'r') as f: | |
code = f.read() | |
output = subprocess.check_output(["python", "-c", code], shell=True, stderr=subprocess.STDOUT) | |
return output.decode('utf-8') | |
else: | |
return send_from_directory(os.path.join(static_dir, 'ai'), filename) | |
def info(): | |
ip = request.remote_addr | |
current_time = datetime.datetime.now().strftime("%H:%M:%S") | |
return jsonify({'ip': ip, 'current_time': current_time}) | |
# Define the visitor count routes | |
visitor_count = 0 | |
visitor_today = 0 | |
last_update_date = datetime.datetime.now().date() | |
visitor_total = 0 | |
def update_visitor_counts(): | |
global visitor_count, visitor_today, last_update_date, visitor_total | |
allowed_paths = ['/ai', '/api', '/tool'] | |
if request.path.startswith(tuple(allowed_paths)): | |
current_date = datetime.datetime.now().date() | |
if current_date != last_update_date: | |
visitor_today = 0 | |
last_update_date = current_date | |
visitor_count += 1 | |
visitor_today += 1 | |
visitor_total += 1 | |
if datetime.datetime.now().hour == 0 and datetime.datetime.now().minute == 0: | |
reset_visitor_count() | |
def count(): | |
return jsonify({ | |
'visitor_count': visitor_count, | |
'visitor_today': visitor_today, | |
'visitor_total': visitor_total | |
}) | |
# Define the status route | |
def status(): | |
uptime_seconds = int(time.time() - psutil.boot_time()) | |
uptime = str(datetime.timedelta(seconds=uptime_seconds)) | |
memory_free = psutil.virtual_memory().available | |
memory_total = psutil.virtual_memory().total | |
return jsonify({'runtime': uptime, 'memory': f'{memory_free} / {memory_total}'}) | |
# Handle 404 errors | |
def page_not_found(e): | |
return send_from_directory(static_dir, '404.html'), 404 | |
apiKeys = [ | |
"2021e94a-1385-4ddc-905b-c050cfb5af32", | |
"0bfe0e6d-6bf9-4984-ab07-3a9410a551ad", | |
"1452e7a5-d6e2-4600-9641-1c2debde397a", | |
"f4b18c3c-ea4d-4b18-be47-f5ad29d70936", | |
"688659c2-b2e9-4524-8a91-1c72735ec068", | |
"aa64f14e-18d8-44df-91cc-6d4e20051ca3" | |
] | |
# Load styles from style.json file | |
with open("style.json", "r") as style_file: | |
styleList = json.load(style_file) | |
def getRandomApiKey(): | |
# Implement your logic to get a random API key here | |
return random.choice(apiKeys) | |
def getRandomSeed(): | |
return random.randint(1, 18446744073709552000) | |
def getAvailableStyles(): | |
return ', '.join([style["name"] for style in styleList]) | |
prodia = Prodia(getRandomApiKey()) | |
def get_styles(): | |
with open("style.json", "r") as style_file: | |
styles = json.load(style_file) | |
return jsonify({"styles": [style["name"] for style in styles]}) | |
def upload_image(): | |
try: | |
# Get the URL parameter | |
url = request.args.get('url') | |
if not url: | |
return jsonify({'error': 'URL parameter is missing'}), 400 | |
# Download the image | |
response = requests.get(url) | |
if response.status_code != 200: | |
return jsonify({'error': 'Failed to download image'}), 400 | |
image_name = f"image.png" | |
# Save the image | |
img = Image.open(BytesIO(response.content)) | |
img.save(image_name, "PNG") | |
# Send the image to Discord | |
discord_webhook_url = "https://discord.com/api/webhooks/1217109788656406588/sh0LG9VH5wmxSWP8OBwfHxfbbMHleUX6eQ8-xULIEo5m4IASfNm7jCNrZFZZweKaNGTM" | |
files = {'file': open(image_name, 'rb')} | |
webhook_response = requests.post(discord_webhook_url, files=files) | |
# Get the uploaded image URL from Discord CDN | |
discord_cdn_url = webhook_response.json().get('attachments', [{}])[0].get('url') | |
# Delete the temporary image file | |
os.remove(image_name) | |
return jsonify({ | |
'success': f'Image uploaded and sent to Discord', | |
'discord_cdn_url': discord_cdn_url | |
}), 200 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
async def generate_image(): | |
try: | |
data = request.json | |
prompt = data.get('prompt', '') | |
userStyle = data.get('style') | |
seed = int(data.get('seed', getRandomSeed())) | |
guidance_scale = int(data.get('guidance_scale', 0)) | |
if not userStyle: | |
return jsonify({"status": "error", "error": "Style is required. Available styles: " + getAvailableStyles()}), 400 | |
selectedStyle = next((style for style in styleList if style["name"].lower() == userStyle.lower()), None) | |
if not selectedStyle: | |
return jsonify({"status": "error", "error": "Invalid style. Available styles: " + getAvailableStyles()}), 400 | |
if guidance_scale and (guidance_scale < 1 or guidance_scale > 100): | |
return jsonify({"status": "error", "error": "guidance_scale must be an integer between 1 and 100."}), 400 | |
job = prodia.sdxl.generate( | |
prompt=selectedStyle["prompt"].replace('{prompt}', data.get('prompt', '')), | |
model="sd_xl_base_1.0.safetensors [be9edd61]", | |
negative_prompt=selectedStyle["negative_prompt"] + ", duplicate", | |
sampler="DPM++ 2M Karras", | |
cfg_scale=selectedStyle.get('cfg_scale', 7), | |
steps=selectedStyle.get('steps', 20), | |
height=1024, | |
width=1024) | |
wait = prodia.wait(job) | |
url = wait.image_url | |
# Discord Bot setup | |
discord_webhook_url = "https://discord.com/api/webhooks/1217084642675654717/FpiXr5sLPmFNZ0xDz5HNClwn6NCYNmL2JvwdcGwb7V9FZd9bdPfSPZR41HmGzGD3uR8d" | |
# Send the generated image URL through the webhook | |
image_name = f"invite_1080035826051854356_best_bot_ever.png" | |
img = Image.open(BytesIO(requests.get(url).content)) | |
img.save(image_name, "PNG") | |
files = {'file': open(image_name, 'rb')} | |
webhook_response = requests.post(discord_webhook_url, files=files) | |
# Print the response for debugging | |
print(webhook_response.text) | |
# Check if the request was successful | |
if webhook_response.status_code == 200: | |
discord_cdn_url = webhook_response.json().get('attachments', [{}])[0].get('url') | |
# Remove the temporary image file | |
os.remove(image_name) | |
# Return the success response with the generated image URL | |
return jsonify({ | |
'status': 'success', | |
'url': discord_cdn_url | |
}), 200 | |
else: | |
# If the request to the webhook fails, return an error response | |
return jsonify({"status": "error", "error": "Failed to send image through webhook"}), 500 | |
except Exception as e: | |
print('Error:', str(e)) | |
return jsonify({"status": "error", "error": "Internal Server Error"}), 500 | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860, debug=True) |