Windowing Support in Core Storm

Storm core 支持处理落在窗口内的一组元组。窗口操作指定了一下两个参数

1.窗口的长度 - 窗口的长度或持续时间

2.滑动间隔 - 窗口滑动的时间间隔 

滑动窗口

元组被分组在窗口和每个滑动间隔窗口中。 一个元组可以属于多个窗口。

例如一个持续时间长度为 10 秒和滑动间隔 5 秒的滑动窗口。 ........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->| 窗口每5秒进行一次评估,第一个窗口中的某些元组与第二个窗口重叠。

注意:窗口第一次滑动在 t = 5s,并且将包含在前 5 秒钟内收到的事件。

Tumbling Window

元组根据时间或数量被分组在一个窗口中。任何元组只属于其中一个窗口。

例如一个持续时间长度为 5s 的 tumbling window。

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3 

窗口每五秒进行一次评估,并且没有窗口重叠。

Storm 支持指定窗口长度和滑动间隔作为元组数的计数或持续时间。 bolt 接口 IWindowedBolt 需要由窗口支持的bolts来实现。

public interface IWindowedBolt extends IComponent {
    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    /**
     * Process tuples falling within the window and optionally emit 
     * new tuples based on the tuples in the input window.
     */
    void execute(TupleWindow inputWindow);
    void cleanup();
} 

每次窗口激活时,都会调用 execute 方法。TupleWindow 的参数允许访问窗口中的当前元组,过期的元组以及自上次窗口计算后添加的新元组,这对于高效的窗口计算将是有用的。

需要窗口支持的 Bolts 一般会扩展为 BaseWindowedBolt,它有用来指定窗口长度和滑动间隔的apis.

例如

public class SlidingWindowBolt extends BaseWindowedBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
      for(Tuple tuple: inputWindow.get()) {
        // do the windowing computation
        ...
      }
      // emit the results
      collector.emit(new Values(computedValue));
    }
}

public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
     builder.setSpout("spout", new RandomSentenceSpout(), 1);
     builder.setBolt("slidingwindowbolt", 
                     new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
                     1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(1);

    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

} 

支持以下窗口配置

withWindow(Count windowLength, Count slidingInterval)
基于元组计数的滑动窗口,在多个tuples进行 `slidingInterval`滑动之后。

withWindow(Count windowLength)
基于元组计数的窗口,它与每个传入的元组一起滑动。

withWindow(Count windowLength, Duration slidingInterval)
基于元组计数的滑动窗口,在`slidingInterval`持续时间滑动之后。

withWindow(Duration windowLength, Duration slidingInterval)
基于持续时间的滑动窗口,在`slidingInterval`持续时间滑动之后。

withWindow(Duration windowLength)
基于持续时间的窗口,它与每个传入的元组一起滑动。

withWindow(Duration windowLength, Count slidingInterval)
基于时间的滑动窗口配置在`slidingInterval`多个元组之后滑动。

withTumblingWindow(BaseWindowedBolt.Count count)
计数的tumbling窗口在指定的元组数之后tumbles.

withTumblingWindow(BaseWindowedBolt.Duration duration)
基于持续时间的tumbling窗口在指定的持续时间后tumbles。 

元组时间戳和乱序元组

默认情况下,在窗口中追踪的时间戳是 bolt 处理元组的时间。窗口计算是根据正在处理的时间戳进行的。 Storm 支持基于源生成的时间戳的追踪窗口。

/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName) 

上述fieldName的值将从传入的元组中查找并考虑进行窗口计算。如果该元组中不存在该字段,将抛出异常。或者,TimestampExtractor可以用于从元组导出时间戳值(例如,从元组中的嵌套字段提取时间戳)。

