[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437018#comment-16437018 ]
Lingxiao WANG commented on KAFKA-6782: -------------------------------------- Yes, I think GlobalKTables will have the same problem. I'm trying to add PR and test for it. > GlobalStateStore never finishes restoring when consuming transactional > messages > ------------------------------------------------------------------------------- > > Key: KAFKA-6782 > URL: https://issues.apache.org/jira/browse/KAFKA-6782 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0, 1.0.1 > Reporter: Lingxiao WANG > Priority: Major > > Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his > proposition : > {code:java} > while (offset < highWatermark) { > final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); > for (ConsumerRecord<byte[], byte[]> record : records) { > if (record.key() != null) { > stateRestoreCallback.restore(record.key(), record.value()); > } > offset = consumer.position(topicPartition); > } > }{code} > doesn't work for me. In my situation, there is chance to have several > transaction markers appear in sequence in one partition. In this case, the > consumer is blocked and can't poll any records, and the code 'offset = > consumer.position(topicPartition)' doesn't have any opportunity to execute. > So I propose to move the code 'offset = consumer.position(topicPartition)' > outside of the cycle to guarantee that event if no records are polled, the > offset can always be updated. > {code:java} > while (offset < highWatermark) { > final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); > for (ConsumerRecord<byte[], byte[]> record : records) { > if (record.key() != null) { > stateRestoreCallback.restore(record.key(), record.value()); > } > } > offset = consumer.position(topicPartition); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)