Storm SQL 内部实现

本页描述了 Storm SQL 的设计和实现.

概览

SQL是一个很好使用但又复杂的标准. 包括 Drill,Hive,Phoenix 和 Spark 在内的几个项目都在其 SQL 层面上投入了大量资金. StormSQL 的主要设计目标之一是利用这些项目的现有资源. StormSQL 利用Apache Calcite 来实现 SQL 标准. StormSQL 专注于将 SQL 语句编译成Storm / Trident 拓扑,以便它们可以在 Storm 集群中执行.

图1描述了在 StormSQL 中执行 SQL 查询的工作流程. 首先,用户提供了一系列 SQL 语句. StormSQL 解析 SQL 语句并将其转换为 Calcite 逻辑计划. 逻辑计划由一系列 SQL 逻辑运算符组成,描述如何执行查询而不考虑底层执行引擎. 逻辑运算符的一些示例包括 TableScan, Filter, ProjectionGroupBy.

图 1: StormSQL 工作流.

下一步是将逻辑执行计划编译为物理执行计划. 物理计划由物理运算符组成,描述如何在 StormSQL 中执行 SQL 查询. 物理运算符如Filter, Projection, 和 GroupBy 直接映射到 Trident 拓扑中的操作. StormSQL 还将 SQL 语句中的表达式编译为 Java 代码块, 并将它们插入 Trident 函数, 这些函数将在运行时被编译一次并执行.

最后,StormSQL 将创建的 Trident 拓扑与空的打包 JAR 提交到 Storm 集群. Storm 计划并以与执行其他 Storm 拓扑相同的方式执行 Trident 拓扑.

以下代码块显示了一个示例查询,用于过滤和投影来自 Kafka 流的结果.

CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' ...

CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' ...

INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50 

前两个 SQL 语句定义外部数据的输入和输出. 图2描述了StormSQL 如何获取最后一个 SELECT 查询并将其编译为 Trident 拓扑的过程.

图 2: 将示例查询编译为 Trident 拓扑.

查询 streaming tables 的限制

查询代表实时数据流的表时有几个局限:

  • ORDER BY 子句不能应用于流
  • GROUP BY 子句中至少有一个单调字段允许 StormSQL 限制缓冲区的大小

更多细节请参考 http://calcite.apache.org/docs/stream.html.

依赖

Storm 除了 EXTERNAL TABLE 使用的数据源 JAR 外,还要注意 Storm SQL 的必须的依赖. 您可以使用 storm sql--jars--artifacts,以便数据源 JAR 可以包含在 Storm SQL Runner 中,也可以包含在 Trident 拓扑运行时 classpath 中. (如果您的数据源 JAR 在 Maven 存储库中可用,则使用 --artifacts,因为它处理传递依赖关系)

请参考 Storm SQL集成.