Storm SQL 使用 Apache Calcite 来转换和评估 SQL 语句. Storm SQL 还采用了来自 Calcite 的 Rex 编译器,因此 Storm SQL 将处理由 Calcite 的默认 SQL 解析器识别的 SQL 方言。
本文基于 Calcite 官网的 SQL 参考手册, 移除了部分 Storm SQL 不支持的内容, 添加了一些 Storm SQL 支持的内容.
请先阅读 Storm SQL integration 页面, 了解 Storm SQL 支持哪些特性.
Calcite 提供丰富的 SQL 语法. 但是 Storm SQL 并不是一个数据库系统, 它用于处理流式数据, 因此只支持语法的子集. Storm SQL 不会重定义 SQL 语法, 只优化 Calcite 提供的转换器, 因此 SQL 语句仍然基于 Calcite 的 SQL 语法进行转换.
SQL 语法表现为 类BNF 的形式.
在 merge 中, 必须有 WHEN MATCH 或者 WHEN NOT MATCH 从句的其中之一.
在 orderItem 中, 如果 expression 是一个正整数 n, 他表示 SELECT 从句中的第 n 个项目.
聚合查询是包含 GROUP BY 或 HAVING 的查询, 子句或 SELECT 子句中的聚合函数。 在 SELECT 中,汇总查询的 HAVING 和 ORDER BY 子句,所有表达式必须在当前组内是恒定的(即分组常量由GROUP BY子句定义,或常量)或聚合函数或常量和聚合的组合功能。 聚合和分组功能只能出现在聚合查询,并且仅在 SELECT,HAVING 或 ORDER BY 子句中。
一个标量子查询是像表达式一样使用的子查询. 如果子查询没有返回行, 值为 NULL; 如果返回多行, 就是错误的.
IN, EXISTS 和 标量子查询可以在任何使用 expression 的地方使用(比如 SELECT 从句, WHERE 从句, JOIN 后面的 ON 从句, 或者作为一个聚合函数的参数).
一个 IN, EXISTS 或者标量子查询可能是相关的; 意思是, 可以引用封闭查询的 FROM 从句中的表。
selectWithoutFrom 等价于 VALUES, 但是并非标准 SQL, 只在允许在特定的conformance levels上.
下列是一个 SQL 的关键词列表. 这个列表页来自于 Calcite SQL 的参考手册.
A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE.
标识符是在 SQL 查询中使用的表名, 列, 和其他元数据元素.
未被引号括起来的标识符, 比如 emp, 必须以字母打头且只能包含字母, 数字, 下划线. 他们会隐式的转换为大写.
被引号引起来的标识符, 例如 "Employee Name"
, 以双引号开始和结束. 他们可以包含几乎任何字符, 包括空白和其他标点. 如果你想在标识符中包含一个双引号, 使用双引号进行转义, 像这样: "An employee called ""Fred""."
.
在 Calcite 中, 与引用的对象的名称匹配的标识符是大小写敏感的. 但是记住, 被引号括起来的标识符会在匹配前隐式转化为大写, 如果它引用的对象的名称是使用未被引号括起来的标识符创建的, 它的名称也会被转换成大写.
Data type | Description | Range and examples |
---|---|---|
BOOLEAN | Logical values | Values: TRUE, FALSE, UNKNOWN |
TINYINT | 1 byte signed integer | Range is -255 to 256 |
SMALLINT | 2 byte signed integer | Range is -32768 to 32767 |
INTEGER, INT | 4 byte signed integer | Range is -2147483648 to 2147483647 |
BIGINT | 8 byte signed integer | Range is -9223372036854775808 to 9223372036854775807 |
DECIMAL(p, s) | Fixed point | Example: 123.45 is a DECIMAL(5, 2) value. |
NUMERIC | Fixed point | |
REAL, FLOAT | 4 byte floating point | 6 decimal digits precision |
DOUBLE | 8 byte floating point | 15 decimal digits precision |
CHAR(n), CHARACTER(n) | Fixed-width character string | 'Hello', '' (empty string), _latin1'Hello', n'Hello', _UTF16'Hello', 'Hello' 'there' (literal split into multiple parts) |
VARCHAR(n), CHARACTER VARYING(n) | Variable-length character string | As CHAR(n) |
BINARY(n) | Fixed-width binary string | x'45F0AB', x'' (empty binary string), x'AB' 'CD' (multi-part binary string literal) |
VARBINARY(n), BINARY VARYING(n) | Variable-length binary string | As BINARY(n) |
DATE | Date | Example: DATE '1969-07-20' |
TIME | Time of day | Example: TIME '20:17:40' |
TIMESTAMP [ WITHOUT TIME ZONE ] | Date and time | Example: TIMESTAMP '1969-07-20 20:17:40' |
TIMESTAMP WITH TIME ZONE | Date and time with time zone | Example: TIMESTAMP '1969-07-20 20:17:40 America/Los Angeles' |
INTERVAL timeUnit [ TO timeUnit ] | Date time interval | Examples: INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY |
Anchored interval | Date time interval | Example: (DATE '1969-07-20', DATE '1972-08-29') |
Where:
注意:
类型 | 描述 |
---|---|
ANY | 一个类型未知的值 |
ROW | 一列或者多列组成的行 |
MAP | 键值映射的集合 |
MULTISET | 可能包含重复内容的未排序的集合 |
ARRAY | 有序的,可能包含重复的连续集合 |
CURSOR | 查询结果的游标 |
运算符优先级和结合性, 从高到低.
运算符 | 优先级 |
---|---|
. | 左 |
左 | |
+ - (unary plus, minus) | 右 |
* / | 左 |
+ - | 左 |
BETWEEN, IN, LIKE, SIMILAR | - |
< > = <= >= <> != | 左 |
IS NULL, IS FALSE, IS NOT TRUE etc. | - |
NOT | 右 |
AND | 左 |
OR | 左 |
语法 | 描述 |
---|---|
value1 = value2 | 相等 |
value1 <> value2 | 不等 |
value1 != value2 | 不等 (only available at some conformance levels) |
value1 > value2 | 大于 |
value1 >= value2 | 大于等于 |
value1 < value2 | 小于 |
value1 <= value2 | 小于等于 |
value IS NULL | value 是否为 null |
value IS NOT NULL | value 是否不为 null |
value1 IS DISTINCT FROM value2 | 两个值是否不等, null 值认为是相等 |
value1 IS NOT DISTINCT FROM value2 | 两个值是否相等, null 值认为是相等 |
value1 BETWEEN value2 AND value3 | Whether value1 is greater than or equal to value2 and less than or equal to value3 |
value1 NOT BETWEEN value2 AND value3 | value1 是否小于 value2 或大于 value3 |
string1 LIKE string2 [ ESCAPE string3 ] | string1 与模式 string2 是否匹配 |
string1 NOT LIKE string2 [ ESCAPE string3 ] | string1 与模式 string2 是否不匹配 |
string1 SIMILAR TO string2 [ ESCAPE string3 ] | string1 与正则 string2 是否匹配 |
string1 NOT SIMILAR TO string2 [ ESCAPE string3 ] | string1 与正则 string2 是否不匹配 |
value IN (value [, value]* ) | value 是否与列表中的每一个值都相等 |
value NOT IN (value [, value]* ) | value 是否与列表中的每一个值都不等 |
Not supported yet on Storm SQL: 目前 Storm SQL 中不支持的运算符:
语法 | 描述 |
---|---|
value IN (sub-query) | 是否 value 等于 sub-query 返回的行 |
value NOT IN (sub-query) | 是否 value 不等于 sub-query 返回的行 |
EXISTS (sub-query) | 是否 sub-query 返回至少一个行 |
Storm SQL 当前不支持子查询, 因此上面的操作不能正常工作. 这个问题会在不久的将来修复.
语法 | 描述 |
---|---|
boolean1 OR boolean2 | 或 |
boolean1 AND boolean2 | 且 |
NOT boolean | 是否为True ; boolean 为 UNKNOWN 则返回 UNKNOWN |
boolean IS FALSE | 是否为False; boolean 为 UNKNOWN 则返回 UNKNOWN |
boolean IS NOT FALSE | 是否不为False; boolean 为 UNKNOWN 则返回 UNKNOWN |
boolean IS TRUE | 是否为True; boolean 为 UNKNOWN 则返回 UNKNOWN |
boolean IS NOT TRUE | 是否不为True; boolean 为 UNKNOWN 则返回 UNKNOWN |
boolean IS UNKNOWN | 判断是否是 UNKNOWN |
boolean IS NOT UNKNOWN | 判断是否不是 UNKNOWN |
语法 | 描述 |
---|---|
+ numeric | 返回 numeric |
:- numeric | 返回负 numeric |
numeric1 + numeric2 | 返回 numeric1 加 numeric2 |
numeric1 - numeric2 | 返回 numeric1 减 numeric2 |
numeric1 * numeric2 | 返回 numeric1 乘以 numeric2 |
numeric1 / numeric2 | 返回 numeric1 除以 numeric2 |
POWER(numeric1, numeric2) | 返回 numeric1 的 numeric2 次方 |
ABS(numeric) | 返回绝对值 numeric |
MOD(numeric, numeric) | 取余 numeric1 除以 numeric2. 如果被除数为负, 则余数为负 |
SQRT(numeric) | 返回平方根 numeric |
LN(numeric) | 返回 numeric 的自然对数 (底数 e) |
LOG10(numeric) | 返回 numeric 的常用对数 (底数 10) |
EXP(numeric) | 返回 e 的 numeric 次方 |
CEIL(numeric) | 向上取整 numeric, 返回大于或者等于 numeric 的最小整数 |
FLOOR(numeric) | 向下取整 numeric, 返回小于或者等于 numeric 的最小整数 |
语法 | 描述 |
---|---|
string || string | 连接2个字符串 |
CHAR_LENGTH(string) | 返回字符串长度 |
CHARACTER_LENGTH(string) | 与 CHAR_LENGTH(string) 等价 |
UPPER(string) | 转换为大写 |
LOWER(string) | 转换为小写 |
POSITION(string1 IN string2) | 返回 string1 在 string2 中首次出现位置 |
TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2) | 从 string2 的 头/尾/两头 移除 string1 的最长匹配 |
OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ]) | 用 string2 替换 string1 |
SUBSTRING(string FROM integer) | 从给定的位置开始返回一个子串 |
SUBSTRING(string FROM integer FOR integer) | 从给定位置返回一个指定长度的子串 |
INITCAP(string) | 将 string 中每个单词首字母大写其他字母小写. 单词是由空白字符分隔开的字符序列 |
未实现的:
语法 | 描述 |
---|---|
binary || binary | 连接2个二进制字符串. |
POSITION(binary1 IN binary2) | 返回二进制串 binary1 在 binary2 中首次出现位置 |
OVERLAY(binary1 PLACING binary2 FROM integer [ FOR integer2 ]) | 替换 |
已知的 bug:
语法 | 描述 |
---|---|
SUBSTRING(binary FROM integer) | 从指定的位置截取 |
SUBSTRING(binary FROM integer FOR integer) | 从指定位置截取指定长度的串 |
Calcite 1.9.0 有bug, 当编译 SUBSTRING 函数的时候会抛出异常. 这个问题会在后续版本中修复.
语法 | 描述 |
---|---|
EXTRACT(timeUnit FROM datetime) | 返回时间中的指定字段 |
FLOOR(datetime TO timeUnit) | 根据 timeUnit 向下取整 |
CEIL(datetime TO timeUnit) | 根据 timeUnit 向上取整 |
未实现的:
Storm SQL 特有的:
语法 | 描述 |
---|---|
LOCALTIME | 以类型 TIME 返回当前会话时区的当前时间 |
LOCALTIME(precision) | 以类型 TIME 返回当前会话时区的当前时间, 精度precision |
LOCALTIMESTAMP | 以类型 TIMESTAMP 返回当前会话时区的当前时间 |
LOCALTIMESTAMP(precision) | 以类型 TIMESTAMP 返回当前会话时区的当前时间, 精度precision |
CURRENT_TIME | 以类型 TIMESTAMP WITH TIME ZONE 返回当前会话时区的当前时间 |
CURRENT_DATE | 以类型 DATE 返回当前会话时区的当前时间 |
CURRENT_TIMESTAMP | 以类型 TIMESTAMP WITH TIME ZONE 返回当前会话时区的当前时间 |
SQL标准规定,上述运算符在评估查询时应返回相同的值。 Storm SQL将每个查询转换为Trident拓扑并运行,因此在评估SQL语句时,技术上当前的日期/时间应该是固定的。 由于这个限制,当创建Trident拓扑时,当前日期/时间将被修复,并且这些运算符应该在拓扑生命周期中返回相同的值。
Storm SQL 当前不支持的函数:
语法 | 描述 |
---|---|
USER | 等价于CURRENT_USER |
CURRENT_USER | 当前执行上下文的用户名 |
SESSION_USER | 会话的用户名 |
SYSTEM_USER | 返回系统标识的当前数据存储的用户 user |
CURRENT_PATH | 返回表示当前查找范围的字符串,用于引用用户定义的例程和类型 |
CURRENT_ROLE | 返回当前活动 role |
这些操作符并不代表 Storm SQL 的运行时,所以除非我们找到正确的语义,否则这些操作可能永远不会被支持。
语法 | 描述 |
---|---|
CASE value | |
WHEN value1 [, value11 ]* THEN result1 | |
[ WHEN valueN [, valueN1 ] THEN resultN ] | |
[ ELSE resultZ ] | |
END | 简单 case 语句 |
CASE | |
WHEN condition1 THEN result1 | |
[ WHEN conditionN THEN resultN ]* | |
[ ELSE resultZ ] | |
END | 搜索 case 语句 |
NULLIF(value, value) | 值相同返回. |
例如, NULLIF(5, 5)
返回 NULL; NULLIF(5, 0)
返回 5. |
| COALESCE(value, value [, value ]* ) | 前一个值为 null 则返回后一个值.
例如, COALESCE(NULL, 5)
返回 5. |
语法 | 描述 |
---|---|
CAST(value AS type) | 把值转换为给定的类型. |
语法 | 描述 |
---|---|
ROW (value [, value]* ) | 从值列表中创建一个行. |
map '[' key ']' | 根据 key 返回 value. |
array '[' index ']' | 返回某个位置的 array 的元素值. |
ARRAY '[' value [, value ]* ']' | 从值列表中创建一个 array. |
MAP '[' key, value [, key, value ]* ']' | 从 key-value 列表中创建一个 map. |
语法 | 描述 |
---|---|
ELEMENT(value) | 返回 array 或 multiset 的唯一元素; 如果集合为空,则为null; 如果它有多个元素,则抛出异常。 |
CARDINALITY(value) | 返回 array 或 multiset 的元素数. |
另请参考: UNNEST关系运算符将集合转换为关系.
语法 | 描述 |
---|---|
{fn ABS(numeric)} | 返回 numeric 的绝对值 |
{fn EXP(numeric)} | 返回 e 的 numeric 次方 |
{fn LOG(numeric)} | 返回 numeric 的自然对数 (底数为 e) |
{fn LOG10(numeric)} | 返回 numeric 的以10为底的对数 |
{fn MOD(numeric1, numeric2)} | 返回 numeric1 被 numeric2 除的余数. 被除数 numeric1 为负数的时候余数为负 |
{fn POWER(numeric1, numeric2)} | 返回 numeric1 的 numeric2 次方 |
未实现的:
字符串 | 描述 |
---|---|
{fn CONCAT(character, character)} | 连接字符串 |
{fn LOCATE(string1, string2)} | 返回 string2 在 string1 中首次出现的位置. 如果指定了 integer , 则从 integer 为起点开搜索. |
{fn INSERT(string1, start, length, string2)} | 把 string2 插入 string1 |
{fn LCASE(string)} | 返回小写 |
{fn LENGTH(string)} | 返回字符数 |
{fn SUBSTRING(string, offset, length)} | 字符串截取, 从 offset 的位置开始截取,长度为 length |
{fn UCASE(string)} | 返回大写 |
已知的bug:
语法 | 描述 |
---|---|
{fn LOCATE(string1, string2 [, integer])} | 返回 string2 在 string1 中首次出现的位置. 如果指定了 integer , 则从 integer 为起点开搜索. |
{fn LTRIM(string)} | 移除 string 头部的空白字符 |
{fn RTRIM(string)} | 移除 string 尾部的空白字符 |
Calcite 1.9.0 在函数使用位置参数的时候会抛出异常, {fn LTRIM} 和 {fn RTRIM} 在编译 SQL 语句的时候. 这个能在将来的版本中修复.
未实现的:
语法 | 描述 |
---|---|
{fn CURDATE()} | 等价于 CURRENT_DATE |
{fn CURTIME()} | 等价于 LOCALTIME |
{fn NOW()} | 等价于 LOCALTIMESTAMP |
{fn QUARTER(date)} | 等价于 EXTRACT(QUARTER FROM date) . 返回 1 和 4 之间的整数. |
{fn TIMESTAMPADD(timeUnit, count, timestamp)} | 添加 count 个 timeUnit 间隔到 timestamp |
{fn TIMESTAMPDIFF(timeUnit, timestamp1, timestamp2)} | timestamp1 减去 timestamp2 结果存在 timeUnit 中 |
未实现的:
未实现的:
Storm SQL 当前不支持聚合函数.
Storm SQL 当前不支持窗口函数
Storm SQL 不支持分组函数
用户可以使用 CREATE FUNCTION
语句定义 user defined function(标量). 例如, 下面的语句使用 org.apache.storm.sql.TestUtils$MyPlus
类定义了一个 MYPLUS
函数.
CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
Storm SQL 通过检验定义的个方法来确定函数是标量还是聚合. 如果类中定义了一个 evaluate
方法, Storm SQL 把这个函数作为 标量
对待.
标量函数的类的例子:
public class MyPlus {
public static Integer evaluate(Integer x, Integer y) {
return x + y;
}
}
请注意, 用户在运行 Storm SQL runner 时候应当使用 --jars
或者 --artifacts
, 来确保 UDFs 在 classpath 路径下可见.
在 StormSQL 中数据表现为一个外部表. 用户可以使用 CREATE EXTERNAL TABLE
语句指定数据源. CREATE EXTERNAL TABLE
的语法与 Hive 数据定义语言 定义的语法紧密相关.
CREATE EXTERNAL TABLE table_name field_list
[ STORED AS
INPUTFORMAT input_format_classname
OUTPUTFORMAT output_format_classname
]
LOCATION location
[ TBLPROPERTIES tbl_properties ]
[ AS select_stmt ]
默认输入格式和输出格式都是 JSON. 我们会在将来的章节介绍 supported formats
.
例如, 下面的语句指定了一个 Kafka spout 和 sink:
CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
请注意, 用户在运行 Storm SQL runner 时候应当使用 --jars
或者 --artifacts
, 来确保 UDFs 在 classpath 可见.
用户通过实现 ISqlTridentDataSource
接口并使用 Java 的 service loader 机制进行注册,以植入外部数据源. 基于表的 URI 的模式来选择外部数据源. 更多细节请参考 storm-sql-kafka
的实现.
格式 | 输入格式类 | 输出格式类 | 需要属性 |
---|---|---|---|
JSON | org.apache.storm.sql.runtime.serde.json.JsonScheme | org.apache.storm.sql.runtime.serde.json.JsonSerializer | No |
Avro | org.apache.storm.sql.runtime.serde.avro.AvroScheme | org.apache.storm.sql.runtime.serde.avro.AvroSerializer | Yes |
CSV | org.apache.storm.sql.runtime.serde.csv.CsvScheme | org.apache.storm.sql.runtime.serde.csv.CsvSerializer | No |
TSV | org.apache.storm.sql.runtime.serde.tsv.TsvScheme | org.apache.storm.sql.runtime.serde.tsv.TsvSerializer | No |
Avro 需要用户描述记录的模式(输入和输出). 模式应该在 TBLPROPERTIES
上描述. 输入格式需要描述给 input.avro.schema
, 输出格式需要描述给 output.avro.schema
. 模式字符串应当是一个转义后的 JSON, 因此 TBLPROPERTIES
是有效的 JSON.
示例 Schema 定义:
"input.avro.schema": "{\"type\": \"record\", \"name\": \"large_orders\", \"fields\" : [ {\"name\": \"ID\", \"type\": \"int\"}, {\"name\": \"TOTAL\", \"type\": \"int\"} ]}"
"output.avro.schema": "{\"type\": \"record\", \"name\": \"large_orders\", \"fields\" : [ {\"name\": \"ID\", \"type\": \"int\"}, {\"name\": \"TOTAL\", \"type\": \"int\"} ]}"
使用 Standard RFC4180 CSV Parser, 不需要任何其他的属性.
默认情况下, TSV 使用 \t
作为分隔符, 但是用户可以通过 input.tsv.delimiter
或者 output.tsv.delimiter
设置其他的分隔符.
数据源 | Artifact Name | 位置前缀 | 支持输入数据源 | 支持输出数据源 | 需要属性 |
---|---|---|---|---|---|
Socket | socket://host:port |
Yes | Yes | No | |
Kafka | org.apache.storm:storm-sql-kafka | kafka://zkhost:port/broker_path?topic=topic |
Yes | Yes | Yes |
Redis | org.apache.storm:storm-sql-redis | redis://:[password]@host:port/[dbIdx] |
No | Yes | Yes |
MongoDB | org.apache.stormg:storm-sql-mongodb | mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] |
No | Yes | Yes |
HDFS | org.apache.storm:storm-sql-hdfs | hdfs://host:port/path-to-file |
No | Yes | Yes |
Socket 数据源是一个内置的特性, 因此用户无需在 --artifacts
选项中添加任何依赖.
请注意, Socket 数据源只是用于测试: 不能保证 exactly-once 和 at-least-once.
贴士: netcat
是一个 Socket 便捷工具: 用户可以使用 netcat 连接 Socket 数据源, 既可以作为输入也可以用作输出.
Kafka 仅当用作输出数据源的时候需要定义下列属性:
producer
: 指定 Kafka Producer 配置 - 更多细节请参考 Kafka producer configs.bootstrap.servers
必须在 producer
中定义这个值请注意, storm-sql-kafka
需要用户提供 storm-kafka
依赖, storm-kafka
又依赖于 kafka
, kafka-clients
. 你可以在 --artifacts
选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本
org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2
Redis 数据源需要设置下列属性:
data.type
: 用于存储的数据类型 - 仅支持 "STRING"
和 "HASH"
data.additional.key
: key, 当数据类型同时需要 key 和 field 时设置 (field 作为字段使用)redis.timeout
: 超时时间, 毫秒 (ex. "3000"
)use.redis.cluster
: 如果 Redis 是集群环境为 "true"
, 否则为 "false"
.请注意, storm-sql-redis
需要用户提供 storm-redis
依赖. 你可以在 --artifacts
选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本
org.apache.storm:storm-sql-redis:2.0.0-SNAPSHOT,org.apache.storm:storm-redis:2.0.0-SNAPSHOT
MongoDB 数据源需要设置以下属性
{"collection.name": "storm_sql_mongo", "trident.ser.field": "serfield"}
trident.ser.field
: 存储字段 - 记录会序列化并以 BSON 存储在字段中collection.name
: 集合名称请注意, storm-sql-mongodb
需要用户提供 storm-mongodb
依赖. 你可以在 --artifacts
选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本
org.apache.storm:storm-sql-mongodb:2.0.0-SNAPSHOT,org.apache.storm:storm-mongodb:2.0.0-SNAPSHOT
当前不支持使用保留字段存储.
HDFS 数据源需要设置下列属性
hdfs.file.path
: HDFS 文件路径hdfs.file.name
: HDFS 文件名 - 参考 SimpleFileNameFormathdfs.rotation.size.kb
: HDFS FileSizeRotationPolicy 单位 KBhdfs.rotation.time.seconds
: HDFS TimedRotationPolicy 单位 seconds请注意 hdfs.rotation.size.kb
和 hdfs.rotation.time.seconds
只能采用其中一种来实现文件滚动.
还要注意 storm-sql-hdfs
需要用户提供 storm-hdfs
依赖. 你可以在 --artifacts
选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本
org.apache.storm:storm-sql-hdfs:2.0.0-SNAPSHOT,org.apache.storm:storm-hdfs:2.0.0-SNAPSHOT
还有, 需要提供 hdfs 配置文件. 可以将 core-site.xml
和 hdfs-site.xml
文件放到 Storm 安装目录的 conf
目录下面.