Cool ..Thanks again for you help/suggestions Luke.

Regards
Mohil

On Tue, Jun 2, 2020 at 10:42 AM Luke Cwik <lc...@google.com> wrote:

> Using side inputs is fine and is a common pattern. You should take a look
> at "slowly changing side inputs"[1] as there is some example code there.
>
> 1:
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
>
> On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Thanks Luke for your reply.
>> I see. I am trying to recall why I added allowedLateness as 360 days.
>> Anyways I will try without that.
>>
>> But do you think the approach I am using to keep getting a running score
>> in a sliding window and then using it as a side input to decorate the main
>> log  is correct ? Or I can achieve same thing is a much better and
>> optimized way.
>>
>> Thanks again
>> Mohil
>>
>> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> Your allowed lateness is 360 days and since the trigger you have doesn't
>>> emit speculative results, you'll have to wait till the watermark advances
>>> to the end of windows timestamp + 360 days before something is output from
>>> the grouping aggregation/available at the side input.
>>>
>>>
>>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hello all,
>>>>
>>>> Any suggestions? Where am I going wrong or is there any better way of
>>>> achieving this so that I can do replay as well ?
>>>>
>>>> Thanks
>>>> Mohil
>>>>
>>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi everyone,
>>>>> I need a suggestion regarding usage of the side input pattern and
>>>>> sliding window, especially while replaying old kafka logs/offsets.
>>>>>
>>>>> FYI: I am running beam 2.19 on google dataflow.
>>>>>
>>>>> I have a use case where I read a continuous stream of data from Kafka
>>>>> and need to calculate one score (apart from other calculations) per key
>>>>> which is based on the number of such requests that are received per key in
>>>>> the last one hour.
>>>>>
>>>>> Roughly my code looks like following:
>>>>>
>>>>> PCollection<POJO> = p
>>>>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>>>>         .withBootstrapServers(String.join(",", 
>>>>> bootstrapServerToConnectTo))
>>>>>         .withTopic("app_access_stats")
>>>>>         .withKeyDeserializer(StringDeserializer.class)
>>>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>>>         .withConsumerFactoryFn(consumerFactoryObj)
>>>>>         .commitOffsetsInFinalize())
>>>>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
>>>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>         
>>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>>>         .withAllowedLateness(Duration.standardDays(380))
>>>>>         .discardingFiredPanes())
>>>>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>>>>         ParDo.of(new ParseKafkaLogs()));
>>>>>
>>>>>
>>>>> /*** Class that handles incoming PCollection<POJO> and calculate score 
>>>>> ***/
>>>>>
>>>>> /**. Assume    input = incoming PCollection<POJO> as created above
>>>>>
>>>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>>>>
>>>>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>>>
>>>>> /**Calculate Running sum of num of reqs in sliding window
>>>>>
>>>>>     Starting sliding window of duration 1 hr every 1 sec so that we can 
>>>>> get accurate result of past 1 hr
>>>>>
>>>>> **/
>>>>>
>>>>>
>>>>> private static class WindowedNumUserRequestsPerKey extends 
>>>>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>>>>
>>>>>     @Override
>>>>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>>>>
>>>>>         return input
>>>>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>>>>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>>>                 
>>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>>>             .apply("Grouping_per_Key", ParDo.of(new 
>>>>> GroupByAggregationKey()))
>>>>>             .apply("Total_Requests_Per_Key", Combine.perKey(new 
>>>>> CalculateTotalUserRequestsPerKey()));
>>>>>     }
>>>>>
>>>>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, 
>>>>> POJO>> {
>>>>>         @ProcessElement
>>>>>         public void processElement(@Element POJO input, 
>>>>> OutputReceiver<KV<KEY, POJO>> out) {
>>>>>             /** code that emits required KV ****/
>>>>>
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static class CalculateTotalUserRequestsPerKey extends 
>>>>> Combine.CombineFn<POJO, 
>>>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>>>>         private static class TotalRequestsAccumulator implements 
>>>>> Serializable {
>>>>>             private long num_requests_running_sum = 0;
>>>>>
>>>>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>>>>                 this.num_requests_running_sum = num_requests_running_sum;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public boolean equals(Object o) {
>>>>>                 if (this == o) return true;
>>>>>                 if (!(o instanceof TotalRequestsAccumulator)) return 
>>>>> false;
>>>>>                 TotalRequestsAccumulator that = 
>>>>> (TotalRequestsAccumulator) o;
>>>>>                 return num_requests_running_sum == 
>>>>> that.num_requests_running_sum;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public int hashCode() {
>>>>>                 return Objects.hash(num_requests_running_sum);
>>>>>             }
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public TotalRequestsAccumulator createAccumulator() {
>>>>>             return new TotalRequestsAccumulator(0);
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
>>>>> mutableAccumulator, POJO input) {
>>>>>             mutableAccumulator.num_requests_running_sum++;
>>>>>             return mutableAccumulator;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public TotalRequestsAccumulator 
>>>>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>>>>             TotalRequestsAccumulator merged = createAccumulator();
>>>>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>>>>                 merged.num_requests_running_sum += 
>>>>> accumulator.num_requests_running_sum;
>>>>>             }
>>>>>             return merged;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>>>>             Long totalUserRequestsPerKey = 
>>>>> accumulator.num_requests_running_sum;
>>>>>             return totalUserRequestsPerKey;
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> Now I calculate the score in the incoming POJO by using
>>>>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>>>>
>>>>> input.apply("Add_Score", ParDo.of(new AddScore())
>>>>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>>>>
>>>>>
>>>>> Above seems to be working fine, though I need a suggestion if there is
>>>>> a better way of achieving this?
>>>>>
>>>>> Also, I start getting problems when we have to stop the beam for a
>>>>> couple of hours for maintenance or some other issue while data is
>>>>> continuously being pumped in kafka.
>>>>>
>>>>> Problem:
>>>>> When the beam resumes after a couple of hours, suddenly the above
>>>>> sliding window gets bombarded with log messages and instead of honoring
>>>>> log's timestamp, it just treats all the log messages received in the 
>>>>> beam's
>>>>> sliding window, thereby giving the wrong score. For eg, if the beam was
>>>>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>>>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>>>>> 50 msgs received in the last one hour as side input while processing all
>>>>> log messages between 9am and 11 am.
>>>>>
>>>>> To overcome this, I tried a few things , but every approach failed:
>>>>>
>>>>> *1. In the transformation which read message from kafka and create
>>>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it 
>>>>> didn't
>>>>> work as I believe you can't output data with older timestamp in a live
>>>>> window while reading real time data stream from kafka.*
>>>>>
>>>>> *2. I thought, since the stage that add score is not honoring event's
>>>>> timestamp (as evident by printing window.startTime in DoFn), I added 
>>>>> custom
>>>>> timestamp policy while reading logs from kafka i.e something like this:*
>>>>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark)
>>>>> -> new CustomFieldTimePolicy(previousWatermark))*
>>>>>
>>>>> *where CustomFieldTimePolicy set time record timestamp based on
>>>>> received record's timestamp.*
>>>>> *  On doing this, through the window.startTime printed a somewhat
>>>>> accurate time which was close to even't timestamp, however,
>>>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>>>>> just stalled. My print statements were showing up in
>>>>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>>>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>>>>> stackdriver indicating the reason for the stalled pipeline.*
>>>>>
>>>>> *Any help/suggestion for solving this case. This will be very useful
>>>>> in our replay jobs where for some reason our data sink such as elastic
>>>>> search gets corrupted and we want to read again all the old kafka offsets
>>>>> and recreate the data in the new ES cluster.*
>>>>>
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to