Issue created https://issues.apache.org/jira/browse/KAFKA-6190
I've not contributed anything to Kafka before, I've followed the guidelines in the contributing wiki except that I put the PR into 0.11.0 rather than trunk as that seemed to be what you were suggesting. Let me know if that's incorrect. Alex On Wed, Nov 8, 2017 at 3:54 PM Alex Good <a...@makerlabs.co.uk> wrote: > No problem, it was an interesting little bughunt. I'll create a JIRA and > PR this afternoon/evening. > > Thanks > Alex > > On Wed, Nov 8, 2017 at 3:05 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Cool. Thanks for cycling back. What you describe verifies my suspicion. >> Your patch make sense. >> >> Do you want to create a JIRA and provide a PR to fix this? We could >> include this into 0.11.0.2 that we plan to release soon (so if you want >> to contribute the patch, please do it right away -- we want to cut the >> first RC for 0.11.0.2 this Friday). If you don't want to do it, it's >> also fine, and I (or someone else) can do it. Please let us know. >> >> For 0.11.0.1, the workaround would be to not use transactions when you >> write the topic that you read as global state/KTable. >> >> Thanks a lot for reporting the issue and helping to nail it down! >> >> >> -Matthias >> >> On 11/8/17 3:13 AM, Alex Good wrote: >> > Previously deleting and recreating the topic has solved the problem. >> > >> > Based on what you've said about the offset correction I did a quick >> test by >> > building kafka streams myself with the following code in >> > `GlobalStateManagerImpl#restoreState()` >> > >> > while (offset < highWatermark) { >> > final ConsumerRecords<byte[], byte[]> records = >> consumer.poll(100); >> > final List<KeyValue<byte[], byte[]>> restoreRecords = new >> > ArrayList<>(); >> > for (ConsumerRecord<byte[], byte[]> record : records) { >> > offset = record.offset() + 1; >> > if (record.key() != null) { >> > restoreRecords.add(KeyValue.pair(record.key(), >> > record.value())); >> > } >> > } >> > stateRestoreAdapter.restoreAll(restoreRecords); >> > stateRestoreListener.onBatchRestored(topicPartition, storeName, >> > offset, restoreRecords.size()); >> > restoreCount += restoreRecords.size(); >> > offset = consumer.position(topicPartition); >> > } >> > >> > Note the recalculation of the offset using consumer position at the end >> of >> > the loop. That fixed the issue so may serve as further verification of >> your >> > hypothesis? >> > >> > In the meantime I suppose the workaround is to not produce transactional >> > messages to topics backing a GlobalKTable? >> > >> > Thanks >> > Alex >> > >> > >> > On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> >> Did you delete and recreate the topic of the GlobalStore? >> >> >> >> I did have a look into the code, too, and think there is a bug in >> >> `GlobalStateManagerImpl#restoreState()`. I did some initial >> >> investigation using an existing test, and the test passed without >> >> transactional data but fails if the global store data is written using >> >> transactions. >> >> >> >> Note: if transactions are available, commit markers will take "one >> spot" >> >> in the partitions. Currently, we break the loop using consumer record >> >> offset >> >> >> >>> offset = record.offset() + 1; >> >> >> >> but I think, if there is a commit marker, the highWatermark is one >> >> offset larger and thus this loop never terminates. We would need to >> >> update the offset using consumer position instead that should step over >> >> the commit marker correctly. >> >> >> >> Will look into this in more detail tomorrow. Would still be valuable, >> if >> >> you could verify my suspicion. >> >> >> >> Thanks! >> >> >> >> -Matthias >> >> >> >> On 11/7/17 7:01 PM, Alex Good wrote: >> >>> Disabling transactions doesn't seem to have changed anything. I've >> had a >> >>> read through the kafka streams source code, specifically the parts >> >> relating >> >>> to the restoration of the global stores and I can't see anything >> obvious >> >> I >> >>> should look at. >> >>> >> >>> @Ted will do, here's a pastebin of the most recent run >> >>> https://pastebin.com/rw2WbFyt >> >>> >> >>> Thanks >> >>> Alex >> >>> >> >>> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu <yuzhih...@gmail.com> wrote: >> >>> >> >>>> Alex: >> >>>> In the future, please use pastebin if the log is not too large. >> >>>> >> >>>> When people find this thread in mailing list archive, the attachment >> >>>> wouldn't be there. >> >>>> >> >>>> Thanks >> >>>> >> >>>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax < >> matth...@confluent.io> >> >>>> wrote: >> >>>> >> >>>>> Alex, >> >>>>> >> >>>>> I am not sure, but maybe it's a bug. I noticed that you read >> >> transaction >> >>>>> data. Can you try to write to the topic without using transactions >> >>>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It >> only a >> >>>>> guess that it might be related to transactions and it would be >> great to >> >>>>> verify or rule it out. >> >>>>> >> >>>>> Thanks a lot! >> >>>>> >> >>>>> -Matthias >> >>>>> >> >>>>> >> >>>>> On 11/7/17 3:15 PM, Alex Good wrote: >> >>>>>> Hi All, >> >>>>>> >> >>>>>> I have a simple kafka streams app that seems to hang when restoring >> >>>>>> state for a GlobalKTable. We're running in a test environment at >> the >> >>>>>> moment and the topic it is loading from only has two messages in >> it, I >> >>>>>> don't know if the very low volume of messages would affect the >> >> restore? >> >>>>>> >> >>>>>> I've attached a log, the topic in question is invoices-state. As >> you >> >>>> can >> >>>>>> see the GlobalStreamThread appears to load the two messages in the >> >>>> topic >> >>>>>> and then continues to send read requests to the topic despite >> having >> >>>>>> caught up. Any tips on debugging this would be very welcome. >> >>>>>> >> >>>>>> Thanks >> >>>>>> Alex >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> >> >> > >> >>