[ https://issues.apache.org/jira/browse/BEAM-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-14111: --------------------------------- Labels: stale-assigned (was: ) > Legacy (SDF wrapper-based) KafkaIO reader degrades when Kafka poll latency is > hign > ---------------------------------------------------------------------------------- > > Key: BEAM-14111 > URL: https://issues.apache.org/jira/browse/BEAM-14111 > Project: Beam > Issue Type: Improvement > Components: io-java-kafka > Reporter: Dmitry Orlovsky > Assignee: Dmitry Orlovsky > Priority: P2 > Labels: stale-assigned > Time Spent: 40m > Remaining Estimate: 0h > > Beam has two KafkaIO source implementations now: > * a modern one implemented as a Splittable DoFn (SDF), and > * a (deprecated) legacy one implemented as an SDF wrapper over an > UnboundedSource and KafkaUnboundedReader classes. > We found that the legacy KafkaIO source can not provide good throughput when > the latency of calls to Kafka > [Consumer.poll|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L523] > becomes high. The degradation is very sharp: a pipeline that drops elements > immediately after reading them from source was only able to read about > 100-1000 qps per Kafka partition. The Kafka cluster was overprovisioned but > was in a remote network and had poll latency about 30ms. > First problem that may be addressed in the scope of this bug is that there's > very little visibility into the Kafka source now. We had to add extra logging > to understand the issue with the pipeline above, or even see the poll latency. > We believe that the cause of throughput degradation is poor choice of the > [RECORDS_DEQUEUE_POLL_TIMEOUT and > RECORDS_ENQUEUE_POLL_TIMEOUT|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335] > especially the former one which is now 10ms. > These are timeouts for popping and pushing elements from/to the > [availableRecordsQueue > |https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L343]. > This is a synchronous queue (i.e. blocking, without buffering) used to hand > records fetched from Kafka between two loops: > * The > [consumerPollLoop|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L515] > that polls data via a Kafka Consumer if there's no pending data already, and > offers it to the availableRecordsQueue otherwise. It also does offset > checkpointing but this is irrelevant to our case. > * The beam UnboundedSourceAsSDFWrapperFn message processing loop. It's a bit > complicated, but the important part is that it would call the [nextBatch > function|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L573] > repeatedly until an attempt to [fetch an element from the > avaliableRecordsQueue|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L580] > times out. After the timeout, it returns the control to the worker and it > may take relatively long time until the loop is scheduled again. > This is what we think is happening when the poll latency is high: > * consumerPollLoop fetches data bundle from Kafka via poll() and offers it to > the avaliableRecordsQueue > * message processing loop fetches bundle from avaliableRecordsQueue and > unblocks the consumerPollLoop > * consumerPollLoop calls poll() again > * message processing loop completes processing the bundle BEFORE the poll() > call above completes, and tries to fetch next bundle from > avaliableRecordsQueue. > * fetch from avaliableRecordsQueue has a very short timeout (10ms) and if it > expires before the pending poll() in the consumerPollLoop completes the > message processing loop will believe there's no fresh data in Kafka and exit. > All the time until the message processing loop is rescheduled is wasted. -- This message was sent by Atlassian Jira (v8.20.7#820007)