Storm 支持通过 JoinBolt 来 join 多个 data streams 变成一个 stream. JoinBolt
是一个 Windowed bolt。JoinBolt
会等待配置的窗口时间来匹配被join 的streams的tuples。这有助于通过窗口边界生成streams.
JoinBolt
每个进来的 data streams 必须基于一个字段进行 Field Group。stream只能使用被 FieldsGrouped的字段 join 其他stream。
考虑下面的 SQL join,设计四张表:
select userId, key4, key2, key3
from table1
inner join table2 on table2.userId = table1.key1
inner join table3 on table3.key3 = table2.userId
left join table4 on table4.key4 = table3.key3
相同的可以使用JoinBolt
,join四个spouts,生成想要的tuples:
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1
.join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1
.join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId
.leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3
.select ("userId, key4, key2, spout3:key3") // chose output fields
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("spout1", new Fields("key1") )
.fieldsGrouping("spout2", new Fields("userId") )
.fieldsGrouping("spout3", new Fields("key3") )
.fieldsGrouping("spout4", new Fields("key4") );
bolt 构造器需要两个参数.第一个参数介绍了第一个stream来自于 spout1
,并指定了通过key1来和其他 streams 连接.组件的名称必须根据直接连接 Join bolt的 spout
或者bolt来设置.这里,来自于spout1的数据必须根据 key1
来 field group。同样的,调用 leftJoin()
和 join()
方法的时候,也会通过这个字段来join.根据上面的例子,FieldGrouping 要求也适用于其他spout 的streams。第三个参数表示streams要和哪个spout的streams连接.
select()
方法用来指定 output fields。select
参数是逗号分隔的字段列表。单个字段可以通过 stream名称作为前缀,来区别不同streams中相同的字段,像这样:.select("spout3:key3, spout4:key3")
.嵌套的tuple 类型是支持的.例如,outer.inner.innermost
就是一个字段嵌套三层深度,outer
和 inner
是 Map
的类型.
join 参数中的字段不允许用 stream 名称作为前缀,但是支持嵌套字段.
上面调用 withTumblingWindow()
方法,将join 窗口配置成10分钟的翻滚窗口.由于 JoinBolt
是一个窗口 spout,我们还可以使用 withWindow
方法将其配置为滑动窗口(参考下面的提示部分).
new JoinBolt( "spout1", "key1")
.join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced
.join ( "spout3", "key3", "spout1")
为了简单起见,Storm topology(拓扑)经常使用 default
stream 。拓扑也可以使用命名的stream 而不是default
stream。为了支持这种 topology(拓扑),可以通过构造函数的第一个参数将 JoinBolt
配置为使用 stream name 而不是源组件(spout / bolt)名称:
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join("stream2", "key2")
...
第一个参数 JoinBolt.Selector.STREAM
通知 bolt stream1/2/3/4
引用 named stream (而不是上游 spouts/bolts的名称)。
以下示例从四个 spouts 连接两个命名流:
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join ("stream2", "userId", "stream1" )
.select ("userId, key1, key2")
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("bolt1", "stream1", new Fields("key1") )
.fieldsGrouping("bolt2", "stream1", new Fields("key1") )
.fieldsGrouping("bolt3", "stream2", new Fields("userId") )
.fieldsGrouping("bolt4", "stream1", new Fields("key1") );
在上述示例中,例如,spout1也可能发送其他 stream。但是连接 bolt 只是从不同的 bolts 订阅了stream1
&stream2。来自bolt1
,bolt2
和bolt4
的stream1
被视为单个 stream,并且与bolt3
相连接。
1.当前值支持 INNER 和LEFT join.
Join 可以是CPU和内存密集型.当前窗口中积累的数据越大(与窗口长度成正比),join所需要的时间就越长。滑动间隔很短(例如几秒钟)会触发频繁的连接.因此,如果使用大的窗口长度或者小的滑动间隔,则性能可能受损.
使用滑动窗口时,跨窗口重复 join 记录。这是因为使用滑动窗口时,tuples 在多个窗口中继续存在。
如果启用了消息超时,请确保超时设置(topology.message.timeout.secs)足够大以舒适地适应窗口大小,以及其他 spouts 和 bolts 的附加处理。
在最坏的情况下,连接一个具有M和N个元素的两个 streams 的窗口,可以产生每个输出元组的MxN元素,每个输出 tuple 从每个输入流锚定到一个 tuple 。这可能意味着来自JoinBolt的大量输出元组和甚至更多的ACK用于下游 spout 发出。这可能会对消息传递系统造成重大压力,如果不小心,则会大大减缓 topology(拓扑)结构。要管理消息传递子系统上的负载,建议: