stzhao's picture
Update app.py
be577b4 verified
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
basic_texts = "对1D/2D/3D/4D并行来说,每一种单独的进程组的GPU上的颜色都是相同的。具体点:" \
"对于数据并行来说,每种相同的颜色表示当前进程组的这些rank上的模型权重也是相同的。" \
"对于张量模型并行来说,每种相同的颜色表示当前进程组的这些rank维护了一部分模型权重,做allreduce才能拿到完整的模型权重" \
"对于流水并行来说,每种相同的颜色表示当前进程组的这些rank维护的层都是串行的,需要通信传输数据," \
"而如果横着来看进程组我们可以发现每一行都对应了流水线并行的一个Stage,这样才能达到处理不同的micro batch流水起来的目的" \
"对于Context Parallel来说,每种相同的颜色表示当前进程组维护的是一部分token的Q,K,V,然后通过在这个进程组里面循环跨卡传递K,V来计算完整的结果。\n"
descriptions = {
'Data Parallel': '对于不考虑Zero的数据并行,在一个数据并行组中,我们需要在Backward阶段对该组中所有rank的权重梯度做allreduce通信。推荐阅读以下paper或文章了解数据并行的原理:\n https://www.cs.cmu.edu/~muli/file/ps.pdf \n https://zhuanlan.zhihu.com/p/485208899 \n https://zhuanlan.zhihu.com/p/617133971 \n https://zhuanlan.zhihu.com/p/618865052',
'Tensor Model Parallel': '在张量模型并行中,模型的不同部分(例如,不同的中间张量或权重张量)被切分到不同的GPU上,我们在Forward和Backward阶段都要做allreduce来同步激活值和参数梯度,Tensor Model Parallel是Megatron-LM的核心并行方式之一。推荐阅读以下Paper或文章了解张量模型并行的原理:\n https://arxiv.org/pdf/1909.08053.pdf \n https://strint.notion.site/Megatron-LM-86381cfe51184b9c888be10ee82f3812\n https://zhuanlan.zhihu.com/p/622212228 ',
'Pipeline Parallel': '在流水并行中,模型的不同层被切分到不同的GPU上,并将输入数据切分为多个小批量(micro-batches),来实现在多个设备上并行处理这些批量数据,以此来加速训练过程。对于流水并行的一个组来说,每组内的GPU维护的层都是串行的,使用点对点的Send/Receive来发送接受数据,在文本框里的分组每一列都是一个完整的流水线Stage,这种组织方式才能达到处理不同的micro batch流水起来的目的。推荐阅读以下Paper或文章了解流水线并行的原理:\n https://arxiv.org/pdf/2104.04473.pdf \n https://zhuanlan.zhihu.com/p/678724323 \n https://zhuanlan.zhihu.com/p/613196255 \n https://juejin.cn/post/7063030243224879140 \n https://mp.weixin.qq.com/s/PXjYm9dN8C9B8svMQ7nOvw',
'Context Parallel': '在长文本训练场景中,随着序列长度的增加,每个微批处理的tokens数量增多,导致训练中激活值(Activations)的显著增加,这主要与序列长度成正比。最佳解决方案是采用Context Parallel(CP),它通过沿序列维度切分并通过循环通信完成self-attention计算。但是,当CP与Tensor Parallel(TP)的乘积超过8时,Nvlink的优势可能会消失,使得循环通信效率不高。尽管如此,Context Parallel的优点在于它仅影响数据并行(DP)组的大小,允许使用CP和DP结合的全部节点进行分布式存储,这对于ZERO系列优化器尤其有利,Megatron-LM在安培架构之后的GPU上支持了使用CP进行训练,包含FP16/BF16/FP8的支持,需要安装Transformer Engine库这个依赖。推荐阅读以下Paper或文章了解Context Paralle的原理:\n https://docs.nvidia.com/megatron-core/developer-guide/latest/api-guide/context_parallel.html \n https://zhuanlan.zhihu.com/p/689067888 \n https://zhuanlan.zhihu.com/p/683714620 \n https://www.zhihu.com/question/637961859/answer/3349322621 \n https://mp.weixin.qq.com/s/u4gG1WZ73mgH9mEKQQCRww ',
'DP+TP': '对于一个Batch的数据来说,TP需要在每一个TP Group里对Transformer层的Forward和Backward的Activation都做AllReduce,而DP只需要在网络的Backward阶段对所有参数做一次AllReduce,数据并行的通信量一般会显著少于模型并行。因此,在使用DP+TP组合时,我们应当在保证模型不OOM的情况下尽量提升数据并行的并行度。推荐阅读以下paper或文章了解混合并行的实践经验:\n https://mp.weixin.qq.com/s/D-14J482SFQf-zh-EFa-1w ',
'DP+PP': '我们应当优先考虑数据并行和流水线并行的组合,而不是数据并行和模型并行,特别是在节点间带宽比较低并且模型参数很大的情况下特别有用。DP+PP的组合相比于DP+TP一般具有更小的通信量,不过PP会存在流水线气泡的问题,不过最新的一些Infra研究比如ZB-H1/H2等等可以将流水线的气泡占比降到非常低,所以流水线并行是非常有用的扩大模型规模的并行方式。此外流水线并行还常和Gradient CheckPointing技术混合使用以继续扩大模型规模,同时缓解流水线不同Stage可能存在的负载均衡问题。推荐阅读以下paper或者文章了解混合并行的实践经验:\n https://mp.weixin.qq.com/s/D-14J482SFQf-zh-EFa-1w \n https://mp.weixin.qq.com/s/PXjYm9dN8C9B8svMQ7nOvw \n https://zhuanlan.zhihu.com/p/678724323 ',
'DP+TP+PP': '对于单一的PP或者TP(TP一般控制在机器内,不超过8)无法满足需求的模型规模例如34b以上LLM Pretrain,我们需要使用到DP+TP+PP的混合并行。推荐阅读以下paper或文章了解混合并行的实践经验:\n https://juejin.cn/post/7063030243224879140 \n https://zhuanlan.zhihu.com/p/629121480 \n https://zhuanlan.zhihu.com/p/670374745',
'CP+DP': '请查看Context Parallel的介绍,对于单独的CP或者CP+DP容易出现Transformer Layer里的FFN重复计算问题,推荐阅读以下Paper或文章了解Context Paralle的原理:\n https://docs.nvidia.com/megatron-core/developer-guide/latest/api-guide/context_parallel.html \n https://zhuanlan.zhihu.com/p/689067888 \n https://zhuanlan.zhihu.com/p/683714620 \n https://www.zhihu.com/question/637961859/answer/3349322621 \n https://mp.weixin.qq.com/s/u4gG1WZ73mgH9mEKQQCRww ',
'CP+DP+TP': '请查看Context Parallel的介绍,推荐阅读以下Paper或文章了解Context Paralle的原理:\n https://docs.nvidia.com/megatron-core/developer-guide/latest/api-guide/context_parallel.html \n https://zhuanlan.zhihu.com/p/689067888 \n https://zhuanlan.zhihu.com/p/683714620 \n https://www.zhihu.com/question/637961859/answer/3349322621 \n https://mp.weixin.qq.com/s/u4gG1WZ73mgH9mEKQQCRww ',
'CP+DP+PP': '请查看Context Parallel的介绍,推荐阅读以下Paper或文章了解Context Paralle的原理:\n https://docs.nvidia.com/megatron-core/developer-guide/latest/api-guide/context_parallel.html \n https://zhuanlan.zhihu.com/p/689067888 \n https://zhuanlan.zhihu.com/p/683714620 \n https://www.zhihu.com/question/637961859/answer/3349322621 \n https://mp.weixin.qq.com/s/u4gG1WZ73mgH9mEKQQCRww ',
'CP+DP+TP+PP': '请查看Context Parallel的介绍,推荐阅读以下Paper或文章了解Context Paralle的原理:\n https://docs.nvidia.com/megatron-core/developer-guide/latest/api-guide/context_parallel.html \n https://zhuanlan.zhihu.com/p/689067888 \n https://zhuanlan.zhihu.com/p/683714620 \n https://www.zhihu.com/question/637961859/answer/3349322621 \n https://mp.weixin.qq.com/s/u4gG1WZ73mgH9mEKQQCRww ',
}
def generate_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size):
"""
Generate data parallel groups based on the provided parallelism parameters.
"""
assert world_size % (pipeline_model_parallel_size * tensor_model_parallel_size * context_parallel_size) == 0, "world_size must be divisible by the product of pipeline_model_parallel_size, tensor_model_parallel_size, and context_parallel_size"
data_parallel_group_ranks = []
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(context_parallel_size * tensor_model_parallel_size):
ranks = range(
start_rank + j, end_rank, context_parallel_size * tensor_model_parallel_size
)
data_parallel_group_ranks.append(list(ranks))
return data_parallel_group_ranks
def generate_context_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size):
"""
Generate data parallel groups considering context parallelism.
"""
assert world_size % (pipeline_model_parallel_size * tensor_model_parallel_size * context_parallel_size) == 0, "world_size must be divisible by the product of pipeline_model_parallel_size, tensor_model_parallel_size, and context_parallel_size"
all_data_parallel_group_ranks_with_cp = []
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(tensor_model_parallel_size):
ranks_with_cp = range(start_rank + j, end_rank, tensor_model_parallel_size)
all_data_parallel_group_ranks_with_cp.append(list(ranks_with_cp))
return all_data_parallel_group_ranks_with_cp
def generate_tensor_model_parallel_groups(world_size, tensor_model_parallel_size):
"""
Generate model parallel groups based on tensor model parallel size.
"""
assert world_size % tensor_model_parallel_size == 0, "world_size must be divisible by tensor_model_parallel_size"
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size
tensor_model_parallel_group_ranks = []
for i in range(num_tensor_model_parallel_groups):
ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size)
tensor_model_parallel_group_ranks.append(list(ranks))
return tensor_model_parallel_group_ranks
def generate_pipeline_parallel_groups(world_size, pipeline_model_parallel_size):
"""
Generate pipeline parallel groups based on pipeline model parallel size.
"""
assert world_size % pipeline_model_parallel_size == 0, "world_size must be divisible by pipeline_model_parallel_size"
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
pipline_parallel_group_ranks = []
for i in range(num_pipeline_model_parallel_groups):
ranks = range(i, world_size, num_pipeline_model_parallel_groups)
pipline_parallel_group_ranks.append(list(ranks))
return pipline_parallel_group_ranks
def generate_context_parallel_groups(world_size, context_parallel_size, tensor_model_parallel_size, pipeline_model_parallel_size):
"""
Generate context parallel groups based on context parallel size, considering tensor and pipeline model parallel sizes.
"""
assert world_size % (context_parallel_size * tensor_model_parallel_size * pipeline_model_parallel_size) == 0, "world_size must be divisible by the product of context_parallel_size, tensor_model_parallel_size, and pipeline_model_parallel_size"
data_parallel_size = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size)
context_parallel_group_ranks = []
num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
for j in range(data_parallel_size):
start_rank = (
i * num_pipeline_model_parallel_groups
+ j * tensor_model_parallel_size * context_parallel_size
)
end_rank = (
i * num_pipeline_model_parallel_groups
+ (j + 1) * tensor_model_parallel_size * context_parallel_size
)
for k in range(tensor_model_parallel_size):
ranks = range(start_rank + k, end_rank, tensor_model_parallel_size)
context_parallel_group_ranks.append(list(ranks))
return context_parallel_group_ranks
def plot_parallel_groups(title="Parallel Groups", dp_groups=None, tp_groups=None, pp_groups=None, cp_groups=None):
# Initialize a figure
fig, ax = plt.subplots(figsize=(8, 6))
# Define the spacing between blocks and their size
block_size = 700 # Size of the blocks in the scatter plot
spacing = 1.5 # Spacing multiplier between blocks
if cp_groups is None:
cp_offset_x = 0
cp_offset_y = 0
tp_offset_x = 0.2
tp_offset_y = -0.2
if tp_groups:
pp_offset_x = 0.4
pp_offset_y = -0.4
else:
pp_offset_x = 0.2
pp_offset_y = -0.2
else:
cp_offset_x = 0.2
cp_offset_y = -0.2
tp_offset_x = 0.4
tp_offset_y = -0.4
if tp_groups:
pp_offset_x = 0.6
pp_offset_y = -0.6
else:
pp_offset_x = 0.4
pp_offset_y = -0.4
# Adjust the grid layout to map GPU ranks from top-left to bottom-right
num_cols = 4 # Number of columns in the grid
x_positions = np.tile(np.arange(num_cols), num_cols) * spacing
y_positions = np.repeat(np.arange(num_cols), num_cols)[::-1] * spacing # Reverse to start from top
dp_colors = plt.cm.tab20(np.linspace(0, 1, len(dp_groups)))
# 使用tab20b提高颜色区分度
if tp_groups is not None:
tp_colors = plt.cm.tab20b(np.linspace(0, 1, len(tp_groups)))
# 如果需要更多颜色,可以考虑结合使用tab20b和tab20c
if pp_groups is not None:
pp_colors = plt.cm.tab20c(np.linspace(0, 1, len(pp_groups)))
if cp_groups is not None:
cp_colors = plt.cm.tab20c(np.linspace(0, 1, len(cp_groups)))
if cp_groups is not None:
for group_idx, group in enumerate(cp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)] + cp_offset_x
y = y_positions[rank % (num_cols*num_cols)] + cp_offset_y
ax.scatter(x, y, s=block_size, color=cp_colors[group_idx], edgecolor='black', zorder=5, marker='s')
ax.text(x, y, f'CP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
for group_idx, group in enumerate(dp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)]
y = y_positions[rank % (num_cols*num_cols)]
ax.scatter(x, y, s=block_size, color=dp_colors[group_idx], edgecolor='black', zorder=5, marker='>')
ax.text(x, y, f'DP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
if tp_groups is not None:
for group_idx, group in enumerate(tp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)] + tp_offset_x
y = y_positions[rank % (num_cols*num_cols)] + tp_offset_y
ax.scatter(x, y, s=block_size, color=tp_colors[group_idx], edgecolor='black', zorder=5, marker='p')
ax.text(x, y, f'TP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
if pp_groups is not None:
for group_idx, group in enumerate(pp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)] + pp_offset_x
y = y_positions[rank % (num_cols*num_cols)] + pp_offset_y
ax.scatter(x, y, s=block_size, color=pp_colors[group_idx], edgecolor='black', zorder=5, marker='h')
ax.text(x, y, f'PP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
# Draw a separating line between Node0 and Node1
mid_y_position = np.max(y_positions) / 2
ax.axhline(y=mid_y_position, color='black', linestyle='-', linewidth=2, zorder=0)
# Add Node labels
ax.text(-spacing, max(y_positions)/4, 'Node1', verticalalignment='center', fontsize=12)
ax.text(-spacing, 3*max(y_positions)/4, 'Node0', verticalalignment='center', fontsize=12)
# Adjusting the appearance
ax.set_aspect('equal') # Keep the aspect ratio square
ax.axis('off') # Turn off the axis
plt.title(title, pad=30)
return fig
def create_empty_plot():
"""Create an empty plot for initialization"""
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'Please select parallel configuration and click "Generate Visualization"',
ha='center', va='center', transform=ax.transAxes, fontsize=14)
ax.axis('off')
plt.title("Megatron-LM Parallel Group Visualization", pad=30)
return fig
def clear_all():
"""Clear all inputs and outputs"""
return (
None, # parallel_group_type
1, # tensor_model_parallel_size
1, # pipeline_model_parallel_size
1, # context_parallel_size
create_empty_plot(), # plot
"请选择并行配置,然后点击'生成可视化'按钮。" # description text
)
def update_plot(parallel_group_type, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size):
if parallel_group_type is None:
return create_empty_plot(), "请先选择并行分组类型。"
world_size = 16 # Fixed world size for 2 machines with 8 GPUs each
try:
description = descriptions.get(parallel_group_type, "Invalid parallel group type")
# Initialize groups to None
data_groups = tp_groups = pp_groups = cp_groups = None
if "CP" in parallel_group_type or parallel_group_type == 'Context Parallel':
cp_groups = generate_context_parallel_groups(world_size, context_parallel_size, tensor_model_parallel_size, pipeline_model_parallel_size)
if "DP" in parallel_group_type:
data_groups = generate_context_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size)
else:
if "DP" in parallel_group_type or parallel_group_type == 'Data Parallel':
data_groups = generate_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size)
if parallel_group_type in ['Tensor Model Parallel', 'DP+TP', 'DP+TP+PP', 'CP+DP+TP', 'CP+DP+TP+PP']:
tp_groups = generate_tensor_model_parallel_groups(world_size, tensor_model_parallel_size)
if parallel_group_type in ['Pipeline Parallel', 'DP+PP', 'DP+TP+PP', 'CP+DP+PP', 'CP+DP+TP+PP']:
pp_groups = generate_pipeline_parallel_groups(world_size, pipeline_model_parallel_size)
# Prepare text description for display
groups_list_str = ""
if data_groups:
groups_list_str += "Data Parallel Groups:\n"
groups_list_str += "\n".join([f"Data Group {idx + 1}: {group}" for idx, group in enumerate(data_groups)])
groups_list_str += "\n--------------------------------------\n"
if tp_groups:
groups_list_str += "Tensor Model Parallel Groups:\n"
groups_list_str += "\n".join([f"Tensor Group {idx + 1}: {group}" for idx, group in enumerate(tp_groups)])
groups_list_str += "\n--------------------------------------\n"
if pp_groups:
groups_list_str += "Pipeline Model Parallel Groups:\n"
groups_list_str += "\n".join([f"Pipeline Group {idx + 1}: {group}" for idx, group in enumerate(pp_groups)])
groups_list_str += "\n--------------------------------------\n"
if cp_groups:
groups_list_str += "Context Parallel Groups:\n"
groups_list_str += "\n".join([f"Context Group {idx + 1}: {group}" for idx, group in enumerate(cp_groups)])
groups_list_str += "\n--------------------------------------\n"
text_to_display = f"==========Parallel Groups Display==========\n\n{groups_list_str}\n\n{description}"
# Generate the figure with the parallel groups
fig = plot_parallel_groups(f"{parallel_group_type} Groups", data_groups if data_groups else [], tp_groups=tp_groups, pp_groups=pp_groups, cp_groups=cp_groups)
return fig, text_to_display
except Exception as e:
error_fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, f'Error: {str(e)}', ha='center', va='center', transform=ax.transAxes, fontsize=12, color='red')
ax.axis('off')
plt.title("Error", pad=30)
return error_fig, f"错误: {str(e)}"
# Create interface with improved layout
def create_interface():
with gr.Blocks(title="Megatron-LM Parallel Group Visualization") as iface:
gr.Markdown("# Megatron-LM Parallel Group Visualization")
gr.Markdown("Select parallel sizes and types to visualize different parallel groups with distinct colors. This includes combinations of Data Parallel (DP), Tensor Model Parallel (TP), Pipeline Parallel (PP), and Context Parallel (CP).")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 配置参数")
parallel_group_type = gr.Dropdown(
choices=['Data Parallel', 'Tensor Model Parallel', 'Pipeline Parallel', 'Context Parallel',
'DP+TP', 'DP+PP', 'DP+TP+PP',
'CP+DP', 'CP+DP+TP', 'CP+DP+PP', 'CP+DP+TP+PP'],
label="Parallel Group Type",
value=None
)
tensor_model_parallel_size = gr.Slider(1, 8, step=1, label="Tensor Model Parallel Size", value=1)
pipeline_model_parallel_size = gr.Slider(1, 8, step=1, label="Pipeline Model Parallel Size", value=1)
context_parallel_size = gr.Slider(1, 8, step=1, label="Context Parallel Size", value=1)
with gr.Row():
generate_btn = gr.Button("🔄 生成可视化", variant="primary")
clear_btn = gr.Button("🗑️ 清除所有", variant="secondary")
gr.Textbox(
value=basic_texts,
label="基础说明",
interactive=False,
lines=6
)
with gr.Column(scale=2):
plot_output = gr.Plot(value=create_empty_plot(), label="并行组可视化")
with gr.Row():
description_output = gr.Textbox(
value="请选择并行配置,然后点击'生成可视化'按钮。",
label="详细说明",
lines=15,
max_lines=20
)
# Event handlers
generate_btn.click(
fn=update_plot,
inputs=[parallel_group_type, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size],
outputs=[plot_output, description_output]
)
clear_btn.click(
fn=clear_all,
outputs=[parallel_group_type, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size, plot_output, description_output]
)
return iface
# Create and launch the interface
iface = create_interface()
iface.launch(share=True)