That sound like expected. Note, if you use exactly-once, on commit/abort of a transaction, the broker write a commit/abort message into the partitions. Those markers fill up one message/record slot.
Thus, you have "gaps" in the message offsets and in you case, I assume that you have 5 commit markers in the corresponding partition. -Matthias On 1/26/18 12:34 AM, Tomáš Lach wrote: > Hello, i have following problem with kafka-streams scala app and Exactly > once delivery quarantee: > > > > Topic filled with kafka-streams app(exactly once enabled) has wrong ending > offset. Broker and streams API version 0.11. When i run > *kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there > are just 12 messages (retention is disabled). When exactly once guarantee > is disabled, these offsets are matching. I tried to reset kafka-streams > according to this > <https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>, > but problem still remains. > > > > When i run *SimpleConsumerShell* with *--print-offsets* option, output is > folowing: > > > > next offset = 1 > > {"timestamp": 149583551238149, "data": {...}} > > next offset = 2 > > {"timestamp": 149583551238149, "data": {...}} > > next offset = 4 > > {"timestamp": 149583551238149, "data": {...}} > > next offset = 5 > > {"timestamp": 149583551238149, "data": {...}} > > next offset = 7 > > {"timestamp": 149583551238149, "data": {...}} > > next offset = 8 > > {"timestamp": 149583551238149, "data": {...}} > > ... > > > > Some offsets are apparently skipped when exactly-once dellivery guarantee > is enabled. I’m reading from this topic with spark, and it is causing these > errors: “*assertion failed*: *Ran out of messages before reaching ending > offset 17 for topic offset-test partition 0 start 0. This should not > happen, and indicates that messages may have been lost” *or* “**Got wrong > record for spark-executor-<group.id <http://group.id>> <topic> 0 even after > seeking to offset” – *depend on version of spark-streaming-kafka lib. We're > running Cloudera distribution of Kafka. > > > > My kafka streams app config is following: > > > > application.id = MultipleTopicsAppTest > > application.server = > > bootstrap.servers = [host:9092] > > buffered.records.per.partition = 1000 > > cache.max.bytes.buffering = 10485760 > > client.id = > > commit.interval.ms = 100 > > connections.max.idle.ms = 540000 > > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndFailExceptionHandler > > default.key.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > > default.timestamp.extractor = class > org.apache.kafka.streams.processor.FailOnInvalidTimestamp > > default.value.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > > key.serde = null > > metadata.max.age.ms = 300000 > > metric.reporters = [] > > metrics.num.samples = 2 > > metrics.recording.level = INFO > > metrics.sample.window.ms = 30000 > > num.standby.replicas = 0 > > num.stream.threads = 1 > > partition.grouper = class > org.apache.kafka.streams.processor.DefaultPartitionGrouper > > poll.ms = 100 > > processing.guarantee = exactly_once > > receive.buffer.bytes = 32768 > > reconnect.backoff.max.ms = 1000 > > reconnect.backoff.ms = 50 > > replication.factor = 1 > > request.timeout.ms = 40000 > > retry.backoff.ms = 100 > > rocksdb.config.setter = null > > security.protocol = PLAINTEXT > > send.buffer.bytes = 131072 > > state.cleanup.delay.ms = 600000 > > state.dir = /tmp/kafka-streams > > timestamp.extractor = null > > value.serde = null > > windowstore.changelog.additional.retention.ms = 86400000 > > zookeeper.connect = > > > > What can cause this problem? Thanks! >
signature.asc
Description: OpenPGP digital signature