Your allowed lateness is 360 days and since the trigger you have doesn't
emit speculative results, you'll have to wait till the watermark advances
to the end of windows timestamp + 360 days before something is output from
the grouping aggregation/available at the side input.


On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hello all,
>
> Any suggestions? Where am I going wrong or is there any better way of
> achieving this so that I can do replay as well ?
>
> Thanks
> Mohil
>
> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi everyone,
>> I need a suggestion regarding usage of the side input pattern and sliding
>> window, especially while replaying old kafka logs/offsets.
>>
>> FYI: I am running beam 2.19 on google dataflow.
>>
>> I have a use case where I read a continuous stream of data from Kafka and
>> need to calculate one score (apart from other calculations) per key  which
>> is based on the number of such requests that are received per key in the
>> last one hour.
>>
>> Roughly my code looks like following:
>>
>> PCollection<POJO> = p
>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>         .withTopic("app_access_stats")
>>         .withKeyDeserializer(StringDeserializer.class)
>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>         .withConsumerFactoryFn(consumerFactoryObj)
>>         .commitOffsetsInFinalize())
>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>         
>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>         .withAllowedLateness(Duration.standardDays(380))
>>         .discardingFiredPanes())
>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>         ParDo.of(new ParseKafkaLogs()));
>>
>>
>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>
>> /**. Assume    input = incoming PCollection<POJO> as created above
>>
>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>
>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>
>> /**Calculate Running sum of num of reqs in sliding window
>>
>>     Starting sliding window of duration 1 hr every 1 sec so that we can get 
>> accurate result of past 1 hr
>>
>> **/
>>
>>
>> private static class WindowedNumUserRequestsPerKey extends 
>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>
>>     @Override
>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>
>>         return input
>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>                 
>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>             .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>>             .apply("Total_Requests_Per_Key", Combine.perKey(new 
>> CalculateTotalUserRequestsPerKey()));
>>     }
>>
>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, 
>> POJO>> {
>>         @ProcessElement
>>         public void processElement(@Element POJO input, 
>> OutputReceiver<KV<KEY, POJO>> out) {
>>             /** code that emits required KV ****/
>>
>>         }
>>     }
>>
>>     private static class CalculateTotalUserRequestsPerKey extends 
>> Combine.CombineFn<POJO, 
>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>         private static class TotalRequestsAccumulator implements 
>> Serializable {
>>             private long num_requests_running_sum = 0;
>>
>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>                 this.num_requests_running_sum = num_requests_running_sum;
>>             }
>>
>>             @Override
>>             public boolean equals(Object o) {
>>                 if (this == o) return true;
>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>>                 return num_requests_running_sum == 
>> that.num_requests_running_sum;
>>             }
>>
>>             @Override
>>             public int hashCode() {
>>                 return Objects.hash(num_requests_running_sum);
>>             }
>>         }
>>
>>         @Override
>>         public TotalRequestsAccumulator createAccumulator() {
>>             return new TotalRequestsAccumulator(0);
>>         }
>>
>>         @Override
>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
>> mutableAccumulator, POJO input) {
>>             mutableAccumulator.num_requests_running_sum++;
>>             return mutableAccumulator;
>>         }
>>
>>         @Override
>>         public TotalRequestsAccumulator 
>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>             TotalRequestsAccumulator merged = createAccumulator();
>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>                 merged.num_requests_running_sum += 
>> accumulator.num_requests_running_sum;
>>             }
>>             return merged;
>>         }
>>
>>         @Override
>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>             Long totalUserRequestsPerKey = 
>> accumulator.num_requests_running_sum;
>>             return totalUserRequestsPerKey;
>>         }
>>     }
>> }
>>
>> Now I calculate the score in the incoming POJO by using
>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>
>> input.apply("Add_Score", ParDo.of(new AddScore())
>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>
>>
>> Above seems to be working fine, though I need a suggestion if there is a
>> better way of achieving this?
>>
>> Also, I start getting problems when we have to stop the beam for a couple
>> of hours for maintenance or some other issue while data is continuously
>> being pumped in kafka.
>>
>> Problem:
>> When the beam resumes after a couple of hours, suddenly the above sliding
>> window gets bombarded with log messages and instead of honoring log's
>> timestamp, it just treats all the log messages received in the beam's
>> sliding window, thereby giving the wrong score. For eg, if the beam was
>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>> 50 msgs received in the last one hour as side input while processing all
>> log messages between 9am and 11 am.
>>
>> To overcome this, I tried a few things , but every approach failed:
>>
>> *1. In the transformation which read message from kafka and create
>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>> work as I believe you can't output data with older timestamp in a live
>> window while reading real time data stream from kafka.*
>>
>> *2. I thought, since the stage that add score is not honoring event's
>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>> timestamp policy while reading logs from kafka i.e something like this:*
>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
>> new CustomFieldTimePolicy(previousWatermark))*
>>
>> *where CustomFieldTimePolicy set time record timestamp based on received
>> record's timestamp.*
>> *  On doing this, through the window.startTime printed a somewhat
>> accurate time which was close to even't timestamp, however,
>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>> just stalled. My print statements were showing up in
>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>> stackdriver indicating the reason for the stalled pipeline.*
>>
>> *Any help/suggestion for solving this case. This will be very useful in
>> our replay jobs where for some reason our data sink such as elastic search
>> gets corrupted and we want to read again all the old kafka offsets and
>> recreate the data in the new ES cluster.*
>>
>>
>> Thanks and Regards
>> Mohil
>>
>>
>>
>>

Reply via email to