PipeDream-Partition-Algorithm

本文最后更新于:1 年前

PipeDream划分算法理论部分

PipeDream会在可用的机器间自动划分模型的层以最小化整体的训练时间

以下是PipeDream划分算法的工作流程图,不难看出PipeDream对输入的DNN采用PipeDream Profiler进行profile,会获得每一个层的相关数据(下面对profile过程会细讲),之后输入一定数目的机器给PipeDream optimizer,在其上面执行划分算法和确定重复因子。至此,便完成了PipeDream对DNN的划分。

PipeDream partition algorithm workflow

PipeDream划分算法的大致流程如下:对一个给定的N层DNN和M台可用的机器

  1. PipeDream首先在一个机器上对模型进行profile;
  2. 执行划分算法将层划分为stage;
  3. 同时确定每个stage的重复因子以最小化模型的整体训练时间。

Profile DNN模型

profile是用来记录每个层ll的一些指标的,这里记录的是:

  • TlT_l: 在ll层的前向和反向传递的总计算时间
  • ala_lll层在前向传递时的输出激活的大小/在反向传递时输入梯度的大小
  • wlw_lll层参数的大小

其中在PipeDream中TlT_l是通过多个mini-batch来统计得出的

而所有的通信都分三个步骤进行:

  1. 将数据从GPU移到发送方的CPU;
  2. 发送数据,从发送到发送到接收方;
  3. 将数据从CPU移到接收方的GPU。

显然2相较于1和3更耗时长,采用2来估计通信所需花费的时间,即是用该层的ala_l除于对应的带宽,即可以得到层ll到层l+1l+1通信所需花费的时间ClC_l

而每个worker(计算节点)在配置有mm台机子做数据并行的通信量是4(m1)wlm\frac{4(m-1)*|w_l|}{m},这个通信量是采用AllReduce计算而得的,则利用一个分布式参数服务器来为该层做权重同步所花费的时间为:WlmW_l^m

因为朴素的AllReduce和Ring AllReduce一类的通信量是一样的,只是解决了存在的带宽瓶颈问题。这里就以AllReduce的来进行通信量计算

某一个server(本身也是一个worker,比如是rank0)接收所有worker的参数:(m1)wl(m-1)*|w_l|,而后将更新后的参数返还给所有worker:(m1)wl(m-1)*|w_l|,其中该过程采用FP16来存储参数,故总通信量为:4(m1)wl4(m-1)*|w_l|,每个worker的通信量则为:4(m1)wlm\frac{4(m-1)*|w_l|}{m}

划分算法

划分算法是在profile完成后进行的,会用到profile的输出结果

我们的目标是最小化模型整体的训练时间即等价于最小化最慢stage的时间

这里介绍划分算法中涉及到的几个概念:

  • A(j,m)A(j,m)表示在1j1\rightarrow j层间使用mm个机子最优流水线的最慢stage所花费的时间;
  • T(ij,m)T(i\rightarrow j,m)表示一个单独的stage在iji\rightarrow j层,采用mm个机子做数据并行所花费的时间,其中该时间计算公式如下:(之所以采用二取一是因为通信和计算存在重叠,不用add起来)

此外,这里还涉及了一个让流水线处于繁忙状态的**NOAMNOAM,即最优的mini-batch数目**,通过如下公式计算而得:NOAM=MmachinesintheinputstageNOAM=\frac{M}{machines\quad in\quad the\quad input\quad stage}

