gradio_livelog / README.md
elismasilva's picture
Update README.md
3500a92 verified

A newer version of the Gradio SDK is available: 5.44.1

Upgrade
metadata
tags:
  - gradio-custom-component
  - Code
title: gradio_livelog
short_description: A Live Log component for Gradio Interface
colorFrom: blue
colorTo: yellow
sdk: gradio
pinned: false
app_file: space.py
sdk_version: 5.43.1

gradio_livelog

Static Badge

💻 Component GitHub Code

A Live Log Component for Gradio Interface

Key Features

LiveLog elevates Gradio applications by providing a powerful, terminal-like monitoring experience directly in your UI. It's designed for both simple progress tracking and complex pipeline introspection.

  • Dual-Mode Progress Tracking: A sophisticated progress bar that operates in two modes for maximum accuracy:

    • Internal Rate Calculation: For simple loops, it features a built-in, tqdm-style progress calculator with Exponential Moving Average (EMA) smoothing for a stable and realistic it/s or s/it display.
    • External tqdm Capture: For deep integration, it can directly capture and display the exact rate from an existing tqdm instance running inside a backend library (like diffusers). This eliminates measurement overhead and provides a perfectly synchronized view of your pipeline's true performance.
  • Rich, Real-time Log Streaming: Display log messages as they are generated.

    • Supports standard Python log levels (INFO, WARNING, ERROR, etc.) with corresponding colors.
    • Includes support for custom log levels (like "SUCCESS") for enhanced visual feedback.
  • Advanced Multi-Logger Capture: The capture_logs utility is designed for complex applications.

    • Effortlessly capture logs from multiple, independent Python loggers simultaneously (e.g., your app's logger and a library's internal logger).
    • Correctly handles logger hierarchies and propagation, making it robust for any logging setup.
  • Flexible Display & Layout Control: Adapt the component to any UI layout.

    • Three Display Modes: Show the full component (logs + progress), logs only, or progress bar only.
    • Highly customizable appearance with properties for height, background_color, line_numbers, and autoscrolling.
  • Comprehensive Utility Controls:

    • Built-in header buttons to Clear, Copy, and Download log content.
    • Optionally suppress log output in your Python console to keep it clean while still displaying everything in the UI.

Installation

pip install gradio_livelog

Usage

# demo/app.py

import spaces
import gradio as gr
import torch
from diffusers import StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler
import queue
import threading
import asyncio
import sys
import logging
import random
import numpy as np

# Import the component and ALL its utilities
from gradio_livelog import LiveLog
from gradio_livelog.utils import ProgressTracker, Tee, TqdmToQueueWriter, capture_logs

# --- 1. SETUP ---
MODEL_ID = "SG161222/RealVisXL_V5.0_Lightning"
MAX_SEED = np.iinfo(np.int32).max

# --- 2. LOGIC FOR THE "LIVELOG FEATURE DEMO" TAB ---
app_logger = logging.getLogger("logging_app")
app_logger.setLevel(logging.INFO)
# Avoid adding duplicate handlers if the script is reloaded
if not app_logger.handlers:
    console_handler = logging.StreamHandler()
    console_handler.flush = sys.stderr.flush
    app_logger.addHandler(console_handler)

async def run_process(disable_console: bool, rate_unit: str, run_error_case: bool):
    with capture_logs(log_level=logging.INFO, log_name=["logging_app"], disable_console=disable_console) as get_logs: #You can watch more than one log if you wish in log_name. If you do not pass log_name, the default log will be used.
        total_steps = 100
        tracker = ProgressTracker(total=total_steps, description="Simulating a process...", rate_unit=rate_unit)
        all_logs = []
        last_log_content = None
        
        initial_log = f"Starting simulated process with {total_steps} steps..."
        app_logger.info(initial_log)
        logs = [
            {
                "type": "log",
                "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname,
                "content": record.getMessage()
            }
            for record in get_logs()
        ]
        all_logs.extend(logs)
        last_log_content = logs[-1]["content"] if logs else None        
        yield tracker.update(advance=0, status="running", logs=all_logs, log_content=None)

        for i in range(total_steps):
            await asyncio.sleep(0.03)
            current_step = i + 1
            
            if current_step == 10:
                app_logger.warning(f"Low disk space warning at step {current_step}.")
            elif current_step == 30:
                app_logger.log(logging.INFO + 5, f"Asset pack loaded successfully at step {current_step}.")
            elif current_step == 75:
                app_logger.critical(f"Checksum mismatch! Data may be corrupt at step {current_step}.")
            
            if run_error_case and current_step == 50:
                app_logger.error("A fatal simulation error occurred! Aborting.")
                logs = [
                    {
                        "type": "log",
                        "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname,
                        "content": record.getMessage()
                    }
                    for record in get_logs()
                ]
                all_logs.extend(logs)
                last_log_content = logs[-1]["content"] if logs else last_log_content
                yield tracker.update(advance=0, status="error", logs=all_logs, log_content=last_log_content)
                return
            
            logs = [
                {
                    "type": "log",
                    "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname,
                    "content": record.getMessage()
                }
                for record in get_logs()
            ]
            all_logs.extend(logs)
            if logs:
                last_log_content = logs[-1]["content"]
            yield tracker.update(advance=1, status="running", logs=all_logs, log_content=last_log_content)
        
        final_log = "Process completed successfully!"
        app_logger.log(logging.INFO + 5, final_log)
        logs = [
            {
                "type": "log",
                "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname,
                "content": record.getMessage()
            }
            for record in get_logs()
        ]
        all_logs.extend(logs)
        last_log_content = logs[-1]["content"] if logs else last_log_content
        yield tracker.update(advance=0, status="success", logs=all_logs, log_content=last_log_content)
        
