Storm SQL 示例

本文通过处理 Apache 日志的例子来展示如何使用 strom SQL. 本文使用 "how-to" 风格书写, 因此可以根据步骤, 一步一步的学习如何使用 Storm SQL.

准备

本文假定 Apache Zookeeper、Apache Storm、Apache Kafka 都本地安装并且正确配置运行. 方便起见, 本文假设 Apache Kafka 0.10.0 是通过 brew 安装.

我们将使用下列工具为输入数据源生成 JSON 数据. 因为他们都是 Python 工程, 本页假设已经安装了 Python 2.7、pipvirtualenv. 如果你使用 Python 3, 需要在生成数据的时候修改一些与 Python 3 不兼容的代码.

创建 topic

在本页, 我们会使用3个 topic, apache-logs, apache-errorlogs, apache-slowlogs. 请根据你的环境来创建 topic.

对于使用 brew 安装的 Apache Kafka 0.10.0,

kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 

灌入数据

让我们提供数据给输入 topic. 在本页中我们将生成假的 Apache 日志, 转换为 JSON 格式, 并把 JSON 灌入 Kafka topic.

开始创建你的工作目录, 用于克隆项目并且设置 virtualenv.

在你的工作目录, 命令 virtualenv env 把 evn 目录设置为 virtualenv 目录, 然后激活虚拟环境.

$ virtualenv env
$ source env/bin/activate 

完成例子以后, 可以随时 deactivate, 退出 python 虚拟环境.

安装和修改 Fake-Apache-Log-Generator

Fake-Apache-Log-Generator 对包不可见, 我们还需要修改一下脚本.

$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$ cd Fake-Apache-Log-Generator 

打开 apache-fake-log-gen.py, 将 while (flag): 语句替换成下面的语句:

 elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec
        seconds=random.randint(30,300)
        increment = datetime.timedelta(seconds=seconds)
        otime += increment

        ip = faker.ipv4()
        dt = otime.strftime('%d/%b/%Y:%H:%M:%S')
        tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z')
        vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2])

        uri = random.choice(resources)
        if uri.find("apps")>0:
                uri += `random.randint(1000,10000)`

        resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
        byt = int(random.gauss(5000,50))
        referer = faker.uri()
        useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
        f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent))

        log_lines = log_lines - 1
        flag = False if log_lines == 0 else True 

要确保 elapsed_us 包含在假日志中.

为了方便, 你可以跳过克隆项目这一步, 直接从这里下载修改过的文件: apache-fake-log-gen.py (gist)

安装 apache-log-parser 并编写转换脚本

apache-log-parser 模块可以通过 pip 命令安装.

$ pip install apache-log-parser 

因为 apache-log-parser 是一个 python 库, 为了转换日志我们需要写编写一个小脚本. 我们创建一个文件 parse-fake-log-gen-to-json-with-incrementing-id.py 包含以下内容:

import sys
import apache_log_parser
import json

auto_incr_id = 1
parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
line_parser = apache_log_parser.make_parser(parser_format)
while True:
  # we'll use pipe
  line = sys.stdin.readline()
  if not line:
    break
  parsed_dict = line_parser(line)
  parsed_dict['id'] = auto_incr_id
  auto_incr_id += 1

  # works only python 2, but I don't care cause it's just a test module :)
  parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')}
  print json.dumps(parsed_dict) 

将转换后的 JSON Apache Log 灌入 Kafka

好了! 我们已经准备好将数据写入 Kafka topic. 下面使用 kafka-console-producer 来灌入 JSON.

$ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs 

打开另一个终端执行下面的命令, 确认数据已经进入 topic.

$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs 

如果看到如下的 json, 就说明搞定了:

{"TIME_US":  "757467",  "REQUEST_FIRST_LINE":  "GET /wp-content HTTP/1.0",  "REQUEST_METHOD":  "GET",  "RESPONSE_BYTES_CLF":  "4988",  "TIME_RECEIVED_ISOFORMAT":  "2021-06-30T22:02:53",  "TIME_RECEIVED_TZ_ISOFORMAT":  "2021-06-30T22:02:53-07:00",  "REQUEST_HTTP_VER":  "1.0",  "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY":  "Firefox",  "REQUEST_HEADER_USER_AGENT__IS_MOBILE":  false,  "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING":  "3.6.13",  "REQUEST_URL_FRAGMENT":  "",  "REQUEST_HEADER_USER_AGENT":  "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13",  "REQUEST_URL_SCHEME":  "",  "REQUEST_URL_PATH":  "/wp-content",  "REQUEST_URL_QUERY_SIMPLE_DICT":  {},  "TIME_RECEIVED_UTC_ISOFORMAT":  "2021-07-01T05:02:53+00:00",  "REQUEST_URL_QUERY_DICT":  {},  "STATUS":  "200",  "REQUEST_URL_NETLOC":  "",  "REQUEST_URL_QUERY_LIST":  [],  "REQUEST_URL_QUERY":  "",  "REQUEST_URL_USERNAME":  null,  "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING":  "",  "REQUEST_URL_HOSTNAME":  null,  "REQUEST_HEADER_USER_AGENT__OS__FAMILY":  "Linux",  "REQUEST_URL":  "/wp-content",  "ID":  904128,  "REQUEST_HEADER_REFERER":  "http://white.com/terms/",  "REQUEST_URL_PORT":  null,  "REQUEST_URL_PASSWORD":  null,  "TIME_RECEIVED":  "[30/Jun/2021:22:02:53 -0700]",  "REMOTE_IP":  "88.203.90.62"}  

