Thank you for details. 

I expect that StartBundle and FinishBundle methods are called in your DoFn 
(KafkaMessageExtractor). Since it’s called before your Window transform, I 
don’t think it should affect it.

Did you count how many records are processed by one bundle? 
What is actually happening inside your @ProcessElement method? 

It would be helpful to share a code of the class where you do your measurement, 
if it’s possible. 


> On 10 Jun 2020, at 18:39, Talat Uyarer <tuya...@paloaltonetworks.com> wrote:
> 
> Hi,
> 
> I check the time when StartBundle is called and do the same thing for 
> FinishBundle then take the difference between Start and Finish Bundle times 
> and report bundle latency. I put this metric on a step(KafkaMessageExtractor) 
> which is right after the KafkaIO step. I dont know if this is related, My 
> pipeline has a Windowing function and GroupIntoBatches. Windowing duration is 
> 10 seconds and batch size is 400. My Current traffic is 8kps. I changed the 
> window duration 5 seconds and 20 seconds. But it does not affect much.
> 
> KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink
> .apply(Window.<KV<String, byte[]>>into(
>     FixedWindows.of(Duration.standardSeconds(windowDurationSeconds)))
>     .triggering(Repeatedly.forever(AfterFirst.of(
>         AfterPane.elementCountAtLeast((int) batchSize),
>         AfterProcessingTime.pastFirstElementInPane()
>             .plusDelayOf(Duration.standardSeconds(windowDurationSeconds))))
>     )
>     .withAllowedLateness(Duration.ZERO)
>     .discardingFiredPanes())
> .apply(GroupIntoBatches.ofSize(batchSize))
> 
> Thanks
> 
> On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Hi Talat,
> 
> Could you elaborate what do you mean by “opening and closing bundle”?
> 
> Sometimes, starting a KafkaReader can take time since it will seek for a 
> start offset for each assigned partition but it happens only once at pipeline 
> start-up and mostly depends on network conditions.
> 
>> On 9 Jun 2020, at 23:05, Talat Uyarer <tuya...@paloaltonetworks.com 
>> <mailto:tuya...@paloaltonetworks.com>> wrote:
>> 
>> Hi,
>> I added some metrics on a step right after KafkaIO. When I compare the read 
>> time difference between producer and KafkaIO it is 800ms for P99. However 
>> somehow that step's opening and closing bundle difference is 18 seconds for 
>> p99. The step itself does not do any specific thing. Do you have any idea 
>> why bundle latency is very high ? Where should I check or tune on KafkaIO ?
>> 
>> Additional information I read from one topic. That topic has 15 partitions. 
>> Producer write in a round robin fashion. 
>> 
>> Thanks
> 

Reply via email to