Issue created https://issues.apache.org/jira/browse/KAFKA-6190

I've not contributed anything to Kafka before, I've followed the guidelines
in the contributing wiki except that I put the PR into 0.11.0 rather than
trunk as that seemed to be what you were suggesting. Let me know if that's
incorrect.

Alex

On Wed, Nov 8, 2017 at 3:54 PM Alex Good <a...@makerlabs.co.uk> wrote:

> No problem, it was an interesting little bughunt. I'll create a JIRA and
> PR this afternoon/evening.
>
> Thanks
> Alex
>
> On Wed, Nov 8, 2017 at 3:05 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Cool. Thanks for cycling back. What you describe verifies my suspicion.
>> Your patch make sense.
>>
>> Do you want to create a JIRA and provide a PR to fix this? We could
>> include this into 0.11.0.2 that we plan to release soon (so if you want
>> to contribute the patch, please do it right away -- we want to cut the
>> first RC for 0.11.0.2 this Friday). If you don't want to do it, it's
>> also fine, and I (or someone else) can do it. Please let us know.
>>
>> For 0.11.0.1, the workaround would be to not use transactions when you
>> write the topic that you read as global state/KTable.
>>
>> Thanks a lot for reporting the issue and helping to nail it down!
>>
>>
>> -Matthias
>>
>> On 11/8/17 3:13 AM, Alex Good wrote:
>> > Previously deleting and recreating the topic has solved the problem.
>> >
>> > Based on what you've said about the offset correction I did a quick
>> test by
>> > building kafka streams myself with the following code in
>> > `GlobalStateManagerImpl#restoreState()`
>> >
>> >     while (offset < highWatermark) {
>> >         final ConsumerRecords<byte[], byte[]> records =
>> consumer.poll(100);
>> >         final List<KeyValue<byte[], byte[]>> restoreRecords = new
>> > ArrayList<>();
>> >         for (ConsumerRecord<byte[], byte[]> record : records) {
>> >             offset = record.offset() + 1;
>> >             if (record.key() != null) {
>> >                 restoreRecords.add(KeyValue.pair(record.key(),
>> > record.value()));
>> >             }
>> >         }
>> >         stateRestoreAdapter.restoreAll(restoreRecords);
>> >         stateRestoreListener.onBatchRestored(topicPartition, storeName,
>> > offset, restoreRecords.size());
>> >         restoreCount += restoreRecords.size();
>> >         offset = consumer.position(topicPartition);
>> >     }
>> >
>> > Note the recalculation of the offset using consumer position at the end
>> of
>> > the loop. That fixed the issue so may serve as further verification of
>> your
>> > hypothesis?
>> >
>> > In the meantime I suppose the workaround is to not produce transactional
>> > messages to topics backing a GlobalKTable?
>> >
>> > Thanks
>> > Alex
>> >
>> >
>> > On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> >> Did you delete and recreate the topic of the GlobalStore?
>> >>
>> >> I did have a look into the code, too, and think there is a bug in
>> >> `GlobalStateManagerImpl#restoreState()`. I did some initial
>> >> investigation using an existing test, and the test passed without
>> >> transactional data but fails if the global store data is written using
>> >> transactions.
>> >>
>> >> Note: if transactions are available, commit markers will take "one
>> spot"
>> >> in the partitions. Currently, we break the loop using consumer record
>> >> offset
>> >>
>> >>> offset = record.offset() + 1;
>> >>
>> >> but I think, if there is a commit marker, the highWatermark is one
>> >> offset larger and thus this loop never terminates. We would need to
>> >> update the offset using consumer position instead that should step over
>> >> the commit marker correctly.
>> >>
>> >> Will look into this in more detail tomorrow. Would still be valuable,
>> if
>> >> you could verify my suspicion.
>> >>
>> >> Thanks!
>> >>
>> >> -Matthias
>> >>
>> >> On 11/7/17 7:01 PM, Alex Good wrote:
>> >>> Disabling transactions doesn't seem to have changed anything. I've
>> had a
>> >>> read through the kafka streams source code, specifically the parts
>> >> relating
>> >>> to the restoration of the global stores and I can't see anything
>> obvious
>> >> I
>> >>> should look at.
>> >>>
>> >>> @Ted will do, here's a pastebin of the most recent run
>> >>> https://pastebin.com/rw2WbFyt
>> >>>
>> >>> Thanks
>> >>> Alex
>> >>>
>> >>> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu <yuzhih...@gmail.com> wrote:
>> >>>
>> >>>> Alex:
>> >>>> In the future, please use pastebin if the log is not too large.
>> >>>>
>> >>>> When people find this thread in mailing list archive, the attachment
>> >>>> wouldn't be there.
>> >>>>
>> >>>> Thanks
>> >>>>
>> >>>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax <
>> matth...@confluent.io>
>> >>>> wrote:
>> >>>>
>> >>>>> Alex,
>> >>>>>
>> >>>>> I am not sure, but maybe it's a bug. I noticed that you read
>> >> transaction
>> >>>>> data. Can you try to write to the topic without using transactions
>> >>>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It
>> only a
>> >>>>> guess that it might be related to transactions and it would be
>> great to
>> >>>>> verify or rule it out.
>> >>>>>
>> >>>>> Thanks a lot!
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>>
>> >>>>> On 11/7/17 3:15 PM, Alex Good wrote:
>> >>>>>> Hi All,
>> >>>>>>
>> >>>>>> I have a simple kafka streams app that seems to hang when restoring
>> >>>>>> state for a GlobalKTable. We're running in a test environment at
>> the
>> >>>>>> moment and the topic it is loading from only has two messages in
>> it, I
>> >>>>>> don't know if the very low volume of messages would affect the
>> >> restore?
>> >>>>>>
>> >>>>>> I've attached a log, the topic in question is invoices-state. As
>> you
>> >>>> can
>> >>>>>> see the GlobalStreamThread appears to load the two messages in the
>> >>>> topic
>> >>>>>> and then continues to send read requests to the topic despite
>> having
>> >>>>>> caught up. Any tips on debugging this would be very welcome.
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>> Alex
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Reply via email to