Hi, I check the time when StartBundle is called and do the same thing for FinishBundle then take the difference between Start and Finish Bundle times and report bundle latency. I put this metric on a step(KafkaMessageExtractor) which is right after the KafkaIO step. I dont know if this is related, My pipeline has a Windowing function and GroupIntoBatches. Windowing duration is 10 seconds and batch size is 400. My Current traffic is 8kps. I changed the window duration 5 seconds and 20 seconds. But it does not affect much.
KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink .apply(Window.<KV<String, byte[]>>into( FixedWindows.of(Duration.standardSeconds(windowDurationSeconds))) .triggering(Repeatedly.forever(AfterFirst.of( AfterPane.elementCountAtLeast((int) batchSize), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(windowDurationSeconds)))) ) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()) .apply(GroupIntoBatches.ofSize(batchSize)) Thanks On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Hi Talat, > > Could you elaborate what do you mean by “*opening and closing bundle*”? > > Sometimes, starting a KafkaReader can take time since it will seek for a > start offset for each assigned partition but it happens only once at > pipeline start-up and mostly depends on network conditions. > > On 9 Jun 2020, at 23:05, Talat Uyarer <tuya...@paloaltonetworks.com> > wrote: > > Hi, > I added some metrics on a step right after KafkaIO. When I compare the > read time difference between producer and KafkaIO it is 800ms for P99. > However somehow that step's opening and closing bundle difference is 18 > seconds for p99. The step itself does not do any specific thing. Do you > have any idea why bundle latency is very high ? Where should I check or > tune on KafkaIO ? > > Additional information I read from one topic. That topic has 15 > partitions. Producer write in a round robin fashion. > > Thanks > > >