FAQ

最佳实践

针对配置 Storm+Trident, 您可以给我哪些经验呢?

  • worker 的数量是机器数量的倍数; 并行度是 worker 数量的倍数; kafka partitions 的数量是 spout 并行度的倍数
  • 每个机器上的每个 topology 使用一个 worker
  • Start with fewer, larger aggregators, one per machine with workers on it
  • 使用 isolation scheduler(隔离调度器)
  • 每个 worker 使用一个 acker -- 0.9 版本默认是这样的, 但是更早的版本没有这样.
  • 启用 GC 日志记录; 在正常情况下, 你应该看到很少的 major GC.
  • set the trident batch millis to about 50% of your typical end-to-end latency.
  • Start with a max spout pending that is for sure too small -- one for trident, or the number of executors for storm -- and increase it until you stop seeing changes in the flow. You'll probably end up with something near 2*(throughput in recs/sec)*(end-to-end latency) (2x the Little's law capacity).

What are some of the best ways to get a worker to mysteriously and bafflingly die?

什么是获取

  • 您是否有对 log directory(日志目录)的写入权限
  • 您扩大过你的 heap 大小吗?
  • 是否所有的 workers 都安装了正确的 libraries(函数库)?
  • 是否 zookeeper 的 hostname(主机名)仍然设置为 localhost 了?
  • 您提供了一个正确, 唯一的 hostname(主机名) -- 它可以解析回机器上 -- 对于每个 worker, 并且将它们放入 storm conf 文件中?
  • 您是否双向开启了 firewall/securitygroup 的权限 a) 所有的 workers, b) storm master, c) zookeeper? 另外, 从 workers 到您的 topology 访问的任何 kafka/kestrel/database/etc ? 使用 netcat 来检测下对应的 ports(端口)并且确定一下.

Help! 我不能看到:

  • my logs 默认情况下, 日志为 $STORM_HOME/logs. 检查您是否具有该目录的写入权限. 他们配置在 log4j2/{cluster, worker}.xml 文件中.
  • final JVM settings 在 childopts 中添加 -XX+PrintFlagsFinal 命令行选项(请看配置文件)
  • final Java system properties 添加 Properties props = System.getProperties(); props.list(System.out); 靠近您构建 topology(拓扑)的地方.

我应该使用多少个 Workers?

worker 的数量是由 supervisors 来确定的 -- 每个 supervisor 将监督一些 JVM slots. 您在 topology(拓扑)上设置的事情是它将尝试声明多少个 worker slots.

每台机器每个 topology(拓扑)使用多个 worker 没有很好的理由。

一个 topology(拓扑)运行在三个 8 核心的节点上, 并行度是 24, 每台机器的每个 bolt 将得到 8 个 executor(执行器), 即每个核心一个. 与运行三个 worker(每个有 8 个指定的 executor)相比,有 24 个 worker(每个分配一个 executor)的运行有 3 个大的优势.

第一,对同一个 worker 的 executor 进行重新分区(shuffles 或 group-bys)的数据不必放入传输缓冲区. 相反, tuple 直接从发送到接收缓冲区存储. 这是一个很大的优势. 相反,如果目标 executor 在不同 worker 的同一台计算机上, 则必须执行 send - > worker transfer - > local socket - > worker recv - > exec recv buffer. 它不经过打网卡,但并不像 executor 在同一个 worker 那么大.

通常情况下,三个具有非常大的 backing cache(后备缓存)的 aggregator(聚合器)比拥有小的 backing caches(后台缓存)的二十四个 aggregators(聚合器)更好,因为这样减少了数据倾斜的影响,并提高了 LRU 效率.

最后,更少的 workers 降低了控制 flow 的难度.

Topology(拓扑)

一个 Trident topology 可以有多个 Streams 吗?

Trident topology 可以像带条件路径(if-else)的 workflow(工作流)一样工作吗? 例如. 一个 Spout(S1) 连接到 bolt(b0), 其基于进入 tuple(元组)中的某些值将它们引导到 blolt(B1)或 bolt(B2),而不是两者都有.

一个 Trident 的 "each" 操作返回一个 Stream 对象, 你可以在一个变量中存储它. 然后,您可以在同一个 Stream 上运行多个 each 进行 split 拆分, 例如:

 Stream s = topology.each(...).groupBy(...).aggregate(...) 
    Stream branch1 = s.each(..., FilterA) 
    Stream branch2 = s.each(..., FilterB) 

你可以使用 join, merge 或 multiReduce 来 join streams.

在写入操作时,您不能向 Trident 的 emit(发射)多个输出流 -- 请参阅 STORM-68

当我启动 topology 时, 为什么获得一个 NotSerializableException/IllegalStateException 异常?

在 Storm 的生命周期中,在执行 topology 之前,topology 被实例化,然后序列化为字节格式以存储在 ZooKeeper 中. 在此步骤中,如果 topology 中的 Spout 或 Bolt 具有初始化的不可序列化属性,序列化将会失败. 如果需要一个不序列化的字段,请在将 topology 传递给 worker 之后运行的 blot 或 spout 的 prepare 方法中进行初始化.

Spouts

coordinator 是什么, 为什么有几个?

