What you said is absolutely right, and sorry I missed that part in the previous email.
I think for now it is OK to tune offsets.retention.minutes, as for the long term fix, there are some discussions on this: the retention of offsets today is not tied to whether the group has active members, and it might be more intuitive to change the behavior to "only consider expiring the offsets until the group has become empty and hence removed"/ Guozhang On Wed, Feb 22, 2017 at 2:01 PM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > Thanks Guozhang, that clarifies the Streams behavior. > > I'm imagining that a Streams application might only commit partition > offsets that have changed, and therefore a partition that is idle for > greater than offsets.retention.minutes might lose its offsets when the app > restarts. Does that seem plausible? > > That theory seems to be supported by a brief look at the code; > StreamTask#commitOffsets() sends only the consumed offsets since the > last commitOffsets > call. > > This would definitely match behavior I've been puzzled about for a while. > I deploy my Streams app, shove a lot of data at it to see how it is > performing and outputting, then go and do some more development work. > After a day or two (or maybe a weekend), I redeploy the app, and it pops > back to the beginning of all the topics., surprising the heck out of me and > making me think I broke something. :-) > > Increasing offsets.retention.minutes seems like the easy immediate fix. It > might be ideal if a Streams app kept idle offsets refreshed occasionally, > but it's not too likely to impact more realistic use-cases. > > Mathieu > > > On Wed, Feb 22, 2017 at 2:18 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Mathieu, > > > > In Streams the consumer config "enable.auto.commit" is always forced to > > false, and a separate "commit.interval.ms" is set. With that even if you > > do > > not have any data processed the commit operation will be triggered after > > that configured period of time. > > > > > > Guozhang > > > > > > On Wed, Feb 22, 2017 at 8:41 AM, Mathieu Fenniak < > > mathieu.fenn...@replicon.com> wrote: > > > > > Hi Eno, > > > > > > Thanks for the quick reply. I think that probably does match the data > > I'm > > > seeing. This surprises me a bit because my streams app was only > offline > > > for a few minutes, but ended up losing its offset. > > > > > > My interpretation is that the source partition had been idle for 24 > > hours, > > > streams doesn't commit offsets for idle partitions, and so the > > > default/unconfigured offset retention of 24 hours had expired. > > > > > > I'll work around this by bumping up my offset retention. Thanks! > > > > > > Mathieu > > > > > > > > > On Wed, Feb 22, 2017 at 9:22 AM, Eno Thereska <eno.there...@gmail.com> > > > wrote: > > > > > > > Hi Mathieu, > > > > > > > > It could be that the offset retention period has expired. See this: > > > > http://stackoverflow.com/questions/39131465/how-does- > > > > an-offset-expire-for-an-apache-kafka-consumer-group < > > > > http://stackoverflow.com/questions/39131465/how-does- > > > > an-offset-expire-for-an-apache-kafka-consumer-group> > > > > > > > > Thanks > > > > Eno > > > > > > > > > On 22 Feb 2017, at 16:08, Mathieu Fenniak < > > > mathieu.fenn...@replicon.com> > > > > wrote: > > > > > > > > > > Hey users, > > > > > > > > > > What causes delete tombstones (value=null) to be sent to the > > > > > __consumer_offsets topic? > > > > > > > > > > I'm observing that a Kafka Streams application that is restarted > > after > > > a > > > > > crash appears to be reprocessing messages from the beginning of a > > > topic. > > > > > I've dumped the __consumer_offsets topic and found that after the > > > > restart, > > > > > messages with a null value are being sent to __consumer_offsets. > > > > > > > > > > I do see that the ConsumerConfig for my StreamThread consumer has > > > > > auto.offset.reset=earliest. But my understanding of this > > configuration > > > > is > > > > > that it only applies when the offset isn't available, but there are > > > > > definitely offsets for this consumer group stored in > > > __consumer_offsets. > > > > > > > > > > Here's the consumer config for the streams app: > > > > > > > > > > ConsumerConfig values: > > > > > auto.commit.interval.ms = 5000 > > > > > auto.offset.reset = earliest > > > > > bootstrap.servers = [10.10.59.184:9092] > > > > > check.crcs = true > > > > > client.id = > > > > > timesheet-list-2d7a7f37-f41a-46b0-a1bb-d47f773012f6- > > > > StreamThread-1-consumer > > > > > connections.max.idle.ms = 540000 > > > > > enable.auto.commit = false > > > > > exclude.internal.topics = true > > > > > fetch.max.bytes = 52428800 > > > > > fetch.max.wait.ms = 500 > > > > > fetch.min.bytes = 1 > > > > > group.id = timesheet-list > > > > > heartbeat.interval.ms = 3000 > > > > > interceptor.classes = null > > > > > key.deserializer = class > > > > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > max.partition.fetch.bytes = 1048576 > > > > > max.poll.interval.ms = 1800000 > > > > > max.poll.records = 1000 > > > > > metadata.max.age.ms = 300000 > > > > > metric.reporters = [] > > > > > metrics.num.samples = 2 > > > > > metrics.recording.level = INFO > > > > > metrics.sample.window.ms = 30000 > > > > > partition.assignment.strategy = > > > > > [org.apache.kafka.streams.processor.internals. > > StreamPartitionAssignor] > > > > > receive.buffer.bytes = 65536 > > > > > reconnect.backoff.ms = 50 > > > > > request.timeout.ms = 1801000 > > > > > retry.backoff.ms = 100 > > > > > sasl.jaas.config = null > > > > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > > > > sasl.kerberos.min.time.before.relogin = 60000 > > > > > sasl.kerberos.service.name = null > > > > > sasl.kerberos.ticket.renew.jitter = 0.05 > > > > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > > > > sasl.mechanism = GSSAPI > > > > > security.protocol = PLAINTEXT > > > > > send.buffer.bytes = 131072 > > > > > session.timeout.ms = 10000 > > > > > ssl.cipher.suites = null > > > > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > > > > ssl.endpoint.identification.algorithm = null > > > > > ssl.key.password = null > > > > > ssl.keymanager.algorithm = SunX509 > > > > > ssl.keystore.location = null > > > > > ssl.keystore.password = null > > > > > ssl.keystore.type = JKS > > > > > ssl.protocol = TLS > > > > > ssl.provider = null > > > > > ssl.secure.random.implementation = null > > > > > ssl.trustmanager.algorithm = PKIX > > > > > ssl.truststore.location = null > > > > > ssl.truststore.password = null > > > > > ssl.truststore.type = JKS > > > > > value.deserializer = class > > > > > org.apache.kafka.common.serialization.ByteArrayDeserializer > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang