Spaces:
Sleeping
Sleeping
import os | |
import zipfile | |
import shutil | |
import time | |
from PIL import Image | |
import io | |
from rembg import remove | |
import gradio as gr | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
from transformers import pipeline | |
def resize_and_crop_image(image_path, target_size=(1080, 1080), crop_mode='center'): | |
with Image.open(image_path) as img: | |
width, height = img.size | |
# Calculate the scaling factor | |
scaling_factor = max(target_size[0] / width, target_size[1] / height) | |
# Resize the image with high-quality resampling | |
new_size = (int(width * scaling_factor), int(height * scaling_factor)) | |
resized_img = img.resize(new_size, Image.LANCZOS) | |
if crop_mode == 'center': | |
left = (resized_img.width - target_size[0]) / 2 | |
top = (resized_img.height - target_size[1]) / 2 | |
elif crop_mode == 'top': | |
left = (resized_img.width - target_size[0]) / 2 | |
top = 0 | |
elif crop_mode == 'bottom': | |
left = (resized_img.width - target_size[0]) / 2 | |
top = resized_img.height - target_size[1] | |
elif crop_mode == 'left': | |
left = 0 | |
top = (resized_img.height - target_size[1]) / 2 | |
elif crop_mode == 'right': | |
left = resized_img.width - target_size[0] | |
top = (resized_img.height - target_size[1]) / 2 | |
right = left + target_size[0] | |
bottom = top + target_size[1] | |
# Crop the image | |
cropped_img = resized_img.crop((left, top, right, bottom)) | |
return cropped_img | |
def remove_background_rembg(input_path): | |
with open(input_path, 'rb') as i: | |
input_image = i.read() | |
output_image = remove(input_image) | |
img = Image.open(io.BytesIO(output_image)).convert("RGBA") | |
return img | |
def remove_background_bria(input_path): | |
pipe = pipeline("image-segmentation", model="briaai/RMBG-1.4", trust_remote_code=True) | |
pillow_image = pipe(input_path) # applies mask on input and returns a pillow image | |
return pillow_image | |
def process_single_image(image_path, output_folder, crop_mode, remove_bg, bg_method, output_format, bg_choice, watermark_path=None): | |
filename = os.path.basename(image_path) | |
try: | |
if remove_bg == 'yes': | |
if bg_method == 'rembg': | |
# Remove background using rembg | |
image_with_no_bg = remove_background_rembg(image_path) | |
elif bg_method == 'bria': | |
# Remove background using bria | |
image_with_no_bg = remove_background_bria(image_path) | |
temp_image_path = os.path.join(output_folder, f"temp_{filename}") | |
image_with_no_bg.save(temp_image_path, format='PNG') | |
else: | |
temp_image_path = image_path | |
# Resize and crop the image with or without background removal | |
new_image = resize_and_crop_image(temp_image_path, crop_mode=crop_mode) | |
# Save both versions of the image (with and without watermark) | |
images_paths = [] | |
# Save without watermark | |
output_ext = 'jpg' if output_format == 'JPG' else 'png' | |
output_path_without_watermark = os.path.join(output_folder, f"without_watermark_{os.path.splitext(filename)[0]}.{output_ext}") | |
if output_format == 'JPG': | |
new_image.convert('RGB').save(output_path_without_watermark, format='JPEG') | |
else: | |
new_image.save(output_path_without_watermark, format='PNG') | |
images_paths.append(output_path_without_watermark) | |
# Apply watermark if provided and save the version with watermark | |
if watermark_path: | |
watermark = Image.open(watermark_path).convert("RGBA") | |
new_image_with_watermark = new_image.copy() | |
new_image_with_watermark.paste(watermark, (0, 0), watermark) | |
output_path_with_watermark = os.path.join(output_folder, f"with_watermark_{os.path.splitext(filename)[0]}.{output_ext}") | |
if output_format == 'JPG': | |
new_image_with_watermark.convert('RGB').save(output_path_with_watermark, format='JPEG') | |
else: | |
new_image_with_watermark.save(output_path_with_watermark, format='PNG') | |
images_paths.append(output_path_with_watermark) | |
if remove_bg == 'yes': | |
# Remove the temporary file | |
os.remove(temp_image_path) | |
return images_paths | |
except Exception as e: | |
print(f"Error processing {filename}: {e}") | |
return None | |
def process_images(zip_file, crop_mode='center', remove_bg='yes', bg_method='rembg', watermark_path=None, output_format='PNG', bg_choice='transparent', num_workers=4, progress=gr.Progress()): | |
start_time = time.time() | |
# Create a temporary directory | |
input_folder = "temp_input" | |
output_folder = "temp_output" | |
if os.path.exists(input_folder): | |
shutil.rmtree(input_folder) | |
if os.path.exists(output_folder): | |
shutil.rmtree(output_folder) | |
os.makedirs(input_folder) | |
os.makedirs(output_folder) | |
# Extract the zip file | |
with zipfile.ZipFile(zip_file, 'r') as zip_ref: | |
zip_ref.extractall(input_folder) | |
processed_images = [] | |
image_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] | |
total_images = len(image_files) | |
# Process images using ThreadPoolExecutor for I/O-bound tasks and ProcessPoolExecutor for CPU-bound tasks | |
with ThreadPoolExecutor(max_workers=num_workers) as thread_executor: | |
with ProcessPoolExecutor(max_workers=num_workers) as process_executor: | |
future_to_image = {thread_executor.submit(process_single_image, image_path, output_folder, crop_mode, remove_bg, bg_method, output_format, bg_choice, watermark_path): image_path for image_path in image_files} | |
for idx, future in enumerate(future_to_image): | |
result = future.result() | |
if result: | |
processed_images.extend(result) | |
# Update progress | |
progress((idx + 1) / total_images, f"{idx + 1}/{total_images} images processed") | |
# Create a zip file of the processed images | |
output_zip_path = "processed_images.zip" | |
with zipfile.ZipFile(output_zip_path, 'w') as zipf: | |
for file in processed_images: | |
if "with_watermark" in file: | |
zipf.write(file, os.path.join("with_watermark", os.path.basename(file))) | |
else: | |
zipf.write(file, os.path.join("without_watermark", os.path.basename(file))) | |
end_time = time.time() | |
processing_time = end_time - start_time | |
# Return the images, the zip file path, and processing time | |
return processed_images, output_zip_path, processing_time | |
def gradio_interface(zip_file, crop_mode, remove_bg, bg_method, watermark, output_format, bg_choice, num_workers): | |
progress = gr.Progress() # Initialize progress | |
watermark_path = watermark.name if watermark else None | |
return process_images(zip_file.name, crop_mode, remove_bg, bg_method, watermark_path, output_format, bg_choice, num_workers, progress) | |
def show_bg_choice(remove_bg, output_format): | |
if remove_bg == 'yes' and output_format == 'PNG': | |
return gr.update(visible=True) | |
return gr.update(visible=False) | |
def show_bg_method(remove_bg): | |
if remove_bg == 'yes': | |
return gr.update(visible=True) | |
return gr.update(visible=False) | |
# Create the Gradio interface | |
with gr.Blocks() as iface: | |
gr.Markdown("# Image Background Removal and Resizing with Optional Watermark") | |
gr.Markdown("Upload a ZIP or RAR file containing images, choose the crop mode, optionally upload a watermark image, and select the output format.") | |
with gr.Row(): | |
zip_file = gr.File(label="Upload ZIP/RAR file of images", file_types=[".zip", ".rar"]) | |
watermark = gr.File(label="Upload Watermark Image (Optional)", file_types=[".png"]) | |
with gr.Row(): | |
crop_mode = gr.Radio(choices=["center", "top", "bottom", "left", "right"], label="Crop Mode", value="center") | |
remove_bg = gr.Radio(choices=["yes", "no"], label="Remove Background", value="yes") | |
output_format = gr.Radio(choices=["PNG", "JPG"], label="Output Format", value="PNG") | |
num_workers = gr.Slider(minimum=1, maximum=16, step=1, label="Number of Workers", value=2) | |
with gr.Row(): | |
bg_method = gr.Radio(choices=["bria", "rembg"], label="Background Removal Method", value="bria", visible=True) | |
bg_choice = gr.Radio(choices=["transparent", "white"], label="Background Choice", value="transparent", visible=True) | |
remove_bg.change(show_bg_choice, inputs=[remove_bg, output_format], outputs=bg_choice) | |
remove_bg.change(show_bg_method, inputs=remove_bg, outputs=bg_method) | |
output_format.change(show_bg_choice, inputs=[remove_bg, output_format], outputs=bg_choice) | |
gallery = gr.Gallery(label="Processed Images") | |
output_zip = gr.File(label="Download Processed Images as ZIP") | |
processing_time = gr.Textbox(label="Processing Time (seconds)") | |
def process(zip_file, crop_mode, remove_bg, bg_method, watermark, output_format, bg_choice, num_workers): | |
processed_images, zip_path, time_taken = gradio_interface(zip_file, crop_mode, remove_bg, bg_method, watermark, output_format, bg_choice, num_workers) | |
return processed_images, zip_path, f"{time_taken:.2f} seconds" | |
process_button = gr.Button("Process Images") | |
process_button.click(process, inputs=[zip_file, crop_mode, remove_bg, bg_method, watermark, output_format, bg_choice, num_workers], outputs=[gallery, output_zip, processing_time]) | |
# Launch the interface | |
iface.launch() | |