I double checked the code and discussed with a colleague.

There are two places when we call `globalConsumer.poll()`

1. On startup, when we need to bootstrap the store. In this case, we
catch the exception and handle it.
2. During regular processing. In this case, we don't catch the exception.

The reasoning is the following: For case (1) the exception should only
happen if you start a new application of if an application was offline
for a long time. This is fine and we just make sure to bootstrap
correctly. For case (2) the consumer is at the end of the log and thus,
an InvalidOffsetException should never occur but indicate an issue the
user should be notified about.

Does this reasoning make sense?

Question: if you restart your application, does it fail again? Or does
it resume processing?

It would be good to understand the root cause. It seems, you
globalConsumer is lagging behind? Can you verify this? If yes, it seems
to make sense to stop processing to inform the user about this issue.
Would you rather prefer the application to just move on implying silent
data loss??


-Matthias


On 10/3/18 12:20 AM, Patrik Kleindl wrote:
> Hello Matthias
> Thank you for the explanation.
> 
> Version used is 2.0.0-cp1
> 
> The stacktrace:
> 2018-10-02 10:51:52,575 ERROR
> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
> (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
> [short-component-name:; transaction-id:; user-id:; creation-time:]
> global-stream-thread
> [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
> global state failed. You can restart KafkaStreams to recover from this
> error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets out of range with no configured reset policy for partitions:
> {...=51247974}
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> at
> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
> at
> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
> 
> Fetcher.parseCompletedFetch:
> 
> else if (error == Errors.OFFSET_OUT_OF_RANGE) {
>                 if (fetchOffset != subscriptions.position(tp)) {
>                     log.debug("Discarding stale fetch response for
> partition {} since the fetched offset {} " +
>                             "does not match the current offset {}", tp,
> fetchOffset, subscriptions.position(tp));
>                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
>                     log.info("Fetch offset {} is out of range for partition
> {}, resetting offset", fetchOffset, tp);
>                     subscriptions.requestOffsetReset(tp);
>                 } else {
>                     throw new
> OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
>                 }
> 
> So this means that for global/restore the exception will always be thrown
> without some special handling?
> 
> best regards
> 
> Patrik
> 
> On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matth...@confluent.io> wrote:
> 
>> It is by design to set the reset policy to "none"
>> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by
>> design to overwrite this (there might be a workaround for you though).
>> However, Streams should not die but catch the exception and recover from
>> it automatically.
>>
>> What version do you use? Can you share the full stack trace to see why
>> Streams failed to recover from this exception?
>>
>>
>> -Matthias
>>
>> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
>>> Hi
>>>
>>> We had several incidents where a streams application crashed while
>>> maintaining a global state store.
>>> Updating global state failed. You can restart KafkaStreams to recover
>> from
>>> this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>>> Offsets out of range with no configured reset policy for partitions: ...
>>>
>>> As we never set this to none I checked the code and found that
>>> StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs both
>>> set this explicitely:
>>> baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>>>
>>> The logs confirms this:
>>> 2018-10-02 11:07:06,057 INFO
>> [org.apache.kafka.common.utils.AppInfoParser]
>>> (ServerService Thread Pool -- 70) - [short-component-name:;
>>> transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
>>> 2018-10-02 11:07:06,057 INFO
>> [org.apache.kafka.common.utils.AppInfoParser]
>>> (ServerService Thread Pool -- 70) - [short-component-name:;
>>> transaction-id:; user-id:; creation-time:]  Kafka commitId :
>>> a8c648ff08b9235d
>>> 2018-10-02 11:07:06,104 INFO
>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>> creation-time:]  ConsumerConfig values:
>>> auto.commit.interval.ms = 5000
>>> auto.offset.reset = none
>>> bootstrap.servers = [...]
>>> check.crcs = true
>>> client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer
>>>
>>> ...
>>>
>>> 2018-10-02 11:07:06,418 INFO
>>> [org.apache.kafka.streams.processor.internals.StreamThread]
>> (ServerService
>>> Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>> creation-time:]  stream-thread
>>> [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating
>> restore
>>> consumer client
>>> 2018-10-02 11:07:06,419 INFO
>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
>>> creation-time:]  ConsumerConfig values:
>>> auto.commit.interval.ms = 5000
>>> auto.offset.reset = none
>>> bootstrap.servers = [...]
>>> check.crcs = true
>>> client.id =
>>> ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer
>>>
>>> Is this intentional and if yes, why can this not use the default policy
>> and
>>> recover?
>>>
>>> best regards
>>>
>>> Patrik
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to