开了掘金的专栏: https://juejin.im/post/5c2dd18af265da61285a3e08
因为掘金的 Markdown 编辑器还是很好用的。后面团队里面别的小伙伴和老司机们的文章也会在上面陆续发布。欢迎关注。知乎的专栏得公司申请认证,还在走流程。
《 Scala 实用指南》快要第二次印刷了,勘误还是太少,希望 V 友们如果买了,多多反馈。另外,我在知乎的 Scala 专栏已经写了很久了,作为图书的补充,欢迎大家阅读。Enzyme SQL 就是我翻译完《 Scala 实用指南》用 Scala 编写的,在我的序(序和前面的章节在异步社区和微信阅读都是免费的)里面,大家可以看到相关信息。
Enzyme 是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于 Spark Catalyst 实现,Enzyme SQL 在查询层面 和 Spark SQL 完全兼容。至于 Dataframe,在 Enzyme 中有对应的 Protein。在 API 的层次上,Protein 和 Spark Dataframe 几乎完全一致。
Enzyme SQL 目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用 SQL 作为一种加工变量的 DSL,提供在离线和实时两个平台上的一致语义。
为什么要使用 SQL 呢?首先,自研 DSL 需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的 DSL 最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL 作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。
Enzyme SQL 引擎极致的性能表现和非常低的 CPU 占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。
Enzyme 设计之初就是以兼容 Spark SQL 为目标的,故而在使用上,和 Spark SQL 的 API 大体是一致的。EnzymeSession 即 SparkSession,Protein 即 Dataframe。
我们从构建一个 Protein 数据集开始:
// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)
// construct a protein from rows and schemas
val schema = StructType(Seq(
StructField("x", LongType),
StructField("y", StringType),
StructField("z", DoubleType),
StructField("in", IntegerType)
))
val rows = Seq(Row(1L, "234L", 1.1, 12),
Row(2L, "23L", 23.4, 4245),
Row(2L, "65L", 5244.2, 234),
Row(null, "7L", 245.234, 5245),
Row(4L, "7L", 245.234, 5245))
val df = new Protein(session, rows, schema)
这样的一个数据集可以直接展示:
> df.show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
|null| 7L|245.234|5245|
| 4| 7L|245.234|5245|
+----+----+-------+----+
如果要使用 SQL,首先我们要把这个数据集和一个表名关联起来:
> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
|null| 7L|245.234|5245|
| 4| 7L|245.234|5245|
+----+----+-------+----+
上面的代码中session.sql()
的结果还是一个 Protein。除了使用 SQL,我们还可以使用 Protein 里面丰富的 API:
> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
|null| 7L|245.234|5245|
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
| 4| 7L|245.234|5245|
+----+----+-------+----+
> df.sort("x").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
|null| 7L|245.234|5245|
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
| 4| 7L|245.234|5245|
+----+----+-------+----+
更多用法的细节可以查看 Spark SQL 的文档,也可以查看 Enzyme 的文档。
Enzyme 基于 Spark Catalyst 实现,而 Catalyst 对标的开源项目是 Apache Calcite。Apache Phoenix 和 Apache Hive 等众多项目都在使用 Calcite。因为我们的目标是兼容 Spark SQL,自然而然选择了 Catalyst,作为 SQL 的解析器、逻辑计划的执行器和优化器。
上面的层次结构简明地概括了一个 SQL 从最原始的 SQL 文本,到最后执行的各个阶段。其中加粗的部分是 Enzyme 中所实现的,未加粗的部分是 Catalyst 所提供的功能。
解析,就是用 Antlr4 将 SQL 文本变成一棵 AST 树,这个 AST 树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道*
所表示的字段究竟是哪些。
分析,就是结合 Catalog 中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如*
)和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出 AnalysisException。
优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst 中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过 Catalyst 提供的接口,将自己研发的优化规则加入其中。这里的优化就是 RBO,基于规则的优化。
最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的 CBO,基于代价的优化。
SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID
我们用上面这个 SQL 来详细了解一下上述各个阶段。
Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
:- 'UnresolvedRelation `employee`
+- 'UnresolvedRelation `department`
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- SubqueryAlias employee
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- SubqueryAlias department
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
我们看到*
已经被展开成了四个明确的字段,而且每个字段都有明确的 ID 标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对 Spark SQL 做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- SubqueryAlias employee
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- SubqueryAlias department
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
因为这是一个 inner join,所以这里的一个优化点其实是在做 join 之前,把 join key 为 null 的行过滤掉。
我们模仿 Spark SQL 中 SparkPlan 的实现,提供了简化的 EnzymePlan:
abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
def iterator: Iterator[InternalRow]
override def output: Seq[Attribute]
...
}
trait LeafExecNode extends EnzymePlan {
override final def children: Seq[EnzymePlan] = Nil
}
trait UnaryExecNode extends EnzymePlan {
def child: EnzymePlan
override final def children: Seq[EnzymePlan] = child :: Nil
}
trait BinaryExecNode extends EnzymePlan {
def left: EnzymePlan
def right: EnzymePlan
override final def children: Seq[EnzymePlan] = Seq(left, right)
}
在这个代码片段中,EnzymePlan 是核心,其中 output 表示一个物理计划的节点上结果集的元数据信息,而 iterator 则是调用这个物理计划节点的入口。我们看到有三类物理计划:
Enzyme 中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是 Union 或者 Join,而只有一个输入源的就是 Projection,Filter,Sort 等算子。
上一节中优化之后的逻辑计划可以生成这样的物理计划:
HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
, BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
: +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
employee, catalog@60dcf9ec
+- FilterExec isnotnull(DepartmentID#4L)
+- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
department, catalog@60dcf9ec
计算通过在根节点调用 iterator 方法,层层回溯:
HashJoinExec.iterator
+ FilterExec.iterator
+ LazyLocalTableScan(employee).iterator
+ FilterExec.iterator
+ LazyLocalTableScan(department).iterator
首先,我们需要定位性能瓶颈。JVM 生态中有很多做 Profiling 的工具。Enzyme 在优化过程中,使用的是 JDK 中自带的 jmc 命令和 FlightRecord。通过 jmc 的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。
Spark 的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料 2 ):
对于 Enzyme 的使用场景,动态代码生成并不一定有性能优化的效果,我们使用 JMH 做基准测试,将一部分使得性能变差的代码生成关闭掉。
数以千计的 SQL 会生成大量 Java 类,在引擎中编译并缓存,会带来一些内存上的占用和 CPU 的消耗,也是我们做取舍的其中一个原因。
我们做的最主要的缓存就是从 Unresolve Logical Plan 到 Physical Plan 的生成。为什么不是直接从 SQL 到 Physical Plan 呢?因为 SQL 解析的开销实际上很小,而且略有差异的 SQL 所生成的 Unresolved Logical Plan 可能是一模一样的。
在物理计划的缓存中,还有两点需要注意:
只有这样,我们的缓存才是有效的、正确无误的。另外,在表的 schema 发生改变的时候,我们还需要让所缓存的相关物理计划失效。
Catalyst 中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks 在 Spark Summit 上做过一个题为 A Deep Dive into Spark SQL's Catalyst Optimizer 的讲座,其中有细节的介绍。
具体的接口如下:
spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。
对于第三点,我们想做的实际上是让 Enzyme 和其他生态更好地结合。比如如何将 Enzyme 运用到 Spark Streaming 或者 Flink Streaming 中,如何在 Spring Boot 中更加方便地使用 Enzyme,如何在机器学习中使用 Enzyme。
忍冬,挖财数据研发工程师,负责 Spark SQL 在挖财的落地,自研了兼容 Spark SQL 适用于单机小数据集的 Enzyme SQL 引擎。译有《 Scala 实用指南》,业余时间是 GNU TeXmacs 项目的维护者之一。
1
sadhen OP 忘记贴我的知乎专栏了: https://zhuanlan.zhihu.com/p/50189343
这篇是索引,主要是学习 R 大: https://zhuanlan.zhihu.com/p/25042028 我的索引都是 Scala 相关的,大部分都比较浅显易懂,只学 R 大的形式,学不了内涵,欢迎正在或者希望从事大数据研发的朋友们阅读,巩固好扎实的 Scala 基础。 对于 Scala 有任何问题,也欢迎在评论去提问。 |
2
miaoever 2019-01-05 05:47:59 +08:00
有点不太明白的是,在 Enzyme 生成 Physical plan 后的 runtime,是直接在本地 code gen 然后运行生成的代码?
|
3
sadhen OP @miaoever 大佬,你司的 Presto 在 Codegen 这块做得更加成熟。原理都是一样的,可以看这里 https://zhuanlan.zhihu.com/p/53469238
|