bumping this up with new update:

I've investigated another occurrence of this exception.

For analyzes, I used:
1) a memory dump that was taken from the broker
2) kafka log file
3) kafka state-change log
4) log, index and time-index files of a failed segment
5) Kafka source code, version 2.3.1 and 1.1.0

Here's how the exception looks like in the kafka log:

2019/11/19 16:03:00 INFO [ProducerStateManager
partition=ad_group_metrics-62] Writing producer snapshot at offset 13886052
(kafka.log.ProducerStateManager) 2019/11/19 16:03:00 INFO [Log
partition=ad_group_metrics-62, dir=/mnt/kafka] Rolled new log segment at
offset 13886052 in 1 ms. (kafka.log.Log) 2019/11/19 16:03:00 ERROR
[ReplicaManager broker=17] Error processing append operation on partition
ad_group_metrics-62 (kafka.server.ReplicaManager) 2019/11/19 16:03:00
java.nio.BufferOverflowException 2019/11/19 16:03:00 at
java.nio.Buffer.nextPutIndex(Buffer.java:527) 2019/11/19 16:03:00 at
java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797) 2019/11/19
16:03:00 at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
2019/11/19 16:03:00 at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
2019/11/19 16:03:00 at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
2019/11/19 16:03:00 at
kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520)
2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690)
2019/11/19 16:03:00 at
kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690) 2019/11/19 16:03:00
at scala.Option.foreach(Option.scala:407) 2019/11/19 16:03:00 at
kafka.log.Log.$anonfun$roll$2(Log.scala:1690) 2019/11/19 16:03:00 at
kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at
kafka.log.Log.roll(Log.scala:1654) 2019/11/19 16:03:00 at
kafka.log.Log.maybeRoll(Log.scala:1639) 2019/11/19 16:03:00 at
kafka.log.Log.$anonfun$append$2(Log.scala:966) 2019/11/19 16:03:00 at
kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at
kafka.log.Log.append(Log.scala:850) 2019/11/19 16:03:00 at
kafka.log.Log.appendAsLeader(Log.scala:819) 2019/11/19 16:03:00 at
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772)
2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
2019/11/19 16:03:00 at
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) 2019/11/19 16:03:00
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759)
2019/11/19 16:03:00 at
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763)
2019/11/19 16:03:00 at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
2019/11/19 16:03:00 at
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
2019/11/19 16:03:00 at
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
2019/11/19 16:03:00 at
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
2019/11/19 16:03:00 at
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) 2019/11/19
16:03:00 at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
2019/11/19 16:03:00 at
scala.collection.TraversableLike.map(TraversableLike.scala:238) 2019/11/19
16:03:00 at
scala.collection.TraversableLike.map$(TraversableLike.scala:231) 2019/11/19
16:03:00 at scala.collection.AbstractTraversable.map(Traversable.scala:108)
2019/11/19 16:03:00 at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751)
2019/11/19 16:03:00 at
kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492)
2019/11/19 16:03:00 at
kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544) 2019/11/19
16:03:00 at kafka.server.KafkaApis.handle(KafkaApis.scala:113) 2019/11/19
16:03:00 at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
2019/11/19 16:03:00 at java.lang.Thread.run(Thread.java:748) ...


What we see here, is that a new segment was rolled out at the offset
13886052 and then an exception happened while trying to make *some* segment
as inactive (`onBecomeInactiveSegment`) on appending new messages to the
Log. The timing of the rolling out of a new segment and appending new
messages doesn't play a role. There are many other similar exceptions where
this occurs a few seconds after rolling out of a new segment.

I managed to find the `LogSegment` object for the offset 13886052 in the
memory dump. I followed the source code logic, checking the LogSegment
state and Kafka logs, and found that the `TimeIndex` object somehow went
into the state with 0 entries and 0 max possible entries (and an empty
memory map). Having 0 entries is normal for TimeIndex and OffsetIndex even
if there are some records in the Log unless their size passes some
threshold. But having 0 max possible entries along with 0 entries made the
TimeIndex considered as full (0 entries == 0 max entries) and was
triggering the rolling out a new segment. The Log was trying to add a final
timestamp to the TimeIndex but was failing due to an empty memory map
(BufferOverflowException). Neither the OffsetIndex nor Log messages were in
a similar state.

According to the source code, the only way to get into a state when
TimeIndex has 0 entries and 0 max possible entries is to call `resize(0)` or
`resize(entrySize * _entries)` methods. Unfortunately, I didn't find the
ways to have them called that could lead to such a state. These calls could
be done either for both OffsetIndex and TimeIndex at the same time (but we
don't observe a similar state for the OffsetIndex in the memory dump) or
during the loading of the segment on Kafka server startup (which is not
possible for the newly rolled out segment).

The `state-change.log` was not useful since it didn't contain any
state-changing events around that time. So, we can say that Kafka cluster
and that particular broker considered themselves as stable and healthy.

So, for summarizing:
1) Some, a maybe regular scheduled, event coincidentally happens after
rolling out of a new segment.
2) That event resizes TimeIndex to 0 sizes with 0 max possible entries and
reinitializes a memory map making it 0 sizes.
3) After that, TimeIndex gets considered as full but with no possibility to
add a final timestamp to the empty memory map.



