Hi Eno,

Thanks for the quick reply.  I think that probably does match the data I'm
seeing.  This surprises me a bit because my streams app was only offline
for a few minutes, but ended up losing its offset.

My interpretation is that the source partition had been idle for 24 hours,
streams doesn't commit offsets for idle partitions, and so the
default/unconfigured offset retention of 24 hours had expired.

I'll work around this by bumping up my offset retention.  Thanks!

Mathieu


On Wed, Feb 22, 2017 at 9:22 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Mathieu,
>
> It could be that the offset retention period has expired. See this:
> http://stackoverflow.com/questions/39131465/how-does-
> an-offset-expire-for-an-apache-kafka-consumer-group <
> http://stackoverflow.com/questions/39131465/how-does-
> an-offset-expire-for-an-apache-kafka-consumer-group>
>
> Thanks
> Eno
>
> > On 22 Feb 2017, at 16:08, Mathieu Fenniak <mathieu.fenn...@replicon.com>
> wrote:
> >
> > Hey users,
> >
> > What causes delete tombstones (value=null) to be sent to the
> > __consumer_offsets topic?
> >
> > I'm observing that a Kafka Streams application that is restarted after a
> > crash appears to be reprocessing messages from the beginning of a topic.
> > I've dumped the __consumer_offsets topic and found that after the
> restart,
> > messages with a null value are being sent to __consumer_offsets.
> >
> > I do see that the ConsumerConfig for my StreamThread consumer has
> > auto.offset.reset=earliest.  But my understanding of this configuration
> is
> > that it only applies when the offset isn't available, but there are
> > definitely offsets for this consumer group stored in __consumer_offsets.
> >
> > Here's the consumer config for the streams app:
> >
> > ConsumerConfig values:
> >  auto.commit.interval.ms = 5000
> >  auto.offset.reset = earliest
> >  bootstrap.servers = [10.10.59.184:9092]
> >  check.crcs = true
> >  client.id =
> > timesheet-list-2d7a7f37-f41a-46b0-a1bb-d47f773012f6-
> StreamThread-1-consumer
> >  connections.max.idle.ms = 540000
> >  enable.auto.commit = false
> >  exclude.internal.topics = true
> >  fetch.max.bytes = 52428800
> >  fetch.max.wait.ms = 500
> >  fetch.min.bytes = 1
> >  group.id = timesheet-list
> >  heartbeat.interval.ms = 3000
> >  interceptor.classes = null
> >  key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >  max.partition.fetch.bytes = 1048576
> >  max.poll.interval.ms = 1800000
> >  max.poll.records = 1000
> >  metadata.max.age.ms = 300000
> >  metric.reporters = []
> >  metrics.num.samples = 2
> >  metrics.recording.level = INFO
> >  metrics.sample.window.ms = 30000
> >  partition.assignment.strategy =
> > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
> >  receive.buffer.bytes = 65536
> >  reconnect.backoff.ms = 50
> >  request.timeout.ms = 1801000
> >  retry.backoff.ms = 100
> >  sasl.jaas.config = null
> >  sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >  sasl.kerberos.min.time.before.relogin = 60000
> >  sasl.kerberos.service.name = null
> >  sasl.kerberos.ticket.renew.jitter = 0.05
> >  sasl.kerberos.ticket.renew.window.factor = 0.8
> >  sasl.mechanism = GSSAPI
> >  security.protocol = PLAINTEXT
> >  send.buffer.bytes = 131072
> >  session.timeout.ms = 10000
> >  ssl.cipher.suites = null
> >  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >  ssl.endpoint.identification.algorithm = null
> >  ssl.key.password = null
> >  ssl.keymanager.algorithm = SunX509
> >  ssl.keystore.location = null
> >  ssl.keystore.password = null
> >  ssl.keystore.type = JKS
> >  ssl.protocol = TLS
> >  ssl.provider = null
> >  ssl.secure.random.implementation = null
> >  ssl.trustmanager.algorithm = PKIX
> >  ssl.truststore.location = null
> >  ssl.truststore.password = null
> >  ssl.truststore.type = JKS
> >  value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>

Reply via email to