这篇文章从 org.apache.flink.table.examples.java.StreamSQLExample 这个简单的例子分析 Flink SQL 的一个简单的执行流程,也算 Flink SQL 执行流程初步的入门,我们先从整体的执行框架了解一个整体流程,方便之后我们有机会对 Flink SQL 进行修改先有一个简单认识
Apache Calcite 是什么?
说到 Flink SQL 我们就不能不提 Apache Calcite 这个项目,它是一个通用的动态数据管理框架,可以以统一的 SQL 查询语言进行数据的管理,目前很多大数据软件例如:Hive,Drill等都基于 Calcite 做 SQL 解析等工作
和大部分SQL执行流程一样,主要可以分成四个阶段:
- Parse:语法解析阶段,将用户传入的 SQL 语句转化为 SqlNode
- Validate:语法校验阶段,通过类似Catelog等进行元数据的合法验证,此时还是 SqlNode
- Optimize:执行计划优化阶段,将 SqlNode 转化为 RelNode,并利用规则优化器进行等价转化等操作,最后得到优化过后的执行计划
- Execute:将逻辑查询计划,生成可执行的代码,提交运行
Flink SQL 的执行过程
通过上面,我们可以大致的有个概念,就是 SQL 的解析到最后的执行,可以大致的分为四个阶段,接下来我们这四个阶段是怎么和映射到Flink上面的,我们先看下简单的 StreamSQLExample
1 | // union the two tables |
对于 Flink(这篇文章基于1.10.0,BlinkPlanner)来说,目前分为两个 Planner,默认情况如果不显示设置默认使用的是 OldPlanner,如果需要使用BlinkPlanner,需要显示的调用 useBlinkPlanner,BlinkPlanner 里有很多优化的新特性,在将来社区将会把 BlinkPlanner 替换OldPlanner
语法解析
上述的 SQL 语句为:
1 | SELECT * FROM UnnamedTable$0 WHERE amount > 2 UNION ALL SELECT * FROM OrderB WHERE amount < 2 |
我们可以简单的先看下调用栈:
1 | tEnv.sqlQuery |
通过调用栈可以看出,CalciteParser 对我们输入的 statement 进行了解析,对于 CalciteParser 来说,它主要是 Calcite 的 SqlParser 的一个封装类,并且在创建的时候,指定 Calcite 的 ParserFactory 为 FlinkSqlParserImpl.FACTORY,告诉 Calcite 采用 FlinkSqlParserImpl 实现,而这个类是通过 JavaCC 自动生成,我们可以简单的浏览下 flink-sql-parser 这个模块,这里有实现 Flink 自定义解析的一些文件
我们可以在这里简单的说一下它的一个简单的过程,我们都知道,maven在执行的时候是分阶段的,我只说明下面几个:
- validate: 用于验证项目的有效性和其项目所需要的内容是否具备
- initialize:初始化操作,比如创建一些构建所需要的目录等。
- generate-sources:用于生成一些源代码,这些源代码在compile phase中需要使用到
- process-sources:对源代码进行一些操作,例如过滤一些源代码
- generate-resources:生成资源文件(这些文件将被包含在最后的输入文件中)
- process-resources:对资源文件进行处理
- compile:对源代码进行编译
- 等等
我们可以通过 flink-sql-parser 模块的maven文件,简单的可以总结一下流程:
- unpack-parser-template (Maven initialize 阶段):对 calcite-core 的jar文件进行解压,拷贝**/Parser.jj到
- copy-fmpp-resources (Maven initialize 阶段):拷贝src/main/codegen中的文件到target/codegen
- generate-fmpp-sources (Maven generate-sources 阶段):fmpp 插件生成代码
- javacc (Maven generate-sources 阶段):javacc 对整体代码进行生成,放置到target/generated-sources,之后继续打包
简单的说完它的如何生成之后,其实它最终会调用FlinkSqlParserImpl.parseSqlStmtEof做最终的 SQL 解析,将 SQL 转化为 SqlNode,SqlNode 是所有解析节点的父类,一个 SqlNode 就是一个 SQL 解析树,我们看下一下的类图关系
我们通过一个简单里的例子去理解,例如:
1 | select id, cast(score as int), 'hello' from T where id < ? |
在上面的SQL中,id、score、T 等为 SqlIdentifier,cast() 为 SqlCall,int 为SqlDataTypeSpec,’hello’ 为 SqlLiteral,’?’ 为SqlDynamicParam,所有的操作都是一个 SqlCall, 例如查询是一个 SqlSelect、删除是一个 SqlDelete 等等,下面对上述类型做简要的总结:
- SqlIdentifier:SQL中的Id标示符
- SqlCall:是对操作符的调用. 操作符可以用来描述任何语法结构,SQL解析树中的每个非叶节点都是某种类型的SqlCall
- SqlDataTypeSpec:SQL数据类型规范
- SqlLiteral:SQL中的常量, 表示输入的常量
- SqlDynamicParam:SQL语句中的动态参数标记
- SqlKind:SqlNode类型
- SqlOperator:SQL解析的节点类型,包括:函数,操作符(=),语法结构(case)等操作
了解完上述的基本知识之后,我们可以简单的看下上诉 SQL 转换为 SqlNode 的一个简单结构
元数据校验
经过上面的解析分析之后,需要通过catalogManager进行元数据上面的校验,这里通过FlinkCalciteSqlValidator.validate进行语法的合法解析,
1 | public static Optional<Operation> convert( |
经过合法性验证,补充了相应的元数据
1 | SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount` |
并调用 FlinkPlannerImpl.rel 将合法的 SqlNode 转变成 RelNode 同时,对于行表达式 (SqlOperator) 将转换为 RexNode
最后分装成 Operation,在这里是 PlannerQueryOperation 返回, 对于 Operation 来说,它涵盖所有种类的表操作,例如查询(DQL)、修改(DML)、定义(DDL)、或控制动作(DCL),我们可以简单的看下他的类图结构
接下来,我们将会看他是怎么优化的一个过程