Arvid Heise created FLINK-36434:
-----------------------------------

             Summary: Revise threading model of (KafkaPartition)SplitReader
                 Key: FLINK-36434
                 URL: https://issues.apache.org/jira/browse/FLINK-36434
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: kafka-3.2.0
            Reporter: Arvid Heise


The KafkaPartitionSplitReader is created in the source thread, where it 
initializes the consumer. However, it later access the consumer almost 
exclusively through the fetcher thread. Since the consumer is not thread-safe, 
this thread model looks broken.

However, I'd challenge that the overall SplitReader implementation is already 
suboptimal as the same issue is probably happening in other connectors. I'd 
probably first create the fetch task and within the fetch task create the split 
reader.

If left as-is, we can't upgrade Kafka client anymore because we receive sporadic


{code:java}
 Caused by: org.apache.kafka.common.requests.CorrelationIdMismatchException: 
Correlation id for response (1179651) does not match request (0), request 
header: RequestHeader(apiKey=API_VERSIONS, apiVersion=3, 
clientId=kafka-source-external-context-6092797646400842179-3, correlationId=0, 
headerVersion=2)
        at 
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:106)
        at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:740)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:913)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:580)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1728)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1686)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.lambda$removeEmptySplits$5(KafkaPartitionSplitReader.java:375)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.retryOnWakeup(KafkaPartitionSplitReader.java:481)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:374)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:224)
        at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        ... 6 more
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to