def update_livelog_properties(mode, color, lines, scroll):
    return gr.update(display_mode=mode, background_color=color, line_numbers=lines, autoscroll=scroll)

def clear_output():
    return None

async def run_success_case(disable_console: bool, rate_unit: str):
    yield None    
    async for update in run_process(disable_console=disable_console, rate_unit=rate_unit, run_error_case=False):
        yield update

async def run_error_case(disable_console: bool, rate_unit: str):
    yield None
    async for update in run_process(disable_console=disable_console, rate_unit=rate_unit, run_error_case=True):
        yield update

# --- 3. LOGIC FOR THE "DIFFUSION PIPELINE INTEGRATION" TAB ---
diffusion_pipeline = None
pipeline_lock = threading.Lock()
def load_pipeline(on_load=True):
    """A function to load the model, ensuring it's only done once."""
    global diffusion_pipeline
    with pipeline_lock:
        if diffusion_pipeline is None:
            print("Loading Stable Diffusion model for the first time...")
            pipe = StableDiffusionXLPipeline.from_pretrained(
                MODEL_ID, torch_dtype=torch.float16, use_safetensors=True, add_watermarker=False, device_map="cuda"
            )
            pipe.enable_vae_tiling()
            #pipe.enable_model_cpu_offload() #disable this on huggingface spaces
            pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)        
            diffusion_pipeline = pipe
            print("Model loaded successfully!")
        
    if not on_load:
        return diffusion_pipeline

