Thank you for details. I expect that StartBundle and FinishBundle methods are called in your DoFn (KafkaMessageExtractor). Since it’s called before your Window transform, I don’t think it should affect it.
Did you count how many records are processed by one bundle? What is actually happening inside your @ProcessElement method? It would be helpful to share a code of the class where you do your measurement, if it’s possible. > On 10 Jun 2020, at 18:39, Talat Uyarer <tuya...@paloaltonetworks.com> wrote: > > Hi, > > I check the time when StartBundle is called and do the same thing for > FinishBundle then take the difference between Start and Finish Bundle times > and report bundle latency. I put this metric on a step(KafkaMessageExtractor) > which is right after the KafkaIO step. I dont know if this is related, My > pipeline has a Windowing function and GroupIntoBatches. Windowing duration is > 10 seconds and batch size is 400. My Current traffic is 8kps. I changed the > window duration 5 seconds and 20 seconds. But it does not affect much. > > KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink > .apply(Window.<KV<String, byte[]>>into( > FixedWindows.of(Duration.standardSeconds(windowDurationSeconds))) > .triggering(Repeatedly.forever(AfterFirst.of( > AfterPane.elementCountAtLeast((int) batchSize), > AfterProcessingTime.pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(windowDurationSeconds)))) > ) > .withAllowedLateness(Duration.ZERO) > .discardingFiredPanes()) > .apply(GroupIntoBatches.ofSize(batchSize)) > > Thanks > > On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Hi Talat, > > Could you elaborate what do you mean by “opening and closing bundle”? > > Sometimes, starting a KafkaReader can take time since it will seek for a > start offset for each assigned partition but it happens only once at pipeline > start-up and mostly depends on network conditions. > >> On 9 Jun 2020, at 23:05, Talat Uyarer <tuya...@paloaltonetworks.com >> <mailto:tuya...@paloaltonetworks.com>> wrote: >> >> Hi, >> I added some metrics on a step right after KafkaIO. When I compare the read >> time difference between producer and KafkaIO it is 800ms for P99. However >> somehow that step's opening and closing bundle difference is 18 seconds for >> p99. The step itself does not do any specific thing. Do you have any idea >> why bundle latency is very high ? Where should I check or tune on KafkaIO ? >> >> Additional information I read from one topic. That topic has 15 partitions. >> Producer write in a round robin fashion. >> >> Thanks >