Cool. Thanks for cycling back. What you describe verifies my suspicion.
Your patch make sense.

Do you want to create a JIRA and provide a PR to fix this? We could
include this into 0.11.0.2 that we plan to release soon (so if you want
to contribute the patch, please do it right away -- we want to cut the
first RC for 0.11.0.2 this Friday). If you don't want to do it, it's
also fine, and I (or someone else) can do it. Please let us know.

For 0.11.0.1, the workaround would be to not use transactions when you
write the topic that you read as global state/KTable.

Thanks a lot for reporting the issue and helping to nail it down!


-Matthias

On 11/8/17 3:13 AM, Alex Good wrote:
> Previously deleting and recreating the topic has solved the problem.
> 
> Based on what you've said about the offset correction I did a quick test by
> building kafka streams myself with the following code in
> `GlobalStateManagerImpl#restoreState()`
> 
>     while (offset < highWatermark) {
>         final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>         final List<KeyValue<byte[], byte[]>> restoreRecords = new
> ArrayList<>();
>         for (ConsumerRecord<byte[], byte[]> record : records) {
>             offset = record.offset() + 1;
>             if (record.key() != null) {
>                 restoreRecords.add(KeyValue.pair(record.key(),
> record.value()));
>             }
>         }
>         stateRestoreAdapter.restoreAll(restoreRecords);
>         stateRestoreListener.onBatchRestored(topicPartition, storeName,
> offset, restoreRecords.size());
>         restoreCount += restoreRecords.size();
>         offset = consumer.position(topicPartition);
>     }
> 
> Note the recalculation of the offset using consumer position at the end of
> the loop. That fixed the issue so may serve as further verification of your
> hypothesis?
> 
> In the meantime I suppose the workaround is to not produce transactional
> messages to topics backing a GlobalKTable?
> 
> Thanks
> Alex
> 
> 
> On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Did you delete and recreate the topic of the GlobalStore?
>>
>> I did have a look into the code, too, and think there is a bug in
>> `GlobalStateManagerImpl#restoreState()`. I did some initial
>> investigation using an existing test, and the test passed without
>> transactional data but fails if the global store data is written using
>> transactions.
>>
>> Note: if transactions are available, commit markers will take "one spot"
>> in the partitions. Currently, we break the loop using consumer record
>> offset
>>
>>> offset = record.offset() + 1;
>>
>> but I think, if there is a commit marker, the highWatermark is one
>> offset larger and thus this loop never terminates. We would need to
>> update the offset using consumer position instead that should step over
>> the commit marker correctly.
>>
>> Will look into this in more detail tomorrow. Would still be valuable, if
>> you could verify my suspicion.
>>
>> Thanks!
>>
>> -Matthias
>>
>> On 11/7/17 7:01 PM, Alex Good wrote:
>>> Disabling transactions doesn't seem to have changed anything. I've had a
>>> read through the kafka streams source code, specifically the parts
>> relating
>>> to the restoration of the global stores and I can't see anything obvious
>> I
>>> should look at.
>>>
>>> @Ted will do, here's a pastebin of the most recent run
>>> https://pastebin.com/rw2WbFyt
>>>
>>> Thanks
>>> Alex
>>>
>>> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Alex:
>>>> In the future, please use pastebin if the log is not too large.
>>>>
>>>> When people find this thread in mailing list archive, the attachment
>>>> wouldn't be there.
>>>>
>>>> Thanks
>>>>
>>>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> Alex,
>>>>>
>>>>> I am not sure, but maybe it's a bug. I noticed that you read
>> transaction
>>>>> data. Can you try to write to the topic without using transactions
>>>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a
>>>>> guess that it might be related to transactions and it would be great to
>>>>> verify or rule it out.
>>>>>
>>>>> Thanks a lot!
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 11/7/17 3:15 PM, Alex Good wrote:
>>>>>> Hi All,
>>>>>>
>>>>>> I have a simple kafka streams app that seems to hang when restoring
>>>>>> state for a GlobalKTable. We're running in a test environment at the
>>>>>> moment and the topic it is loading from only has two messages in it, I
>>>>>> don't know if the very low volume of messages would affect the
>> restore?
>>>>>>
>>>>>> I've attached a log, the topic in question is invoices-state. As you
>>>> can
>>>>>> see the GlobalStreamThread appears to load the two messages in the
>>>> topic
>>>>>> and then continues to send read requests to the topic despite having
>>>>>> caught up. Any tips on debugging this would be very welcome.
>>>>>>
>>>>>> Thanks
>>>>>> Alex
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to