PipeDream-Partition-Algorithm
本文最后更新于:1 年前
PipeDream划分算法理论部分
PipeDream会在可用的机器间自动划分模型的层以最小化整体的训练时间
以下是PipeDream划分算法的工作流程图,不难看出PipeDream对输入的DNN采用PipeDream Profiler进行profile,会获得每一个层的相关数据(下面对profile过程会细讲),之后输入一定数目的机器给PipeDream optimizer,在其上面执行划分算法和确定重复因子。至此,便完成了PipeDream对DNN的划分。
PipeDream划分算法的大致流程如下:对一个给定的N层DNN和M台可用的机器
- PipeDream首先在一个机器上对模型进行profile;
- 执行划分算法将层划分为stage;
- 同时确定每个stage的重复因子以最小化模型的整体训练时间。
Profile DNN模型
profile是用来记录每个层的一些指标的,这里记录的是:
- : 在层的前向和反向传递的总计算时间
- : 层在前向传递时的输出激活的大小/在反向传递时输入梯度的大小
- : 层参数的大小
其中在PipeDream中是通过多个mini-batch来统计得出的
而所有的通信都分三个步骤进行:
- 将数据从GPU移到发送方的CPU;
- 发送数据,从发送到发送到接收方;
- 将数据从CPU移到接收方的GPU。
显然2相较于1和3更耗时长,采用2来估计通信所需花费的时间,即是用该层的除于对应的带宽,即可以得到层到层通信所需花费的时间
而每个worker(计算节点)在配置有台机子做数据并行的通信量是,这个通信量是采用AllReduce计算而得的,则利用一个分布式参数服务器来为该层做权重同步所花费的时间为:
因为朴素的AllReduce和Ring AllReduce一类的通信量是一样的,只是解决了存在的带宽瓶颈问题。这里就以AllReduce的来进行通信量计算
某一个server(本身也是一个worker,比如是rank0)接收所有worker的参数:,而后将更新后的参数返还给所有worker:,其中该过程采用FP16来存储参数,故总通信量为:,每个worker的通信量则为:
划分算法
划分算法是在profile完成后进行的,会用到profile的输出结果
我们的目标是最小化模型整体的训练时间即等价于最小化最慢stage的时间
这里介绍划分算法中涉及到的几个概念:
- 表示在层间使用个机子最优流水线的最慢stage所花费的时间;
- 表示一个单独的stage在层,采用个机子做数据并行所花费的时间,其中该时间计算公式如下:(之所以采用二取一是因为通信和计算存在重叠,不用add起来)
此外,这里还涉及了一个让流水线处于繁忙状态的**,即最优的mini-batch数目**,通过如下公式计算而得:
其中动规的公式分为只有一个stage和多个stage的情况:
-
一个stage
-
多个stage
\begin{align*} \begin{split} A(j,m) = \min_{1\le i\lt j} \min_{1\le m` \lt m} \max \left \{ \begin{array}{} A(i,m-m`) &①\\ 2*C_i &②\\ T(i+1 \rightarrow j,m`) &③\\ \end{array} \right . \end{split} \end{align*}
显然我们是在①、②、③这三个公式,找到此时的最慢stage,而后改变行列(行对应层数,列对应数据并行的机子数),找到最优流水线的最慢stage。因为是采用动态规划处理,而动态规划具有最优子结构、无后效性和重叠子问题三大性质,因此即使是子问题,所求得的结果也是最优的(最优子结构),最后的结果是在最优的子问题上进行叠加的,而后续问题的发展又不会影响到子问题的解(无后效性),而采用表格存储可以避免重复计算(重叠子问题)。当然后续一些多维并行的也会用到动态规划,但会采取融合算子,剪枝等方式减少搜索空间,因为PipeDream的是二维的,且空间很小,所以没用到相应技巧。
以下是动规的初始化部分,当完成了初始化后,便可以代入上式的公式进行计算,计算到终止条件时则会停止
- ,其中
- ,其中
PipeDream划分算法代码部分
🔷profile<span class=“hint–top hint–rounded” aria-label="PipeDream-Profile
">[1]
profile的目的是获得每个层的,主要的函数调用步骤如下:
- 在训练的时候先获取
summary
,这个函数(torchsummary.summary()
)可以计算网络的参数信息,然后调用create_graph()
,这个函数的目的在于通过内部的调用,生成模型的DAG图; create_graph()
内部会调用torchgraph.GraphCreator()
,构造一个GraphCreator
实例,这个实例的类内部定义了相关的钩子函数hook_modules()
,可以为模型的子模块设置钩子,以追踪其内部的关联信息,然后利用模型运行,获得该信息,最后unhook_modules()
把给子模块设置的钩子撤掉,并将profile的结果通过该类内部的persist_graph()
方法给持久化成.dot,.txt,.pdf等类型,便于后续使用;GraphCreator
的实例记作graph_creator
,在初始化的时候,会把model
,module_whitelist
,summary
这些作为实例变量,同时增加了forward_original_methods
这个字典用来存储子模块和子模块的原生forward
方法,还增加了graph
类的实例。还有个inputs
字典,用来存储好像是子模块和子模块经由TensorWrapper
包装后的tensor;graph_creator
实例通过调用hook_modules()
这个方法,该方法内部通过调用forward_wrapper()
和forward_wrapper_root()
方法,来将子模块和模型的forward
进行替换:sub_module.forward = forward_wrapper().__get__(sub_module,sub_module.__class__)
。而这俩方法内部,分别将子模块和模型设置为Tensorwrapper
(这一块的代码看不大懂,输入的是sub_module
,但好像用作tensor?(之后debug看下这部分内在怎么运行的)),并且为图构造边;- 上面说到的
TensorWrapper
,它内部根据我们输入的node_desc
,就是对节点的形容,来判断并从summary
中获取对应元素,获取它的forward pass和backward pass的时间,激活大小,参数大小等,包装为_node
这个成员变量; - 当模型运行后,获得了profile信息(整个模型的内部DAG图的构造是在
forward_wrapper()
这个function里完成的),采用persist_graph()给持久化成.dot文件(可视化一下),也持久化成.txt,便于后续使用。
🔷compute partition<span class=“hint–top hint–rounded” aria-label="PipeDream计算分区
">[2]
在计算分区的部分有两个很重要的概念,一个是增强反链,一个是后继反链
计算增强反链的目的是,更好的计算节点的时间,即将一个节点的增强反链内的节点视作一个大节点(或者说视作统一的一个状态),它们的输出激活也统一看待.
**怎么样去找到一个节点的增强反链呢?**首先我们是将本节点纳入到它对应的增强反链的list中,然后在它这个节点的全部前序节点中,找到有分叉的节点,且分叉节点的出边目的节点不在全部前序节点中且也不是本节点,则这个分叉节点就可以加入到该节点的增强反链中去.
**为什么要找这样的分叉点呢?**我们的模型是可以流水线并行的,也就是说对于节点而言,它的运行时间实际上是在这三个时间中找到最大值,那么对于一个节点的输入,它如果前序节点中存在分叉节点,这个分叉节点要输出数据给至少两个节点,那么这其中是否存在依赖关系是不可而知的,就比如另一个传完了,我才可以将结果向本节点输出,那因为未知,所以把它当作一个整体来考虑则更为妥当.计算出来节点的时间会更加精准合适.
计算后继反链的目的是找到下一个图分割点
**怎么去找到一个节点的后继反链呢?**首先,找到该节点的增强反链,获取增强反链中节点的出边的目的节点(后继嘛,就是后面的节点,且这个后继反链是相对于增强反链而言的,所以用增强反链中的去找),根据输入的节点的antichain
过滤一波,剩下的预备役后继节点,一个个抓出来,跟增强反链一起丢进一个函数判断是否能成为后继节点,即判断预备役后继节点的successor与增强反链是否存在交集,不存在则说明这个预备役后继节点后面走的路不会又绕回去,则它是一个真正的后继节点.如此判断,将它们加入到对应的dict中.
而把这两个概念使用在一起的时候是当我们**建立antichain_dag
**的时候
如何建立antichain_dag
:
在graph里有个antichain_dag()
,通过这个建立,主要的逻辑是:
- 获取sources中的第一个源点,构造其的
{tuple(sorted(antichain)) : AntichainNode}
(这里的antichain
就是[node8]这样的东东,然后将这个antichain去实例化AntichainNode类,就得到了antichain_mapping
中的一个item); - 然后把这个用AntichainNode实例后的
source_node
加入到antichain_dag.source
中 - 把antichain置入
antichain_queue
中,后期做类似bfs的操作用到它,把1中的item加入antichain_mapping
中 - 在类bfs的循环中,通过pop出首个antichain,然后获取它的后继反链,遍历后继反链中的节点,如果没有在
antichain_mapping
中的话就给它实例化为反链节点(AntichainNode),然后加入mapping中,并将其与antichain构造边(antichain->next_antichain,反链DAG中的边,注意它的这个节点是antichain和next_antichain对应的AntichainNode实例),然后把这个后继节点加入队列中.
如此循环,这个反链DAG就完成了,有以下需要注意的:
- 因为构建顺序是先增强反链(AntichainNode)后后继反链,所以如果节点antichain在后继反链dict中说明已经被记录过了,则直接continue就好;
- 如果后继节点在
antichain_mapping
里面,说明前面给它构建过反链节点(AntichainNode)了(因为一个节点有可能是多个节点的后继节点),但依旧需要构建新的边,就是你新的大节点/状态,它们对这个后继节点都有指向,这也对应了前面说的找后继节点,就是找图分割点;
总的来说antichain_dag
中的节点就是原生节点对应反链节点(增强反链形成的大节点/状态),而关系,即边的连接则是原dag中的antichain和next_antichain,就是原DAG中的原节点和后继反链节点形成的关系,不过是用反链节点link起来的(感觉就是原DAG去了环(其实也不是,因为DAG本来就没环…,只是一种形状上的描述,找不到好的词语形容,自己画图感受),所谓的环通过增强反链替代成大节点,后继反链则是找到大节点的出边节点)
计算分区初步做的事情分为以下四步:
- 去掉输入的source.因为输入就在第一层,之后convert model再加上,不用让optimizer决定放哪;
- 去掉无用的输出,就是sink中
node_desc
为(__getitem__
)这种的; - 构建
antichain_dag
; - 对
antichain_dag
进行拓扑排序
在经过对antichain_dag
进行拓扑排序后,可以得到顺序执行的线性序列,由states
承载.
此处对上面的过程做一个总结:
在antichain_dag()
中,我们通过对profile生成的DAG(注意此时已对作为输入的source和无用的sink进行删除)获取它结点的增强反链,并构成AntichainNode
,然后找到它的后继反链,对后继反链的结点生成AntichainNode
,然后构造AntichainNode[antichain]=>AntichainNode[next_antichain]
的边,循环迭代,构造完反链DAG
据上可知增强反链是反链DAG节点的一个重要属性,而原DAG是依据后继反链来划分的反链节点,而后对反链DAG做拓扑排序(实际就是对增强反链做拓扑排序),最终得到states,states是反链节点的一个list,拓扑排序好了的.
获得了states,就到了自动分区的部分
在开始正式自动分区的介绍前,就是进入compute_partitioning()
之前,有如下举动:
- 计算了每个state的输出激活值大小,就是根据它们的增强反链的节点的输出激活值大小,来累加到
states[i].output_activation_size
.这里的输出激活值大小是state的必要前序节点给自己的输出; - 计算了每个state的计算时间,激活值大小,参数值大小,通过遍历原dag每个state的前置节点以进行累加;
上面这些都是作为state的属性/成员变量而存在的.比如:
state.output_activation_size
state.compute_time
state.activation_size
state.parameter_size
需要注意:output_activation_size
和activation_size
是不一样的
- 计算了个总输出激活值大小
output_activation_sizes
; - 计算了每个state在反链dag的前序节点
all_predecessor_ids
需要注意:这里的前序节点是反链DAG的
- 计算了states之间的计算时间,激活值大小,参数值大小,这是一个二维数组,表示的是
i->j
的时间,激活值,参数值,大致形式如下:
比如第一行,表示的是state0->state[0,...,n-1]
所花费的计算时间/激活值大小/参数值大小;
第二行则代表的是state1->state[1,...,n-1]
所花费的计算时间/激活值大小/参数值大小;就是要减去state0
的值.以此类推~
在开始进入到compute_partitioning()
这个函数前,有两个变量不是很懂是干嘛的,希望看完之后可以弄懂==counter
num_machines_in_machine
==,通过遍历机器集和带宽,进行多次计算分区
🔷 计算分区(根据dp找最优化结果)compute_partitioning
🔷 分析分区(根据最优化结果做分区)analyze_partitioning
🔷convert models<span class=“hint–top hint–rounded” aria-label="PipeDream转换模型
">[3]
🔷runtime<span class=“hint–top hint–rounded” aria-label="PipeDream运行时引擎
">[4]
PipeDream划分算法代码中使用到的自定义类解析
Graph类
PipeDream中的Graph类有的属性和方法大致介绍:
涉及到的一些名字介绍:
- source -> 源点,入度为0的点
- sink -> 汇点,出度为为0的点
- 链是指节点集合,且里面的节点间是存在单向可达的关系,即要么x到y是可达的,或者y到x是可达的
- 反链是指节点集合,且里面的节点间是不可达的,即x到y是不可达的且y到x是不可达的
属性部分:
-
edges 边dict(出边,key对应的是箭尾的点,value是个list,对应的是箭头的点)
-
in_edges 边dict(入边,key对应的是箭头的点,value是个list,对应的是箭尾的点)
-
nodes 节点dict
-
_predecessors 前序节点dict
-
_successors 后继节点dict
-
_augmented_antichains 增强反链dict
{Tuple(Node.node_id) : List[Node.node_id]}
增强反链 = 当前节点 + 部分前序节点
找增强反链的部分前序节点的方法:
- 获取当前节点的全部前序节点
- 若前序节点的出边目的节点不在当前节点的全部前序节点中,也不是当前节点,则将其加入增强反链中
总之,增强反链中的部分前序节点,是存在分叉的节点,且其分叉的去向与当前节点的已走过的路径无关
-
_next_antichains 后继反链dict
后继反链的查找方式如下:
- 对输入的反链寻找增强反链;
- 对其增强反链进行遍历,获取增强反链节点,获取其出边对应的箭头节点
- 遍历出边的箭头节点们,已在反链中则不予理睬,而后进入
is_next_antichain()
进行判断 - 若箭头节点的后续节点不在增强反链中,则证明其为后继反链节点
- 将其加入输入反链作key的后继反链中
-
_antichain_dag 反链DAG
与其说_antichain_dag是反链DAG,其内部的节点是AntichainNode类的实例,它内部的antichain代表的该节点的增强反链,而图的结构关系体现的是后继反链的关系,我们保存了它的source,根据source即可遍历出每一个节点对应的后继反链,可以依次整理出
antichain_mapping
的关系,这个mapping即是{Tuple(Node.node_id) : AntichainNode}
的关系
方法部分:
- copy() # 复制一张新图
- sources() # 返回一个源点列表
- sinks() # 返回一个汇点列表
- add_node()
- remove_node()
- reset() # 重置前序节点集和后继节点集
- add_edge()
- remove_edge()
- to_dot() # 生成dot文件,即是训练模型的dag图
- topological_sort() # 拓扑排序,下面那个helper也是,主要是helper递归,对antichain_dag进行拓扑排序,生成线性序列,利用
states
存储 - topological_sort_helper()
- augment_antichain # 根据输入的反链,寻找增强反链
- deaugment_augmented_antichain() # 这个没搞懂
- next_antichains() # 根据输入的反链,找到其后继反链
- is_next_antichain() # 判断是否符合后继反链,通过获取输入的
new_node
的后续节点,来判断该节点的后续节点是否在输入给定的增强反链augmented_antichain
中,若不在,则返回True,即是新的后继反链的节点 - construct_antichain() # 构造新的反链,里面调用的是deaugment那个函数,然后这个方法是在后继反链节点被is_next_antichain()判断为True后调用的
- antichain_dag() # 构造反链DAG,实际DAG结构体现后继反链关系,节点内部体现增强反链关系
Node类
属性部分:
- node_id 节点id号,比如
node58
- node_desc 形容节点作用,比如
Add(inplace)
这样的 - forward_compute_time
- backward_compute_time
- activation_size
- parameter_size
- stage_id
- depth
- height
方法部分:
-
__str__()
1
2
3return "%s -- %s -- forward_compute_time=%.3f, backward_compute_time=%.3f, activation_size=%s, parameter_size=%.3f%s" % (
self.node_id, node_desc, self.forward_compute_time, self.backward_compute_time,
activation_size, self.parameter_size, stage_id_str)给出的一个例子:
node58 -- Add(inplace) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=102760448.000, parameter_size=0.000
-
from_str() # 静态方法,
AntichainNode类
该类继承了Node
类
该类在antichain_dag
此方法内被频繁实例化,以构建后继反链节点对应的增强反链,因为后继反链节点它是要被用来作分割的,那么它如果有一些前序节点是存在分叉的,即可能存在执行的依赖关系,因此需要把这些点看作一个整体,把输出激活统一统计,存放于AntichainNode这个节点中,并且利用相关属性把增强反链涉及到的节点进行存储。
属性部分:
- antichain 反链,实际上这里是增强反链
- output_activation_size 统计增强反链的输出激活大小
方法部分:
-
__str__() # 就把打印的格式给重构了一下
return "%s -- %s" % (self.node_id, self.antichain)
GraphCreator类
属性部分:
- model 模型,用于后面给子模块
- module_whitelist 白名单,处于子模块白名单
- summary 由torchsummary这个包的summary函数获得
- forward_original_methods 以{sub_module: sub_module.forward}形式存储了原始的子模块的forward函数
- graph Graph类的实例对象
- inputs 这个inputs,它的kv大概是这样的:{sub_module: sub_module TensorWrapper类的实例}
方法部分:
- hook_modules() # 给子模块加钩子函数,具体做法是通过遍历子模块,为无后继部分的子模块或是白名单中的子模块更换forward函数,通过
sub_module.forward = forward_wrapper.__get__(sub_module,sub_module.__class__)
方法来更换,此种即是使得forward_wrapper(sub_module)
延迟触发.内部还定义了两个方法:- forward_wrapper() # 把sub_module给包装成TensorWrapper
- forward_wrapper_root()
- unhook_modules() # 利用
forward_original_methods
把forward
函数还原 - persist_graph() # 持久化图,生成
graph.dot
(可视化的dag图),生成graph.txt,保存为条状图和对应的pdf文件
TensorWrapper类
属性部分:
- tensor
- object_id 这是一个全局参数
- node_desc
- graph_creator
- activation_size
- _node 这个是包装好的node
torchsummary库下的summary方法
这个方法是真正进行forward_compute_time等的部分,与pip的torchsummary库不一样,经由TensorWrapper整理(实际上它是把统计好的信息给包装起来,通过Node类包装)
参考文章:
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!