Kafka partitions replication issue

2020-06-17 Thread Karnam, Sudheer
Team,
We are using kafka version 2.3.0 and we are facing issue with brokers 
replication

1.Kafka has 6 brokers.
2.Mainly 7 topics exist in kafka cluster and each topic has 128 partitions.
3.Each partition has 3 in-sync-replicas and these are distributed among 6 kafka 
brokers.
4.All partitions has preferred leader and "Auto Leader Rebalance Enable" 
configuration enabled.
Issue:
We had a kafka broker-3 failure because of hardware issues and partitions 
having broker-3 as leader are disrupted.
As per kafka official page, partitions should elect new leader once preferred 
leader fails.

[2020-06-01 14:02:25,029] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition object-xxx-xxx-xx-na4-93 
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
replicas for partition object-xxx-xxx-xx-na4-93 is [1], below required minimum 
[2]

Above error message found in kafka logs,
" object-xxx-xxx-xx-na4-93 " topic has 128 partition and 93rd partition has 3 
replicas. It is distributed among (broker-3,broker-2,broker-4).
Broker -3 is the preferred leader.
When broker-3 failed, Leader position should move to any one of 
(broker-2,broker-4) but it didn't happened.
As per error message, whenever leader is failing it is throwing error by 
stating only one insync replica available.

Please help us in finding root cause for not selecting new leader.


Thanks,
Sudheer



Re: Kafka partitions replication issue

2020-06-17 Thread Ricardo Ferreira

Karnam,

I think the combination of the setting preferred leader and auto leader 
rebalance enable along with the hardware issue in broker-3 might be 
giving you the opposite effect that you are expecting. If the broker-3 
happens to be the preferred leader for a given partition (because it 
happens to be the broker that hosted the original leader when the 
partition was originally created) then the Kafka protocol will try to 
pin that broker for that partition -- but as you say the broker is 
having hardware failures and thus it will fail in this attempt.


Here are things that you can try:

- Move the preferred leader to another broker using the 
`bin/kafka-preferred-replica-election` tool.


- Decrease the `min.insync.replicas` from 2 to 1 to allow producers and 
replication to keep on going.


- Enable unclean election, which allows non-ISRs to become leaders (but 
opens margin for data loss)


- Solve the hardware issue in broker-3 =)

Nevertheless, it is never a good idea to keep preferred leader election 
enabled if the cluster health is not constantly monitored and you are 
not willing to keep moving those across the cluster from time to time. 
Keeping the cluster well balanced requires an increase of Ops tasks. 
This is the reason why Confluent created the feature called Auto Data 
Balancing 
 that 
keeps partition leaders automatically and constantly spread over the 
cluster for you.


Thanks,

-- Ricardo

On 6/17/20 8:16 AM, Karnam, Sudheer wrote:

Team,
We are using kafka version 2.3.0 and we are facing issue with brokers 
replication

1.Kafka has 6 brokers.
2.Mainly 7 topics exist in kafka cluster and each topic has 128 partitions.
3.Each partition has 3 in-sync-replicas and these are distributed among 6 kafka 
brokers.
4.All partitions has preferred leader and "Auto Leader Rebalance Enable" 
configuration enabled.
Issue:
We had a kafka broker-3 failure because of hardware issues and partitions 
having broker-3 as leader are disrupted.
As per kafka official page, partitions should elect new leader once preferred 
leader fails.

[2020-06-01 14:02:25,029] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition object-xxx-xxx-xx-na4-93 
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
replicas for partition object-xxx-xxx-xx-na4-93 is [1], below required minimum 
[2]

Above error message found in kafka logs,
" object-xxx-xxx-xx-na4-93 " topic has 128 partition and 93rd partition has 3 
replicas. It is distributed among (broker-3,broker-2,broker-4).
Broker -3 is the preferred leader.
When broker-3 failed, Leader position should move to any one of 
(broker-2,broker-4) but it didn't happened.
As per error message, whenever leader is failing it is throwing error by 
stating only one insync replica available.

Please help us in finding root cause for not selecting new leader.


Thanks,
Sudheer



NPE in kafka-streams with Avro input

2020-06-17 Thread Dumitru-Nicolae Marasoui
Hello kafka community,
When the following kafka-streams starts with input topic values in avro
format, we get this NPE below. The input is a record and a field of it is
an array of other records. Reading the stack trace below what I understand
is that at some point in deserializing a value structure it encounters an
unexpected null value and hence the NPE. Do you have any hints as to what
may be the problem? In this kafka-streams ETL job we emit multiple messages
from a single input message (flatMapping the array field to the output).
Thank you
Exception in thread
“global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
java.lang.NullPointerException
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
at
com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.Processor

Re: NPE in kafka-streams with Avro input

2020-06-17 Thread Ricardo Ferreira

Hi Dumitru,

According to the stack trace that you've shared the NPE is being thrown 
by this framework called *Avro4S* that you're using. This is important 
to isolate the problem because it means that it is not Kafka Streams the 
problem but rather, your serialization framework.


