Thanks for the details. Seems like a fair assumption. I created a jira to track it: https://issues.apache.org/jira/browse/KAFKA-7480
For now, there is not much you can do, because Streams hard codes to set the policy to "none". Thus, a manual restart (that is gladly working as you confirmed) it currently the way to go. Thanks for reporting this issue. -Matthias On 10/4/18 3:23 AM, Patrik Kleindl wrote: > Hello Matthias > Thanks for looking into this. > A restart has worked, I can confirm that. > Before this problem happened we had some cluster issues which are still > being looked into, there were some leader changes and some offset commit > failures. > The consumer should not have lagged that much behind, but I can only check > that at the next occurrence. > > Does the user have any other solution available than to restart? > I understand the intention to "notify" the user of a potential problem, but > if nothing can be changed about the data loss then a warning message and > automatic recovery should not make things worse. > This would make sense as an improvement, as I understand this is not a bug > the case is closed for me at the moment. > > Thanks again and best regards > Patrik > > On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax <matth...@confluent.io> wrote: > >> I double checked the code and discussed with a colleague. >> >> There are two places when we call `globalConsumer.poll()` >> >> 1. On startup, when we need to bootstrap the store. In this case, we >> catch the exception and handle it. >> 2. During regular processing. In this case, we don't catch the exception. >> >> The reasoning is the following: For case (1) the exception should only >> happen if you start a new application of if an application was offline >> for a long time. This is fine and we just make sure to bootstrap >> correctly. For case (2) the consumer is at the end of the log and thus, >> an InvalidOffsetException should never occur but indicate an issue the >> user should be notified about. >> >> Does this reasoning make sense? >> >> Question: if you restart your application, does it fail again? Or does >> it resume processing? >> >> It would be good to understand the root cause. It seems, you >> globalConsumer is lagging behind? Can you verify this? If yes, it seems >> to make sense to stop processing to inform the user about this issue. >> Would you rather prefer the application to just move on implying silent >> data loss?? >> >> >> -Matthias >> >> >> On 10/3/18 12:20 AM, Patrik Kleindl wrote: >>> Hello Matthias >>> Thank you for the explanation. >>> >>> Version used is 2.0.0-cp1 >>> >>> The stacktrace: >>> 2018-10-02 10:51:52,575 ERROR >>> >> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer] >>> (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) - >>> [short-component-name:; transaction-id:; user-id:; creation-time:] >>> global-stream-thread >>> [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating >>> global state failed. You can restart KafkaStreams to recover from this >>> error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: >>> Offsets out of range with no configured reset policy for partitions: >>> {...=51247974} >>> at >>> >> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970) >>> at >>> >> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490) >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259) >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) >>> at >>> >> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) >>> at >>> >> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) >>> >>> Fetcher.parseCompletedFetch: >>> >>> else if (error == Errors.OFFSET_OUT_OF_RANGE) { >>> if (fetchOffset != subscriptions.position(tp)) { >>> log.debug("Discarding stale fetch response for >>> partition {} since the fetched offset {} " + >>> "does not match the current offset {}", tp, >>> fetchOffset, subscriptions.position(tp)); >>> } else if (subscriptions.hasDefaultOffsetResetPolicy()) { >>> log.info("Fetch offset {} is out of range for >> partition >>> {}, resetting offset", fetchOffset, tp); >>> subscriptions.requestOffsetReset(tp); >>> } else { >>> throw new >>> OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset)); >>> } >>> >>> So this means that for global/restore the exception will always be thrown >>> without some special handling? >>> >>> best regards >>> >>> Patrik >>> >>> On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matth...@confluent.io> >> wrote: >>> >>>> It is by design to set the reset policy to "none" >>>> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by >>>> design to overwrite this (there might be a workaround for you though). >>>> However, Streams should not die but catch the exception and recover from >>>> it automatically. >>>> >>>> What version do you use? Can you share the full stack trace to see why >>>> Streams failed to recover from this exception? >>>> >>>> >>>> -Matthias >>>> >>>> On 10/2/18 4:54 AM, Patrik Kleindl wrote: >>>>> Hi >>>>> >>>>> We had several incidents where a streams application crashed while >>>>> maintaining a global state store. >>>>> Updating global state failed. You can restart KafkaStreams to recover >>>> from >>>>> this error.: >> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: >>>>> Offsets out of range with no configured reset policy for partitions: >> ... >>>>> >>>>> As we never set this to none I checked the code and found that >>>>> StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs >> both >>>>> set this explicitely: >>>>> baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); >>>>> >>>>> The logs confirms this: >>>>> 2018-10-02 11:07:06,057 INFO >>>> [org.apache.kafka.common.utils.AppInfoParser] >>>>> (ServerService Thread Pool -- 70) - [short-component-name:; >>>>> transaction-id:; user-id:; creation-time:] Kafka version : 2.0.0-cp1 >>>>> 2018-10-02 11:07:06,057 INFO >>>> [org.apache.kafka.common.utils.AppInfoParser] >>>>> (ServerService Thread Pool -- 70) - [short-component-name:; >>>>> transaction-id:; user-id:; creation-time:] Kafka commitId : >>>>> a8c648ff08b9235d >>>>> 2018-10-02 11:07:06,104 INFO >>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService >> Thread >>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:; >>>>> creation-time:] ConsumerConfig values: >>>>> auto.commit.interval.ms = 5000 >>>>> auto.offset.reset = none >>>>> bootstrap.servers = [...] >>>>> check.crcs = true >>>>> client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer >>>>> >>>>> ... >>>>> >>>>> 2018-10-02 11:07:06,418 INFO >>>>> [org.apache.kafka.streams.processor.internals.StreamThread] >>>> (ServerService >>>>> Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:; >>>>> creation-time:] stream-thread >>>>> [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating >>>> restore >>>>> consumer client >>>>> 2018-10-02 11:07:06,419 INFO >>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService >> Thread >>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:; >>>>> creation-time:] ConsumerConfig values: >>>>> auto.commit.interval.ms = 5000 >>>>> auto.offset.reset = none >>>>> bootstrap.servers = [...] >>>>> check.crcs = true >>>>> client.id = >>>>> >> ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer >>>>> >>>>> Is this intentional and if yes, why can this not use the default policy >>>> and >>>>> recover? >>>>> >>>>> best regards >>>>> >>>>> Patrik >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature