Your allowed lateness is 360 days and since the trigger you have doesn't emit speculative results, you'll have to wait till the watermark advances to the end of windows timestamp + 360 days before something is output from the grouping aggregation/available at the side input.
On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote: > Hello all, > > Any suggestions? Where am I going wrong or is there any better way of > achieving this so that I can do replay as well ? > > Thanks > Mohil > > On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote: > >> Hi everyone, >> I need a suggestion regarding usage of the side input pattern and sliding >> window, especially while replaying old kafka logs/offsets. >> >> FYI: I am running beam 2.19 on google dataflow. >> >> I have a use case where I read a continuous stream of data from Kafka and >> need to calculate one score (apart from other calculations) per key which >> is based on the number of such requests that are received per key in the >> last one hour. >> >> Roughly my code looks like following: >> >> PCollection<POJO> = p >> .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() >> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) >> .withTopic("app_access_stats") >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(ByteArrayDeserializer.class) >> .withConsumerConfigUpdates(kafkaConsumerProperties) >> .withConsumerFactoryFn(consumerFactoryObj) >> .commitOffsetsInFinalize()) >> .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, >> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >> >> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) >> .withAllowedLateness(Duration.standardDays(380)) >> .discardingFiredPanes()) >> .apply("Convert_KafkaRecord_To_PCollection<POJO>", >> ParDo.of(new ParseKafkaLogs())); >> >> >> /*** Class that handles incoming PCollection<POJO> and calculate score ***/ >> >> /**. Assume input = incoming PCollection<POJO> as created above >> >> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView >> >> = input.apply("Calculate_Total_UserRequests_Past_1Hr", new >> WindowedNumUserRequestsPerKey()).apply(View.asMap()); >> >> /**Calculate Running sum of num of reqs in sliding window >> >> Starting sliding window of duration 1 hr every 1 sec so that we can get >> accurate result of past 1 hr >> >> **/ >> >> >> private static class WindowedNumUserRequestsPerKey extends >> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> { >> >> @Override >> public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) { >> >> return input >> .apply("Applying_Sliding_Window_1Hr_Every1sec", >> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) >> >> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) >> .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) >> .apply("Total_Requests_Per_Key", Combine.perKey(new >> CalculateTotalUserRequestsPerKey())); >> } >> >> private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, >> POJO>> { >> @ProcessElement >> public void processElement(@Element POJO input, >> OutputReceiver<KV<KEY, POJO>> out) { >> /** code that emits required KV ****/ >> >> } >> } >> >> private static class CalculateTotalUserRequestsPerKey extends >> Combine.CombineFn<POJO, >> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { >> private static class TotalRequestsAccumulator implements >> Serializable { >> private long num_requests_running_sum = 0; >> >> TotalRequestsAccumulator(long num_requests_running_sum) { >> this.num_requests_running_sum = num_requests_running_sum; >> } >> >> @Override >> public boolean equals(Object o) { >> if (this == o) return true; >> if (!(o instanceof TotalRequestsAccumulator)) return false; >> TotalRequestsAccumulator that = (TotalRequestsAccumulator) o; >> return num_requests_running_sum == >> that.num_requests_running_sum; >> } >> >> @Override >> public int hashCode() { >> return Objects.hash(num_requests_running_sum); >> } >> } >> >> @Override >> public TotalRequestsAccumulator createAccumulator() { >> return new TotalRequestsAccumulator(0); >> } >> >> @Override >> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator >> mutableAccumulator, POJO input) { >> mutableAccumulator.num_requests_running_sum++; >> return mutableAccumulator; >> } >> >> @Override >> public TotalRequestsAccumulator >> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) { >> TotalRequestsAccumulator merged = createAccumulator(); >> for (TotalRequestsAccumulator accumulator : accumulators) { >> merged.num_requests_running_sum += >> accumulator.num_requests_running_sum; >> } >> return merged; >> } >> >> @Override >> public Long extractOutput(TotalRequestsAccumulator accumulator) { >> Long totalUserRequestsPerKey = >> accumulator.num_requests_running_sum; >> return totalUserRequestsPerKey; >> } >> } >> } >> >> Now I calculate the score in the incoming POJO by using >> slidingWindowHourlyUserRequestsPerKeyView as side input. >> >> input.apply("Add_Score", ParDo.of(new AddScore()) >> .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); >> >> >> Above seems to be working fine, though I need a suggestion if there is a >> better way of achieving this? >> >> Also, I start getting problems when we have to stop the beam for a couple >> of hours for maintenance or some other issue while data is continuously >> being pumped in kafka. >> >> Problem: >> When the beam resumes after a couple of hours, suddenly the above sliding >> window gets bombarded with log messages and instead of honoring log's >> timestamp, it just treats all the log messages received in the beam's >> sliding window, thereby giving the wrong score. For eg, if the beam was >> stopped between 9 am and 11 am and there was 20 msgs between 9-10 am and >> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total >> 50 msgs received in the last one hour as side input while processing all >> log messages between 9am and 11 am. >> >> To overcome this, I tried a few things , but every approach failed: >> >> *1. In the transformation which read message from kafka and create >> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't >> work as I believe you can't output data with older timestamp in a live >> window while reading real time data stream from kafka.* >> >> *2. I thought, since the stage that add score is not honoring event's >> timestamp (as evident by printing window.startTime in DoFn), I added custom >> timestamp policy while reading logs from kafka i.e something like this:* >> * KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) -> >> new CustomFieldTimePolicy(previousWatermark))* >> >> *where CustomFieldTimePolicy set time record timestamp based on received >> record's timestamp.* >> * On doing this, through the window.startTime printed a somewhat >> accurate time which was close to even't timestamp, however, >> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It >> just stalled. My print statements were showing up in >> aforementioned GroupByAggregationKey(), but then no output was emitted as >> if the pipeline was stuck at that stage. I couldn't find any log in GCP's >> stackdriver indicating the reason for the stalled pipeline.* >> >> *Any help/suggestion for solving this case. This will be very useful in >> our replay jobs where for some reason our data sink such as elastic search >> gets corrupted and we want to read again all the old kafka offsets and >> recreate the data in the new ES cluster.* >> >> >> Thanks and Regards >> Mohil >> >> >> >>