2*(throughput in recs/sec)*(end-to-end latency)
(2x the Little's law capacity).-XX+PrintFlagsFinal
命令行选项(请看配置文件)Properties props = System.getProperties(); props.list(System.out);
靠近您构建 topology(拓扑)的地方.worker 的数量是由 supervisors 来确定的 -- 每个 supervisor 将监督一些 JVM slots. 您在 topology(拓扑)上设置的事情是它将尝试声明多少个 worker slots.
每台机器每个 topology(拓扑)使用多个 worker 没有很好的理由。
一个 topology(拓扑)运行在三个 8 核心的节点上, 并行度是 24, 每台机器的每个 bolt 将得到 8 个 executor(执行器), 即每个核心一个. 与运行三个 worker(每个有 8 个指定的 executor)相比,有 24 个 worker(每个分配一个 executor)的运行有 3 个大的优势.
第一,对同一个 worker 的 executor 进行重新分区(shuffles 或 group-bys)的数据不必放入传输缓冲区. 相反, tuple 直接从发送到接收缓冲区存储. 这是一个很大的优势. 相反,如果目标 executor 在不同 worker 的同一台计算机上, 则必须执行 send - > worker transfer - > local socket - > worker recv - > exec recv buffer. 它不经过打网卡,但并不像 executor 在同一个 worker 那么大.
通常情况下,三个具有非常大的 backing cache(后备缓存)的 aggregator(聚合器)比拥有小的 backing caches(后台缓存)的二十四个 aggregators(聚合器)更好,因为这样减少了数据倾斜的影响,并提高了 LRU 效率.
最后,更少的 workers 降低了控制 flow 的难度.
Trident topology 可以像带条件路径(if-else)的 workflow(工作流)一样工作吗? 例如. 一个 Spout(S1) 连接到 bolt(b0), 其基于进入 tuple(元组)中的某些值将它们引导到 blolt(B1)或 bolt(B2),而不是两者都有.
一个 Trident 的 "each" 操作返回一个 Stream 对象, 你可以在一个变量中存储它. 然后,您可以在同一个 Stream 上运行多个 each 进行 split 拆分, 例如:
Stream s = topology.each(...).groupBy(...).aggregate(...)
Stream branch1 = s.each(..., FilterA)
Stream branch2 = s.each(..., FilterB)
你可以使用 join, merge 或 multiReduce 来 join streams.
在写入操作时,您不能向 Trident 的 emit(发射)多个输出流 -- 请参阅 STORM-68
在 Storm 的生命周期中,在执行 topology 之前,topology 被实例化,然后序列化为字节格式以存储在 ZooKeeper 中. 在此步骤中,如果 topology 中的 Spout 或 Bolt 具有初始化的不可序列化属性,序列化将会失败. 如果需要一个不序列化的字段,请在将 topology 传递给 worker 之后运行的 blot 或 spout 的 prepare 方法中进行初始化.
trident-spout 实际运行在 storm bolt 之内. trident topology 的 storm-spout 是 MasterBatchCoordinator -- 它协调了 trident batches,无论您使用什么 spout 都是一样的. 当 MBC 为每个 spout-coordinators 分配一个 seed tuple(种子元组)时,batch 就诞生了. spout-coordinator bolts 知道您特定的 spouts 应该如何配合 -- 所以在 kafka 的场景中, 这有助于找出每个 spout 应该从哪个 partition 和 offset 进行 pull 操作.
You should only store static data, and as little of it as possible, into the metadata record (note: maybe you can store more interesting things; you shouldn't, though)
Since the MBC is the actual spout, all the tuples in a batch are just members of its tupletree. That means storm's "max spout pending" config effectively defines the number of concurrent batches trident runs. The MBC emits a new batch if it has fewer than max-spending tuples pending and if at least one trident batch interval's worth of seconds has passed since the last batch.
由于 MBC 是实际的 spout,所以一个 batch 中的所有 tuple 只是它的 tupletree 的成员. 这意味着 storm 的 "max spout pending" 配置有效地定义了并发 batch trident 运行的次数.
Yes, there's a pluggable "spout wait strategy"; the default is to sleep for a configurable amount of time
你知道 486 时代的电脑有一个 turbo button 吗?
Actually, it has two practical uses. One is to throttle spouts that poll a remote source without throttling processing. For example, we have a spout that looks in a given S3 bucket for a new batch-uploaded file to read, linebreak and emit. We don't want it hitting S3 more than every few seconds: files don't show up more than once every few minutes, and a batch takes a few seconds to process.
The other is to limit overpressure on the internal queues during startup or under a heavy burst load -- if the spouts spring to life and suddenly jam ten batches' worth of records into the system, you could have a mass of less-urgent tuples from batch 7 clog up the transfer buffer and prevent the $commit tuple from batch 3 to get through (or even just the regular old tuples from batch 3). What we do is set the trident batch interval to about half the typical end-to-end processing latency -- if it takes 600ms to process a batch, it's OK to only kick off a batch every 300ms.
Note that this is a cap, not an additional delay -- with a period of 300ms, if your batch takes 258ms Trident will only delay an additional 42ms.
Trident 不对 batch 数量设置自己的限制. 在 Kafka spout 的场景中,最大抓取的字节大小初一平均的记录大小定义了每个子分区的有效记录.
trident batch 是一个有点过载的设施. 与 partition(分区)数量一起,batch 大小受限于或用于定义:
一旦生成,您将无法更改总体的 batch 大小,但您可以更改 partition 数量 - 执行 shuffle,然后更改 parallelism hint(并行度)
如果您的记录具有不可变的 timestamp(时间戳),并且您想 count,average 或以其他方式将它们聚合到离散时间段中,则 Trident 是一款出色且可扩展的解决方案。
编写一个将 timestamp 转换成 time bucket 的 Each
函数: 如果 bucket 的大小是 "by hour(按小时的)" , 则时间戳 2013-08-08 12:34:56
将被映射成 2013-08-08 12:00:00
time bucket, 十二点钟以后的其它时间也是这样. 然后在那个 timebucket 上的 group(组)并使用分组的 persistentAggregate 方法. persistentAggregate 使用由数据存储支持的本地 cacheMap. 具有许多记录的 Groups 需要从数据存储器读取很少的数据, 并使用高效的批量读写. 只要您的数据供给相对较快,Trident 就可以非常有效地利用内存和网络. 即使服务器脱机一天,然后在一瞬间提供全天的数据, 旧的结果将被安静地检索和更新 -- 并且不干扰计算当前结果.
你不能知道所有的 event(事件)都被收集 -- 这是一个 epistemological challenge(认识论的挑战),而不是分布式系统的挑战. 您可以: