Writing tests for apache beam sliding window based streaming pipeline with late event triggers

I have been working with apache beam for a few years now, from developing the most basic batch pipelines, all the way to real-time, stateful streaming analytics pipelines. In this post, I will show you to control the watermarks and test your sliding window logic while also considering late events.

If you are new to windowing in apache beam, I suggest you quickly go through the concepts before continuing with this post.

Apache Beam Sliding Window

I have developed a simple pipeline, that consumes a generic sensor data through streams, computes average every few minutes through a sliding window and emits an aggregated event.

For this example, the following applies

  • RawEvent will be the input event. It contains id (sensor id), ts (timestamp of the sensor reading) and value (the sensor reading)
  • ComputedEvent will be the output event after aggregation. It contains id (sensor id), ts (timestamp of the last event that was part of aggregation) and value (the average)
  • I will run a 3 minute sliding window, while allowing another 2 minutes for the late data to arrive
  • The window frequency will be set to 1 minute, i.e, every minute a new 3 minute sliding window is opened
  • Event frequency will be 20 seconds, which implies
    • Every minute, 3 events are pushed into the stream
    • Every window will have 3 minutes * 3 events = 9 records to aggregate
  • I will push 15 events and simulate the pipeline as if it ran for 7 minutes
  • I will set the initial sensor reading to 1 and subsequent events will increment the reading by 1, thus the last event will send the reading as 15.
    • E1 will have value 1.0, E2 will have it 2.0 and so on.
  • For the sake of simplicity, I will push data for only a single key, so I have only one sensor data to simulate

Expected best case

Assuming E as events and W as window, starting sending the events a 9:00 upto 9:05, here is what the ideal scenario would look like,

Window Events Bounds Average
W1 E1, E2, E3 8:58 - 9:01 2.0
W2 E1, E2, E3
E4, E5, E6
8:59 - 9:02 3.5
W3 E1, E2, E3
E4, E5, E6
E7, E8, E9
9:00 - 9:03 5.0
W4 E4, E5, E6
E7, E8, E9
E10 E11 E12
9:01 - 9:04 8.0
W5 E7, E8, E9
E10, E11, E12
E13, E14, E15
9:02 - 9:05 11.0
W6 E10, E11, E12
E13, E14, E15
9:03 - 9:06 12.5
W7 E13, E14, E15 9:04 - 9:07 14

Late arriving events

My window configuration looks like this,

private Window<RawEvent> configureWindow() {
  return Window.<RawEvent>into(SlidingWindows
                    .of(Duration.standardSeconds(windowDurationSeconds))
                    .every(Duration.standardSeconds(windowFrequencySeconds)))
            .withAllowedLateness(Duration.standardSeconds(allowedLatenessSeconds))
            .accumulatingFiredPanes()
            .triggering(AfterWatermark.pastEndOfWindow()
                    .withLateFirings(AfterPane.elementCountAtLeast(1)));
}

As mentioned earlier, I allow 2 minutes for late data to arrive and we say fire for every element that arrives late. That means, we have the possibility that even though window W3 fired an event and closed at 9:03, it is still possible that it could fire more events, till 9:05, to correct the previously emitted event as the data. Beyond 2 minutes, the pane is discarded and any late data is ignored for that window.

Another important configuration to note is that we are accumulating fired panes, i.e., our window data will remain in memory till the end of allowed lateness so that the average can be recomputed. If we discard fired panes, then we would get incorrect value as average as there will be only one event in the late firing window.

Test streams with controlled watermark

Keeping all the above in mind, let's start the test case development. First, let's send events for 3 minutes, thus completing atleast one full window without any issues.

TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
    .addElements(testEvents.get(0), testEvents.get(1), testEvents.get(2))
    .addElements(testEvents.get(3), testEvents.get(4), testEvents.get(5))
    .addElements(testEvents.get(6), testEvents.get(7), testEvents.get(8))
    .advanceWatermarkTo(startTime.plusMinutes(3).toInstant())

There you go, I sent 9 events, i.e, for upto 3 minutes and then advanced the watermark by 3 minutes, Windows are end exclusive, so the event occuring exactly at 9:03 will not be part of a window that ends at 9:03.

With the above, we should already have W1, W2 and W3 from the table. We would also have W4 and W5 open, but not emitting yet.

Testing late data and late firing panes

Now, I'm going to simulate a late event. The event E11, which was supposed to arrive at 9:03:20, does not arrive.

.addElements(testEvents.get(9), testEvents.get(11))
.advanceWatermarkTo(startTime.plusMinutes(4).toInstant())
.addElements(testEvents.get(12), testEvents.get(13), testEvents.get(14))
.advanceWatermarkTo(startTime.plusMinutes(6).toInstant())

As you see, I did not send testEvents.get(10), which is our E11. I advanced the watermark to 9:04, which closes the window W4 with average value as 7.625 instead of 8.0, but it still has 2 minutes to correct this value.

I then send the remaining events E13-E15 and advance the watermark to 9:06. This implies, W4 will not correct its value, its discarded now. W5 and W6 have closed too, emitting 11.0 (88/8) and 12.8 instead of emitting 11.0 (99/9) and 12.5 respectively, but they still have their late arrival periods open.

So, now let me send the missing E11 event, with event timestamp 9:03:20, but arriving in the stream at processing time of 9:06:00.

.addElements(testEvents.get(10))
.advanceWatermarkToInfinity();

After sending this event, I advance the watermark to infinity, simulating a stream that ended at time infinity. Since E11, arrives during the late data period, W5 and W6 emit events again, this time with the correct values. W7 also emits because we advanced the watermark to infinity and that caused the window to close. Thus we have all the windows.

Below is the set of events, that was expected to originate from the above simulation, 7 on time firings and 2 late firings.

List<ComputedEvent> expectedEvents = List.of(
    new ComputedEvent(id, testEvents.get(2).getValue().data().ts(), 2.0), // W1
    new ComputedEvent(id, testEvents.get(5).getValue().data().ts(), 3.5), // W2
    new ComputedEvent(id, testEvents.get(8).getValue().data().ts(), 5.0), // W3
    new ComputedEvent(id, testEvents.get(11).getValue().data().ts(), 7.625), // W4 - without E11
    new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // W5 - without E11
    new ComputedEvent(id, testEvents.get(14).getValue().data().ts(),  11.0), // W5 - corrected after E11 arrived
    new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.8), // W6 - without E11
    new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.5), // W6 - corrected after E11 arrived
    new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 14.0)); // W7

Running the test and PAsserting

Now that the simulation is done and expectations set, let's run the test pipeline and check if our sliding window logic is working as expected,

ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
        
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<ComputedEvent> computed = pipeline
        .apply("Create input stream", simulatedStream)
        .apply(average);

PAssert.that(computed)
        .containsInAnyOrder(expectedEvents);
pipeline.run();

Go ahead and run it, your next round's on me if the test doesn't pass.

The full source code is available at the git repository. Now that you know how to run these simulated tests, go ahead, change those window configurations and see how the emitted data change.