Nevertheless, the Avro specification allows fields to be null if you 
explicitly specify this in the Avro file. For instance:


```

{
  "type": "record",
  "name": "MyRecord",
  "fields" : [
    {"name": "userId", "type": "long"},  // mandatory field
    {"name": "userName", "type": ["null", "string"]} // optional field
  ]
}

```

The field *userName* above can have null values and be treated as 
optional. You may want to check if you can make this change in the Avro 
file or if it is made already, if the serialization framework that 
you're using don't have problems in handling situations like this.


Thanks,

-- Ricardo

On 6/17/20 11:29 AM, Dumitru-Nicolae Marasoui wrote:

Hello kafka community,
When the following kafka-streams starts with input topic values in avro
format, we get this NPE below. The input is a record and a field of it is
an array of other records. Reading the stack trace below what I understand
is that at some point in deserializing a value structure it encounters an
unexpected null value and hence the NPE. Do you have any hints as to what
may be the problem? In this kafka-streams ETL job we emit multiple messages
from a single input message (flatMapping the array field to the output).
Thank you
Exception in thread
“global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
java.lang.NullPointerException
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
at
com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.interna

Re: Kafka partitions replication issue

2020-06-17 Thread Peter Bukowinski


> On Jun 17, 2020, at 5:16 AM, Karnam, Sudheer  wrote:
> 
> Team,
> We are using kafka version 2.3.0 and we are facing issue with brokers 
> replication
> 
> 1.Kafka has 6 brokers.
> 2.Mainly 7 topics exist in kafka cluster and each topic has 128 partitions.
> 3.Each partition has 3 in-sync-replicas and these are distributed among 6 
> kafka brokers.
> 4.All partitions has preferred leader and "Auto Leader Rebalance Enable" 
> configuration enabled.
> Issue:
> We had a kafka broker-3 failure because of hardware issues and partitions 
> having broker-3 as leader are disrupted.
> As per kafka official page, partitions should elect new leader once preferred 
> leader fails.
> 
> [2020-06-01 14:02:25,029] ERROR [ReplicaManager broker=3] Error processing 
> append operation on partition object-xxx-xxx-xx-na4-93 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
> replicas for partition object-xxx-xxx-xx-na4-93 is [1], below required 
> minimum [2]
> 
> Above error message found in kafka logs,
> " object-xxx-xxx-xx-na4-93 " topic has 128 partition and 93rd partition has 3 
> replicas. It is distributed among (broker-3,broker-2,broker-4).
> Broker -3 is the preferred leader.
> When broker-3 failed, Leader position should move to any one of 
> (broker-2,broker-4) but it didn't happened.
> As per error message, whenever leader is failing it is throwing error by 
> stating only one insync replica available.
> 
> Please help us in finding root cause for not selecting new leader.
> 
> 
> Thanks,
> Sudheer

Hi Sudheer,

What do you have `replica.lag.time.max.ms` set to for your cluster? Also, are 
your producers using `acks=-1` or `acks=all`? If the replica lag time is too 
short or you are using `acks=1`, then it’s likely that when broker 3 failed, 
the both followers for partition you mention had not yet caught up with the 
leader, so the cluster is unable to meet the min.insync.replicas count of 2.

You have a few choices you can make. If you value topic availability over 
complete data integrity, then you can set `min.insync.replicas=1`, or set 
`unclean.leader.election.enable=true`. The former will keep a partition online 
with only one in-sync replica. The latter will allow a replica that hadn’t 
fully caught up to the leader to become a leader.

I have both of these set in my environment since I have the luxury of not 
dealing with transactional data and “best effort” delivery is sufficient for my 
needs. In practice, the amount of loss we see is an extremely small fraction of 
the total data pushed through kafka and only occurs around broker failures.

—
Peter



kafka log compaction

2020-06-17 Thread Pushkar Deole
Hi All

I want some of my topics to retain data forever without any deletion since
those topics hold static data that is always required by application. Also,
for these topic I want to retain latest value for key.
I believe the cleanup policy of 'compact' would meet my needs. I have
following questions though:
1. Would setting the cleanup policy to compact (and No delete) would always
retain the latest value for a key?
2. Does parameters like segment.bytes, retention.ms also play any role in
compaction?


Uneven distribution of messages in topic's partitions

2020-06-17 Thread Hemant Bairwa
Hello All

I have a single producer service which is queuing message into a topic with
let say 12 partitions. I want to evenly distribute the messages across all
the partitions in a round robin fashion.
Even after using default partitioning and keeping key 'NULL', the messages
are not getting distributed evenly. Rather some partitions are getting none
of the messages while some are getting multiple.
One reason I found for this behaviour, somewhere, is that if there are
lesser number of producers than the number of partitions, it distributes
the messages to fewer partitions to limit many open sockets.
However I have achieved even distribution through code by first getting
total partition numbers and then passing partition number in the
incremental order along with the message into the producer record. Once the
partition number reaches end of the partition number then again resetting
the next partition number to zero.

Query:
1. Is there can be any downside of above approach used?
2. If yes, how to achieve even distribution of messages in an optimized way?