Cool ..Thanks again for you help/suggestions Luke. Regards Mohil
On Tue, Jun 2, 2020 at 10:42 AM Luke Cwik <lc...@google.com> wrote: > Using side inputs is fine and is a common pattern. You should take a look > at "slowly changing side inputs"[1] as there is some example code there. > > 1: > https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing > > On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare <mo...@prosimo.io> wrote: > >> Thanks Luke for your reply. >> I see. I am trying to recall why I added allowedLateness as 360 days. >> Anyways I will try without that. >> >> But do you think the approach I am using to keep getting a running score >> in a sliding window and then using it as a side input to decorate the main >> log is correct ? Or I can achieve same thing is a much better and >> optimized way. >> >> Thanks again >> Mohil >> >> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote: >> >>> Your allowed lateness is 360 days and since the trigger you have doesn't >>> emit speculative results, you'll have to wait till the watermark advances >>> to the end of windows timestamp + 360 days before something is output from >>> the grouping aggregation/available at the side input. >>> >>> >>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> Hello all, >>>> >>>> Any suggestions? Where am I going wrong or is there any better way of >>>> achieving this so that I can do replay as well ? >>>> >>>> Thanks >>>> Mohil >>>> >>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote: >>>> >>>>> Hi everyone, >>>>> I need a suggestion regarding usage of the side input pattern and >>>>> sliding window, especially while replaying old kafka logs/offsets. >>>>> >>>>> FYI: I am running beam 2.19 on google dataflow. >>>>> >>>>> I have a use case where I read a continuous stream of data from Kafka >>>>> and need to calculate one score (apart from other calculations) per key >>>>> which is based on the number of such requests that are received per key in >>>>> the last one hour. >>>>> >>>>> Roughly my code looks like following: >>>>> >>>>> PCollection<POJO> = p >>>>> .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() >>>>> .withBootstrapServers(String.join(",", >>>>> bootstrapServerToConnectTo)) >>>>> .withTopic("app_access_stats") >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>>> .withConsumerConfigUpdates(kafkaConsumerProperties) >>>>> .withConsumerFactoryFn(consumerFactoryObj) >>>>> .commitOffsetsInFinalize()) >>>>> .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, >>>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >>>>> >>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) >>>>> .withAllowedLateness(Duration.standardDays(380)) >>>>> .discardingFiredPanes()) >>>>> .apply("Convert_KafkaRecord_To_PCollection<POJO>", >>>>> ParDo.of(new ParseKafkaLogs())); >>>>> >>>>> >>>>> /*** Class that handles incoming PCollection<POJO> and calculate score >>>>> ***/ >>>>> >>>>> /**. Assume input = incoming PCollection<POJO> as created above >>>>> >>>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView >>>>> >>>>> = input.apply("Calculate_Total_UserRequests_Past_1Hr", new >>>>> WindowedNumUserRequestsPerKey()).apply(View.asMap()); >>>>> >>>>> /**Calculate Running sum of num of reqs in sliding window >>>>> >>>>> Starting sliding window of duration 1 hr every 1 sec so that we can >>>>> get accurate result of past 1 hr >>>>> >>>>> **/ >>>>> >>>>> >>>>> private static class WindowedNumUserRequestsPerKey extends >>>>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> { >>>>> >>>>> @Override >>>>> public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) { >>>>> >>>>> return input >>>>> .apply("Applying_Sliding_Window_1Hr_Every1sec", >>>>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) >>>>> >>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) >>>>> .apply("Grouping_per_Key", ParDo.of(new >>>>> GroupByAggregationKey())) >>>>> .apply("Total_Requests_Per_Key", Combine.perKey(new >>>>> CalculateTotalUserRequestsPerKey())); >>>>> } >>>>> >>>>> private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, >>>>> POJO>> { >>>>> @ProcessElement >>>>> public void processElement(@Element POJO input, >>>>> OutputReceiver<KV<KEY, POJO>> out) { >>>>> /** code that emits required KV ****/ >>>>> >>>>> } >>>>> } >>>>> >>>>> private static class CalculateTotalUserRequestsPerKey extends >>>>> Combine.CombineFn<POJO, >>>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { >>>>> private static class TotalRequestsAccumulator implements >>>>> Serializable { >>>>> private long num_requests_running_sum = 0; >>>>> >>>>> TotalRequestsAccumulator(long num_requests_running_sum) { >>>>> this.num_requests_running_sum = num_requests_running_sum; >>>>> } >>>>> >>>>> @Override >>>>> public boolean equals(Object o) { >>>>> if (this == o) return true; >>>>> if (!(o instanceof TotalRequestsAccumulator)) return >>>>> false; >>>>> TotalRequestsAccumulator that = >>>>> (TotalRequestsAccumulator) o; >>>>> return num_requests_running_sum == >>>>> that.num_requests_running_sum; >>>>> } >>>>> >>>>> @Override >>>>> public int hashCode() { >>>>> return Objects.hash(num_requests_running_sum); >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public TotalRequestsAccumulator createAccumulator() { >>>>> return new TotalRequestsAccumulator(0); >>>>> } >>>>> >>>>> @Override >>>>> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator >>>>> mutableAccumulator, POJO input) { >>>>> mutableAccumulator.num_requests_running_sum++; >>>>> return mutableAccumulator; >>>>> } >>>>> >>>>> @Override >>>>> public TotalRequestsAccumulator >>>>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) { >>>>> TotalRequestsAccumulator merged = createAccumulator(); >>>>> for (TotalRequestsAccumulator accumulator : accumulators) { >>>>> merged.num_requests_running_sum += >>>>> accumulator.num_requests_running_sum; >>>>> } >>>>> return merged; >>>>> } >>>>> >>>>> @Override >>>>> public Long extractOutput(TotalRequestsAccumulator accumulator) { >>>>> Long totalUserRequestsPerKey = >>>>> accumulator.num_requests_running_sum; >>>>> return totalUserRequestsPerKey; >>>>> } >>>>> } >>>>> } >>>>> >>>>> Now I calculate the score in the incoming POJO by using >>>>> slidingWindowHourlyUserRequestsPerKeyView as side input. >>>>> >>>>> input.apply("Add_Score", ParDo.of(new AddScore()) >>>>> .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); >>>>> >>>>> >>>>> Above seems to be working fine, though I need a suggestion if there is >>>>> a better way of achieving this? >>>>> >>>>> Also, I start getting problems when we have to stop the beam for a >>>>> couple of hours for maintenance or some other issue while data is >>>>> continuously being pumped in kafka. >>>>> >>>>> Problem: >>>>> When the beam resumes after a couple of hours, suddenly the above >>>>> sliding window gets bombarded with log messages and instead of honoring >>>>> log's timestamp, it just treats all the log messages received in the >>>>> beam's >>>>> sliding window, thereby giving the wrong score. For eg, if the beam was >>>>> stopped between 9 am and 11 am and there was 20 msgs between 9-10 am and >>>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total >>>>> 50 msgs received in the last one hour as side input while processing all >>>>> log messages between 9am and 11 am. >>>>> >>>>> To overcome this, I tried a few things , but every approach failed: >>>>> >>>>> *1. In the transformation which read message from kafka and create >>>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it >>>>> didn't >>>>> work as I believe you can't output data with older timestamp in a live >>>>> window while reading real time data stream from kafka.* >>>>> >>>>> *2. I thought, since the stage that add score is not honoring event's >>>>> timestamp (as evident by printing window.startTime in DoFn), I added >>>>> custom >>>>> timestamp policy while reading logs from kafka i.e something like this:* >>>>> * KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) >>>>> -> new CustomFieldTimePolicy(previousWatermark))* >>>>> >>>>> *where CustomFieldTimePolicy set time record timestamp based on >>>>> received record's timestamp.* >>>>> * On doing this, through the window.startTime printed a somewhat >>>>> accurate time which was close to even't timestamp, however, >>>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It >>>>> just stalled. My print statements were showing up in >>>>> aforementioned GroupByAggregationKey(), but then no output was emitted as >>>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's >>>>> stackdriver indicating the reason for the stalled pipeline.* >>>>> >>>>> *Any help/suggestion for solving this case. This will be very useful >>>>> in our replay jobs where for some reason our data sink such as elastic >>>>> search gets corrupted and we want to read again all the old kafka offsets >>>>> and recreate the data in the new ES cluster.* >>>>> >>>>> >>>>> Thanks and Regards >>>>> Mohil >>>>> >>>>> >>>>> >>>>>