例子: 过滤错误日志

在这个例子中, 我们将从所有的日志中过滤出 error 日志并且存储到另一个 topic 中. 将会用到 projectfilter 特性.

脚本文件的内容如下:

CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs'
CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4 

把文件保存为 apache_log_error_filtering.sql.

让我们过一遍这个脚本.

第一个语句定义了一个表 APACHE_LOGS 代表输入流. LOCATION 从句指定了 ZkHost (localhost:2181), brokers路径(/brokers) 和 topic (apache-logs). 注意 Kafka 数据源必须定义一个主键. 这就是为什么我们为 JSON 数据设置了一个整数 id.

同样, 第二个语句指定了表 APACHE_ERROR_LOGS 代表输出流. TBLPROPERTIES 从句指定了KafkaProducer的配置, 从句对于 Kafka sink表 是必须的.

最后的语句定义了一个 topology. Storm SQL 只会在 DML 语句上定义和运行 topology. DDL 语句定义输入数据源、输出数据源、以及可以被 DML 语句引用的用户定义函数(user defined function).

L我们先看 where 语句. 由于我们想过滤 error 日志, 我们使用状态码除以 100, 验证得到的商是否等于或者大于4.(简单的说就是 statu_code >= 400) 由于JSON中的状态码是字符串格式(因此在 APACHE_LOGS 表中是 VARCHAR 格式). 我们在应用除法之前, 使用 CAST(STATUS AS INT) 先把状态码转换为整数. 现在我们只有 error 日志了.

我们转换一些列以和输出流想匹配. 在这个语句中, 我们使用 CAST(STATUS AS INT) 转化为整数类型, 然后使用 1000 除 TIME_US 将毫秒转换成秒.

最后, insert 语句将过滤和转换后的行(tuples)存入输出流.

要运行这个例子, 用户需要包含数据源(本例中是 storm-sql-kafka) 和的所有依赖到 class path 中. 当用户运行 storm sql 命令的时候 Storm SQL 的依赖会被自动处理. 用户可以在提交阶段包含数据源依赖, 如下:

$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" 

上面的命令提交 SQL 语句到 StormSQL. storm sql 命令的选项是 storm sql [script file] [topology name]. 如果用户使用了不同版本的 Storm 或者 Kafka ,需要修改每个 artifacts 的版本号与之对应.

如果你的语句通过了验证阶段, 会在 Storm UI 页面上显示 topology.

你可以在控制台上看到下面的输出:

$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs 

输出类似下面的内容:

{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222}  {"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851}  ...  

你可以运行 Storm SQL runner, 将 topology 名称替换为 --explain 来查看逻辑执行计划.

$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" 

输入类似下面的内容:

LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
  LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7
    LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6
      EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5 

如果 Storm SQL 应用了查询优化, 你可能看到的输出会和上面的不一样.

我们正在执行第一个 Storm SQL topology! 如果你看到了足够多的输出和日志, 请杀掉 topology.

为了简洁, 我们不再解释我们已经看到的东西.

例子: 过滤访问慢的日志

在这个例子中我们将过滤访问慢的日志, 把他们存储到另一个 topic. 用到的特性有 projectfilterUser Defined Function (UDF). 这个例子与上一个例子 filtering error logs 非常相似, 我们主要看如何定义 User Defined Function (UDF).

脚本文件的内容如下:

CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2'
INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100 

内容保存为文件 apache_log_slow_filtering.sql.

由于前两个语句和上一个例子相似, 我们直接跳过.

第三个语句定义了一个 User defined function. 我们使用 org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2 定义了一个 GET_TIME.

GetTime2 函数的实现如下:

package org.apache.storm.sql.runtime.functions.scalar.datetime;

import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class GetTime2 {
    public static Long evaluate(String dateString, String dateFormat) {
        try {
            DateTimeFormatter df = DateTimeFormat.forPattern(dateFormat).withZoneUTC();
            return df.parseDateTime(dateString).getMillis();
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }
} 

由于这个类定义了静态方法 evaluate 可以用于 UDF. SQL 的参数和返回的类型取决于 Storm SQL 依赖哪种风格.

注意, 这个类应该放在 classpath 路径下, 因此为了定义 UDF, 你需要创建一个包含 UDF 类的 jar 文件, 并在执行 storm sql 命令的时候使用 -- jar 选项.

最后一个语句和过滤错误日志的例子相似. 唯一新鲜的东西就是我们调用了 GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') 将字符串格式的时间转换为 unix timestamp (BIGINT).

执行:

$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" 

可以在控制台看到下面的输出:

$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs

输出类似下面的内容: 

{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579} {"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957} ... ```

好了! 假设我们有通过远程 IP 查询 geo 信息的 UDF, 我们可以通过 geo 位置做过滤, 或者将 geo 位置添加到转换结果中.

Summary

我们通读了几个 Storm SQL 的简单的用例来学习 Storm SQL 的特性. 如果还没有看过Storm SQL integrationStorm SQL language, 你需要阅读这些章节来查看所有支持的特性.

注意, Storm SQL 运行在小批量和非强类型的 Trident 库之上. Sink 实际并不检查类型. (你可能注意到一些输出字段的类型与输出表模式的定义不同).

当 Storm SQL 的后端 API 修改为核心(tuple by tuple, low-level, high-level) 时, Storm SQL 的行为会相应改变.