@spaces.GPU(duration=60, enable_queue=True)
def run_diffusion_in_thread(prompt: str, disable_console: bool, update_queue: queue.Queue):
    """
    This function now uses capture_logs to listen to internal diffusers logs
    while retaining the superior data structure you designed.
    """
    tracker = None    
    with capture_logs(log_level=logging.INFO, log_name=["logging_app"], disable_console=disable_console) as get_logs: #You can watch more than one log if you wish in log_name. If you do not pass log_name, the default log will be used.
        try:            
            pipe = load_pipeline(on_load=False)            
            #We will capture pipeline tqdm s/it progress instead            
            rate_queue = queue.Queue()
            tqdm_writer = TqdmToQueueWriter(rate_queue)
            
            progress_bar_handler = Tee(sys.stderr, tqdm_writer)
            pipe.set_progress_bar_config(file=progress_bar_handler,  #if you dont need to see the tqdm progress on console set file=tqdm_writer instead            
                                        disable=False,  
                                        ncols=100,
                                        dynamic_ncols=True,
                                        ascii=" █")
            
            seed = random.randint(0, MAX_SEED)
            generator = torch.Generator(device="cuda").manual_seed(seed)
            prompt_style = f"hyper-realistic 8K image of {prompt}. ultra-detailed, lifelike, high-resolution, sharp, vibrant colors, photorealistic"
            negative_prompt_style = "cartoonish, low resolution, blurry, simplistic, abstract, deformed, ugly"
            num_inference_steps = 10
            
            all_logs = []
            last_known_rate_data = None

            # Helper function to process and store new logs
            def process_and_send_updates(status="running", advance=0, final_image_payload=None):
                """
                This is the core callback function. It captures new logs, formats them,
                and sends a complete update object (logs + progress) to the UI queue.
                This should also be called after the log record.
                """
                nonlocal all_logs, last_known_rate_data
                new_rate_data = None
                while not rate_queue.empty():
                    try:
                        new_rate_data = rate_queue.get_nowait()
                    except queue.Empty:
                        break
                
                if new_rate_data is not None:
                    last_known_rate_data = new_rate_data
                
                new_records = get_logs()
                if new_records:
                    new_logs = [{
                        "type": "log",
                        "level": "SUCCESS" if r.levelno == logging.INFO + 5 else r.levelname,
                        "content": r.getMessage()
                    } for r in new_records]
                    all_logs.extend(new_logs)
                
                # Use the tracker to generate the progress update dictionary if it exists.
                # If not, create a preliminary update dictionary.
                update_dict = {}
                
                if tracker:
                    update_dict = tracker.update(
                        advance=advance, 
                        status=status, 
                        logs=all_logs,
                        rate_data=last_known_rate_data                               
                    )
                else:
                    # Initial state before the tracker is created.
                    update_dict = {
                        "type": "progress", 
                        "logs": all_logs, 
                        "current": 0, 
                        "total": num_inference_steps, 
                        "desc": "Diffusion Steps" # Description is sent once
                    }

                # Put the update on the queue. The image payload is usually None
                # until the very end.
                update_queue.put((final_image_payload, update_dict))
                
            app_logger.info(f"Using seed: {seed}")
            process_and_send_updates()
                        
            app_logger.info("Starting diffusion process...")
            process_and_send_updates()
                        
            tracker = ProgressTracker(total=num_inference_steps, description="Diffusion Steps", rate_unit='it/s')
            
            def progress_callback(pipe_instance, step, timestep, callback_kwargs):
                process_and_send_updates(advance=1) 
                return callback_kwargs
                        
            images = pipe(
                prompt=prompt_style, negative_prompt=negative_prompt_style, width=1024, height=1024,
                guidance_scale=3.0, num_inference_steps=num_inference_steps,
                generator=generator, callback_on_step_end=progress_callback
            ).images
            
            app_logger.log(logging.INFO + 5, "Image generated successfully!")
            process_and_send_updates(status="success", final_image_payload=images)

        except Exception as e:
            app_logger.error(f"Error in diffusion thread: {e}, process aborted!", exc_info=True)                    
            process_and_send_updates(status="error")                                                        
        finally:
            update_queue.put(None)
            
            
@spaces.GPU(duration=60, enable_queue=True)
def generate(prompt):
    """This function starts the worker thread and yields updates from the queue."""   
    yield None, None, gr.update(interactive=False)    
    update_queue = queue.Queue()
    diffusion_thread = threading.Thread(target=run_diffusion_in_thread,  args=(prompt, False, update_queue))
    diffusion_thread.start()
    final_images = None
    log_update = None
    while True:
        update = update_queue.get()
        if update is None: 
            break
        
        images, log_update = update
        
        if images:
            final_images = images
      
        yield final_images, log_update, gr.skip()
    
    yield final_images, log_update, gr.update(interactive=True)

# --- 4. THE COMBINED GRADIO UI with TABS ---
with gr.Blocks(theme=gr.themes.Ocean()) as demo:
    gr.HTML("<h1><center>LiveLog Component Showcase</center></h1>")

    with gr.Tabs():
        with gr.TabItem("LiveLog Feature Demo"):            
            gr.Markdown("### Test all features of the LiveLog component interactively.")
            with gr.Row():
                with gr.Column(scale=3):
                    feature_logger = LiveLog(
                        label="Process Output", line_numbers=True, height=550,
                        background_color="#000000", display_mode="full"
                    )
                with gr.Column(scale=1):
                    with gr.Group():
                        gr.Markdown("### Component Properties")
                        display_mode_radio = gr.Radio(["full", "log", "progress"], label="Display Mode", value="full")
                        rate_unit = gr.Radio(["it/s","s/it"], label="Progress rate unit", value="it/s")
                        bg_color_picker = gr.ColorPicker(label="Background Color", value="#000000")
                        line_numbers_checkbox = gr.Checkbox(label="Show Line Numbers", value=True)
                        autoscroll_checkbox = gr.Checkbox(label="Autoscroll", value=True)
                        disable_console_checkbox = gr.Checkbox(label="Disable Python Console Output", value=False)
                    with gr.Group():
                        gr.Markdown("### Simulation Controls")
                        start_btn = gr.Button("Run Success Case", variant="primary")
                        error_btn = gr.Button("Run Error Case")
            
            start_btn.click(fn=run_success_case, inputs=[disable_console_checkbox, rate_unit], outputs=feature_logger)
            error_btn.click(fn=run_error_case, inputs=[disable_console_checkbox, rate_unit], outputs=feature_logger)
            feature_logger.clear(fn=clear_output, inputs=None, outputs=feature_logger)
            controls = [display_mode_radio, bg_color_picker, line_numbers_checkbox, autoscroll_checkbox]
            for control in controls:
                control.change(fn=update_livelog_properties, inputs=controls, outputs=feature_logger)
        
        with gr.TabItem("Diffusion Pipeline Integration"):               
            gr.Markdown("### Use `LiveLog` to monitor a real image generation process.")
            with gr.Row():
                with gr.Column(scale=3):
                    with gr.Group():
                        prompt = gr.Textbox(
                            label="Enter your prompt", show_label=False,
                            placeholder="A cinematic photo of a robot in a floral garden...",
                            scale=8, container=False
                        )
                        run_button = gr.Button("Generate", scale=1, variant="primary")
                    
                    livelog_viewer = LiveLog(
                        label="Process Monitor", height=250, display_mode="full", line_numbers=False
                    )
                
                with gr.Column(scale=2):
                    result_gallery = gr.Gallery(
                        label="Result", columns=1, show_label=False, height=500, min_width=768, preview=True, allow_preview=True
                    )
            
            run_button.click(fn=generate, inputs=[prompt], outputs=[result_gallery, livelog_viewer, run_button])
            prompt.submit(fn=generate, inputs=[prompt], outputs=[result_gallery, livelog_viewer, run_button])
            livelog_viewer.clear(fn=clear_output, inputs=None, outputs=[livelog_viewer])
            
    # This ensures the model is downloaded/loaded once when the app starts.
    #demo.load(load_pipeline, None, None)  #do not use this in spaces, it will cause an error
                