其中动规的公式分为只有一个stage和多个stage的情况:

  • 一个stage

    A(j,m)=T(1j,m)A(j,m) = T(1\rightarrow j,m)

  • 多个stage

    \begin{align*} \begin{split} A(j,m) = \min_{1\le i\lt j} \min_{1\le m` \lt m} \max \left \{ \begin{array}{} A(i,m-m`) &①\\ 2*C_i &②\\ T(i+1 \rightarrow j,m`) &③\\ \end{array} \right . \end{split} \end{align*}

显然我们是在①、②、③这三个公式,找到此时的最慢stage,而后改变行列(行对应层数,列对应数据并行的机子数),找到最优流水线的最慢stage。因为是采用动态规划处理,而动态规划具有最优子结构、无后效性和重叠子问题三大性质,因此即使是子问题,所求得的结果也是最优的(最优子结构),最后的结果是在最优的子问题上进行叠加的,而后续问题的发展又不会影响到子问题的解(无后效性),而采用表格存储可以避免重复计算(重叠子问题)。当然后续一些多维并行的也会用到动态规划,但会采取融合算子,剪枝等方式减少搜索空间,因为PipeDream的是二维的,且空间很小,所以没用到相应技巧。

以下是动规的初始化部分,当完成了初始化后,便可以代入上式的公式进行计算,计算到终止条件A(N,M)A(N,M)时则会停止

  • A(1,m)=T(11,m)A(1,m) = T(1\rightarrow 1,m),其中m[1,M]m \in [1,M]
  • A(j,1)=T(1j,1)A(j,1) =T(1\rightarrow j,1),其中j[1,N]j \in [1,N]

PipeDream划分算法代码部分

🔷profile<span class=“hint–top hint–rounded” aria-label="PipeDream-Profile

">[1]

profile的目的是获得每个层的wl,Tl,alw_l,T_l,a_l,主要的函数调用步骤如下:

  1. 训练的时候先获取summary,这个函数(torchsummary.summary())可以计算网络的参数信息,然后调用create_graph(),这个函数的目的在于通过内部的调用,生成模型的DAG图;
  2. create_graph()内部会调用torchgraph.GraphCreator(),构造一个GraphCreator实例,这个实例的类内部定义了相关的钩子函数hook_modules(),可以为模型的子模块设置钩子,以追踪其内部的关联信息,然后利用模型运行,获得该信息,最后unhook_modules()把给子模块设置的钩子撤掉,并将profile的结果通过该类内部的persist_graph()方法给持久化成.dot,.txt,.pdf等类型,便于后续使用;
  3. GraphCreator的实例记作graph_creator,在初始化的时候,会把model,module_whitelist,summary这些作为实例变量,同时增加了forward_original_methods这个字典用来存储子模块和子模块的原生forward方法,还增加了graph类的实例。还有个inputs字典,用来存储好像是子模块和子模块经由TensorWrapper包装后的tensor;
  4. graph_creator实例通过调用hook_modules()这个方法,该方法内部通过调用forward_wrapper()forward_wrapper_root()方法,来将子模块和模型的forward进行替换:sub_module.forward = forward_wrapper().__get__(sub_module,sub_module.__class__)。而这俩方法内部,分别将子模块和模型设置为Tensorwrapper这一块的代码看不大懂,输入的是sub_module,但好像用作tensor?(之后debug看下这部分内在怎么运行的)),并且为图构造边;
  5. 上面说到的TensorWrapper,它内部根据我们输入的node_desc,就是对节点的形容,来判断并从summary中获取对应元素,获取它的forward pass和backward pass的时间,激活大小,参数大小等,包装为_node这个成员变量;
  6. 当模型运行后,获得了profile信息(整个模型的内部DAG图的构造是在forward_wrapper()这个function里完成的),采用persist_graph()给持久化成.dot文件(可视化一下),也持久化成.txt,便于后续使用。

🔷compute partition<span class=“hint–top hint–rounded” aria-label="PipeDream计算分区

">[2]

在计算分区的部分有两个很重要的概念,一个是增强反链,一个是后继反链

计算增强反链的目的是,更好的计算节点的时间,即将一个节点的增强反链内的节点视作一个大节点(或者说视作统一的一个状态),它们的输出激活也统一看待.

**怎么样去找到一个节点的增强反链呢?**首先我们是将本节点纳入到它对应的增强反链的list中,然后在它这个节点的全部前序节点中,找到有分叉的节点,且分叉节点的出边目的节点不在全部前序节点中且也不是本节点,则这个分叉节点就可以加入到该节点的增强反链中去.

**为什么要找这样的分叉点呢?**我们的模型是可以流水线并行的,也就是说对于节点而言,它的运行时间实际上是在max{输入时间,执行时间,输出时间}max{\{输入时间,执行时间,输出时间\}}这三个时间中找到最大值,那么对于一个节点的输入,它如果前序节点中存在分叉节点,这个分叉节点要输出数据给至少两个节点,那么这其中是否存在依赖关系是不可而知的,就比如另一个传完了,我才可以将结果向本节点输出,那因为未知,所以把它当作一个整体来考虑则更为妥当.计算出来节点的时间会更加精准合适.

计算后继反链的目的是找到下一个图分割点

**怎么去找到一个节点的后继反链呢?**首先,找到该节点的增强反链,获取增强反链中节点的出边的目的节点(后继嘛,就是后面的节点,且这个后继反链是相对于增强反链而言的,所以用增强反链中的去找),根据输入的节点的antichain过滤一波,剩下的预备役后继节点,一个个抓出来,跟增强反链一起丢进一个函数判断是否能成为后继节点,即判断预备役后继节点的successor与增强反链是否存在交集,不存在则说明这个预备役后继节点后面走的路不会又绕回去,则它是一个真正的后继节点.如此判断,将它们加入到对应的dict中.

而把这两个概念使用在一起的时候是当我们**建立antichain_dag**的时候

如何建立antichain_dag:

在graph里有个antichain_dag(),通过这个建立,主要的逻辑是:

  1. 获取sources中的第一个源点,构造其的{tuple(sorted(antichain)) : AntichainNode}(这里的antichain就是[node8]这样的东东,然后将这个antichain去实例化AntichainNode类,就得到了antichain_mapping中的一个item);
  2. 然后把这个用AntichainNode实例后的source_node加入到antichain_dag.source
  3. 把antichain置入antichain_queue中,后期做类似bfs的操作用到它,把1中的item加入antichain_mapping
  4. 在类bfs的循环中,通过pop出首个antichain,然后获取它的后继反链,遍历后继反链中的节点,如果没有在antichain_mapping中的话就给它实例化为反链节点(AntichainNode),然后加入mapping中,并将其与antichain构造边(antichain->next_antichain,反链DAG中的边,注意它的这个节点是antichain和next_antichain对应的AntichainNode实例),然后把这个后继节点加入队列中.

如此循环,这个反链DAG就完成了,有以下需要注意的:

  • 因为构建顺序是先增强反链(AntichainNode)后后继反链,所以如果节点antichain在后继反链dict中说明已经被记录过了,则直接continue就好;
  • 如果后继节点在antichain_mapping里面,说明前面给它构建过反链节点(AntichainNode)了(因为一个节点有可能是多个节点的后继节点),但依旧需要构建新的边,就是你新的大节点/状态,它们对这个后继节点都有指向,这也对应了前面说的找后继节点,就是找图分割点;

总的来说antichain_dag中的节点就是原生节点对应反链节点(增强反链形成的大节点/状态),而关系,即边的连接则是原dag中的antichain和next_antichain,就是原DAG中的原节点和后继反链节点形成的关系,不过是用反链节点link起来的(感觉就是原DAG去了环(其实也不是,因为DAG本来就没环…,只是一种形状上的描述,找不到好的词语形容,自己画图感受),所谓的环通过增强反链替代成大节点,后继反链则是找到大节点的出边节点)

计算分区初步做的事情分为以下四步:

  1. 去掉输入的source.因为输入就在第一层,之后convert model再加上,不用让optimizer决定放哪;
  2. 去掉无用的输出,就是sink中node_desc为(__getitem__)这种的;
  3. 构建antichain_dag;
  4. antichain_dag进行拓扑排序

在经过对antichain_dag进行拓扑排序后,可以得到顺序执行的线性序列,由states承载.

此处对上面的过程做一个总结:

antichain_dag()中,我们通过对profile生成的DAG(注意此时已对作为输入的source和无用的sink进行删除)获取它结点的增强反链,并构成AntichainNode,然后找到它的后继反链,对后继反链的结点生成AntichainNode,然后构造AntichainNode[antichain]=>AntichainNode[next_antichain]的边,循环迭代,构造完反链DAG

据上可知增强反链是反链DAG节点的一个重要属性,而原DAG是依据后继反链来划分的反链节点,而后对反链DAG做拓扑排序(实际就是对增强反链做拓扑排序),最终得到states,states是反链节点的一个list,拓扑排序好了的.

获得了states,就到了自动分区的部分

在开始正式自动分区的介绍前,就是进入compute_partitioning()之前,有如下举动:

  • 计算了每个state的输出激活值大小,就是根据它们的增强反链的节点的输出激活值大小,来累加到states[i].output_activation_size.这里的输出激活值大小是state的必要前序节点给自己的输出;
  • 计算了每个state的计算时间,激活值大小,参数值大小,通过遍历原dag每个state的前置节点以进行累加;

上面这些都是作为state的属性/成员变量而存在的.比如:

state.output_activation_size state.compute_time state.activation_size state.parameter_size

需要注意:output_activation_sizeactivation_size是不一样的

  • 计算了个总输出激活值大小output_activation_sizes;
  • 计算了每个state在反链dag的前序节点all_predecessor_ids

需要注意:这里的前序节点是反链DAG的

  • 计算了states之间的计算时间,激活值大小,参数值大小,这是一个二维数组,表示的是i->j的时间,激活值,参数值,大致形式如下:

states之间的差

比如第一行,表示的是state0->state[0,...,n-1]所花费的计算时间/激活值大小/参数值大小;

第二行则代表的是state1->state[1,...,n-1]所花费的计算时间/激活值大小/参数值大小;就是要减去state0的值.以此类推~


在开始进入到compute_partitioning()这个函数前,有两个变量不是很懂是干嘛的,希望看完之后可以弄懂==counter num_machines_in_machine==,通过遍历机器集和带宽,进行多次计算分区

🔷 计算分区(根据dp找最优化结果)compute_partitioning

🔷 分析分区(根据最优化结果做分区)analyze_partitioning

🔷convert models<span class=“hint–top hint–rounded” aria-label="PipeDream转换模型

">[3]

🔷runtime<span class=“hint–top hint–rounded” aria-label="PipeDream运行时引擎

">[4]

PipeDream划分算法代码中使用到的自定义类解析

Graph类

PipeDream中的Graph类有的属性和方法大致介绍:
涉及到的一些名字介绍:

  • source -> 源点,入度为0的点
  • sink -> 汇点,出度为为0的点
  • 链是指节点集合,且里面的节点间是存在单向可达的关系,即要么x到y是可达的,或者y到x是可达的
  • 反链是指节点集合,且里面的节点间是不可达的,即x到y是不可达的且y到x是不可达的

属性部分:

  • edges 边dict(出边,key对应的是箭尾的点,value是个list,对应的是箭头的点)

  • in_edges 边dict(入边,key对应的是箭头的点,value是个list,对应的是箭尾的点)

  • nodes 节点dict

  • _predecessors 前序节点dict

  • _successors 后继节点dict

  • _augmented_antichains 增强反链dict {Tuple(Node.node_id) : List[Node.node_id]}

    增强反链 = 当前节点 + 部分前序节点

    找增强反链的部分前序节点的方法:

    1. 获取当前节点的全部前序节点
    2. 若前序节点的出边目的节点不在当前节点的全部前序节点中,也不是当前节点,则将其加入增强反链中

    总之,增强反链中的部分前序节点,是存在分叉的节点,且其分叉的去向与当前节点的已走过的路径无关

  • _next_antichains 后继反链dict

    后继反链的查找方式如下:

    1. 对输入的反链寻找增强反链;
    2. 对其增强反链进行遍历,获取增强反链节点,获取其出边对应的箭头节点
    3. 遍历出边的箭头节点们,已在反链中则不予理睬,而后进入is_next_antichain()进行判断
    4. 若箭头节点的后续节点不在增强反链中,则证明其为后继反链节点
    5. 将其加入输入反链作key的后继反链中
  • _antichain_dag 反链DAG

    与其说_antichain_dag是反链DAG,其内部的节点是AntichainNode类的实例,它内部的antichain代表的该节点的增强反链,而图的结构关系体现的是后继反链的关系,我们保存了它的source,根据source即可遍历出每一个节点对应的后继反链,可以依次整理出antichain_mapping的关系,这个mapping即是{Tuple(Node.node_id) : AntichainNode}的关系

方法部分:

  • copy() # 复制一张新图
  • sources() # 返回一个源点列表
  • sinks() # 返回一个汇点列表
  • add_node()
  • remove_node()
  • reset() # 重置前序节点集和后继节点集
  • add_edge()
  • remove_edge()
  • to_dot() # 生成dot文件,即是训练模型的dag图
  • topological_sort() # 拓扑排序,下面那个helper也是,主要是helper递归,对antichain_dag进行拓扑排序,生成线性序列,利用states存储
  • topological_sort_helper()
  • augment_antichain # 根据输入的反链,寻找增强反链
  • deaugment_augmented_antichain() # 这个没搞懂
  • next_antichains() # 根据输入的反链,找到其后继反链
  • is_next_antichain() # 判断是否符合后继反链,通过获取输入的new_node的后续节点,来判断该节点的后续节点是否在输入给定的增强反链augmented_antichain中,若不在,则返回True,即是新的后继反链的节点
  • construct_antichain() # 构造新的反链,里面调用的是deaugment那个函数,然后这个方法是在后继反链节点被is_next_antichain()判断为True后调用的
  • antichain_dag() # 构造反链DAG,实际DAG结构体现后继反链关系,节点内部体现增强反链关系

Node类

属性部分:

  • node_id 节点id号,比如node58
  • node_desc 形容节点作用,比如Add(inplace)这样的
  • forward_compute_time
  • backward_compute_time
  • activation_size
  • parameter_size
  • stage_id
  • depth
  • height

方法部分:

  • __str__()

    1
    2
    3
    return "%s -- %s -- forward_compute_time=%.3f, backward_compute_time=%.3f, activation_size=%s, parameter_size=%.3f%s" % (
    self.node_id, node_desc, self.forward_compute_time, self.backward_compute_time,
    activation_size, self.parameter_size, stage_id_str)

    给出的一个例子:node58 -- Add(inplace) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=102760448.000, parameter_size=0.000

  • from_str() # 静态方法,

AntichainNode类

该类继承了Node

该类在antichain_dag此方法内被频繁实例化,以构建后继反链节点对应的增强反链,因为后继反链节点它是要被用来作分割的,那么它如果有一些前序节点是存在分叉的,即可能存在执行的依赖关系,因此需要把这些点看作一个整体,把输出激活统一统计,存放于AntichainNode这个节点中,并且利用相关属性把增强反链涉及到的节点进行存储

属性部分:

  • antichain 反链,实际上这里是增强反链
  • output_activation_size 统计增强反链的输出激活大小

方法部分:

  • __str__() # 就把打印的格式给重构了一下

    return "%s -- %s" % (self.node_id, self.antichain)

GraphCreator类

属性部分:

  • model 模型,用于后面给子模块
  • module_whitelist 白名单,处于子模块白名单
  • summary 由torchsummary这个包的summary函数获得
  • forward_original_methods 以{sub_module: sub_module.forward}形式存储了原始的子模块的forward函数
  • graph Graph类的实例对象
  • inputs 这个inputs,它的kv大概是这样的:{sub_module: sub_module TensorWrapper类的实例}

方法部分:

  • hook_modules() # 给子模块加钩子函数,具体做法是通过遍历子模块,为无后继部分的子模块或是白名单中的子模块更换forward函数,通过sub_module.forward = forward_wrapper.__get__(sub_module,sub_module.__class__)方法来更换,此种即是使得forward_wrapper(sub_module)延迟触发.内部还定义了两个方法:
    • forward_wrapper() # 把sub_module给包装成TensorWrapper
    • forward_wrapper_root()
  • unhook_modules() # 利用forward_original_methodsforward函数还原
  • persist_graph() # 持久化图,生成graph.dot(可视化的dag图),生成graph.txt,保存为条状图和对应的pdf文件

TensorWrapper类

属性部分:

  • tensor
  • object_id 这是一个全局参数
  • node_desc
  • graph_creator
  • activation_size
  • _node 这个是包装好的node

torchsummary库下的summary方法

这个方法是真正进行forward_compute_time等的部分,与pip的torchsummary库不一样,经由TensorWrapper整理(实际上它是把统计好的Tl,al,wlT_l,a_l,w_l信息给包装起来,通过Node类包装)

参考文章:


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!