Hi everyone,
I need a suggestion regarding usage of the side input pattern and sliding
window, especially while replaying old kafka logs/offsets.

FYI: I am running beam 2.19 on google dataflow.

I have a use case where I read a continuous stream of data from Kafka and
need to calculate one score (apart from other calculations) per key  which
is based on the number of such requests that are received per key in the
last one hour.

Roughly my code looks like following:

PCollection<POJO> = p
    .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
        .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
        .withTopic("app_access_stats")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(ByteArrayDeserializer.class)
        .withConsumerConfigUpdates(kafkaConsumerProperties)
        .withConsumerFactoryFn(consumerFactoryObj)
        .commitOffsetsInFinalize())
    .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
        
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
        .withAllowedLateness(Duration.standardDays(380))
        .discardingFiredPanes())
    .apply("Convert_KafkaRecord_To_PCollection<POJO>",
        ParDo.of(new ParseKafkaLogs()));


/*** Class that handles incoming PCollection<POJO> and calculate score ***/

/**. Assume    input = incoming PCollection<POJO> as created above

PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView

   = input.apply("Calculate_Total_UserRequests_Past_1Hr", new
WindowedNumUserRequestsPerKey()).apply(View.asMap());

/**Calculate Running sum of num of reqs in sliding window

    Starting sliding window of duration 1 hr every 1 sec so that we
can get accurate result of past 1 hr

**/


private static class WindowedNumUserRequestsPerKey extends
PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {

    @Override
    public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {

        return input
            .apply("Applying_Sliding_Window_1Hr_Every1sec",
Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
            .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
            .apply("Total_Requests_Per_Key", Combine.perKey(new
CalculateTotalUserRequestsPerKey()));
    }

    private static class GroupByAggregationKey extends DoFn<POJO,
KV<KEY, POJO>> {
        @ProcessElement
        public void processElement(@Element POJO input,
OutputReceiver<KV<KEY, POJO>> out) {
            /** code that emits required KV ****/

        }
    }

    private static class CalculateTotalUserRequestsPerKey extends
Combine.CombineFn<POJO,
CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
        private static class TotalRequestsAccumulator implements Serializable {
            private long num_requests_running_sum = 0;

            TotalRequestsAccumulator(long num_requests_running_sum) {
                this.num_requests_running_sum = num_requests_running_sum;
            }

            @Override
            public boolean equals(Object o) {
                if (this == o) return true;
                if (!(o instanceof TotalRequestsAccumulator)) return false;
                TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
                return num_requests_running_sum ==
that.num_requests_running_sum;
            }

            @Override
            public int hashCode() {
                return Objects.hash(num_requests_running_sum);
            }
        }

        @Override
        public TotalRequestsAccumulator createAccumulator() {
            return new TotalRequestsAccumulator(0);
        }

        @Override
        public TotalRequestsAccumulator
addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
            mutableAccumulator.num_requests_running_sum++;
            return mutableAccumulator;
        }

        @Override
        public TotalRequestsAccumulator
mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
            TotalRequestsAccumulator merged = createAccumulator();
            for (TotalRequestsAccumulator accumulator : accumulators) {
                merged.num_requests_running_sum +=
accumulator.num_requests_running_sum;
            }
            return merged;
        }

        @Override
        public Long extractOutput(TotalRequestsAccumulator accumulator) {
            Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
            return totalUserRequestsPerKey;
        }
    }
}

Now I calculate the score in the incoming POJO by using
slidingWindowHourlyUserRequestsPerKeyView as side input.

input.apply("Add_Score", ParDo.of(new AddScore())
    .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));


Above seems to be working fine, though I need a suggestion if there is a
better way of achieving this?

Also, I start getting problems when we have to stop the beam for a couple
of hours for maintenance or some other issue while data is continuously
being pumped in kafka.

Problem:
When the beam resumes after a couple of hours, suddenly the above sliding
window gets bombarded with log messages and instead of honoring log's
timestamp, it just treats all the log messages received in the beam's
sliding window, thereby giving the wrong score. For eg, if the beam was
stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
30 msgs between 10-11 am, beam on resuming at 11, will consider the total
50 msgs received in the last one hour as side input while processing all
log messages between 9am and 11 am.

To overcome this, I tried a few things , but every approach failed:

*1. In the transformation which read message from kafka and create
PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
work as I believe you can't output data with older timestamp in a live
window while reading real time data stream from kafka.*

*2. I thought, since the stage that add score is not honoring event's
timestamp (as evident by printing window.startTime in DoFn), I added custom
timestamp policy while reading logs from kafka i.e something like this:*
*    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) -> new
CustomFieldTimePolicy(previousWatermark))*

*where CustomFieldTimePolicy set time record timestamp based on received
record's timestamp.*
*  On doing this, through the window.startTime printed a somewhat accurate
time which was close to even't timestamp, however,
"WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
just stalled. My print statements were showing up in
aforementioned GroupByAggregationKey(), but then no output was emitted as
if the pipeline was stuck at that stage. I couldn't find any log in GCP's
stackdriver indicating the reason for the stalled pipeline.*

*Any help/suggestion for solving this case. This will be very useful in our
replay jobs where for some reason our data sink such as elastic search gets
corrupted and we want to read again all the old kafka offsets and recreate
the data in the new ES cluster.*


Thanks and Regards
Mohil

Reply via email to