Thanks for the details.

Seems like a fair assumption. I created a jira to track it:
https://issues.apache.org/jira/browse/KAFKA-7480

For now, there is not much you can do, because Streams hard codes to set
the policy to "none". Thus, a manual restart (that is gladly working as
you confirmed) it currently the way to go.

Thanks for reporting this issue.


-Matthias

On 10/4/18 3:23 AM, Patrik Kleindl wrote:
> Hello Matthias
> Thanks for looking into this.
> A restart has worked, I can confirm that.
> Before this problem happened we had some cluster issues which are still
> being looked into, there were some leader changes and some offset commit
> failures.
> The consumer should not have lagged that much behind, but I can only check
> that at the next occurrence.
> 
> Does the user have any other solution available than to restart?
> I understand the intention to "notify" the user of a potential problem, but
> if nothing can be changed about the data loss then a warning message and
> automatic recovery should not make things worse.
> This would make sense as an improvement, as I understand this is not a bug
> the case is closed for me at the moment.
> 
> Thanks again and best regards
> Patrik
> 
> On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax <matth...@confluent.io> wrote:
> 
>> I double checked the code and discussed with a colleague.
>>
>> There are two places when we call `globalConsumer.poll()`
>>
>> 1. On startup, when we need to bootstrap the store. In this case, we
>> catch the exception and handle it.
>> 2. During regular processing. In this case, we don't catch the exception.
>>
>> The reasoning is the following: For case (1) the exception should only
>> happen if you start a new application of if an application was offline
>> for a long time. This is fine and we just make sure to bootstrap
>> correctly. For case (2) the consumer is at the end of the log and thus,
>> an InvalidOffsetException should never occur but indicate an issue the
>> user should be notified about.
>>
>> Does this reasoning make sense?
>>
>> Question: if you restart your application, does it fail again? Or does
>> it resume processing?
>>
>> It would be good to understand the root cause. It seems, you
>> globalConsumer is lagging behind? Can you verify this? If yes, it seems
>> to make sense to stop processing to inform the user about this issue.
>> Would you rather prefer the application to just move on implying silent
>> data loss??
>>
>>
>> -Matthias
>>
>>
>> On 10/3/18 12:20 AM, Patrik Kleindl wrote:
>>> Hello Matthias
>>> Thank you for the explanation.
>>>
>>> Version used is 2.0.0-cp1
>>>
>>> The stacktrace:
>>> 2018-10-02 10:51:52,575 ERROR
>>>
>> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
>>> (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
>>> [short-component-name:; transaction-id:; user-id:; creation-time:]
>>> global-stream-thread
>>> [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
>>> global state failed. You can restart KafkaStreams to recover from this
>>> error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>>> Offsets out of range with no configured reset policy for partitions:
>>> {...=51247974}
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
>>>
>>> Fetcher.parseCompletedFetch:
>>>
>>> else if (error == Errors.OFFSET_OUT_OF_RANGE) {
>>>                 if (fetchOffset != subscriptions.position(tp)) {
>>>                     log.debug("Discarding stale fetch response for
>>> partition {} since the fetched offset {} " +
>>>                             "does not match the current offset {}", tp,
>>> fetchOffset, subscriptions.position(tp));
>>>                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
>>>                     log.info("Fetch offset {} is out of range for
>> partition
>>> {}, resetting offset", fetchOffset, tp);
>>>                     subscriptions.requestOffsetReset(tp);
>>>                 } else {
>>>                     throw new
>>> OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
>>>                 }
>>>
>>> So this means that for global/restore the exception will always be thrown
>>> without some special handling?
>>>
>>> best regards
>>>
>>> Patrik
>>>
>>> On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>>
>>>> It is by design to set the reset policy to "none"
>>>> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by
>>>> design to overwrite this (there might be a workaround for you though).
>>>> However, Streams should not die but catch the exception and recover from
>>>> it automatically.
>>>>
>>>> What version do you use? Can you share the full stack trace to see why
>>>> Streams failed to recover from this exception?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
>>>>> Hi
>>>>>
>>>>> We had several incidents where a streams application crashed while
>>>>> maintaining a global state store.
>>>>> Updating global state failed. You can restart KafkaStreams to recover
>>>> from
>>>>> this error.:
>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>>>>> Offsets out of range with no configured reset policy for partitions:
>> ...
>>>>>
>>>>> As we never set this to none I checked the code and found that
>>>>> StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs
>> both
>>>>> set this explicitely:
>>>>> baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>>>>>
>>>>> The logs confirms this:
>>>>> 2018-10-02 11:07:06,057 INFO
>>>> [org.apache.kafka.common.utils.AppInfoParser]
>>>>> (ServerService Thread Pool -- 70) - [short-component-name:;
>>>>> transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
>>>>> 2018-10-02 11:07:06,057 INFO
>>>> [org.apache.kafka.common.utils.AppInfoParser]
>>>>> (ServerService Thread Pool -- 70) - [short-component-name:;
>>>>> transaction-id:; user-id:; creation-time:]  Kafka commitId :
>>>>> a8c648ff08b9235d
>>>>> 2018-10-02 11:07:06,104 INFO
>>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService
>> Thread
>>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>>>> creation-time:]  ConsumerConfig values:
>>>>> auto.commit.interval.ms = 5000
>>>>> auto.offset.reset = none
>>>>> bootstrap.servers = [...]
>>>>> check.crcs = true
>>>>> client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer
>>>>>
>>>>> ...
>>>>>
>>>>> 2018-10-02 11:07:06,418 INFO
>>>>> [org.apache.kafka.streams.processor.internals.StreamThread]
>>>> (ServerService
>>>>> Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>>>> creation-time:]  stream-thread
>>>>> [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating
>>>> restore
>>>>> consumer client
>>>>> 2018-10-02 11:07:06,419 INFO
>>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService
>> Thread
>>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>>>> creation-time:]  ConsumerConfig values:
>>>>> auto.commit.interval.ms = 5000
>>>>> auto.offset.reset = none
>>>>> bootstrap.servers = [...]
>>>>> check.crcs = true
>>>>> client.id =
>>>>>
>> ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer
>>>>>
>>>>> Is this intentional and if yes, why can this not use the default policy
>>>> and
>>>>> recover?
>>>>>
>>>>> best regards
>>>>>
>>>>> Patrik
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to