I've updated the JIRA bug
<https://issues.apache.org/jira/browse/KAFKA-9213>with
that information.

On Tue, Nov 19, 2019 at 3:36 PM Daniyar Kulakhmetov <dkulakhme...@liftoff.io>
wrote:

> Filed JIRA bug:
> https://issues.apache.org/jira/browse/KAFKA-9213
>
>
>
> On Tue, Nov 19, 2019 at 2:58 PM Ismael Juma <ism...@juma.me.uk> wrote:
>
>> Can you please file a JIRA?
>>
>> Ismael
>>
>> On Tue, Nov 19, 2019 at 11:52 AM Daniyar Kulakhmetov <
>> dkulakhme...@liftoff.io> wrote:
>>
>> > Hi Kafka users,
>> >
>> > We updated our Kafka cluster from 1.1.0 version to 2.3.1.
>> > Message format and inter-broker protocol versions left the same:
>> >
>> > inter.broker.protocol.version=1.1
>> > log.message.format.version=1.1
>> >
>> > After upgrading, we started to get some occasional exceptions:
>> >
>> > 2019/11/19 05:30:53 INFO [ProducerStateManager
>> > partition=matchmaker_retry_clicks_15m-2] Writing producer snapshot at
>> > offset 788532 (kafka.log.ProducerStateManager)
>> > 2019/11/19 05:30:53 INFO [Log partition=matchmaker_retry_clicks_15m-2,
>> > dir=/mnt/kafka] Rolled new log segment at offset 788532 in 1 ms.
>> > (kafka.log.Log)
>> > 2019/11/19 05:31:01 ERROR [ReplicaManager broker=0] Error processing
>> append
>> > operation on partition matchmaker_retry_clicks_15m-2
>> > (kafka.server.ReplicaManager)
>> > 2019/11/19 05:31:01 java.nio.BufferOverflowException
>> > 2019/11/19 05:31:01     at java.nio.Buffer.nextPutIndex(Buffer.java:527)
>> > 2019/11/19 05:31:01     at
>> > java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797)
>> > 2019/11/19 05:31:01     at
>> > kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
>> > 2019/11/19 05:31:01     at
>> > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> > 2019/11/19 05:31:01     at
>> > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
>> > 2019/11/19 05:31:01     at
>> > kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
>> > 2019/11/19 05:31:01     at
>> > kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520)
>> > 2019/11/19 05:31:01     at kafka.log.Log.$anonfun$roll$8(Log.scala:1690)
>> > 2019/11/19 05:31:01     at
>> > kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690)
>> > 2019/11/19 05:31:01     at scala.Option.foreach(Option.scala:407)
>> > 2019/11/19 05:31:01     at kafka.log.Log.$anonfun$roll$2(Log.scala:1690)
>> > 2019/11/19 05:31:01     at
>> > kafka.log.Log.maybeHandleIOException(Log.scala:2085)
>> > 2019/11/19 05:31:01     at kafka.log.Log.roll(Log.scala:1654)
>> > 2019/11/19 05:31:01     at kafka.log.Log.maybeRoll(Log.scala:1639)
>> > 2019/11/19 05:31:01     at
>> kafka.log.Log.$anonfun$append$2(Log.scala:966)
>> > 2019/11/19 05:31:01     at
>> > kafka.log.Log.maybeHandleIOException(Log.scala:2085)
>> > 2019/11/19 05:31:01     at kafka.log.Log.append(Log.scala:850)
>> > 2019/11/19 05:31:01     at kafka.log.Log.appendAsLeader(Log.scala:819)
>> > 2019/11/19 05:31:01     at
>> >
>> >
>> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772)
>> > 2019/11/19 05:31:01     at
>> > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
>> > 2019/11/19 05:31:01     at
>> > kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
>> > 2019/11/19 05:31:01     at
>> > kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759)
>> > 2019/11/19 05:31:01     at
>> >
>> >
>> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763)
>> > 2019/11/19 05:31:01     at
>> >
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.TraversableLike.map(TraversableLike.scala:238)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>> > 2019/11/19 05:31:01     at
>> > scala.collection.AbstractTraversable.map(Traversable.scala:108)
>> > 2019/11/19 05:31:01     at
>> > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751)
>> > 2019/11/19 05:31:01     at
>> > kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492)
>> > 2019/11/19 05:31:01     at
>> > kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544)
>> > 2019/11/19 05:31:01     at
>> > kafka.server.KafkaApis.handle(KafkaApis.scala:113)
>> > 2019/11/19 05:31:01     at
>> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>> > 2019/11/19 05:31:01     at java.lang.Thread.run(Thread.java:748)
>> >
>> >
>> > This error persists until broker gets restarted (or leadership get
>> moved to
>> > another broker).
>> >
>> > What could be the issue and how we can solve it?
>> >
>> > Thank you!
>> >
>> > Best regards,
>> > Daniyar.
>> >
>>
>

Reply via email to