Hi everyone, I need a suggestion regarding usage of the side input pattern and sliding window, especially while replaying old kafka logs/offsets.
FYI: I am running beam 2.19 on google dataflow. I have a use case where I read a continuous stream of data from Kafka and need to calculate one score (apart from other calculations) per key which is based on the number of such requests that are received per key in the last one hour. Roughly my code looks like following: PCollection<POJO> = p .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) .withTopic("app_access_stats") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) .withAllowedLateness(Duration.standardDays(380)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection<POJO>", ParDo.of(new ParseKafkaLogs())); /*** Class that handles incoming PCollection<POJO> and calculate score ***/ /**. Assume input = incoming PCollection<POJO> as created above PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap()); /**Calculate Running sum of num of reqs in sliding window Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr **/ private static class WindowedNumUserRequestsPerKey extends PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> { @Override public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) { return input .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey())); } private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, POJO>> { @ProcessElement public void processElement(@Element POJO input, OutputReceiver<KV<KEY, POJO>> out) { /** code that emits required KV ****/ } } private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn<POJO, CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { private static class TotalRequestsAccumulator implements Serializable { private long num_requests_running_sum = 0; TotalRequestsAccumulator(long num_requests_running_sum) { this.num_requests_running_sum = num_requests_running_sum; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof TotalRequestsAccumulator)) return false; TotalRequestsAccumulator that = (TotalRequestsAccumulator) o; return num_requests_running_sum == that.num_requests_running_sum; } @Override public int hashCode() { return Objects.hash(num_requests_running_sum); } } @Override public TotalRequestsAccumulator createAccumulator() { return new TotalRequestsAccumulator(0); } @Override public TotalRequestsAccumulator addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) { mutableAccumulator.num_requests_running_sum++; return mutableAccumulator; } @Override public TotalRequestsAccumulator mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) { TotalRequestsAccumulator merged = createAccumulator(); for (TotalRequestsAccumulator accumulator : accumulators) { merged.num_requests_running_sum += accumulator.num_requests_running_sum; } return merged; } @Override public Long extractOutput(TotalRequestsAccumulator accumulator) { Long totalUserRequestsPerKey = accumulator.num_requests_running_sum; return totalUserRequestsPerKey; } } } Now I calculate the score in the incoming POJO by using slidingWindowHourlyUserRequestsPerKeyView as side input. input.apply("Add_Score", ParDo.of(new AddScore()) .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); Above seems to be working fine, though I need a suggestion if there is a better way of achieving this? Also, I start getting problems when we have to stop the beam for a couple of hours for maintenance or some other issue while data is continuously being pumped in kafka. Problem: When the beam resumes after a couple of hours, suddenly the above sliding window gets bombarded with log messages and instead of honoring log's timestamp, it just treats all the log messages received in the beam's sliding window, thereby giving the wrong score. For eg, if the beam was stopped between 9 am and 11 am and there was 20 msgs between 9-10 am and 30 msgs between 10-11 am, beam on resuming at 11, will consider the total 50 msgs received in the last one hour as side input while processing all log messages between 9am and 11 am. To overcome this, I tried a few things , but every approach failed: *1. In the transformation which read message from kafka and create PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't work as I believe you can't output data with older timestamp in a live window while reading real time data stream from kafka.* *2. I thought, since the stage that add score is not honoring event's timestamp (as evident by printing window.startTime in DoFn), I added custom timestamp policy while reading logs from kafka i.e something like this:* * KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))* *where CustomFieldTimePolicy set time record timestamp based on received record's timestamp.* * On doing this, through the window.startTime printed a somewhat accurate time which was close to even't timestamp, however, "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It just stalled. My print statements were showing up in aforementioned GroupByAggregationKey(), but then no output was emitted as if the pipeline was stuck at that stage. I couldn't find any log in GCP's stackdriver indicating the reason for the stalled pipeline.* *Any help/suggestion for solving this case. This will be very useful in our replay jobs where for some reason our data sink such as elastic search gets corrupted and we want to read again all the old kafka offsets and recreate the data in the new ES cluster.* Thanks and Regards Mohil