Cool. Thanks for cycling back. What you describe verifies my suspicion. Your patch make sense.
Do you want to create a JIRA and provide a PR to fix this? We could include this into 0.11.0.2 that we plan to release soon (so if you want to contribute the patch, please do it right away -- we want to cut the first RC for 0.11.0.2 this Friday). If you don't want to do it, it's also fine, and I (or someone else) can do it. Please let us know. For 0.11.0.1, the workaround would be to not use transactions when you write the topic that you read as global state/KTable. Thanks a lot for reporting the issue and helping to nail it down! -Matthias On 11/8/17 3:13 AM, Alex Good wrote: > Previously deleting and recreating the topic has solved the problem. > > Based on what you've said about the offset correction I did a quick test by > building kafka streams myself with the following code in > `GlobalStateManagerImpl#restoreState()` > > while (offset < highWatermark) { > final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); > final List<KeyValue<byte[], byte[]>> restoreRecords = new > ArrayList<>(); > for (ConsumerRecord<byte[], byte[]> record : records) { > offset = record.offset() + 1; > if (record.key() != null) { > restoreRecords.add(KeyValue.pair(record.key(), > record.value())); > } > } > stateRestoreAdapter.restoreAll(restoreRecords); > stateRestoreListener.onBatchRestored(topicPartition, storeName, > offset, restoreRecords.size()); > restoreCount += restoreRecords.size(); > offset = consumer.position(topicPartition); > } > > Note the recalculation of the offset using consumer position at the end of > the loop. That fixed the issue so may serve as further verification of your > hypothesis? > > In the meantime I suppose the workaround is to not produce transactional > messages to topics backing a GlobalKTable? > > Thanks > Alex > > > On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Did you delete and recreate the topic of the GlobalStore? >> >> I did have a look into the code, too, and think there is a bug in >> `GlobalStateManagerImpl#restoreState()`. I did some initial >> investigation using an existing test, and the test passed without >> transactional data but fails if the global store data is written using >> transactions. >> >> Note: if transactions are available, commit markers will take "one spot" >> in the partitions. Currently, we break the loop using consumer record >> offset >> >>> offset = record.offset() + 1; >> >> but I think, if there is a commit marker, the highWatermark is one >> offset larger and thus this loop never terminates. We would need to >> update the offset using consumer position instead that should step over >> the commit marker correctly. >> >> Will look into this in more detail tomorrow. Would still be valuable, if >> you could verify my suspicion. >> >> Thanks! >> >> -Matthias >> >> On 11/7/17 7:01 PM, Alex Good wrote: >>> Disabling transactions doesn't seem to have changed anything. I've had a >>> read through the kafka streams source code, specifically the parts >> relating >>> to the restoration of the global stores and I can't see anything obvious >> I >>> should look at. >>> >>> @Ted will do, here's a pastebin of the most recent run >>> https://pastebin.com/rw2WbFyt >>> >>> Thanks >>> Alex >>> >>> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Alex: >>>> In the future, please use pastebin if the log is not too large. >>>> >>>> When people find this thread in mailing list archive, the attachment >>>> wouldn't be there. >>>> >>>> Thanks >>>> >>>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Alex, >>>>> >>>>> I am not sure, but maybe it's a bug. I noticed that you read >> transaction >>>>> data. Can you try to write to the topic without using transactions >>>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a >>>>> guess that it might be related to transactions and it would be great to >>>>> verify or rule it out. >>>>> >>>>> Thanks a lot! >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 11/7/17 3:15 PM, Alex Good wrote: >>>>>> Hi All, >>>>>> >>>>>> I have a simple kafka streams app that seems to hang when restoring >>>>>> state for a GlobalKTable. We're running in a test environment at the >>>>>> moment and the topic it is loading from only has two messages in it, I >>>>>> don't know if the very low volume of messages would affect the >> restore? >>>>>> >>>>>> I've attached a log, the topic in question is invoices-state. As you >>>> can >>>>>> see the GlobalStreamThread appears to load the two messages in the >>>> topic >>>>>> and then continues to send read requests to the topic despite having >>>>>> caught up. Any tips on debugging this would be very welcome. >>>>>> >>>>>> Thanks >>>>>> Alex >>>>> >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature