Thanks Luke for your reply.
I see. I am trying to recall why I added allowedLateness as 360 days.
Anyways I will try without that.

But do you think the approach I am using to keep getting a running score in
a sliding window and then using it as a side input to decorate the main
log  is correct ? Or I can achieve same thing is a much better and
optimized way.

Thanks again
Mohil

On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote:

> Your allowed lateness is 360 days and since the trigger you have doesn't
> emit speculative results, you'll have to wait till the watermark advances
> to the end of windows timestamp + 360 days before something is output from
> the grouping aggregation/available at the side input.
>
>
> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hello all,
>>
>> Any suggestions? Where am I going wrong or is there any better way of
>> achieving this so that I can do replay as well ?
>>
>> Thanks
>> Mohil
>>
>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi everyone,
>>> I need a suggestion regarding usage of the side input pattern and
>>> sliding window, especially while replaying old kafka logs/offsets.
>>>
>>> FYI: I am running beam 2.19 on google dataflow.
>>>
>>> I have a use case where I read a continuous stream of data from Kafka
>>> and need to calculate one score (apart from other calculations) per key
>>> which is based on the number of such requests that are received per key in
>>> the last one hour.
>>>
>>> Roughly my code looks like following:
>>>
>>> PCollection<POJO> = p
>>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>>         .withTopic("app_access_stats")
>>>         .withKeyDeserializer(StringDeserializer.class)
>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>         .withConsumerFactoryFn(consumerFactoryObj)
>>>         .commitOffsetsInFinalize())
>>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>         
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>         .withAllowedLateness(Duration.standardDays(380))
>>>         .discardingFiredPanes())
>>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>>         ParDo.of(new ParseKafkaLogs()));
>>>
>>>
>>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>>
>>> /**. Assume    input = incoming PCollection<POJO> as created above
>>>
>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>>
>>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>
>>> /**Calculate Running sum of num of reqs in sliding window
>>>
>>>     Starting sliding window of duration 1 hr every 1 sec so that we can get 
>>> accurate result of past 1 hr
>>>
>>> **/
>>>
>>>
>>> private static class WindowedNumUserRequestsPerKey extends 
>>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>>
>>>     @Override
>>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>>
>>>         return input
>>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>                 
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>             .apply("Grouping_per_Key", ParDo.of(new 
>>> GroupByAggregationKey()))
>>>             .apply("Total_Requests_Per_Key", Combine.perKey(new 
>>> CalculateTotalUserRequestsPerKey()));
>>>     }
>>>
>>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, 
>>> POJO>> {
>>>         @ProcessElement
>>>         public void processElement(@Element POJO input, 
>>> OutputReceiver<KV<KEY, POJO>> out) {
>>>             /** code that emits required KV ****/
>>>
>>>         }
>>>     }
>>>
>>>     private static class CalculateTotalUserRequestsPerKey extends 
>>> Combine.CombineFn<POJO, 
>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>>         private static class TotalRequestsAccumulator implements 
>>> Serializable {
>>>             private long num_requests_running_sum = 0;
>>>
>>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>>                 this.num_requests_running_sum = num_requests_running_sum;
>>>             }
>>>
>>>             @Override
>>>             public boolean equals(Object o) {
>>>                 if (this == o) return true;
>>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) 
>>> o;
>>>                 return num_requests_running_sum == 
>>> that.num_requests_running_sum;
>>>             }
>>>
>>>             @Override
>>>             public int hashCode() {
>>>                 return Objects.hash(num_requests_running_sum);
>>>             }
>>>         }
>>>
>>>         @Override
>>>         public TotalRequestsAccumulator createAccumulator() {
>>>             return new TotalRequestsAccumulator(0);
>>>         }
>>>
>>>         @Override
>>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
>>> mutableAccumulator, POJO input) {
>>>             mutableAccumulator.num_requests_running_sum++;
>>>             return mutableAccumulator;
>>>         }
>>>
>>>         @Override
>>>         public TotalRequestsAccumulator 
>>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>>             TotalRequestsAccumulator merged = createAccumulator();
>>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>>                 merged.num_requests_running_sum += 
>>> accumulator.num_requests_running_sum;
>>>             }
>>>             return merged;
>>>         }
>>>
>>>         @Override
>>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>>             Long totalUserRequestsPerKey = 
>>> accumulator.num_requests_running_sum;
>>>             return totalUserRequestsPerKey;
>>>         }
>>>     }
>>> }
>>>
>>> Now I calculate the score in the incoming POJO by using
>>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>>
>>> input.apply("Add_Score", ParDo.of(new AddScore())
>>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>>
>>>
>>> Above seems to be working fine, though I need a suggestion if there is a
>>> better way of achieving this?
>>>
>>> Also, I start getting problems when we have to stop the beam for a
>>> couple of hours for maintenance or some other issue while data is
>>> continuously being pumped in kafka.
>>>
>>> Problem:
>>> When the beam resumes after a couple of hours, suddenly the above
>>> sliding window gets bombarded with log messages and instead of honoring
>>> log's timestamp, it just treats all the log messages received in the beam's
>>> sliding window, thereby giving the wrong score. For eg, if the beam was
>>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>>> 50 msgs received in the last one hour as side input while processing all
>>> log messages between 9am and 11 am.
>>>
>>> To overcome this, I tried a few things , but every approach failed:
>>>
>>> *1. In the transformation which read message from kafka and create
>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>>> work as I believe you can't output data with older timestamp in a live
>>> window while reading real time data stream from kafka.*
>>>
>>> *2. I thought, since the stage that add score is not honoring event's
>>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>>> timestamp policy while reading logs from kafka i.e something like this:*
>>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
>>> new CustomFieldTimePolicy(previousWatermark))*
>>>
>>> *where CustomFieldTimePolicy set time record timestamp based on received
>>> record's timestamp.*
>>> *  On doing this, through the window.startTime printed a somewhat
>>> accurate time which was close to even't timestamp, however,
>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>>> just stalled. My print statements were showing up in
>>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>>> stackdriver indicating the reason for the stalled pipeline.*
>>>
>>> *Any help/suggestion for solving this case. This will be very useful in
>>> our replay jobs where for some reason our data sink such as elastic search
>>> gets corrupted and we want to read again all the old kafka offsets and
>>> recreate the data in the new ES cluster.*
>>>
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>>
>>>
>>>

Reply via email to