Saw it, thank you.
Best regards
Patrik

> Am 04.10.2018 um 17:11 schrieb Matthias J. Sax <matth...@confluent.io>:
> 
> Thanks for the details.
> 
> Seems like a fair assumption. I created a jira to track it:
> https://issues.apache.org/jira/browse/KAFKA-7480
> 
> For now, there is not much you can do, because Streams hard codes to set
> the policy to "none". Thus, a manual restart (that is gladly working as
> you confirmed) it currently the way to go.
> 
> Thanks for reporting this issue.
> 
> 
> -Matthias
> 
>> On 10/4/18 3:23 AM, Patrik Kleindl wrote:
>> Hello Matthias
>> Thanks for looking into this.
>> A restart has worked, I can confirm that.
>> Before this problem happened we had some cluster issues which are still
>> being looked into, there were some leader changes and some offset commit
>> failures.
>> The consumer should not have lagged that much behind, but I can only check
>> that at the next occurrence.
>> 
>> Does the user have any other solution available than to restart?
>> I understand the intention to "notify" the user of a potential problem, but
>> if nothing can be changed about the data loss then a warning message and
>> automatic recovery should not make things worse.
>> This would make sense as an improvement, as I understand this is not a bug
>> the case is closed for me at the moment.
>> 
>> Thanks again and best regards
>> Patrik
>> 
>>> On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax <matth...@confluent.io> wrote:
>>> 
>>> I double checked the code and discussed with a colleague.
>>> 
>>> There are two places when we call `globalConsumer.poll()`
>>> 
>>> 1. On startup, when we need to bootstrap the store. In this case, we
>>> catch the exception and handle it.
>>> 2. During regular processing. In this case, we don't catch the exception.
>>> 
>>> The reasoning is the following: For case (1) the exception should only
>>> happen if you start a new application of if an application was offline
>>> for a long time. This is fine and we just make sure to bootstrap
>>> correctly. For case (2) the consumer is at the end of the log and thus,
>>> an InvalidOffsetException should never occur but indicate an issue the
>>> user should be notified about.
>>> 
>>> Does this reasoning make sense?
>>> 
>>> Question: if you restart your application, does it fail again? Or does
>>> it resume processing?
>>> 
>>> It would be good to understand the root cause. It seems, you
>>> globalConsumer is lagging behind? Can you verify this? If yes, it seems
>>> to make sense to stop processing to inform the user about this issue.
>>> Would you rather prefer the application to just move on implying silent
>>> data loss??
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>>> On 10/3/18 12:20 AM, Patrik Kleindl wrote:
>>>> Hello Matthias
>>>> Thank you for the explanation.
>>>> 
>>>> Version used is 2.0.0-cp1
>>>> 
>>>> The stacktrace:
>>>> 2018-10-02 10:51:52,575 ERROR
>>>> 
>>> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
>>>> (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
>>>> [short-component-name:; transaction-id:; user-id:; creation-time:]
>>>> global-stream-thread
>>>> [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
>>>> global state failed. You can restart KafkaStreams to recover from this
>>>> error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>>>> Offsets out of range with no configured reset policy for partitions:
>>>> {...=51247974}
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
>>>> 
>>>> Fetcher.parseCompletedFetch:
>>>> 
>>>> else if (error == Errors.OFFSET_OUT_OF_RANGE) {
>>>>                if (fetchOffset != subscriptions.position(tp)) {
>>>>                    log.debug("Discarding stale fetch response for
>>>> partition {} since the fetched offset {} " +
>>>>                            "does not match the current offset {}", tp,
>>>> fetchOffset, subscriptions.position(tp));
>>>>                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
>>>>                    log.info("Fetch offset {} is out of range for
>>> partition
>>>> {}, resetting offset", fetchOffset, tp);
>>>>                    subscriptions.requestOffsetReset(tp);
>>>>                } else {
>>>>                    throw new
>>>> OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
>>>>                }
>>>> 
>>>> So this means that for global/restore the exception will always be thrown
>>>> without some special handling?
>>>> 
>>>> best regards
>>>> 
>>>> Patrik
>>>> 
>>>> On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>> 
>>>>> It is by design to set the reset policy to "none"
>>>>> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by
>>>>> design to overwrite this (there might be a workaround for you though).
>>>>> However, Streams should not die but catch the exception and recover from
>>>>> it automatically.
>>>>> 
>>>>> What version do you use? Can you share the full stack trace to see why
>>>>> Streams failed to recover from this exception?
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>>> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
>>>>>> Hi
>>>>>> 
>>>>>> We had several incidents where a streams application crashed while
>>>>>> maintaining a global state store.
>>>>>> Updating global state failed. You can restart KafkaStreams to recover
>>>>> from
>>>>>> this error.:
>>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>>>>>> Offsets out of range with no configured reset policy for partitions:
>>> ...
>>>>>> 
>>>>>> As we never set this to none I checked the code and found that
>>>>>> StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs
>>> both
>>>>>> set this explicitely:
>>>>>> baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>>>>>> 
>>>>>> The logs confirms this:
>>>>>> 2018-10-02 11:07:06,057 INFO
>>>>> [org.apache.kafka.common.utils.AppInfoParser]
>>>>>> (ServerService Thread Pool -- 70) - [short-component-name:;
>>>>>> transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
>>>>>> 2018-10-02 11:07:06,057 INFO
>>>>> [org.apache.kafka.common.utils.AppInfoParser]
>>>>>> (ServerService Thread Pool -- 70) - [short-component-name:;
>>>>>> transaction-id:; user-id:; creation-time:]  Kafka commitId :
>>>>>> a8c648ff08b9235d
>>>>>> 2018-10-02 11:07:06,104 INFO
>>>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService
>>> Thread
>>>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>>>>> creation-time:]  ConsumerConfig values:
>>>>>> auto.commit.interval.ms = 5000
>>>>>> auto.offset.reset = none
>>>>>> bootstrap.servers = [...]
>>>>>> check.crcs = true
>>>>>> client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer
>>>>>> 
>>>>>> ...
>>>>>> 
>>>>>> 2018-10-02 11:07:06,418 INFO
>>>>>> [org.apache.kafka.streams.processor.internals.StreamThread]
>>>>> (ServerService
>>>>>> Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>>>>> creation-time:]  stream-thread
>>>>>> [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating
>>>>> restore
>>>>>> consumer client
>>>>>> 2018-10-02 11:07:06,419 INFO
>>>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService
>>> Thread
>>>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>>>>> creation-time:]  ConsumerConfig values:
>>>>>> auto.commit.interval.ms = 5000
>>>>>> auto.offset.reset = none
>>>>>> bootstrap.servers = [...]
>>>>>> check.crcs = true
>>>>>> client.id =
>>>>>> 
>>> ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer
>>>>>> 
>>>>>> Is this intentional and if yes, why can this not use the default policy
>>>>> and
>>>>>> recover?
>>>>>> 
>>>>>> best regards
>>>>>> 
>>>>>> Patrik
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 

Reply via email to