OneFlow源码阅读7:静态图与运行时

1年前 (2022) 程序员胖胖胖虎阿
229 0 0

oneflow静态图的训练效率远高于动态图(eager模式)。本文试图通过一个简单例子,结合0.8.0的代码,看一下静态图和运行时的实现机制。
在开始之前,建议先读一下参考资料中《OneFlow框架的系统设计》等系列文章。对静态图、运行时的基本概念和设计理念有基本的了解,会更容易理解代码。

1 代码示例

下面的示例代码来自官方文档,是一个线性模型的前向计算。后续主要基于这段代码进行分析。

import oneflow as flow
import oneflow.nn as nn

class ModuleMyLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(flow.randn(in_features, out_features))
        self.bias = nn.Parameter(flow.randn(out_features))

    def forward(self, input):
        return flow.matmul(input, self.weight) + self.bias

linear_model = ModuleMyLinear(4, 3)

class GraphMyLinear(nn.Graph):
    def __init__(self):
        super().__init__()
        # ModuleBlock
        self.model = linear_model

    def build(self, input):
        # ModuleBlock.__call__
        return self.model(input)

graph_mylinear = GraphMyLinear()
input = flow.randn(1, 4)
out = graph_mylinear(input)
print(out)

2 oneflow包的初始化

import oneflow在初始化包时,与静态图相关的主要操作如下:

  • GetEnv

    • EnvGlobalObjectsScope::Init

      • 启动各个节点的控制面网络连接
      • 初始化VM
      • 启动各个节点的数据面网络连接
      • 初始化KernelObserver
  • NewDefaultSession

    • RegsiterSession 创建 Session,并注册为 default session
    • 创建 Python MultiClientSession 并保存到dict,但并不 TryInit

      • 创建 C++ MultiClientSessionContext 但并不 TryInit

EnvGlobalObjectsScope::Init中先创建一个全局的ProcessCtx对象。然后根据环境变量等配置,在各个进程间创建gRPC和CommNet的连接,分别负责控制面和数据面的数据传输。其中在Bootstrap过程中会初始化全局的ProcessCtx,给每个进程分配一个全局唯一的rank编号(machine_id)。
本文不涉及网络层面的操作,只讨论同一进程内各线程间的交互。

3 Module类

虽然可以直接用op和tensor构造模型,但是op的粒度太细了,直接用op构造模型会比较繁琐。
Module是由op和tensor构成的、可复用的子模块。利用Module可以更高效、更快捷的构建复杂模型。oneflow.nn模块导出了很多预定义的Module。

Module定义了自己的属性设置逻辑,核心逻辑是

  • 如果value是Parameter类型,就保存到Module._parameters
  • 如果value是Module类型,就保存到Module._modules
  • 如果value是Tensor类型,就保存到Module._buffers
  • 否则按常规属性处理

Module可以包含子Module,形成树结构。因为Module通过setattr将子Module和Parameter都保存到字典结构中,可以方便的遍历所有Module及其参数tensor。

4 Graph类

4.1 构造函数

Graph的构造函数中GetDefaultSession得到的session,就是导入oneflow包时NewDefaultSession构建的session。当时没有初始化,而是在Graph构造时进行初始化。对应的C++函数是MultiClientSessionContext::TryInit,执行时会创建各种全局的资源管理器,比如:

  • LazyJobBuildAndInferCtxMgr
  • BufferMgr
  • RegstMgr
  • ActorMsgBus
  • ThreadMgr

4.2 __setattr__: 将Module和Tensor封装为Block

Graph.__setattr__不允许将Tensor对象设置为属性。Tensor只能存到Module中。
setattr最重要的动作就是对_add_block的调用,_add_block中主要是调用get_block_cls并保存结果。
get_block_cls的作用是将Module及其所有Tensor属性都转为对应的Block对象。为什么要做这个动作呢?主要是静态图编译需要借助Block类型的一些功能,这些功能不适合直接加诸于Module和Tensor。
这个转换是在ModuleBlock构造时调用set_origin完成的。对于子Module,会递归调用get_block_cls函数,这样所有子Module及其Tensor属性都会被转换为对应的Block对象。
所以,上述示例代码中,GraphMyLinear实际存储的是ModuleBlock,Graph.build执行时获取的model属性也是ModuleBlock对象,ModuleBlock.origin才是ModuleMyLinear。

4.3 针对不同任务,定义不同的计算图

