[ 
https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lingxiao WANG updated KAFKA-6782:
---------------------------------
    Description: 
Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
solution which is below, works for the succeed transactional messages. But when 
there are aborted messages, it will be in infinite loop. Here is his 
proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
Concretely, when the consumer consume a set of aborted messages, it polls 0 
records, and the code 'offset = consumer.position(topicPartition)' doesn't have 
any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' 
outside of the cycle to guarantee that event if no records are polled, the 
offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 

  was:
Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
doesn't work for me. In my situation, there is chance to have several 
transaction markers appear in sequence in one partition. In this case, the 
consumer is blocked and can't poll any records, and the code 'offset = 
consumer.position(topicPartition)' doesn't have any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' 
outside of the cycle to guarantee that event if no records are polled, the 
offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 


> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted 
> messages
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6782
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6782
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Lingxiao WANG
>            Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
> solution which is below, works for the succeed transactional messages. But 
> when there are aborted messages, it will be in infinite loop. Here is his 
> proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 
> records, and the code 'offset = consumer.position(topicPartition)' doesn't 
> have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' 
> outside of the cycle to guarantee that event if no records are polled, the 
> offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to