Spaces:
Sleeping
Sleeping
import os | |
import zipfile | |
import shutil | |
import time | |
from PIL import Image | |
import io | |
from rembg import remove | |
import gradio as gr | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
from transformers import pipeline | |
def colors_within_tolerance(color1, color2, tolerance): | |
return all(abs(c1 - c2) <= tolerance for c1, c2 in zip(color1, color2)) | |
def check_border_colors(image_path, tolerance): | |
# Open the image | |
image = Image.open(image_path) | |
pixels = image.load() | |
width, height = image.size | |
# Get the color of the first pixel on the left and right borders | |
left_border_color = pixels[0, 0] | |
right_border_color = pixels[width - 1, 0] | |
# Check the left border | |
for y in range(height): | |
if not colors_within_tolerance(pixels[0, y], left_border_color, tolerance): | |
return False | |
# Check the right border | |
for y in range(height): | |
if not colors_within_tolerance(pixels[width - 1, y], right_border_color, tolerance): | |
return False | |
return True | |
def resize_and_crop_image(image_path, target_size=(1080, 1080), crop_mode='center'): | |
print(f"Resizing and cropping image: {image_path}") | |
with Image.open(image_path) as img: | |
width, height = img.size | |
print(f"Original image size: {width}x{height}") | |
# Calculate the scaling factor | |
scaling_factor = max(target_size[0] / width, target_size[1] / height) | |
# Resize the image with high-quality resampling | |
new_size = (int(width * scaling_factor), int(height * scaling_factor)) | |
resized_img = img.resize(new_size, Image.LANCZOS) | |
print(f"Resized image size: {new_size}") | |
if crop_mode == 'center': | |
left = (resized_img.width - target_size[0]) / 2 | |
top = (resized_img.height - target_size[1]) / 2 | |
elif crop_mode == 'top': | |
left = (resized_img.width - target_size[0]) / 2 | |
top = 0 | |
elif crop_mode == 'bottom': | |
left = (resized_img.width - target_size[0]) / 2 | |
top = resized_img.height - target_size[1] | |
elif crop_mode == 'left': | |
left = 0 | |
top = (resized_img.height - target_size[1]) / 2 | |
elif crop_mode == 'right': | |
left = resized_img.width - target_size[0] | |
top = (resized_img.height - target_size[1]) / 2 | |
right = left + target_size[0] | |
bottom = top + target_size[1] | |
# Crop the image | |
cropped_img = resized_img.crop((left, top, right, bottom)) | |
print(f"Cropped image size: {cropped_img.size}") | |
return cropped_img | |
def remove_background_rembg(input_path): | |
print(f"Removing background using rembg for image: {input_path}") | |
with open(input_path, 'rb') as i: | |
input_image = i.read() | |
output_image = remove(input_image) | |
img = Image.open(io.BytesIO(output_image)).convert("RGBA") | |
return img | |
def remove_background_bria(input_path): | |
print(f"Removing background using bria for image: {input_path}") | |
pipe = pipeline("image-segmentation", model="briaai/RMBG-1.4", trust_remote_code=True) | |
pillow_image = pipe(input_path) # applies mask on input and returns a pillow image | |
return pillow_image | |
def process_single_image(image_path, output_folder, crop_mode, bg_method, output_format, bg_choice, custom_color, watermark_path=None): | |
filename = os.path.basename(image_path) | |
try: | |
print(f"Processing image: {filename}") | |
if bg_method == 'rembg': | |
# Remove background using rembg | |
image_with_no_bg = remove_background_rembg(image_path) | |
elif bg_method == 'bria': | |
# Remove background using bria | |
image_with_no_bg = remove_background_bria(image_path) | |
temp_image_path = os.path.join(output_folder, f"temp_{filename}") | |
image_with_no_bg.save(temp_image_path, format='PNG') | |
# Check border colors and categorize | |
if check_border_colors(temp_image_path, tolerance=50): | |
print(f"Border colors are the same for image: {filename}") | |
# Create a new 1080x1080 canvas | |
if bg_choice == 'transparent': | |
new_image = Image.new("RGBA", (1080, 1080), (255, 255, 255, 0)) | |
else: | |
new_image = Image.new("RGBA", (1080, 1080), custom_color) | |
# Scale image to fit inside the canvas without stretching | |
width, height = image_with_no_bg.size | |
scaling_factor = min(1080 / width, 1080 / height) | |
new_size = (int(width * scaling_factor), int(height * scaling_factor)) | |
resized_img = image_with_no_bg.resize(new_size, Image.LANCZOS) | |
print(f"Resized image size: {new_size}") | |
new_image.paste(resized_img, ((1080 - resized_img.width) // 2, (1080 - resized_img.height) // 2)) | |
else: | |
print(f"Border colors are different for image: {filename}") | |
new_image = resize_and_crop_image(temp_image_path, crop_mode=crop_mode) | |
# Change background color if needed | |
if bg_choice == 'white': | |
new_image = new_image.convert("RGBA") | |
white_bg = Image.new("RGBA", new_image.size, "WHITE") | |
new_image = Image.alpha_composite(white_bg, new_image) | |
elif bg_choice == 'custom': | |
new_image = new_image.convert("RGBA") | |
custom_bg = Image.new("RGBA", new_image.size, custom_color) | |
new_image = Image.alpha_composite(custom_bg, new_image) | |
# Save both versions of the image (with and without watermark) | |
images_paths = [] | |
# Save without watermark | |
output_ext = 'jpg' if output_format == 'JPG' else 'png' | |
output_path_without_watermark = os.path.join(output_folder, f"without_watermark_{os.path.splitext(filename)[0]}.{output_ext}") | |
if output_format == 'JPG': | |
new_image.convert('RGB').save(output_path_without_watermark, format='JPEG') | |
else: | |
new_image.save(output_path_without_watermark, format='PNG') | |
images_paths.append(output_path_without_watermark) | |
# Apply watermark if provided and save the version with watermark | |
if watermark_path: | |
watermark = Image.open(watermark_path).convert("RGBA") | |
new_image_with_watermark = new_image.copy() | |
new_image_with_watermark.paste(watermark, (0, 0), watermark) | |
output_path_with_watermark = os.path.join(output_folder, f"with_watermark_{os.path.splitext(filename)[0]}.{output_ext}") | |
if output_format == 'JPG': | |
new_image_with_watermark.convert('RGB').save(output_path_with_watermark, format='JPEG') | |
else: | |
new_image_with_watermark.save(output_path_with_watermark, format='PNG') | |
images_paths.append(output_path_with_watermark) | |
# Remove the temporary file | |
os.remove(temp_image_path) | |
print(f"Processed image paths: {images_paths}") | |
return images_paths | |
except Exception as e: | |
print(f"Error processing {filename}: {e}") | |
return None | |
def process_images(zip_file, crop_mode='center', bg_method='rembg', watermark_path=None, output_format='PNG', bg_choice='transparent', custom_color="#ffffff", num_workers=4, progress=gr.Progress()): | |
start_time = time.time() | |
# Create a temporary directory | |
input_folder = "temp_input" | |
output_folder = "temp_output" | |
if os.path.exists(input_folder): | |
shutil.rmtree(input_folder) | |
if os.path.exists(output_folder): | |
shutil.rmtree(output_folder) | |
os.makedirs(input_folder) | |
os.makedirs(output_folder) | |
# Extract the zip file | |
try: | |
with zipfile.ZipFile(zip_file, 'r') as zip_ref: | |
zip_ref.extractall(input_folder) | |
except zipfile.BadZipFile as e: | |
print(f"Error extracting zip file: {e}") | |
return [], None, 0 | |
processed_images = [] | |
image_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] | |
total_images = len(image_files) | |
print(f"Total images to process: {total_images}") | |
# Process images using ThreadPoolExecutor for I/O-bound tasks and ProcessPoolExecutor for CPU-bound tasks | |
with ThreadPoolExecutor(max_workers=num_workers) as thread_executor: | |
with ProcessPoolExecutor(max_workers=num_workers) as process_executor: | |
future_to_image = {thread_executor.submit(process_single_image, image_path, output_folder, crop_mode, bg_method, output_format, bg_choice, custom_color, watermark_path): image_path for image_path in image_files} | |
for idx, future in enumerate(future_to_image): | |
try: | |
result = future.result() | |
if result: | |
processed_images.extend(result) | |
except Exception as e: | |
print(f"Error processing image {future_to_image[future]}: {e}") | |
# Update progress | |
progress((idx + 1) / total_images, f"{idx + 1}/{total_images} images processed") | |
# Create a zip file of the processed images | |
output_zip_path = "processed_images.zip" | |
with zipfile.ZipFile(output_zip_path, 'w') as zipf: | |
for file in processed_images: | |
if "with_watermark" in file: | |
zipf.write(file, os.path.join("with_watermark", os.path.basename(file))) | |
else: | |
zipf.write(file, os.path.join("without_watermark", os.path.basename(file))) | |
end_time = time.time() | |
processing_time = end_time - start_time | |
print(f"Processing time: {processing_time} seconds") | |
# Return the images, the zip file path, and processing time | |
return processed_images, output_zip_path, processing_time | |
def gradio_interface(zip_file, crop_mode, bg_method, watermark, output_format, bg_choice, custom_color, num_workers): | |
progress = gr.Progress() # Initialize progress | |
watermark_path = watermark.name if watermark else None | |
return process_images(zip_file.name, crop_mode, bg_method, watermark_path, output_format, bg_choice, custom_color, num_workers, progress) | |
def show_bg_choice(output_format): | |
if output_format == 'PNG': | |
return gr.update(visible=True) | |
return gr.update(visible=False) | |
def show_color_picker(bg_choice): | |
if bg_choice == 'custom': | |
return gr.update(visible=True) | |
return gr.update(visible=False) | |
# Create the Gradio interface | |
with gr.Blocks() as iface: | |
gr.Markdown("# Image Background Removal and Resizing with Optional Watermark") | |
gr.Markdown("Upload a ZIP or RAR file containing images, choose the crop mode, optionally upload a watermark image, and select the output format.") | |
with gr.Row(): | |
zip_file = gr.File(label="Upload ZIP/RAR file of images", file_types=[".zip", ".rar"]) | |
watermark = gr.File(label="Upload Watermark Image (Optional)", file_types=[".png"]) | |
with gr.Row(): | |
crop_mode = gr.Radio(choices=["center", "top", "bottom", "left", "right"], label="Crop Mode", value="center") | |
output_format = gr.Radio(choices=["PNG", "JPG"], label="Output Format", value="PNG") | |
num_workers = gr.Slider(minimum=1, maximum=16, step=1, label="Number of Workers", value=2) | |
with gr.Row(): | |
bg_method = gr.Radio(choices=["bria", "rembg"], label="Background Removal Method", value="bria", visible=True) | |
bg_choice = gr.Radio(choices=["transparent", "white", "custom"], label="Background Choice", value="transparent", visible=True) | |
custom_color = gr.ColorPicker(label="Custom Background Color", value="#ffffff", visible=False) | |
bg_choice.change(show_color_picker, inputs=bg_choice, outputs=custom_color) | |
output_format.change(show_bg_choice, inputs=output_format, outputs=bg_choice) | |
gallery = gr.Gallery(label="Processed Images") | |
output_zip = gr.File(label="Download Processed Images as ZIP") | |
processing_time = gr.Textbox(label="Processing Time (seconds)") | |
def process(zip_file, crop_mode, bg_method, watermark, output_format, bg_choice, custom_color, num_workers): | |
processed_images, zip_path, time_taken = gradio_interface(zip_file, crop_mode, bg_method, watermark, output_format, bg_choice, custom_color, num_workers) | |
return processed_images, zip_path, f"{time_taken:.2f} seconds" | |
process_button = gr.Button("Process Images") | |
process_button.click(process, inputs=[zip_file, crop_mode, bg_method, watermark, output_format, bg_choice, custom_color, num_workers], outputs=[gallery, output_zip, processing_time]) | |
# Launch the interface | |
iface.launch() | |