根据Oneflow Model Zoo的模型示例,train/eval/infer等阶段可以创建不同的Graph子类。在这些不同阶段,Graph构造函数的行为、build函数的输入输出都有各自特点。了解这些,看后续代码时会更容易理解各个参数的具体含义。

  • 构造函数

    • train阶段,构造函数会引入Module、损失函数、优化器和data_loader
    • eval阶段,只需要引入Module和data_loader
    • infer阶段,只需要引入Module
  • build函数

    • train

      • 导入样本和label
      • 调用Module得到前向计算结果
      • 计算损失
      • 计算梯度
      • 返回loss
    • eval

      • 导入样本和label
      • 调用Module得到预估结果
      • 返回预估结果和label
    • infer

      • 传入的参数包括样本
      • 调用Module得到预估结果
      • 返回预估结果

4.4 小结

上述几个类型的关系如下:
OneFlow源码阅读7:静态图与运行时

下面描述了GraphMyLinear的构造流程

* `__init__`
  * `Graph.__init__`
  * self.model = linear_model
    * `Graph.__setattr__`
      * _add_block
        * get_block_cls: 递归地把Module转为ModuleBlock
        * `ModuleBlock.__init__`
          * ModuleBlock.set_origin
            * `ModuleBlock._origin = origin` (Module)
            * 对origin的sub modules, parameters, buffers递归调用get_block_cls
            * `ModuleBlock.__setattr__`

5 逻辑图的编译

计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算任务的编译,是将用户的特定语句操作转换为DAG图。oneflow中用Job描述逻辑的计算图。
不同于eager模式的动态图,静态图在开始执行前可以得到整个计算任务的所有信息,可以对DAG进行多轮优化。每轮优化都是输入一个Job、得到一个新Job。
最后,根据分布式环境配置,将逻辑图Job转换为物理执行的计算图Plan。在物理图中,一个op可能分布在多个节点/进程。

启动DAG计算需要调用Graph.__call__,这个函数的执行主要分以下几个步骤:

  • __call__

    • _compile if not _is_compiled

      • build_graph

        • __build_graph
      • finish_complie_and_init_runtime
    • __run

逻辑图编译主要在__build_graph中进行。finish_complie_and_init_runtime会继续做一些优化pass,然后构建物理图、初始化运行时Actor系统。__run会启动一次DAG的运算。

5.1 graph_build_context: 为逻辑图编译设置基本环境

__build_graph中的graph_build_context虽然只有一行代码,但却做了几件非常重要的事情。

首先在context作用域内设置全局的lazy_mode为True。在这个context作用域内,所有op都由LazyInterpreter解释执行。

其次,在JobBuildAndInferCtx作用域内,JobBuildAndInferCtx_Open调用类似如下C++代码

// oneflow/api/python/job_build/job_build_and_infer.h
// oneflow/core/job/job_build_and_infer_ctx_mgr.cpp

// 如前所述,LazyJobBuildAndInferCtxMgr 在 MultiClientSessionContext::TryInit 执行时初始化。
// LazyJobBuildAndInferCtxMgr mgr;
mgr.OpenJobBuildAndInferCtx(job_name);

OpenJobBuildAndInferCtx会新建一个Job对象、一个LazyJobBuildAndInferCtx对象。LazyJobBuildAndInferCtx负责根据用户定制的op等操作,修改Job。

5.2 __build_io:为计算图添加input和output Op

self.__build_io("input", graph_build_util.build_graph_input_arg, *args, **kwargs)

上面这行代码的作用是,对于用户传递给graph_mylinear(input)的input参数,针对其中的每个tensor都在逻辑计算图中插入一个FeedInputOp节点。也就是说,model的输入(比如样本tensor,具体参考4.3节),在静态图中也视为一个op操作。

__build_io内会用args(即input)和kwargs构造一个ArgsTree。示例代码中kwargs是空的。
然后遍历ArgsTree,对args和kwargs的每个tensor都调用传入的build_func,对于input来说,就是build_graph_input_arg。后面会看到,model的output也会调用__build_io,所以这个函数名的意思应该就是对model的输入、输出进行静态图的构图工作。

build_graph_input_arg内部会构造一个FeedInputOpExpr,提交给解释器执行。因为是在lazy作用域内,由LazyInterpreter解释执行,LazyInterpreter会将对应的op插入静态图。

附: build input时ArgsTree的内部结构
OneFlow源码阅读7:静态图与运行时

