It looks like org.apache.kafka.clients.producer.KafkaProducer is not available in 0.8.1.1 client jar. So, we'll stay with kafka.javaapi.producer.Producer implementation.
Thanks, Connie On Fri, Jul 18, 2014 at 5:13 PM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > One option is to reduce the value of topic.metadata.refresh.interval.ms > but > the concern is that may end up sending too many requests to the brokers > causing overhead. I'd suggest you use the new producer under > org.apache.kafka.clients.producer.KafkaProducer that does not have the > problem. It is fairly new but has gone through some level of testing now > and we will appreciate any feedback/bugs that you can report back. > > Thanks, > Neha > > > On Fri, Jul 18, 2014 at 4:23 PM, Connie Yang <cybercon...@gmail.com> > wrote: > > > Sure, I will try to take a snapshot of the data distribution when it > > happens next time. > > > > Assuming the topic.metadata.refresh.interval.ms is the concern, how > should > > we "unstuck" our producers? > > > > The important note from that the documentation seems to suggest that the > > metadata refresh will only happen AFTER the message is sent. > > > > The producer generally refreshes the topic metadata from brokers when > there > > is a failure (partition missing, leader not available...). It will also > > poll regularly (default: every 10min so 600000ms). If you set this to a > > negative value, metadata will only get refreshed on failure. If you set > > this to zero, the metadata will get refreshed after each message sent > (not > > recommended). Important note: the refresh happen only AFTER the message > is > > sent, so if the producer never sends a message the metadata is never > > refreshed > > > > Thanks, > > Connie > > > > > > > > > > On Fri, Jul 18, 2014 at 3:58 PM, Neha Narkhede <neha.narkh...@gmail.com> > > wrote: > > > > > Does this mean that we should set "auto.leader.rebalance.enable" to > true? > > > > > > I wouldn't recommend that just yet since it is not known to be very > > stable. > > > You mentioned that only 2 brokers ever took the traffic and the > > replication > > > factor is 2, makes me think that the producer stuck to 1 or few > > partitions > > > instead of distributing the data over all the partitions. This is a > known > > > problem in the old producer where the default value of a config ( > > > topic.metadata.refresh.interval.ms), that controls how long a producer > > > sticks to certain partitions, is 10 mins. So it effectively does not > > > distribute data evenly across all partitions. > > > > > > If you see the same behavior next time, try to take a snapshot of data > > > distribution across all partitions to verify this theory. > > > > > > Thanks, > > > Neha > > > > > > > > > On Thu, Jul 17, 2014 at 5:43 PM, Connie Yang <cybercon...@gmail.com> > > > wrote: > > > > > > > It might appear that the data is not balanced, but it could be as a > > > result > > > > of the imbalanced leaders setting. > > > > > > > > Does this mean that we should set "auto.leader.rebalance.enable" to > > true? > > > > Any other configuration we need to change as well? As I mentioned > > > before, > > > > we use pretty much use the default setting. > > > > > > > > All of our topics have replication factor of 2 (aka 2 copies per > > > message). > > > > > > > > We don't have the topic output when we had the problem, but here's > our > > > > topic output after we ran the kafka-preferred-replica-election.sh > tool > > as > > > > suggested: > > > > > > > > $KAFKA_HOME/bin/kafka-topics.sh --zookeeper > > > > zkHost1:2181,zkHost2:2181,zkHost3:2181 --describe > --topic=myKafkaTopic > > > > Topic:myKafkaTopic PartitionCount:24 ReplicationFactor:2 Configs: > > > > retention.ms=43200000 > > > > Topic: myKafkTopic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2 > > > > Topic: myKafkTopic Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2 > > > > Topic: myKafkTopic Partition: 2 Leader: 4 Replicas: 4,3 Isr: 3,4 > > > > Topic: myKafkTopic Partition: 3 Leader: 5 Replicas: 5,4 Isr: 5,4 > > > > Topic: myKafkTopic Partition: 4 Leader: 6 Replicas: 6,5 Isr: 5,6 > > > > Topic: myKafkTopic Partition: 5 Leader: 7 Replicas: 7,6 Isr: 6,7 > > > > Topic: myKafkTopic Partition: 6 Leader: 8 Replicas: 8,7 Isr: 7,8 > > > > Topic: myKafkTopic Partition: 7 Leader: 9 Replicas: 9,8 Isr: 9,8 > > > > Topic: myKafkTopic Partition: 8 Leader: 10 Replicas: 10,9 Isr: 10,9 > > > > Topic: myKafkTopic Partition: 9 Leader: 11 Replicas: 11,10 Isr: 11,10 > > > > Topic: myKafkTopic Partition: 10 Leader: 12 Replicas: 12,11 Isr: > 11,12 > > > > Topic: myKafkTopic Partition: 11 Leader: 13 Replicas: 13,12 Isr: > 12,13 > > > > Topic: myKafkTopic Partition: 12 Leader: 14 Replicas: 14,13 Isr: > 14,13 > > > > Topic: myKafkTopic Partition: 13 Leader: 15 Replicas: 15,14 Isr: > 14,15 > > > > Topic: myKafkTopic Partition: 14 Leader: 16 Replicas: 16,15 Isr: > 16,15 > > > > Topic: myKafkTopic Partition: 15 Leader: 17 Replicas: 17,16 Isr: > 16,17 > > > > Topic: myKafkTopic Partition: 16 Leader: 18 Replicas: 18,17 Isr: > 18,17 > > > > Topic: myKafkTopic Partition: 17 Leader: 19 Replicas: 19,18 Isr: > 18,19 > > > > Topic: myKafkTopic Partition: 18 Leader: 20 Replicas: 20,19 Isr: > 20,19 > > > > Topic: myKafkTopic Partition: 19 Leader: 21 Replicas: 21,20 Isr: > 20,21 > > > > Topic: myKafkTopic Partition: 20 Leader: 22 Replicas: 22,21 Isr: > 22,21 > > > > Topic: myKafkTopic Partition: 21 Leader: 23 Replicas: 23,22 Isr: > 23,22 > > > > Topic: myKafkTopic Partition: 22 Leader: 24 Replicas: 24,23 Isr: > 23,24 > > > > Topic: myKafkTopic Partition: 23 Leader: 1 Replicas: 1,24 Isr: 1,24 > > > > > > > > Thanks, > > > > Connie > > > > > > > > > > > > > > > > On Thu, Jul 17, 2014 at 4:20 PM, Neha Narkhede < > > neha.narkh...@gmail.com> > > > > wrote: > > > > > > > > > Connie, > > > > > > > > > > After we freed up the > > > > > cluster disk space and adjusted the broker data retention policy, > we > > > > > noticed that the cluster partition was not balanced based on topic > > > > describe > > > > > script came from Kafka 0.8.1.1 distribution. > > > > > > > > > > When you say the cluster was not balanced, did you mean the leaders > > or > > > > the > > > > > data? The describe topic tool does not give information about data > > > sizes, > > > > > so I'm assuming you are referring to leader imbalance. If so, the > > right > > > > > tool to run is kafka-preferred-replica-election.sh not partition > > > > > reassignment. In general, assuming the partitions were evenly > > > distributed > > > > > on your cluster before you ran out of disk space, the only thing > you > > > > should > > > > > need to do to recover is delete a few older segments and bounce > each > > > > > broker, one at a time. It is also preferrable to run preferred > > replica > > > > > election after a complete cluster bounce so the leaders are well > > > > > distributed. > > > > > > > > > > Also, it will help if you can send around the output of the > describe > > > > topic > > > > > tool. I wonder if your topics have a replication factor of 1 > > > > inadvertently? > > > > > > > > > > Thanks, > > > > > Neha > > > > > > > > > > > > > > > On Thu, Jul 17, 2014 at 11:57 AM, Connie Yang < > cybercon...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > Our Kafka cluster ran out of disk space yesterday. After we > freed > > up > > > > the > > > > > > cluster disk space and adjusted the broker data retention policy, > > we > > > > > > noticed that the cluster partition was not balanced based on > topic > > > > > describe > > > > > > script came from Kafka 0.8.1.1 distribution. So, we tried to > > > rebalance > > > > > the > > > > > > partition using the kafka-reassign-partitions.sh. After sometime > > > later, > > > > > we > > > > > > ran out of disk space on 2 brokers in the cluster while the rest > > have > > > > > > plenty of disk space left. > > > > > > > > > > > > This seems to suggest that only two brokers were receiving > > messages. > > > > We > > > > > > have not changed the broker partition from our producer which > uses > > a > > > > > random > > > > > > partition key strategy. > > > > > > > > > > > > String uuid = UUID.randomUUID().toString(); > > > > > > KeyedMessage<String, String> data = new KeyedMessage<String, > > String>( > > > > > > "myKafkaTopic" > > > > > > uuid, msgBuilder.toString()); > > > > > > > > > > > > > > > > > > Questions > > > > > > 1. Is partition reassignment required after disk full or when > some > > of > > > > the > > > > > > brokers are not healthy? > > > > > > 2. Is there a broker config that we can use to auto rebalance the > > > > broker > > > > > > partition? Should "auto.leader.rebalance.enable" set to true? > > > > > > 2. How do we recover from situation like this? > > > > > > > > > > > > We pretty much use default configuration on the broker. > > > > > > > > > > > > Thanks, > > > > > > Connie > > > > > > > > > > > > > > > > > > > > >