taskflow 是 openstack 未确保冗长的系列操作能正确执行的工作流框架
主要应用场景有如 Cinder 的 create volume 这般复杂、冗长、容易失败, 却又要求保持数据与环境一致的业务逻辑.
由于适用场景非常多,例如游戏行业的常见操作
停服 备份数据库 升级数据库 升级应用 启动服务器
非常适合用工作流引擎来确保执行
这个由 openstack 编写的 taskflow 国内相关介绍非常少,官方用户指南内容太多,导致学习这个框架花费需要花费非常多的时间
这里将会较为详细的介绍 taskflow 的代码和使用
百度有一篇中文的相关介绍,作者对 taskflow 还是很熟的,但是写得太简要,直接看感觉明白了实际还是一头雾水
要看懂 taskflow,需要先熟悉两个最基本的概念
工作流 这玩意最直观的一个应用就是 oa 里审批,上面我们说的游戏业的常见操作也可以作为例子
停服 备份数据库 升级数据库 升级应用 启动服务器
上述操作也是一个工作流, 停服成功才能备份,备份成功才能升级 升级应用和备份可以同时进行,升级数据库和升级应用都成功了才能启动服务器 升级失败还要回滚
有限状态机 推举导读. 工作流就是用状态机来实现流程控制和执行任务的
上面两条光看懂了还不行,还要写过一轮深入了解,taskflow 使用了 openstack 的通用状态机项目,所以要能看 taskflow 先要熟悉 openstack 的状态机项目 Automaton
https://pypi.python.org/pypi/automaton/
为了熟悉状态机项目,我用状态机写了一个tailfile项目,作用就是实现 tail -f 的效果,倒读 N 行和判断文件是否变化都用到了状态机,
因为状态和事件都比较少,用状态机写这些功能有点杀鸡用牛刀,主要是为了熟悉状态机才专门用 automaton 来写
熟悉玩 automaton 以后,可以开始来看 taskflow(基于最新版本 2.14)了,
首先,因为 taskflow 作为通用项目而不是 openstack 内部项目,所以 taskflow 没有使用 oslo 中的一些通用工具
而且里面同样为了兼容写了一些复杂的接口还用了 futurist 实现异步,我为了和自己的其他几个项目统一起来,基于 taskflow2.14 生成了一个简化过的项目
https://github.com/lolizeppelin/simpleflow
代码变化了几个如下部分
因为极大的简化了 futurist 和删掉了 backends 层所以比较好读一些,读代码的时候建议看我简化后的项目
我们先来看 taskflow 的几个主要单位
Engine
Engine 是 taskflow 是启动口, 主要工作 创建状态机, 循环状态机, 在指定状态下通过 Executor 执行任务
Engine 分为好几种 work based 的 Engine 比较特殊我们不看,直接看 action engine
几种 action engine 其实没有什么区别,通过 Executor 分为
并行引擎的优势是,当个任务没有顺序关联的情况下可以同时执行多个任务 当然,引擎不影响任务之间的顺序关系,除非你想强制一个一个任务执行,否则都应该使用并行引擎
Executor 的实现可以参考我简化过的futurist 因为是用 eventlet 实现的,所以需要熟悉 eventlet
Atom 明天继续
1
qq583708076 2017-09-06 20:15:52 +08:00
赞
|
2
lolizeppelin OP 原来超过一定时间不能编辑的....
2. Executor 前面说了,Engine 会通过 Executor 执行任务,因为如果 Engine 直接执行任务的话,整个状态机的循环会受到正在执行的任务的影响,所以包了一层 Executor 来执行具体的任务(当然具体代码里对 Executor 的应用会更复杂一点,为了扩展和异常处理包了 3 层) 在 taskflow 的源代码中 Executor 是通过 futurist 库来实现的,而 futurist 又是基于 futures 的,这个库内部实现还是比较复杂的,如果没用过对应库的,建议直接参考我简化的[futurist]( https://github.com/lolizeppelin/simpleutil/blob/master/simpleutil/utils/futurist.py),因为是用 eventlet 实现的,所以需要熟悉 eventlet. 具体的任务代码(比如备份数据库什么的)在一般情况下可以不处理异常,因为执行任务的代码通过 except Exception 捕获了任务的所有异常. 特殊异常就是 CancelledError,这个异常是调度到已经取消任务时由 futurist 抛出,在读代码的时候需注意下这个的特殊处理 3. Scheduler 这个没什么好说的,Executor 的封装的最上层,最后执行会落实到具体的 Executor 上 4. storage 这个是存储接口,后面说到 flow 的时候会详细讲到,storage 的初始化在 Engine 中,一个功能是数据存储的接口,一个功能是作为 flow 的外层封装 4. Runtime 与 machine 在看这个之前,如果你还不熟悉状态机,建议先拿前面说的 automaton 练练手,如果已经熟悉状态机但是还没看过 automaton 代码的,建议去看看 automaton 的代码 machine 就是 Engine 中循环的(automaton)状态机了,一个 engine 只运行一个状态机,初始化代码在 builder.MachineBuilder,MachineBuilder 又是在 Runtime 中调用生成 machine 的,我们先别管 Runtime,先理解一下 taskflow 的状态机 taskflow 状态机并不复杂,但还不熟悉 taskflow 的时候很容易被高懵.因为 taskflow 用到 networkx 这个图库,而状态机其实就是一个有向图,所以一开始看的时候,很容易以为 taskflow 的状态机会非常复杂要看懂图的相关代码才能搞明白,但实际情况是 taskflow 的状态机和图无关!因为 taskflow 状态机的状态很少不需要用图来解决状态循环 那么 taskflow 为什么要用到图库呢,在解决这个疑问前我们先抛开 taskflow,自己用状态机设计一个解决前面——"停服 备份数据库 升级数据库 升级应用 启动服务器" 的工作流 1. 首先定义停服状态和对应停服状态执行的代码 2. 定义停服成功的返回,失败的返回,定义进入停服状态的 event (这个是起始时间,event 就是 start ) 3. 定义备份数据库状态对应备份执行代码 4. 定义进入备份状态的 event (前面的停服成功) 5. 定义备份成功和备份失败的返回,到目前还简单,备份失败大不了多备份几次直到成功,失败了整个状态机终止都可以影响不大 6. 定义升级数据库状态对应备份执行代码 7. 定义进入升级状态的 event (前面的备份成功) 8. 定义升级成功和备份失败的返回,这里开始坑了,升级失败要回滚了 9. 发现少了回滚升级失败的状态定义.....增加升级失败回滚失败的定义 ...... 回滚升级数据库失败....升级应用是失败...回滚升级应用是失败.....启动失败 设计下来你发现没几个步骤。要定义的状态就越来越多...这也就是状态机复杂以后和图有关的原因了 taskflow 非常巧妙的避免了复杂化状态机,taskflow 的设计的状态机可以简单的理解只处理 2 个状态就好 开始....找到任务-执行任务-找到任务....执行任务...终止 执行任务就是调用 Executor, 至于找下一个任务的工作,就是封装了图库的 flow 的工作了.这样设计状态机状态就很少,具体的状态可以看 MachineBuilder 的注释中有对应表格,对应状态目前粗看一下即可,知道哪个状态是找任务、哪个状态执行任务就可,有些状态涉要看了后面的 retry 相关才比较好理解,至于 flow,这个我们在后面说明 回头来看 Runtime,MachineBuilder 是由 Runtime 生成的,状态机的有些 callback 最终执行的 Runtime 中的函数,里面会有一些嵌套和封装, Scheduler 的封装就在 Runtime 中,Runtime 可以简单理解为状态机调用其它注入 Scheduler、storage 接口调用的中间件,Runtime 在整体理解 taskflow 的的时候可以不用细看 第一篇完...请看下一篇介绍 flow atom task retry |
3
lolizeppelin OP 好别扭....有点文字问题都不能修正....不想发了....
|
4
revol 2017-09-07 14:19:29 +08:00
赞,可以发到自己的博客
|
5
lolizeppelin OP |
6
zhujinhe 2017-09-29 17:52:43 +08:00
class CallJoe(task.Task):
def execute(self, joe_number, *args, **kwargs): print("CallJoe args", args) print("CallJoe kwargs", kwargs) print("Calling joe %s." % joe_number) 这种定义的类, 想实现类似: t = CallJoe() t.execute('13911111111', "foo", "bar", baz="value_baz", qux="value_qux") 找了好久也没摸索出怎么把这个 非关键字变长参数传进去. 还请楼主指教. |
7
lolizeppelin OP 不行
为什么非要变长呢 打包到 list 或者 dict 里不好嘛 |
8
lolizeppelin OP 具体你看反射出参数的部分怎么实现的就知道为什么不行了
|