[ 
https://issues.apache.org/jira/browse/BEAM-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-14111:
---------------------------------
    Labels: stale-assigned  (was: )

> Legacy (SDF wrapper-based) KafkaIO reader degrades when Kafka poll latency is 
> hign
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-14111
>                 URL: https://issues.apache.org/jira/browse/BEAM-14111
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Dmitry Orlovsky
>            Assignee: Dmitry Orlovsky
>            Priority: P2
>              Labels: stale-assigned
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Beam has two KafkaIO source implementations now:
>  * a modern one implemented as a Splittable DoFn (SDF), and
>  * a (deprecated) legacy one implemented as an SDF wrapper over an 
> UnboundedSource and KafkaUnboundedReader classes.
> We found that the legacy KafkaIO source can not provide good throughput when 
> the latency of calls to Kafka 
> [Consumer.poll|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L523]
>  becomes high. The degradation is very sharp: a pipeline that drops elements 
> immediately after reading them from source was only able to read about 
> 100-1000 qps per Kafka partition. The Kafka cluster was overprovisioned but 
> was in a remote network and had poll latency about 30ms.
> First problem that may be addressed in the scope of this bug is that there's 
> very little visibility into the Kafka source now. We had to add extra logging 
> to understand the issue with the pipeline above, or even see the poll latency.
> We believe that the cause of throughput degradation is poor choice of the 
> [RECORDS_DEQUEUE_POLL_TIMEOUT and 
> RECORDS_ENQUEUE_POLL_TIMEOUT|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335]
>  especially the former one which is now 10ms.
> These are timeouts for popping and pushing elements from/to the 
> [availableRecordsQueue 
> |https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L343].
>   This is a synchronous queue (i.e. blocking, without buffering) used to hand 
> records fetched from Kafka between two loops:
> * The 
> [consumerPollLoop|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L515]
>  that polls data via a Kafka Consumer if there's no pending data already, and 
> offers it to the availableRecordsQueue otherwise. It also does offset 
> checkpointing but this is irrelevant to our case.
> * The beam UnboundedSourceAsSDFWrapperFn message processing loop. It's a bit 
> complicated, but the important part is that it would call the [nextBatch 
> function|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L573]
>  repeatedly until an attempt to [fetch an element from the 
> avaliableRecordsQueue|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L580]
>  times out. After the timeout, it returns the control to the worker and it 
> may take relatively long time until the loop is scheduled again.
> This is what we think is happening when the poll latency is high:
> * consumerPollLoop fetches data bundle from Kafka via poll() and offers it to 
> the avaliableRecordsQueue
> * message processing loop fetches bundle from avaliableRecordsQueue and 
> unblocks the consumerPollLoop
> * consumerPollLoop calls poll() again
> * message processing loop completes processing the bundle BEFORE the poll() 
> call above completes, and tries to fetch next bundle from 
> avaliableRecordsQueue.
> * fetch from avaliableRecordsQueue has a very short timeout (10ms) and if it 
> expires before the pending poll() in the consumerPollLoop completes the 
> message processing loop will believe there's no fresh data in Kafka and exit. 
> All the time until the message processing loop is rescheduled is wasted.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to