Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my code was 0.9.0. Simple mistake and one that I should have thought of sooner. Once I had them bump up to the latest kafka all was well. Thank you for your help!
v > On Apr 25, 2016, at 2:54 PM, vinay sharma <vinsharma.t...@gmail.com> wrote: > > Hi, > > Your code looks good. i don't see any reason there for frequent meta data > fetch but i will run it to verify. > > I was able to replicate this issue with my consumer today where I killed 2 > brokers out or 3 and ran consumer. I saw a lot of meta data requests in > wait for new leader for partitions for which those 2 brokers were leader. i > see below 2 lines frequently as you mentioned in logs > > [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated cluster > metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, 9093)], > partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, > replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = > 2, leader = none, replicas = [], isr = [], Partition(topic = kafkaPOCTopic, > partition = 0, leader = none, replicas = [], isr = []]) > > [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for > partition kafkaPOCTopic-0 unavailable for fetching offset, wait for > metadata refresh > > I also saw this behavior due to below error where i might have killed the > only ISR available for the topic at that time. > > "Error while fetching metadata with correlation id 242 : > {kafkaPOCTopic=LEADER_NOT_AVAILABLE}" > > Do you see any such error in your logs? > > Regards, > Vinay Sharma > > > On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent < > vincent_f...@cable.comcast.com> wrote: > >> >> >> My code is very straightforward. I create a producer, and then call it to >> send messages. Here is the factory method:: >> >> public Producer<String, String> createProducer() { >> >> Properties props = new Properties(); >> >> props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092"); >> props.put("key.serializer", >> "org.apache.kafka.common.serialization.StringSerializer"); >> props.put("value.serializer", >> "org.apache.kafka.common.serialization.StringSerializer"); >> props.put("acks", "all"); >> props.put("retries", "0"); >> props.put("batch.size", "1638"); >> props.put("linger.ms", "1"); >> props.put("buffer.memory", "33554432"); >> props.put("compression.type", "gzip"); >> props.put("client.id", "sds-merdevl"); >> >> return new KafkaProducer<>(props); >> } >> >> That is injected into a single instance of KafkaNotifier:: >> >> public class KafkaNotifier implements MessageNotifier { >> private static Logger logger = >> LoggerFactory.getLogger(KafkaNotifier.class); >> >> private Producer<String, String> producer; >> private String topicName; >> >> @Override >> public void sendNotificationMessage(DataObjectModificationMessage >> message) { >> >> String json = message.asJSON(); >> String key = message.getObject().getId().toString(); >> >> producer.send(new ProducerRecord<>(topicName, key, json)); >> >> if (logger.isDebugEnabled()) { >> logger.debug("Sent Kafka message for object with id={}", key); >> } >> } >> >> @Required >> public void setProducer(Producer<String, String> producer) { >> this.producer = producer; >> } >> >> @Required >> public void setTopicName(String topicName) { >> this.topicName = topicName; >> } >> } >> >> Here is the topic config info : >> >> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe >> --topic s.notifications.dev >> Topic:s.notifications.dev PartitionCount:1 >> ReplicationFactor:1 Configs:retention.ms >> =172800000,cleanup.policy=compact >> Topic: s.notifications.dev Partition: 0 Leader: 0 >> Replicas: 0 Isr: 0 >> >> >> >> >> >>> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com> >> wrote: >>> >>> 2 producer's to same topic should not be a problem. There can be multiple >>> producers and consumers of same kafka topic. >>> >>> I am not sure what can be wrong here. I can this at my end If you can >> share >>> producer code and any config of topic ot broker that you changed and is >> not >>> default. >>> >>> Please also check that you are not creating producer every time you send >> a >>> message but reusing producer once created to send multiple messages. >> Before >>> a send and after creation a producers sends this request to fetch >> metadata. >>> I ran a test to publish messages from 2 producers to a topic with 3 >>> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more >> than >>> a minute and saw just once for both producers before their 1st send. >>> >>> Regards, >>> Vinay Sharma >>> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com >>> >>> wrote: >>> >>>> Hi. I've not set that value. My producer properties are as follows : >>>> >>>> acks=all >>>> retries=0 >>>> bath.size=1638 >>>> linger.ms=1 >>>> buffer.memory=33554432 >>>> compression.type=gzip >>>> client.id=sds-merdevl >>>> >>>> I have this running on two hosts with the same config. I thought that >>>> having the same client.id on each would just consolidate the tracking >>>> (same logical name). You don't think there is an issue with 2 producers >> to >>>> the same topic? >>>> >>>> >>>> >>>> >>>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com> >>>> wrote: >>>>> >>>>> Generally a proactive metadata refresh request is sent by producer and >>>>> consumer every 5 minutes but this interval can be overriden with >>>> property " >>>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes. >>>> Check if >>>>> you have set this property very low in your producer? >>>>> >>>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent < >>>>> vincent_f...@cable.comcast.com> wrote: >>>>> >>>>>> I'm testing a kafka install and using the java client. I have a topic >>>> set >>>>>> up and it appears to work great, but after a while I noticed my log >>>>>> starting to fill up with what appears to be some kind of loop for >>>> metadata >>>>>> updates. >>>>>> >>>>>> example:: >>>>>> >>>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out env="md" >>>>>> [kafka-producer-network-thread | sds-merdevl] >>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version >>>> 6196 to >>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], >> partitions >>>> = >>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, >>>> replicas >>>>>> = [0,], isr = [0,]]) >>>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out env="md" >>>>>> [kafka-producer-network-thread | sds-merdevl] >>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version >>>> 6197 to >>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], >> partitions >>>> = >>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, >>>> replicas >>>>>> = [0,], isr = [0,]]) >>>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out env="md" >>>>>> [kafka-producer-network-thread | sds-merdevl] >>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version >>>> 6198 to >>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], >> partitions >>>> = >>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, >>>> replicas >>>>>> = [0,], isr = [0,]]) >>>>>> >>>>>> etc. >>>>>> >>>>>> It hasn't stopped.. >>>>>> >>>>>> I'm curious about what's going on here. Can anyone help? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>> >>>> >> >>