Arvid Heise created FLINK-36434: ----------------------------------- Summary: Revise threading model of (KafkaPartition)SplitReader Key: FLINK-36434 URL: https://issues.apache.org/jira/browse/FLINK-36434 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-3.2.0 Reporter: Arvid Heise
The KafkaPartitionSplitReader is created in the source thread, where it initializes the consumer. However, it later access the consumer almost exclusively through the fetcher thread. Since the consumer is not thread-safe, this thread model looks broken. However, I'd challenge that the overall SplitReader implementation is already suboptimal as the same issue is probably happening in other connectors. I'd probably first create the fetch task and within the fetch task create the split reader. If left as-is, we can't upgrade Kafka client anymore because we receive sporadic {code:java} Caused by: org.apache.kafka.common.requests.CorrelationIdMismatchException: Correlation id for response (1179651) does not match request (0), request header: RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=kafka-source-external-context-6092797646400842179-3, correlationId=0, headerVersion=2) at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:106) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:740) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:913) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:580) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1728) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1686) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.lambda$removeEmptySplits$5(KafkaPartitionSplitReader.java:375) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.retryOnWakeup(KafkaPartitionSplitReader.java:481) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:374) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:224) at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)