if __name__ == "__main__":
    demo.queue(max_size=50).launch(debug=True)

LiveLog

Initialization

name type default description
value
typing.Union[
    typing.List[typing.Dict[str, typing.Any]],
    typing.Callable,
    NoneType,
][
    typing.List[typing.Dict[str, typing.Any]][
        typing.Dict[str, typing.Any][str, Any]
    ],
    Callable,
    None,
]
None The initial value, a list of log/progress dictionaries. Can be a callable.
label
str | None
None The component label.
every
float | None
None If `value` is a callable, run the function 'every' seconds.
height
int | str
400 The height of the log panel in pixels or CSS units.
autoscroll
bool
True If True, the panel will automatically scroll to the bottom on new logs.
line_numbers
bool
False If True, shows line numbers for logs.
background_color
str
"#000000" The background color of the log panel as a CSS-valid string.
display_mode
"full" | "log" | "progress"
"full" "full" (logs and progress), "log" (only logs), or "progress" (only progress bar).
disable_console
bool
True If True, logs will not be propagated to the standard Python console.
show_download_button
bool
True If True, shows the download button in the header.
show_copy_button
bool
True If True, shows the copy button in the header.
show_clear_button
bool
True If True, shows the clear button in the header.
show_label
bool
True If True, will display label.
container
bool
True If True, will place the component in a container.
scale
int | None
None Relative size compared to adjacent Components.
min_width
int
160 Minimum pixel width, will wrap if not sufficient screen space.
visible
bool
True If False, the component will be hidden.
elem_id
str | None
None An optional string that is assigned as the id of this component in the HTML DOM.
elem_classes
list[str] | str | None
None An optional string or list of strings assigned as the class of this component.
render
bool
True If False, this component will not be rendered.
key
int | str | tuple[int | str, Ellipsis] | None
None A unique key for the component.

Events

name description
change Triggered when the value of the LiveLog changes either because of user input (e.g. a user types in a textbox) OR because of a function update (e.g. an image receives a value from the output of an event trigger). See .input() for a listener that is only triggered by user input.
clear This listener is triggered when the user clears the LiveLog using the clear button for the component.

User function

The impact on the users predict function varies depending on whether the component is used as an input or output for an event (or both).

  • When used as an Input, the component only impacts the input signature of the user function.
  • When used as an output, the component only impacts the return signature of the user function.

The code snippet below is accurate in cases where the component is used as both an input and an output.

  • As output: Is passed, the preprocessed input data sent to the user's function in the backend.
  • As input: Should return, the output data received by the component from the user's function in the backend.
def predict(
    value: typing.Optional[typing.List[typing.Dict[str, typing.Any]]][
   typing.List[typing.Dict[str, typing.Any]][
       typing.Dict[str, typing.Any][str, Any]
   ],
   None,
]
) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]][
   typing.List[typing.Dict[str, typing.Any]][
       typing.Dict[str, typing.Any][str, Any]
   ],
   None,
]:
    return value