Thanks Luke for your reply. I see. I am trying to recall why I added allowedLateness as 360 days. Anyways I will try without that.
But do you think the approach I am using to keep getting a running score in a sliding window and then using it as a side input to decorate the main log is correct ? Or I can achieve same thing is a much better and optimized way. Thanks again Mohil On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote: > Your allowed lateness is 360 days and since the trigger you have doesn't > emit speculative results, you'll have to wait till the watermark advances > to the end of windows timestamp + 360 days before something is output from > the grouping aggregation/available at the side input. > > > On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote: > >> Hello all, >> >> Any suggestions? Where am I going wrong or is there any better way of >> achieving this so that I can do replay as well ? >> >> Thanks >> Mohil >> >> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote: >> >>> Hi everyone, >>> I need a suggestion regarding usage of the side input pattern and >>> sliding window, especially while replaying old kafka logs/offsets. >>> >>> FYI: I am running beam 2.19 on google dataflow. >>> >>> I have a use case where I read a continuous stream of data from Kafka >>> and need to calculate one score (apart from other calculations) per key >>> which is based on the number of such requests that are received per key in >>> the last one hour. >>> >>> Roughly my code looks like following: >>> >>> PCollection<POJO> = p >>> .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() >>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) >>> .withTopic("app_access_stats") >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(ByteArrayDeserializer.class) >>> .withConsumerConfigUpdates(kafkaConsumerProperties) >>> .withConsumerFactoryFn(consumerFactoryObj) >>> .commitOffsetsInFinalize()) >>> .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, >>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >>> >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) >>> .withAllowedLateness(Duration.standardDays(380)) >>> .discardingFiredPanes()) >>> .apply("Convert_KafkaRecord_To_PCollection<POJO>", >>> ParDo.of(new ParseKafkaLogs())); >>> >>> >>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/ >>> >>> /**. Assume input = incoming PCollection<POJO> as created above >>> >>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView >>> >>> = input.apply("Calculate_Total_UserRequests_Past_1Hr", new >>> WindowedNumUserRequestsPerKey()).apply(View.asMap()); >>> >>> /**Calculate Running sum of num of reqs in sliding window >>> >>> Starting sliding window of duration 1 hr every 1 sec so that we can get >>> accurate result of past 1 hr >>> >>> **/ >>> >>> >>> private static class WindowedNumUserRequestsPerKey extends >>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> { >>> >>> @Override >>> public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) { >>> >>> return input >>> .apply("Applying_Sliding_Window_1Hr_Every1sec", >>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) >>> >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) >>> .apply("Grouping_per_Key", ParDo.of(new >>> GroupByAggregationKey())) >>> .apply("Total_Requests_Per_Key", Combine.perKey(new >>> CalculateTotalUserRequestsPerKey())); >>> } >>> >>> private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, >>> POJO>> { >>> @ProcessElement >>> public void processElement(@Element POJO input, >>> OutputReceiver<KV<KEY, POJO>> out) { >>> /** code that emits required KV ****/ >>> >>> } >>> } >>> >>> private static class CalculateTotalUserRequestsPerKey extends >>> Combine.CombineFn<POJO, >>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { >>> private static class TotalRequestsAccumulator implements >>> Serializable { >>> private long num_requests_running_sum = 0; >>> >>> TotalRequestsAccumulator(long num_requests_running_sum) { >>> this.num_requests_running_sum = num_requests_running_sum; >>> } >>> >>> @Override >>> public boolean equals(Object o) { >>> if (this == o) return true; >>> if (!(o instanceof TotalRequestsAccumulator)) return false; >>> TotalRequestsAccumulator that = (TotalRequestsAccumulator) >>> o; >>> return num_requests_running_sum == >>> that.num_requests_running_sum; >>> } >>> >>> @Override >>> public int hashCode() { >>> return Objects.hash(num_requests_running_sum); >>> } >>> } >>> >>> @Override >>> public TotalRequestsAccumulator createAccumulator() { >>> return new TotalRequestsAccumulator(0); >>> } >>> >>> @Override >>> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator >>> mutableAccumulator, POJO input) { >>> mutableAccumulator.num_requests_running_sum++; >>> return mutableAccumulator; >>> } >>> >>> @Override >>> public TotalRequestsAccumulator >>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) { >>> TotalRequestsAccumulator merged = createAccumulator(); >>> for (TotalRequestsAccumulator accumulator : accumulators) { >>> merged.num_requests_running_sum += >>> accumulator.num_requests_running_sum; >>> } >>> return merged; >>> } >>> >>> @Override >>> public Long extractOutput(TotalRequestsAccumulator accumulator) { >>> Long totalUserRequestsPerKey = >>> accumulator.num_requests_running_sum; >>> return totalUserRequestsPerKey; >>> } >>> } >>> } >>> >>> Now I calculate the score in the incoming POJO by using >>> slidingWindowHourlyUserRequestsPerKeyView as side input. >>> >>> input.apply("Add_Score", ParDo.of(new AddScore()) >>> .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); >>> >>> >>> Above seems to be working fine, though I need a suggestion if there is a >>> better way of achieving this? >>> >>> Also, I start getting problems when we have to stop the beam for a >>> couple of hours for maintenance or some other issue while data is >>> continuously being pumped in kafka. >>> >>> Problem: >>> When the beam resumes after a couple of hours, suddenly the above >>> sliding window gets bombarded with log messages and instead of honoring >>> log's timestamp, it just treats all the log messages received in the beam's >>> sliding window, thereby giving the wrong score. For eg, if the beam was >>> stopped between 9 am and 11 am and there was 20 msgs between 9-10 am and >>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total >>> 50 msgs received in the last one hour as side input while processing all >>> log messages between 9am and 11 am. >>> >>> To overcome this, I tried a few things , but every approach failed: >>> >>> *1. In the transformation which read message from kafka and create >>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't >>> work as I believe you can't output data with older timestamp in a live >>> window while reading real time data stream from kafka.* >>> >>> *2. I thought, since the stage that add score is not honoring event's >>> timestamp (as evident by printing window.startTime in DoFn), I added custom >>> timestamp policy while reading logs from kafka i.e something like this:* >>> * KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) -> >>> new CustomFieldTimePolicy(previousWatermark))* >>> >>> *where CustomFieldTimePolicy set time record timestamp based on received >>> record's timestamp.* >>> * On doing this, through the window.startTime printed a somewhat >>> accurate time which was close to even't timestamp, however, >>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It >>> just stalled. My print statements were showing up in >>> aforementioned GroupByAggregationKey(), but then no output was emitted as >>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's >>> stackdriver indicating the reason for the stalled pipeline.* >>> >>> *Any help/suggestion for solving this case. This will be very useful in >>> our replay jobs where for some reason our data sink such as elastic search >>> gets corrupted and we want to read again all the old kafka offsets and >>> recreate the data in the new ES cluster.* >>> >>> >>> Thanks and Regards >>> Mohil >>> >>> >>> >>>