/**
* Specify the timestamp extractor implementation.
*
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) 

与时间戳字段 name/extractor 一起,可以指定一个时间滞后参数,它指示具有无序时间戳的元组的最大时间限制。

/**
* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration) 

例如:如果滞后是5秒,并且元组t1到达时间戳为06:00:05没有元组可能会在早于06:00:00的元组时间戳到达。 如果一个元组在t1之后到达时间戳05:59:59,并且窗口已经移动过t1了,它将被视为迟到的元组。 默认情况下不处理迟到的元组,只需在INFO级别打印到工作日志文件。 ```java / * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. * It must be defined on a per-component basis, and in conjunction with the * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * @param streamId the name of the stream used to emit late tuples on / public BaseWindowedBolt withLateTupleStream(String streamId)

 通过指定上述 `streamId` 来更改此行为。 在这种情况下,迟到的元组将在指定的流中发出并可通过`WindowedBoltExecutor.LATE_TUPLE_FIELD` 访问
字段。

### Watermarks
为了处理具有时间戳字段的元组,storm 根据传入的元组时间戳内部计算 watermarks。Watermark 是所有输入流中最新的元组时间戳(减去滞后)的最小值。在较高级别,watermark类似于 Flink 和 Google 的 MillWheel 用于跟踪基于事件的时间戳的概念。

定期的(默认每秒),watermark时间戳被发出,如果基于元组的时间戳被使用,这被认为是窗口计算的  clock tick(时钟勾)。可以用下面的api来改变发出 watermarks 的时间间隔。

```java
/**
* Specify the watermark event generation interval. For tuple based timestamps, watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval) 

当接收到watermark时,将对所有时间戳记进行评估。

例如,考虑具有以下窗口参数基于元组的时间戳处理,

Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

|-----|-----|-----|-----|-----|-----|-----|
0     10    20    30    40    50    60    70 

当前 ts = 09:00:00

9:00:009:00:01收到的元组e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)

在time t = 09:00:01, watermark w1 = 6:00:31被发出,没有早于6:00:31的元组可以到达。

三个窗口将被评估。通过采取最早的事件时间戳(06:00:03)并基于滑动间隔(10s)计算上限来计算第一个窗口结束在 ts(06:00:10)。

  1. 5:59:50 - 06:00:10 有元组 e1, e2, e3
  2. 6:00:00 - 06:00:20 有元组 e1, e2, e3, e4
  3. 6:00:10 - 06:00:30 有元组 e4, e5

e6未被评估,因为 watermark 时间戳6:00:31比元组 ts6:00:36更旧。

9:00:019:00:02之间,接收到的元组e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)

在 time t = 09:00:02另一个 watermark w2 = 08:00:34被发出,没有元组比8:00:34更早到达。

三个窗口将被评估

  1. 6:00:20 - 06:00:40 有元组 e5, e6 (从早期批次)
  2. 6:00:30 - 06:00:50 有元组 e6 (从早期批次)
  3. 8:00:10 - 08:00:30 有元组 e7, e8, e9

e10 不被评估,因为元组 ts 8:00:39超出了watermark time 8:00:34.

窗口计算考虑时间间隔,并基于元组时间戳计算窗口。

Guarantees

storm core的窗口功能目前提供一致性保证。执行(TupleWindow inputWindow)方法发出的值将自动锁定到 inputWindow 中的所有元组。预计下游 bolts 将确认接收的元组(即从窗口 bolt 发出的元组)以完成元组树。如果不是,元组将重播,并且重新评估窗口计算。

窗口中的元组会在过期后被自动确认,即当它们在windowLength + slidingInterval之后从窗口中滑落出来。请注意,配置topology.message.timeout.secs应该远远超过基于时间窗口的windowLength + slidingInterval; 否则元组将超时并重播,并可能导致重复的评估。对于基于计数的窗口,应该调整配置,使得在超时时间段内可以接收到windowLength + slidingInterval元组。

拓扑示例

示例拓扑滑动窗口拓扑显示了如何使用apis来计算滑动窗口总和和滚动窗口平均值。