Saw it, thank you. Best regards Patrik
> Am 04.10.2018 um 17:11 schrieb Matthias J. Sax <matth...@confluent.io>: > > Thanks for the details. > > Seems like a fair assumption. I created a jira to track it: > https://issues.apache.org/jira/browse/KAFKA-7480 > > For now, there is not much you can do, because Streams hard codes to set > the policy to "none". Thus, a manual restart (that is gladly working as > you confirmed) it currently the way to go. > > Thanks for reporting this issue. > > > -Matthias > >> On 10/4/18 3:23 AM, Patrik Kleindl wrote: >> Hello Matthias >> Thanks for looking into this. >> A restart has worked, I can confirm that. >> Before this problem happened we had some cluster issues which are still >> being looked into, there were some leader changes and some offset commit >> failures. >> The consumer should not have lagged that much behind, but I can only check >> that at the next occurrence. >> >> Does the user have any other solution available than to restart? >> I understand the intention to "notify" the user of a potential problem, but >> if nothing can be changed about the data loss then a warning message and >> automatic recovery should not make things worse. >> This would make sense as an improvement, as I understand this is not a bug >> the case is closed for me at the moment. >> >> Thanks again and best regards >> Patrik >> >>> On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> I double checked the code and discussed with a colleague. >>> >>> There are two places when we call `globalConsumer.poll()` >>> >>> 1. On startup, when we need to bootstrap the store. In this case, we >>> catch the exception and handle it. >>> 2. During regular processing. In this case, we don't catch the exception. >>> >>> The reasoning is the following: For case (1) the exception should only >>> happen if you start a new application of if an application was offline >>> for a long time. This is fine and we just make sure to bootstrap >>> correctly. For case (2) the consumer is at the end of the log and thus, >>> an InvalidOffsetException should never occur but indicate an issue the >>> user should be notified about. >>> >>> Does this reasoning make sense? >>> >>> Question: if you restart your application, does it fail again? Or does >>> it resume processing? >>> >>> It would be good to understand the root cause. It seems, you >>> globalConsumer is lagging behind? Can you verify this? If yes, it seems >>> to make sense to stop processing to inform the user about this issue. >>> Would you rather prefer the application to just move on implying silent >>> data loss?? >>> >>> >>> -Matthias >>> >>> >>>> On 10/3/18 12:20 AM, Patrik Kleindl wrote: >>>> Hello Matthias >>>> Thank you for the explanation. >>>> >>>> Version used is 2.0.0-cp1 >>>> >>>> The stacktrace: >>>> 2018-10-02 10:51:52,575 ERROR >>>> >>> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer] >>>> (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) - >>>> [short-component-name:; transaction-id:; user-id:; creation-time:] >>>> global-stream-thread >>>> [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating >>>> global state failed. You can restart KafkaStreams to recover from this >>>> error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: >>>> Offsets out of range with no configured reset policy for partitions: >>>> {...=51247974} >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) >>>> >>>> Fetcher.parseCompletedFetch: >>>> >>>> else if (error == Errors.OFFSET_OUT_OF_RANGE) { >>>> if (fetchOffset != subscriptions.position(tp)) { >>>> log.debug("Discarding stale fetch response for >>>> partition {} since the fetched offset {} " + >>>> "does not match the current offset {}", tp, >>>> fetchOffset, subscriptions.position(tp)); >>>> } else if (subscriptions.hasDefaultOffsetResetPolicy()) { >>>> log.info("Fetch offset {} is out of range for >>> partition >>>> {}, resetting offset", fetchOffset, tp); >>>> subscriptions.requestOffsetReset(tp); >>>> } else { >>>> throw new >>>> OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset)); >>>> } >>>> >>>> So this means that for global/restore the exception will always be thrown >>>> without some special handling? >>>> >>>> best regards >>>> >>>> Patrik >>>> >>>> On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>> >>>>> It is by design to set the reset policy to "none" >>>>> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by >>>>> design to overwrite this (there might be a workaround for you though). >>>>> However, Streams should not die but catch the exception and recover from >>>>> it automatically. >>>>> >>>>> What version do you use? Can you share the full stack trace to see why >>>>> Streams failed to recover from this exception? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>>> On 10/2/18 4:54 AM, Patrik Kleindl wrote: >>>>>> Hi >>>>>> >>>>>> We had several incidents where a streams application crashed while >>>>>> maintaining a global state store. >>>>>> Updating global state failed. You can restart KafkaStreams to recover >>>>> from >>>>>> this error.: >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: >>>>>> Offsets out of range with no configured reset policy for partitions: >>> ... >>>>>> >>>>>> As we never set this to none I checked the code and found that >>>>>> StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs >>> both >>>>>> set this explicitely: >>>>>> baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); >>>>>> >>>>>> The logs confirms this: >>>>>> 2018-10-02 11:07:06,057 INFO >>>>> [org.apache.kafka.common.utils.AppInfoParser] >>>>>> (ServerService Thread Pool -- 70) - [short-component-name:; >>>>>> transaction-id:; user-id:; creation-time:] Kafka version : 2.0.0-cp1 >>>>>> 2018-10-02 11:07:06,057 INFO >>>>> [org.apache.kafka.common.utils.AppInfoParser] >>>>>> (ServerService Thread Pool -- 70) - [short-component-name:; >>>>>> transaction-id:; user-id:; creation-time:] Kafka commitId : >>>>>> a8c648ff08b9235d >>>>>> 2018-10-02 11:07:06,104 INFO >>>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService >>> Thread >>>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:; >>>>>> creation-time:] ConsumerConfig values: >>>>>> auto.commit.interval.ms = 5000 >>>>>> auto.offset.reset = none >>>>>> bootstrap.servers = [...] >>>>>> check.crcs = true >>>>>> client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer >>>>>> >>>>>> ... >>>>>> >>>>>> 2018-10-02 11:07:06,418 INFO >>>>>> [org.apache.kafka.streams.processor.internals.StreamThread] >>>>> (ServerService >>>>>> Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:; >>>>>> creation-time:] stream-thread >>>>>> [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating >>>>> restore >>>>>> consumer client >>>>>> 2018-10-02 11:07:06,419 INFO >>>>>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService >>> Thread >>>>>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:; >>>>>> creation-time:] ConsumerConfig values: >>>>>> auto.commit.interval.ms = 5000 >>>>>> auto.offset.reset = none >>>>>> bootstrap.servers = [...] >>>>>> check.crcs = true >>>>>> client.id = >>>>>> >>> ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer >>>>>> >>>>>> Is this intentional and if yes, why can this not use the default policy >>>>> and >>>>>> recover? >>>>>> >>>>>> best regards >>>>>> >>>>>> Patrik >>>>>> >>>>> >>>>> >>>> >>> >>> >> >