Spaces:
Running
Running
from flask import Flask, request, jsonify, send_from_directory | |
from flask_limiter import Limiter | |
from flask_limiter.util import get_remote_address | |
import google.generativeai as genai | |
from PIL import Image | |
from io import BytesIO | |
from prodiapy import Prodia | |
import requests | |
import os | |
import psutil | |
import time | |
import datetime | |
import json | |
import subprocess | |
import string | |
import random | |
from g4f.client import Client | |
import tempfile | |
from huggingface_hub import HfApi, login | |
import threading | |
client = Client() | |
app = Flask(__name__) | |
HF_TOKEN = os.environ.get('HF_TOKEN') | |
login(token=HF_TOKEN) | |
api = HfApi() | |
limiter = Limiter( | |
app, | |
default_limits=["30 per minute"] | |
) | |
# Define the path to static files | |
static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) | |
def css(filename): | |
return send_from_directory(os.path.join(static_dir, 'css'), filename) | |
def js(filename): | |
return send_from_directory(os.path.join(static_dir, 'js'), filename) | |
def img(filename): | |
return send_from_directory(os.path.join(static_dir, 'img'), filename) | |
def index(): | |
return send_from_directory(static_dir, 'index.html') | |
def ai(): | |
return send_from_directory(os.path.join(static_dir, 'views'), 'ai.html') | |
def ai_file(filename): | |
if filename.endswith('.py'): | |
with open(os.path.join(static_dir, 'ai', filename), 'r') as f: | |
code = f.read() | |
output = subprocess.check_output(["python", "-c", code], shell=True, stderr=subprocess.STDOUT) | |
return output.decode('utf-8') | |
else: | |
return send_from_directory(os.path.join(static_dir, 'ai'), filename) | |
def info(): | |
ip = request.remote_addr | |
current_time = datetime.datetime.now().strftime("%H:%M:%S") | |
return jsonify({'ip': ip, 'current_time': current_time}) | |
# Define the visitor count routes | |
visitor_count = 0 | |
visitor_today = 0 | |
last_update_date = datetime.datetime.now().date() | |
visitor_total = 0 | |
def update_visitor_counts(): | |
global visitor_count, visitor_today, last_update_date, visitor_total | |
allowed_paths = ['/ai', '/api', '/tool'] | |
if request.path.startswith(tuple(allowed_paths)): | |
current_date = datetime.datetime.now().date() | |
if current_date != last_update_date: | |
visitor_today = 0 | |
last_update_date = current_date | |
visitor_count += 1 | |
visitor_today += 1 | |
visitor_total += 1 | |
if datetime.datetime.now().hour == 0 and datetime.datetime.now().minute == 0: | |
reset_visitor_count() | |
def count(): | |
return jsonify({ | |
'visitor_count': visitor_count, | |
'visitor_today': visitor_today, | |
'visitor_total': visitor_total | |
}) | |
# Define the status route | |
def status(): | |
uptime_seconds = int(time.time() - psutil.boot_time()) | |
uptime = str(datetime.timedelta(seconds=uptime_seconds)) | |
memory_free = psutil.virtual_memory().available | |
memory_total = psutil.virtual_memory().total | |
return jsonify({'runtime': uptime, 'memory': f'{memory_free} / {memory_total}'}) | |
# Handle 404 errors | |
def page_not_found(e): | |
return send_from_directory(static_dir, '404.html'), 404 | |
apiKeys = [ | |
"f5282cab-1ced-4b6e-80f4-11b2be59af01", | |
"2021e94a-1385-4ddc-905b-c050cfb5af32", | |
"0bfe0e6d-6bf9-4984-ab07-3a9410a551ad", | |
"1452e7a5-d6e2-4600-9641-1c2debde397a", | |
"f4b18c3c-ea4d-4b18-be47-f5ad29d70936", | |
"688659c2-b2e9-4524-8a91-1c72735ec068", | |
"aa64f14e-18d8-44df-91cc-6d4e20051ca3", | |
"7440ab53-6c97-40bc-aad4-50a93e753256", | |
"65f9a7e7-bcf5-4f21-8715-64cdbc3adbdf", | |
"cc9e0170-ffc8-43e0-b01c-eb4847158a72", | |
"ee39894a-3f05-4fca-8d6b-2eaf6443c2d5", | |
"ef9d2480-a655-490e-983a-2a448ead257c", | |
"a888afb5-2e90-4fc0-bb1a-617ba4a24c2f", | |
"201a4f06-ac0d-4877-a8d6-c346d7dc1c9f", | |
"9f604055-793a-4a2f-a528-c7fe283f0fa9" | |
] | |
# Load styles from style.json file | |
with open("style.json", "r") as style_file: | |
styleList = json.load(style_file) | |
def getRandomApiKey(): | |
# Implement your logic to get a random API key here | |
return random.choice(apiKeys) | |
def getRandomSeed(): | |
return random.randint(1, 18446744073709552000) | |
def getAvailableStyles(): | |
return ', '.join([style["name"] for style in styleList]) | |
prodia = Prodia(getRandomApiKey()) | |
def get_styles(): | |
with open("style.json", "r") as style_file: | |
styles = json.load(style_file) | |
return jsonify({'status': 'success', "styles": [style["name"] for style in styles]}) | |
def upload_image(): | |
try: | |
# Get the URL parameter | |
url = request.args.get('url') | |
if not url: | |
return jsonify({'error': 'URL parameter is missing'}), 400 | |
# Download the image | |
response = requests.get(url) | |
if response.status_code != 200: | |
return jsonify({'error': 'Failed to download image'}), 400 | |
image_name = f"image.png" | |
# Save the image | |
img = Image.open(BytesIO(response.content)) | |
img.save(image_name, "PNG") | |
# Send the image to Discord | |
discord_webhook_url = "https://discord.com/api/webhooks/1217109788656406588/sh0LG9VH5wmxSWP8OBwfHxfbbMHleUX6eQ8-xULIEo5m4IASfNm7jCNrZFZZweKaNGTM" | |
files = {'file': open(image_name, 'rb')} | |
webhook_response = requests.post(discord_webhook_url, files=files) | |
# Get the uploaded image URL from Discord CDN | |
discord_cdn_url = webhook_response.json().get('attachments', [{}])[0].get('url') | |
# Delete the temporary image file | |
os.remove(image_name) | |
return jsonify({ | |
'success': f'Image uploaded and sent to Discord', | |
'discord_cdn_url': discord_cdn_url | |
}), 200 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
async def generate_image(): | |
try: | |
data = request.json | |
prompt = data.get('prompt', '') | |
userStyle = data.get('style') | |
seed = int(data.get('seed', getRandomSeed())) | |
guidance_scale = int(data.get('guidance_scale', 0)) | |
if not userStyle: | |
return jsonify({"status": "error", "error": "Style is required. Available styles: " + getAvailableStyles()}), 400 | |
selectedStyle = next((style for style in styleList if style["name"].lower() == userStyle.lower()), None) | |
if not selectedStyle: | |
return jsonify({"status": "error", "error": "Invalid style. Available styles: " + getAvailableStyles()}), 400 | |
if guidance_scale and (guidance_scale < 1 or guidance_scale > 100): | |
return jsonify({"status": "error", "error": "guidance_scale must be an integer between 1 and 100."}), 400 | |
job = prodia.sdxl.generate( | |
prompt=selectedStyle["prompt"].replace('{prompt}', data.get('prompt', '')), | |
model="sd_xl_base_1.0.safetensors [be9edd61]", | |
negative_prompt=selectedStyle["negative_prompt"] + ", duplicate", | |
sampler="DPM++ 2M Karras", | |
cfg_scale=selectedStyle.get('cfg_scale', 7), | |
steps=selectedStyle.get('steps', 20), | |
height=1024, | |
width=1024) | |
wait = prodia.wait(job) | |
url = wait.image_url | |
# Discord Bot setup | |
discord_webhook_url = "https://discord.com/api/webhooks/1217084642675654717/FpiXr5sLPmFNZ0xDz5HNClwn6NCYNmL2JvwdcGwb7V9FZd9bdPfSPZR41HmGzGD3uR8d" | |
# Send the generated image URL through the webhook | |
image_name = f"invite_1080035826051854356_best_bot_ever.png" | |
img = Image.open(BytesIO(requests.get(url).content)) | |
img.save(image_name, "PNG") | |
files = {'file': open(image_name, 'rb')} | |
webhook_response = requests.post(discord_webhook_url, files=files) | |
# Print the response for debugging | |
print(webhook_response.text) | |
# Check if the request was successful | |
if webhook_response.status_code == 200: | |
discord_cdn_url = webhook_response.json().get('attachments', [{}])[0].get('url') | |
# Remove the temporary image file | |
os.remove(image_name) | |
# Return the success response with the generated image URL | |
return jsonify({ | |
'status': 'success', | |
'url': discord_cdn_url | |
}), 200 | |
else: | |
# If the request to the webhook fails, return an error response | |
return jsonify({"status": "error", "error": "Failed to send image through webhook"}), 500 | |
except Exception as e: | |
print('Error:', str(e)) | |
return jsonify({"status": "error", "error": "Internal Server Error"}), 500 | |
genai.configure(api_key="AIzaSyBPIdkEyVTDZnmXrBi4ykf0sOfkbOvxAzo") | |
DEFAULT_MODELS = ["gemini-1.0-pro", "gemini-1.0-pro-001"] | |
def gemini(): | |
data = request.json | |
prompt = data.get('prompt') | |
model_name = data.get('model') | |
messages = data.get('messages', []) | |
if not prompt: | |
return jsonify({'error': 'Prompt parameter is required'}), 400 | |
if model_name and model_name not in DEFAULT_MODELS: | |
return jsonify({'error': f'Model {model_name} not found'}), 400 | |
try: | |
# Use the specified model or default model | |
selected_model = model_name if model_name in DEFAULT_MODELS else DEFAULT_MODELS[0] | |
# Set up the selected model | |
generation_config = { | |
"temperature": 1, | |
"top_p": 1, | |
"top_k": 1, | |
"max_output_tokens": 2048, | |
} | |
safety_settings = [ | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
] | |
model = genai.GenerativeModel( | |
model_name=selected_model, | |
generation_config=generation_config, | |
safety_settings=safety_settings | |
) | |
# Start the conversation and generate response | |
convo = model.start_chat(history=messages) | |
convo.send_message(prompt) | |
response = convo.last.text | |
return jsonify({'status': 'success', 'response': response}) | |
except Exception as e: | |
error_message = str(e) | |
app.logger.error("Failed to generate content: %s", error_message) | |
return jsonify({'status': 'error', 'error': 'Failed to generate content.'}), 500 | |
TOKEN_MESSAGES_TMP = {} | |
def generate_chat_id(): | |
"""Generate a random chat ID.""" | |
while True: | |
chat_id = "Kastg_" + ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(15, 30))) | |
if len(chat_id) > 7: | |
return chat_id | |
def chat_tmp(): | |
data = request.json | |
messages = data.get('messages') | |
chat_id = data.get('chat-id') | |
if not messages: | |
return jsonify({'status': 'error', 'error': 'Messages parameter is required'}), 400 | |
if not chat_id: | |
# Generate a new chat ID | |
chat_id = generate_chat_id() | |
elif not chat_id.startswith('Kastg_'): | |
return jsonify({'status': 'error', 'error': 'Chat ID must start with "Kastg_"'}), 400 | |
elif len(chat_id) <= 13: | |
return jsonify({'status': 'error', 'error': 'Chat ID must have more than 7 characters after Kastg_'}), 400 | |
# Save messages under chat ID | |
if chat_id not in TOKEN_MESSAGES_TMP: | |
TOKEN_MESSAGES_TMP[chat_id] = [] | |
for message in messages: | |
TOKEN_MESSAGES_TMP[chat_id].append(message) | |
# Return token and saved messages | |
response_data = {'creator': 'api.Kastg.com', 'status': 'success', 'chat-id': chat_id, 'messages': TOKEN_MESSAGES_TMP[chat_id]} | |
return jsonify(response_data) | |
def handle_message(): | |
try: | |
# Get the data from the request JSON | |
data = request.json | |
messages = data.get('messages', []) | |
model = data.get('model', 'gpt-4o-mini') | |
if not messages: | |
return jsonify({"error": "No messages provided"}), 400 | |
# Validate the structure of messages | |
for message in messages: | |
if 'role' not in message or 'content' not in message: | |
return jsonify({"error": "Invalid message format"}), 400 | |
# Use the G4F client to get a response | |
response = client.chat.completions.create( | |
model=model, | |
messages=messages | |
) | |
# Extract the response content | |
ai_response = response.choices[0].message.content | |
# Return the response as JSON | |
return jsonify({"response": ai_response}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def make_text(): | |
query = request.args.get('query') | |
file_name = request.args.get('fileName') | |
repo_id = request.args.get('repoId') | |
repo_type = request.args.get('repoType', 'dataset') | |
if not query or not file_name or not repo_id: | |
return "Parameters 'query', 'fileName', and 'repoId' are required", 400 | |
if repo_type not in ['space', 'dataset', 'model']: | |
return "Invalid 'repoType'. Must be 'space', 'dataset', or 'model'", 400 | |
# Create a temporary file | |
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: | |
temp_file.write(query) | |
temp_file_path = temp_file.name | |
try: | |
# Upload the temporary file to Hugging Face | |
api.upload_file( | |
path_or_fileobj=temp_file_path, | |
path_in_repo=file_name, | |
repo_id=repo_id, | |
repo_type=repo_type, | |
) | |
return f"File '{file_name}' uploaded successfully to {repo_id} ({repo_type})", 200 | |
except Exception as e: | |
return f"Error uploading file: {str(e)}", 500 | |
finally: | |
# Clean up the temporary file | |
os.unlink(temp_file_path) | |
def delete_file_after_delay(file_path, delay_seconds): | |
def delete_file(): | |
time.sleep(delay_seconds) | |
try: | |
os.unlink(file_path) | |
print(f"Temporary file {file_path} deleted after {delay_seconds} seconds.") | |
except Exception as e: | |
print(f"Error deleting temporary file {file_path}: {str(e)}") | |
thread = threading.Thread(target=delete_file) | |
thread.start() | |
def upload_image_dataset(): | |
data = request.json | |
url = data.get('url') | |
file_name = data.get('fileName') | |
repo_id = data.get('repoId') | |
repo_type = data.get('repoType', 'dataset') | |
is_forwarded = data.get('is_forwarded', False) | |
if not url or not file_name or not repo_id: | |
return jsonify({"error": "Parameters 'url', 'fileName', and 'repoId' are required"}), 400 | |
if repo_type not in ['space', 'dataset', 'model']: | |
return jsonify({"error": "Invalid 'repoType'. Must be 'space', 'dataset', or 'model'"}), 400 | |
try: | |
# Download the image | |
headers = {} | |
if is_forwarded: | |
headers['Authorization'] = f'Bearer {HF_TOKEN}' | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
# Create a temporary file | |
with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
temp_file.write(response.content) | |
temp_file_path = temp_file.name | |
# Schedule file deletion after 1 minute | |
delete_file_after_delay(temp_file_path, 60) | |
# Upload the file to Hugging Face | |
api.upload_file( | |
path_or_fileobj=temp_file_path, | |
path_in_repo=file_name, | |
repo_id=repo_id, | |
repo_type=repo_type, | |
) | |
return jsonify({ | |
"message": f"File '{file_name}' uploaded successfully to {repo_id} ({repo_type})", | |
"size": len(response.content) | |
}), 200 | |
except requests.RequestException as e: | |
return jsonify({"error": f"Error downloading image: {str(e)}"}), 500 | |
except Exception as e: | |
return jsonify({"error": f"Error uploading file: {str(e)}"}), 500 | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860, debug=True) |