__build_io(input) 中 ArgsTree 的内部数据组织示意

  • _named_io_args: NamedArg

    • _value: tuple

      • [0]: NamedArg

        • _value: tuple of NamedArg

          • [0]: NamedArg

            • _value: args tensor from Graph.__call__
      • [1]: NamedArg

        • _value: empty kwargs from Graph.__call__

通过pdb命令可以查看变量: p args_tree._named_io_args._value[0]._value[0]._value.to_numpy()

5.2.1 将op添加到逻辑图

LazyInterpreter::ApplyImpl在执行时,GetCurInferCtx()返回的就是graph_build_context中OpenJobBuildAndInferCtx创建的那个LazyJobBuildAndInferCtx对象,这个对象负责逻辑图的构建。添加op的主要调用流程如下:

  • infer_ctx->AddAndInferConsistentOp
  • AddAndInferOp
  • ConstructOp
  • CheckAndConstructOp
  • NewObj

OperatorConf中,多种op配置共享op_type字段,protobuf oneof的op_type_case常量作为注册NewObj的key。
系统预定义的op在oneflow/core/operator下,例如UserOp。

AddAndInferOp将返回的Operator保存到LazyJobBuildAndInferCtx的字典中。后续的函数调用,主要是进行推导并修改静态图Job,使得各个节点构成一个DAG。

JobBuildAndInferCtx相关的类关系如下:
OneFlow源码阅读7:静态图与运行时

5.2.2 lazy tensor 和 eager tensor 的区别

LazyInterpreter::ApplyImpl的最后,会调用BuildTensor构造一个lazy tensor,作为build_graph_input_arg的返回值。所以__build_io返回的lazy_args是lazy tensor,它将替代eager的args(也就是用户输入的input)参与后续的计算图构建。

那么lazy tensor和eager tensor的区别是什么呢?我的理解是,eager tensor是要即时计算的,所以需要携带数据;而lazy tensor仅在静态图编译阶段用于推导,只需要描述性质的元信息,不需要类似样本那样的数据。同时,静态图编译是在lazy模式下运行,使用lazy tensor在各种检查校验时应该会更顺畅(?)。
后面会看到,静态图的运行期已经没有tensor的概念。运行期看到的只是更广义的Regst存储,可能代表tensor/blob,也可能是其它控制信息。静态图运行时的输入,应该是通过op直接读取tensor的blob(或者复用地址?)到regst;输出应该是op写到regst,通过blob构造eager tensor。

5.3 build: 将UserOp和FeedVariableOp添加到逻辑图

__build_graph中的self.build()会调用GraphMyLinear.build(),以及ModuleMyLinear.forward()。因为是在lazy模式下运行,matmul和add都会调用UserOpExpr重载版本的LazyInterpreter::ApplyImpl,进而调用AddAndInferConsistentOp进行构图操作。

需要说明的是,在引用Module的Parameter属性时(如weight/bias),会触发FeedVariableOp的构图操作、调用对应版本的LazyInterpreter::ApplyImpl。这个是怎么执行的呢?

__build_graph中,在进入lazy模式之前,先调用了_create_states_builder。其中self._state()返回所有Module的所有Parameter(包括子Module)。

state_block的类型是TensorBlock。所有的state_block的lazy_origin_builder().method都被设置为调用build_graph_state。

给build_graph_state设置个断点能让整个调用过程显形,主要的调用栈如下:

