That sound like expected. Note, if you use exactly-once, on commit/abort
of a transaction, the broker write a commit/abort message into the
partitions. Those markers fill up one message/record slot.

Thus, you have "gaps" in the message offsets and in you case, I assume
that you have 5 commit markers in the corresponding partition.


On 1/26/18 12:34 AM, Tomáš Lach wrote:
> Hello, i have following problem with kafka-streams scala app and Exactly
> once delivery quarantee:
> Topic filled with kafka-streams app(exactly once enabled) has wrong ending
> offset. Broker and streams API version 0.11. When i run
> **, it gives ending offset 17, but in topic there
> are just 12 messages (retention is disabled). When exactly once guarantee
> is disabled, these offsets are matching. I tried to reset kafka-streams
> according to this
> <>,
> but problem still remains.
> When i run *SimpleConsumerShell* with *--print-offsets* option, output is
> folowing:
> next offset = 1
> {"timestamp": 149583551238149, "data": {...}}
> next offset = 2
> {"timestamp": 149583551238149, "data": {...}}
> next offset = 4
> {"timestamp": 149583551238149, "data": {...}}
> next offset = 5
> {"timestamp": 149583551238149, "data": {...}}
> next offset = 7
> {"timestamp": 149583551238149, "data": {...}}
> next offset = 8
> {"timestamp": 149583551238149, "data": {...}}
> ...
> Some offsets are apparently skipped when exactly-once dellivery guarantee
> is enabled. I’m reading from this topic with spark, and it is causing these
> errors: “*assertion failed*: *Ran out of messages before reaching ending
> offset 17 for topic offset-test partition 0 start 0. This should not
> happen, and indicates that messages may have been lost” *or* “**Got wrong
> record for spark-executor-< <>> <topic> 0 even after
> seeking to offset” – *depend on version of spark-streaming-kafka lib. We're
> running Cloudera distribution of Kafka.
> My kafka streams app config is following:
> = MultipleTopicsAppTest
>         application.server =
>         bootstrap.servers = [host:9092]
>         buffered.records.per.partition = 1000
>         cache.max.bytes.buffering = 10485760
> =
> = 100
> = 540000
>         default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
>         default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
>         default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
>         default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
>         key.serde = null
> = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
> = 30000
>         num.standby.replicas = 0
> = 1
>         partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> = 100
>         processing.guarantee = exactly_once
>         receive.buffer.bytes = 32768
> = 1000
> = 50
>         replication.factor = 1
> = 40000
> = 100
>         rocksdb.config.setter = null
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
> = 600000
>         state.dir = /tmp/kafka-streams
>         timestamp.extractor = null
>         value.serde = null
> = 86400000
>         zookeeper.connect =
> What can cause this problem? Thanks!

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to