trident-spout 实际运行在 storm bolt 之内. trident topology 的 storm-spout 是 MasterBatchCoordinator -- 它协调了 trident batches,无论您使用什么 spout 都是一样的. 当 MBC 为每个 spout-coordinators 分配一个 seed tuple(种子元组)时,batch 就诞生了. spout-coordinator bolts 知道您特定的 spouts 应该如何配合 -- 所以在 kafka 的场景中, 这有助于找出每个 spout 应该从哪个 partition 和 offset 进行 pull 操作.

What can I store into the spout's metadata record?

You should only store static data, and as little of it as possible, into the metadata record (note: maybe you can store more interesting things; you shouldn't, though)

'emitPartitionBatchNew' 函数多久被调用一次?

Since the MBC is the actual spout, all the tuples in a batch are just members of its tupletree. That means storm's "max spout pending" config effectively defines the number of concurrent batches trident runs. The MBC emits a new batch if it has fewer than max-spending tuples pending and if at least one trident batch interval's worth of seconds has passed since the last batch.

由于 MBC 是实际的 spout,所以一个 batch 中的所有 tuple 只是它的 tupletree 的成员. 这意味着 storm 的 "max spout pending" 配置有效地定义了并发 batch trident 运行的次数.

If nothing was emitted does Trident slow down the calls?

Yes, there's a pluggable "spout wait strategy"; the default is to sleep for a configurable amount of time

OK, 那么 trident batch 间隔是多少?

你知道 486 时代的电脑有一个 turbo button 吗?

Actually, it has two practical uses. One is to throttle spouts that poll a remote source without throttling processing. For example, we have a spout that looks in a given S3 bucket for a new batch-uploaded file to read, linebreak and emit. We don't want it hitting S3 more than every few seconds: files don't show up more than once every few minutes, and a batch takes a few seconds to process.

The other is to limit overpressure on the internal queues during startup or under a heavy burst load -- if the spouts spring to life and suddenly jam ten batches' worth of records into the system, you could have a mass of less-urgent tuples from batch 7 clog up the transfer buffer and prevent the $commit tuple from batch 3 to get through (or even just the regular old tuples from batch 3). What we do is set the trident batch interval to about half the typical end-to-end processing latency -- if it takes 600ms to process a batch, it's OK to only kick off a batch every 300ms.

Note that this is a cap, not an additional delay -- with a period of 300ms, if your batch takes 258ms Trident will only delay an additional 42ms.

你是怎样设置 batch 大小的?

Trident 不对 batch 数量设置自己的限制. 在 Kafka spout 的场景中,最大抓取的字节大小初一平均的记录大小定义了每个子分区的有效记录.

如何调整 batch 的大小?

trident batch 是一个有点过载的设施. 与 partition(分区)数量一起,batch 大小受限于或用于定义:

  1. the unit of transactional safety (tuples at risk vs time)
  2. per partition, an effective windowing mechanism for windowed stream analytics
  3. per partition, the number of simultaneous queries that will be made by a partitionQuery, partitionPersist, etc;
  4. per partition, the number of records convenient for the spout to dispatch at the same time;

一旦生成,您将无法更改总体的 batch 大小,但您可以更改 partition 数量 - 执行 shuffle,然后更改 parallelism hint(并行度)

Time Series(时间序列)

如何按时间聚合事件?

如果您的记录具有不可变的 timestamp(时间戳),并且您想 count,average 或以其他方式将它们聚合到离散时间段中,则 Trident 是一款出色且可扩展的解决方案。

编写一个将 timestamp 转换成 time bucket 的 Each 函数: 如果 bucket 的大小是 "by hour(按小时的)" , 则时间戳 2013-08-08 12:34:56 将被映射成 2013-08-08 12:00:00 time bucket, 十二点钟以后的其它时间也是这样. 然后在那个 timebucket 上的 group(组)并使用分组的 persistentAggregate 方法. persistentAggregate 使用由数据存储支持的本地 cacheMap. 具有许多记录的 Groups 需要从数据存储器读取很少的数据, 并使用高效的批量读写. 只要您的数据供给相对较快,Trident 就可以非常有效地利用内存和网络. 即使服务器脱机一天,然后在一瞬间提供全天的数据, 旧的结果将被安静地检索和更新 -- 并且不干扰计算当前结果.

我怎么知道一个时间内的 bucket 的所有记录已经被收到?

你不能知道所有的 event(事件)都被收集 -- 这是一个 epistemological challenge(认识论的挑战),而不是分布式系统的挑战. 您可以:

  • 使用 domain knowledge 设置时间限制
  • 引入 punctuation: 一个 record 知道紧跟特定时间 bucket 内所有记录之后而来. Trident 使用此方案知道 batch 何时完成. 例如,如果您从一组传感器接收记录,则每个传感器都将按照传感器的顺序发送,所有传感器都会向您发送 3:02:xx 或更后版本的时间戳,以让您知道可以 commit(提交).
  • 在可能的情况下, 使您的进程增加: 进来的每个 value 会让答案越来越正确. Trident ReducerAggregator 是一个 operator, 它采取先前的结果和一组新的记录,并返回一个新的结果. 这样可以将结果缓存并序列化到数据存储; 如果一台服务器脱机一天,然后在一天内回来一整天的数据,旧的结果将被平静地检索和更新.
  • Lambda 架构: 在接收时将所有 event(时间)记录到 archival store(S3, HBase, HDFS). 在快速处理的层面上, 一旦时间窗口被 clear(清楚), 处理 bucket 以获得可行的答案, 并忽略比时间窗口更旧的一切. 定期运行全局聚合以计算 "正确的" 答案。