-> out = graph_mylinear(input)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(221)__call__()
-> self._compile(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(741)_compile()
-> _, eager_outputs = self.build_graph(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(759)build_graph()
-> outputs = self.__build_graph(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(864)__build_graph()
-> outputs = self.build(*lazy_args, **lazy_kwargs)
  /mnt/project/machine-learning/oneflow/oneflow/test.py(21)build()
-> return self.model(input)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(234)__call__()
-> result = self.__block_forward(*args, **kwargs)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(266)__block_forward()
-> result = self._origin.__class__.forward(self, *args, **kwargs)
  /mnt/project/machine-learning/oneflow/oneflow/test.py(11)forward()
-> return flow.matmul(input, self.weight) + self.bias
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(483)__getattr__()
-> p_state = self._get_from_states(name, "_parameters")
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(521)_get_from_states()
-> _s_block.try_build()
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(679)try_build()
-> self._lazy_origin_builder.try_build(self)
  /usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(627)try_build()
-> self.result = self.method()
> /usr/local/lib64/python3.6/site-packages/oneflow/framework/graph_build_util.py(227)build_graph_state()
-> op_name, var_conf_str, ["in_0"], ["out_0"]

这个调用过程比较容易困扰的是,执行对象会在Grpah、GraphMyLinear、ModuleMyLinear、ModuleBlock之间切换。
前面在讨论Graph的构造时已经提过,执行self.model(input)时,Graph.__getattr__返回的属性model是ModuleBlock对象,所以实际调用的是ModuleBlock.__call__。在这个函数内调用__block_forward,其中的_origin是ModuleMyLinear,进入到它的forward方法,执行到flow.matmul(input, self.weight) + self.bias时,会触发调用ModuleBlock.__getattr__,进而调用_get_from_states,调用TensorBlock.try_build()。这里执行的就是进入lazy模式之前设置的build_graph_state。从而增加一个FeedVariableOp到计算图。

为什么设置和调用会距离这么远呢?可能是因为,如果在引用weights等Parameter时再try_build,会不方便处理多个block共享一个tensor的情况。
再后面的步骤就是调用__build_io插入FetchOutputOp。也就是说,获取model的output也是一个op。

到目前为止,前向计算图就构建完成了。它的json表示可以参考附录。net.op是计算图的节点,通过input等属性可以看出节点之间的连接关系。
示例代码的前向计算图如下。从这个图可以看到,input、output、weights等都是op。
OneFlow源码阅读7:静态图与运行时

5.4 逻辑图优化

__build_graph中会调用CurJobBuildAndInferCtx_Complete对静态图进行多轮优化,对应的C++函数是LazyJobBuildAndInferCtx::Complete()。

这之后生成的Job是full_job。本文的示例代码比较简单,并不是典型的计算场景,其forwar和ful计算图l的拓扑是一样的。
到这里,逻辑图构建的主体部分就结束了。

随后会构建一个CNNGraph对象,对应的C++类型是NNGraph。这个对象将负责构建物理计算图Plan。它也是整个运行时的拥有者和维护者。这个对象析构时,整个运行时也会有序终止并释放资源。

5.5 物理图的编译

接下来就是执行finish_complie_and_init_runtime,其中的核心调用是self._c_nn_graph.complie_and_init_runtime(),对应的C++函数是NNGraph::CompileAndInitRuntime。
在这个函数中,JobCompleter().Complete()会继续对逻辑图做几轮修改优化,Compiler().Compile()将逻辑图转为物理图,并继续对Plan进行修改优化。

Plan的编译是在master节点进行的。master节点会将Plan通过gRPC推送给各个worker节点,worker节点从master拉取物理计算图。

之后调用NewRuntimeBuffers创建Buffer对象,Buffer应该是主要用于进程内的信息同步。

然后就准备初始化运行时了。

示例代码生成的compiled_job和物理图Plan的json参见附录。
最终生成的compiled逻辑图如下。框架自动插入了很多系统控制节点。
OneFlow源码阅读7:静态图与运行时

5.6 Plan的结构

示例代码输出的Plan json数据见附录。

Plan在逻辑上和compiled_job是等价的。这里主要关注task/op之间的关系。
Plan.task中的每个元素是一个task,其中的exec_sequence.exec_node对应job中的op,通常只有一个op(数组可以支持sub graph)。
exec_node.kernel_conf.op_attribute描述了op信息。其中op_conf包含op name信息。
kernel_conf.op_attribute.op_conf就是Job中的OperatorConf。

kernel_conf.op_attribute.arg_signature.bn_in_op2lbi体现了task/op之间的连接关系。
bn_in_op就是blob name in op,即op输入的blob name。

以System-AutoTick-DstSubsetTick_21为例

{
  "out": {
    "op_name": "System-AutoTick-DstSubsetTick_21",
    "blob_name": "out"
  },
  "in_0": {
    "op_name": "System-EagerCriticalSection-Interface-End-Tick-19",
    "blob_name": "out"
  },
  "in_1": {
    "op_name": "System-AutoTick-SrcSubsetTick_20",
    "blob_name": "out"
  }
}

exec_node.bn_in_op2regst_desc_id在task层面体现了连接关系。这个map中的key表示输入输出,value是register id。

{
  "out": "29",
  "in_0": "27",
  "in_1": "28"
}

task.produced_regst_desc描述了对应task生产的register,consumer_task_id是消费者,
produced_regst_desc.out.regst_desc_type.data_regst_desc.lbi2blob_desc.lbi就是这个register的logic blob id。
task.consumed_regst_desc_id描述了对应task消费的register信息

6 运行时的初始化

NNGraph::CompileAndInitRuntime中,new Runtime这行代码会初始化运行时。主要做的事情包括:

  • 创建Thread
  • 通知Thread创建Actor,Actor会创建Regst和Kernel
  • 给没有输入的source_tasks发送启动信号kStart

6.1 Runtime创建Thread

在Runtime的构造函数中,DumpThreadIdsFromPlan会将Plan中属于当前进程的task的thread id存入thread_ids_变量。AddThreads创建这些Thread对象。
Thread在构造时会创建一个物理线程,线程执行的是PollMsgChannel方法,Thread就是在这里持续等待需要处理的新消息。
Thread只处理两类命令消息:线程终止消息,创建Actor的消息。其它消息交给Actor::ProcessMsg处理。

6.2 Runtime通知Thread创建Actor

在Runtime的构造函数中,tasks被分为两类:source_tasks和other_tasks。在示例代码中,source_tasks是没有输入边的task。
从代码逻辑看,在Plan proto中,task的consumed_regst_desc_id字段是一个map。如果这个map的所有key都是in_ctrl,这个task就是source_tasks。

示例代码的source_tasks列表如下:

  • System-Src-WaitAndSendIds_16
  • System-AutoTick-AppendDeviceTick_9
  • System-EagerCriticalSection-Interface-End-Tick-19
  • System-EagerCriticalSection-Interface-End-Tick-25

Runtime调用HandoutTasks函数会给ActorMsgBus发送构建Actor的kConstructActor消息。

6.3 ActorMsgBus和Thread的消息处理

从接口看,ActorMsgBus负责消息的发送(Actor通过ActorMsgBus发送消息),Thread负责消息的接收(普通消息转给Actor处理)。

相关实体的协作关系如下

  • Actor是调度的基本单元。

    • actor_id就是task_id,是在编译Plan时就确定的。task是编译器概念,actor是对等的运行时概念。
    • task_id有特定的编码格式,从中可以解析出machine_id和thread_id。
    • 在跨网络的整个物理图Plan中,actor id相当于地址,通过它可以定位唯一的actor实体。
  • Actor之间通过ActorMsgBus::SendMsg进行消息通讯。

    • ActorMsg包含源和目的actor id。
    • 如果是进程内通讯,ActorMsgBus将消息发给Thread,Thread转给Actor处理消息。
    • 如果是跨进程消息,ActorMsgBus通过CommNet发送消息,接收方的CommNet应该会根据actor id获得线程id,从ThreadMgr查到Thread,将消息交给Thread处理。
  • Thread通过EnqueueActorMsg接收发给自己管理的Actor的消息。

    • 如果是本线程内的actor之间的消息,直接将消息放到local队列。否则放到Channel的异步消息队列。
    • Thread::PollMsgChannel会在构造Thread时启动的物理线程中接收Channel的消息,普通消息转给Actor处理。
  • Actor收到消息后,负责LaunchKernel执行计算,计算结束后通过ActorMsgBus通知上下游Actor。

这些对象之间的消息传递关系如下图所示
OneFlow源码阅读7:静态图与运行时

6.4 Runtime激活Actor系统

到这里,Actor之间的协作关系基本清楚了。但是整个Actor系统还处于静止待命的状态。Actor是消息驱动的,总要有消息触发才能让它们转起来。

Runtime在构造函数中,给没有输入依赖的source_tasks发送kStart消息,让Actor系统处于可运行状态。

从DAG的角度看,只要激活source_tasks就行,这些节点给下游发送消息,自然会触发所有Actor的执行。

但这个kStart消息并没有启动计算图的一轮计算。因为这是在Runtime的构造函数中,还处于运行时初始化阶段。发送kStart更像是让Actor系统处于激活状态。我们在讨论完Actor之后,再看看计算图的每一轮计算是怎么触发的。

7 Actor

7.1 Actor的创建

Thread在创建Actor时,会先尝试创建为LightActor,如果不成功,再尝试用预先注册的工厂创建Actor。

有几种TaskType可以用于LightActor:

  • kNormalForward,比如matmul、add等user op。
  • kCopyHd
  • kTick
  • kCollectiveBoxingGeneric

目前大约有20多种Actor的子类型。其它Actor类型根据TaskType预先注册。例如WaitAndSendIdsActor。

示例代码的各个节点对应的actor类型参见附录。

Actor相关的类关系如下(包含关系只是表示可以访问到相关信息,并不意味着创建或着拥有该类型对象)
OneFlow源码阅读7:静态图与运行时

7.2 Actor的初始化

Actor的构造函数一般都是空的,构建之后需要执行Init函数进行初始化。

LightActor继承自ActorBase,不是Actor的子类,有自己的Init函数实现。这里只讨论Actor的初始化。

在Actor::Init中,首先调用ConstructKernel创建kernel实例。和Operator类似,kernel也是以OpTypeCase作为注册的key,例如WaitAndSendIdsKernel。一个Actor通常只有一个kernel。

之后调用NewRegsts创建Regst。Tensor是用户侧的概念。对应的运行时概念是Regst,它持有Kernel需要读写的内存。Regst的概念比Tensor更宽泛,比如框架自动添加的控制Op也会用到Regst。
Actor将自己创建的Regst保存到produced_regsts_。

TakeOverNaiveConsumed只记录需要消费的regst id,但并不push到consumed_regsts_

TakeOverNaiveProduced既记录生产的regst id,也push到naive_produced_rs_。这种区别是为了首次执行计算时,actor能顺利执行。后面分析Actor的消息处理时会再回过头来讨论一下。

调用InitBnInOp2BlobInfo会初始化BlobInfo。
之后就是调用VirtualActorInit,这里允许各个Actor子类定制自己的初始化逻辑。通常会调用OF_SET_MSG_HANDLER宏设置Actor的消息处理函数。

7.3 Actor的消息处理

我们以WaitAndSendIds为例,观察一下Actor的消息处理机制。其Actor是WaitAndSendIdsActor,kernel是WaitAndSendIdsKernel。

之所以选择这个例子,一是这个Actor比较简单;二是这是一个典型的source task,想看一下计算图是怎么被触发启动计算的。

Thread收到的消息如果不是kStopThread或kConstructActor,就调用Actor::ProcessMsg,将消息转给Actor处理。
ProcessMsg函数只是简单的将消息转给handler处理。

WaitAndSendIdsActor::VirtualActorInit中,handler被设置为HandlerWaitToStart。
Runtime的构造函数中,发送的第一批消息是给source_tasks的kStart消息,这个消息就由HandlerWaitToStart函数处理。

HandlerWaitToStart校验消息类型后,将handler设置为HandlerNormal(这也是大部分Actor的默认handler),然后调用ProcessMsg,实际就是调用新设置的handler HandlerNormal。

HandlerNormal中,如果是kCmdMsg,只允许是kStart。通过消息类型校验后,会直接调用ActUntilFail。

7.4 Act执行的条件

ActUntilFail中,Act方法是各个子类自己实现的,一般主要是启动kernel计算。
但是在执行Act之前,需要先确认:

  • Act执行依赖的数据是否都已经就绪?(IsReadReady)
  • Act生产出来的数据,消费方是否已经用完、并收到ack消息确认?(IsWriteReady)

Actor有4个与此相关的成员变量

  • RegstSlot naive_produced_rs_;
  • RegstSlot inplace_produced_rs_;
  • RegstSlot naive_consumed_rs_;
  • RegstSlot inplace_consumed_rs_;

xx_produced_rs_存储的是当前Actor的下游consumer返回的、已经使用完毕的ack regst信息。(当前Actor生产的Regst存储在produced_regsts_中。)

运行时在初始化的过程中,所有Actor都没有运行过,任何Actor都不可能收到ack消息,所以在Actor初始化时,要预先填充xx_produced_rs_,这样才能保证Actor在首次运行前是WriteReady的,才能顺利启动执行。

xx_consumed_rs_存储的是上游依赖发来的数据。它不需要预先填充。因为source_tasks没有输入依赖,自然就是ReadReady的;而xx_produced_rs_在初始化时的预先填充又保证它是WriteReady的,所以source_tasks可以直接运行。source_tasks的输出消息发给下游,下游也会变为ReadReady,而下游在初始化后也保证是WriteReady的。整个Actor系统就可以这样运转起来了。

7.5 Actor上下游之间的通知机制

Act执行完毕后,需要将结果数据发给下游consumer。以 WaitAndSendIds 的 Naive Produced 为例,ActUntilFail中的调用流程如下:

  • AsyncSendNaiveProducedRegstMsgToConsumer

    • VirtualAsyncSendNaiveProducedRegstMsgToConsumer

      • HandleProducedNaiveDataRegstToConsumer

        • HandleRegstToConsumer

          • EnqueueAsyncMsg

            • 如果目标线程是当前线程,ActorMsgBus::SendMsg
            • 否则,将消息加入async_msg_queue_
          • 增加 total_reading_cnt_(这个变量表示已经发消息给下游、但未收到的ack数量)
        • naive_produced_rs_.PopFrontRegsts
    • AsyncSendProducedCtrlRegstMsgToConsumer

注意naive_produced_rs_.PopFrontRegsts会将Regst指针从队列中删掉,相应的计数减1。
而在Actor::HandlerNormal中处理收到的kRegstMsg消息时,如果是consumer发来的ack消息,会调用TryUpdtStateAsProducedRegst,将Regst再添加到 naive_produced_rs_ 中,以保证当前Actor在收到所有ack后是WriteReady的;同时递减total_reading_cnt_。

Actor对依赖的上游消息的处理是类似的。通过以下函数调用给上游发送ack消息、通知数据已经用完,可以继续更新了:

  • AsyncSendNaiveConsumedRegstMsgToProducer
  • AsyncRetInplaceConsumedRegstIfNoConsumer
    在Actor::HandlerNormal中收到kRegstMsg消息后,将消息添加到consumed_rs_,以保证当前Actor在收到所有依赖数据后是ReadReady的。

LightActor有自己的消息处理机制,大致原理应该是差不多的。

7.6 Act执行的动作

根据上述讨论,Actor收到kRegstMsg后也会进入ActUntilFail执行。如果读写都是Ready,就执行Act。以WaitAndSendIdsActor为例,主要调用链路如下:

  • AsyncLaunchKernel
  • ek.kernel->Launch,启动Kernel计算
  • Forward
  • ForwardDataContent
  • buffer->Pull
  • 给regst的存储地址mut_dptr赋值

buffer->Pull会等待条件变量的通知。现在,看上去所有Actor都已准备就绪,只等发令枪一响就开跑了。

8 启动静态图的计算

Graph.__run会扣动发令枪的板机,启动计算图的一轮计算。
主要调用流程如下:

  • RunLazyNNGraph
  • builder->LaunchLazyJob
  • LaunchLazyJobInstructionType
  • Buffer::Push

这里的Buffer::Push就是WaitAndSendIdsKernel在等待的起跑信号。

9 运行时的退出机制

整个运行时包含很多对象和资源,安全有序的退出是庞杂而又细致的工作。这里仅以WaitAndSendIds为例,从一个侧面观察一下运行时的退出机制。

运行时的退出始于NNGraph对象的析构。

9.1 Actor的退出

  • NNGraph在析构时,会关闭所有的Buffer对象。
  • Buffer在关闭时,会设置is_closed_ = true并通知所有监听者。但是Pull会继续处理完已经提交的计算。

    • 所以,Buffer应该是主要用于进程内的通信和异步协调的一个类。
  • WaitAndSendIdsKernel这时候正在等待新一轮计算开始,结果收到Pull返回的kBufferStatusErrorClosed。
  • WaitAndSendIdsActor::IsCustomizedReadReady以后就一直返回false,IsReadReady也返回false。

    • 这之后,ActUntilFail只会执行异步消息发送(不再进入while循环)
  • WaitAndSendIdsActor::HandlerNormal仍然会处理其它Actor发来的消息。但因为IsCustomizedReadReady返回false,会进入AsyncSendEORDMsgForAllProducedRegstDesc执行。它会给每个下游发送kEordMsg消息。
  • Actor在收到上游发来的kEordMsg消息后,递减remaining_eord_cnt_。

    • remaining_eord_cnt_被初始化为Actor的输入regst的数量。
  • total_reading_cnt_是当前Actor生产的、已经发给consumer、但尚未收到ack的消息数量。

    • Actor目前仍可以正常接收consumer发来的ack消息。
  • 当上述2个变量都为0时,意味着所有上游都发出了kEordMsg消息,也收到了所有下游的ack消息。Actor就给Thread返回1。

    • 如果上述两个变量有不为0的,就修改handler,由HandlerZombie处理后续收到的消息。
  • Thread收到Actor返回的1后,将它从自己的存储中删除,并递减运行Actor的数量。

9.2 Thread的退出

  • NNGraph重置runtime_导致运行时对象被析构。
  • Runtime删除所有Thread。
  • ThreadMgr给所有Thread发送kStopThread消息。同时,重置指针导致Thread析构。
  • Thread的物理线程退出PollMsgChannel循环。
  • Thread等待物理线程结束,关闭channel。

10 分布式场景的静态图

分布式的compile_job、物理图Plan和单机场景有明显变化。
比如,每个进程都有一套WaitAndSendIds等控制节点。这也容易理解,因为每个节点都要执行__runBuffer::Push/Pull,都要启动本进程的Actors执行计算。
matmul和broadcast_add等user op也会在两个节点进行计算。
OneFlow源码阅读7:静态图与运行时

10.1 示例代码

启动方式参考Global Tensor的官方文档。

import oneflow as flow
import oneflow.nn as nn

P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)

class ModuleMyLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(flow.randn(in_features, out_features,
            placement=P0, sbp=flow.sbp.broadcast))
        self.bias = nn.Parameter(flow.randn(1, out_features,
            placement=P0, sbp=flow.sbp.broadcast))

    def forward(self, input):
        return flow.matmul(input, self.weight) + self.bias

linear_model = ModuleMyLinear(4, 3)

class GraphMyLinear(nn.Graph):
    def __init__(self):
        super().__init__()
        # ModuleBlock
        self.model = linear_model

    def build(self, input):
        # ModuleBlock.__call__
        return self.model(input)

graph_mylinear = GraphMyLinear()
input = flow.randn(5, 4, placement=P0, sbp=flow.sbp.split(1))
out = graph_mylinear(input)
print(out)

11 附录

11.1 断点

11.1.1 Python断点示例

# python3 -m pdb test.py
break test.py:25
break oneflow/nn/graph/graph.py:221
break oneflow/nn/graph/graph.py:741
break oneflow/nn/graph/graph.py:745
break oneflow/nn/graph/graph.py:759
break oneflow/nn/graph/graph.py:828
break oneflow/nn/graph/graph.py:777
break oneflow/nn/graph/graph.py:1066
break oneflow/nn/graph/graph.py:1133
break oneflow/framework/graph_build_util.py:227

11.1.2 C++断点示例

启动命令

source /mnt/oneflow/build/source.sh
gdb --args python3 /mnt/oneflow/test.py
# set breakpoints
# run

断点示例

set breakpoint pending on
break oneflow::ActorMsg::BuildEordMsg
break oneflow/core/common/buffer.h:80
break oneflow::(anonymous namespace)::CheckAndConstructOp
break oneflow::WaitAndSendIdsActor::Act
break oneflow::WaitAndSendIdsActor::HandlerWaitToStart
break oneflow/core/lazy/actor/light_actor.cpp:452
break oneflow/core/lazy/actor/light_actor.cpp:485
break oneflow::ForeignInputKernel::ForwardDataContent
break oneflow::vm::LaunchLazyJobInstructionType::Compute

11.2 静态图的json表示

  • forward
  • full
  • compiled
  • plan

11.3 actor type

naive_actor

System-AutoTick-AppendDeviceTick_9
System-AutoTick-DstSubsetTick_12
System-AutoTick-DstSubsetTick_21
System-AutoTick-DstSubsetTick_27
System-AutoTick-Prepend-DeviceTick_7
System-AutoTick-SrcSubsetTick_20
System-AutoTick-SrcSubsetTick_26
System-AutoTick-SrcSubsetTick_8
System-AutoTick-Tick_11
System-AutoTick-Tick_13
System-EagerCriticalSection-Callback-23
System-EagerCriticalSection-Callback-29
System-EagerCriticalSection-Interface-Begin-Tick-18
System-EagerCriticalSection-Interface-Begin-Tick-24
System-EagerCriticalSection-Interface-End-Tick-19
System-EagerCriticalSection-Interface-End-Tick-25
System-EagerCriticalSection-Wait-22
System-EagerCriticalSection-Wait-28

light_actor

_GraphMyLinear_0_input.0.0_2
_GraphMyLinear_0_output.0.0_2
model.bias
model-broadcast_add-1
model-matmul-0
model.weight
System-AutoTick-SinkTick_15
System-SyncAllRanksSinkTick_14

wait_and_send_ids_actor

System-Src-WaitAndSendIds_16

call_back_notify_actor

System-Sink-CallbackNotify_17

12 参考资料

  • oneflow v0.8.0
  • OneFlow框架的系统设计(上篇)
  • OneFlow框架的系统设计(中篇)
  • OneFlow框架的系统设计(下篇)
  • 一个Job在OneFlow中的执行过程—上篇
  • 一个Job在OneFlow中的执行过程—中篇
  • 一个Job在OneFlow中的执行过程—下篇
  • 静态图模块 nn.Graph
  • OneFlow系统设计
  • torch.nn.Module
版权声明:程序员胖胖胖虎阿 发表于 2022年11月25日 上午6:00。
转载请注明:OneFlow源码阅读7:静态图与运行时 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...