Storm core 支持处理落在窗口内的一组元组。窗口操作指定了一下两个参数
1.窗口的长度 - 窗口的长度或持续时间
2.滑动间隔 - 窗口滑动的时间间隔
元组被分组在窗口和每个滑动间隔窗口中。 一个元组可以属于多个窗口。
例如一个持续时间长度为 10 秒和滑动间隔 5 秒的滑动窗口。 ........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->|
窗口每5秒进行一次评估,第一个窗口中的某些元组与第二个窗口重叠。
注意:窗口第一次滑动在 t = 5s,并且将包含在前 5 秒钟内收到的事件。
元组根据时间或数量被分组在一个窗口中。任何元组只属于其中一个窗口。
例如一个持续时间长度为 5s 的 tumbling window。
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
窗口每五秒进行一次评估,并且没有窗口重叠。
Storm 支持指定窗口长度和滑动间隔作为元组数的计数或持续时间。 bolt 接口 IWindowedBolt
需要由窗口支持的bolts来实现。
public interface IWindowedBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Process tuples falling within the window and optionally emit
* new tuples based on the tuples in the input window.
*/
void execute(TupleWindow inputWindow);
void cleanup();
}
每次窗口激活时,都会调用 execute
方法。TupleWindow 的参数允许访问窗口中的当前元组,过期的元组以及自上次窗口计算后添加的新元组,这对于高效的窗口计算将是有用的。
需要窗口支持的 Bolts 一般会扩展为 BaseWindowedBolt
,它有用来指定窗口长度和滑动间隔的apis.
例如
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
支持以下窗口配置
withWindow(Count windowLength, Count slidingInterval)
基于元组计数的滑动窗口,在多个tuples进行 `slidingInterval`滑动之后。
withWindow(Count windowLength)
基于元组计数的窗口,它与每个传入的元组一起滑动。
withWindow(Count windowLength, Duration slidingInterval)
基于元组计数的滑动窗口,在`slidingInterval`持续时间滑动之后。
withWindow(Duration windowLength, Duration slidingInterval)
基于持续时间的滑动窗口,在`slidingInterval`持续时间滑动之后。
withWindow(Duration windowLength)
基于持续时间的窗口,它与每个传入的元组一起滑动。
withWindow(Duration windowLength, Count slidingInterval)
基于时间的滑动窗口配置在`slidingInterval`多个元组之后滑动。
withTumblingWindow(BaseWindowedBolt.Count count)
计数的tumbling窗口在指定的元组数之后tumbles.
withTumblingWindow(BaseWindowedBolt.Duration duration)
基于持续时间的tumbling窗口在指定的持续时间后tumbles。
默认情况下,在窗口中追踪的时间戳是 bolt 处理元组的时间。窗口计算是根据正在处理的时间戳进行的。 Storm 支持基于源生成的时间戳的追踪窗口。
/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)
上述fieldName
的值将从传入的元组中查找并考虑进行窗口计算。如果该元组中不存在该字段,将抛出异常。或者,TimestampExtractor可以用于从元组导出时间戳值(例如,从元组中的嵌套字段提取时间戳)。
/**
* Specify the timestamp extractor implementation.
*
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
与时间戳字段 name/extractor 一起,可以指定一个时间滞后参数,它指示具有无序时间戳的元组的最大时间限制。
/**
* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)
例如:如果滞后是5秒,并且元组t1
到达时间戳为06:00:05
没有元组可能会在早于06:00:00
的元组时间戳到达。 如果一个元组在t1
之后到达时间戳05:59:59
,并且窗口已经移动过t1
了,它将被视为迟到的元组。 默认情况下不处理迟到的元组,只需在INFO级别打印到工作日志文件。 ```java / * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. * It must be defined on a per-component basis, and in conjunction with the * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * @param streamId the name of the stream used to emit late tuples on / public BaseWindowedBolt withLateTupleStream(String streamId)
通过指定上述 `streamId` 来更改此行为。 在这种情况下,迟到的元组将在指定的流中发出并可通过`WindowedBoltExecutor.LATE_TUPLE_FIELD` 访问
字段。
### Watermarks
为了处理具有时间戳字段的元组,storm 根据传入的元组时间戳内部计算 watermarks。Watermark 是所有输入流中最新的元组时间戳(减去滞后)的最小值。在较高级别,watermark类似于 Flink 和 Google 的 MillWheel 用于跟踪基于事件的时间戳的概念。
定期的(默认每秒),watermark时间戳被发出,如果基于元组的时间戳被使用,这被认为是窗口计算的 clock tick(时钟勾)。可以用下面的api来改变发出 watermarks 的时间间隔。
```java
/**
* Specify the watermark event generation interval. For tuple based timestamps, watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)
当接收到watermark时,将对所有时间戳记进行评估。
例如,考虑具有以下窗口参数基于元组的时间戳处理,
Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
|-----|-----|-----|-----|-----|-----|-----|
0 10 20 30 40 50 60 70
当前 ts = 09:00:00
在9:00:00
到9:00:01
收到的元组e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)
在time t = 09:00:01
, watermark w1 = 6:00:31
被发出,没有早于6:00:31
的元组可以到达。
三个窗口将被评估。通过采取最早的事件时间戳(06:00:03)并基于滑动间隔(10s)计算上限来计算第一个窗口结束在 ts(06:00:10)。
5:59:50 - 06:00:10
有元组 e1, e2, e36:00:00 - 06:00:20
有元组 e1, e2, e3, e46:00:10 - 06:00:30
有元组 e4, e5e6未被评估,因为 watermark 时间戳6:00:31
比元组 ts6:00:36
更旧。
在9:00:01
和 9:00:02
之间,接收到的元组e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)
在 time t = 09:00:02
另一个 watermark w2 = 08:00:34
被发出,没有元组比8:00:34
更早到达。
三个窗口将被评估
6:00:20 - 06:00:40
有元组 e5, e6 (从早期批次)6:00:30 - 06:00:50
有元组 e6 (从早期批次)8:00:10 - 08:00:30
有元组 e7, e8, e9e10 不被评估,因为元组 ts 8:00:39
超出了watermark time 8:00:34
.
窗口计算考虑时间间隔,并基于元组时间戳计算窗口。
storm core的窗口功能目前提供一致性保证。执行(TupleWindow inputWindow)
方法发出的值将自动锁定到 inputWindow 中的所有元组。预计下游 bolts 将确认接收的元组(即从窗口 bolt 发出的元组)以完成元组树。如果不是,元组将重播,并且重新评估窗口计算。
窗口中的元组会在过期后被自动确认,即当它们在windowLength + slidingInterval
之后从窗口中滑落出来。请注意,配置topology.message.timeout.secs
应该远远超过基于时间窗口的windowLength + slidingInterval
; 否则元组将超时并重播,并可能导致重复的评估。对于基于计数的窗口,应该调整配置,使得在超时时间段内可以接收到windowLength + slidingInterval
元组。
示例拓扑滑动窗口拓扑
显示了如何使用apis来计算滑动窗口总和和滚动窗口平均值。