Spaces:
Runtime error
Runtime error
| import os | |
| import base64 | |
| import time | |
| from fastapi import FastAPI, APIRouter, Body | |
| from facefusion.api.model import Params, print_globals | |
| import facefusion.globals as globals | |
| import facefusion.processors.frame.globals as frame_processors_globals | |
| from facefusion import core | |
| from facefusion.utilities import normalize_output_path | |
| app = FastAPI() | |
| router = APIRouter() | |
| app_dir = os.path.dirname(__file__) | |
| work_dir = os.path.dirname(app_dir)+'/' | |
| print("工作目录",work_dir) | |
| def root(): | |
| return {"API": "hello"} | |
| async def process_frames(params: Params = Body(...)) -> dict: | |
| # delete_files_in_directory(f"{work_dir}temp/source") | |
| # delete_files_in_directory(f"{work_dir}temp/target") | |
| # delete_files_in_directory(f"{work_dir}temp/output") | |
| if not (params.source or params.target): | |
| return {"message": "Source image or path is required"} | |
| update_global_variables(params) | |
| globals.source_path = f"{work_dir+params.user_id}source-{int(time.time())}.{params.source_type}" | |
| globals.target_path = f"{work_dir+params.user_id}target-{int(time.time())}.{params.target_type}" | |
| globals.output_path = f"{work_dir+params.user_id}output-{int(time.time())}.{params.target_type}" | |
| # globals.source_path = f"{work_dir+params.user_id}source.{params.source_type}" | |
| # globals.target_path = f"{work_dir+params.user_id}target.{params.target_type}" | |
| # globals.output_path = f"{work_dir+params.user_id}output.{params.target_type}" | |
| print(globals.output_path) | |
| print_globals() | |
| save_file(globals.source_path, params.source) | |
| save_file(globals.target_path, params.target) | |
| try: | |
| core.api_conditional_process() | |
| except Exception as e: | |
| print(e) | |
| return {"message": "Error"} | |
| output = image_to_base64_str(globals.output_path) | |
| # delete_files_in_directory(globals.source_path) | |
| # delete_files_in_directory(globals.target_path) | |
| # delete_files_in_directory(globals.output_path) | |
| os.remove(globals.source_path) | |
| os.remove(globals.target_path) | |
| os.remove(globals.output_path) | |
| return {'output': output} | |
| def update_global_variables(params: Params): | |
| for var_name, value in vars(params).items(): | |
| if value is not None: | |
| if hasattr(globals, var_name): | |
| setattr(globals, var_name, value) | |
| elif hasattr(frame_processors_globals, var_name): | |
| setattr(frame_processors_globals, var_name, value) | |
| def image_to_base64_str(image_path): | |
| with open(image_path, "rb") as image_file: | |
| encoded_string = base64.b64encode(image_file.read()) | |
| return encoded_string.decode('utf-8') | |
| def save_file(file_path: str, encoded_image: str): | |
| data = base64.b64decode(encoded_image) | |
| directory = os.path.dirname(file_path) | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| with open(file_path, "wb") as file: | |
| file.write(data) | |
| def delete_files_in_directory(directory_path): | |
| for filename in os.listdir(directory_path): | |
| file_path = os.path.join(directory_path, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| print(f"Deleted {file_path}") | |
| app.include_router(router) | |
| def launch(): | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |