Hi!

I've been testing Kafka. I've hit some problems, but I can't really understand 
what's going on, so I'd like to ask for your help.
Situation - we want to decide whether to go for many topics/a couple of 
partitions or the other way around, so I'be trying to benchmark both cases. 
During tests when I overload the cluster, number of under-replicated partitions 
spikes up. I'd expect it to go back down to 0 after the load lessens, but 
that's not always the case - either it never catches up, or it takes 
significantly longer than it takes other brokers. Currently, I run benchmarks 
against 3-node cluster and sometimes one of the brokers can't seem to be able 
to catch up with replication. There are 3 cases here that I experienced:

1. Seeing this in logs. It doesn't seem to be correlated with any problems with 
network infrastructure and once it appears.
[2016-07-27 20:34:09,237] WARN [ReplicaFetcherThread-0-1511], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@25e2a1ac 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1511 was disconnected before the response 
was read
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
at scala.Option.foreach(Option.scala:257)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

2. During other test, instead of the above message, I sometimes see this:
[2016-07-26 15:26:30,334] INFO Partition [1806,0] on broker 1511: Expanding ISR 
for partition [1806,0] from 1511 to 1511,1509 (kafka.cluster.Partition)
[2016-07-26 15:26:30,344] INFO Partition [1806,0] on broker 1511: Cached 
zkVersion [1] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
At the same time broker can't catch up with replication.

I'm using version 0.10.0.0 on SCL6, running on 3 32core/64GB/8x7200RPM spindle 
blades. I don't know, if it's relevant, but I basically test two scenarios: 1 
topic, 4k partitions and 4k topics, 1 partition each (in this scenario I just 
set auto.create.topics.enable=true and create topics during warm up by simply 
sending messages to them). For some reason the second scenario seems to be 
orders of magnitude slower - after I started looking at JMX metrics of the 
producer, it revealed huge difference in average number of messages per 
request. With 1 topic it oscilated around 100 records/request (5KB records), in 
4k topics scenario it was just 1 record/request. Can you think of any 
explanation for that?

Code I use for testing:
https://github.com/BlueEyedHush/kafka_perf/tree/itrac

Thank you,
Krzysztof Nawara

Reply via email to