Hi, Your code looks good. i don't see any reason there for frequent meta data fetch but i will run it to verify.
I was able to replicate this issue with my consumer today where I killed 2 brokers out or 3 and ran consumer. I saw a lot of meta data requests in wait for new leader for partitions for which those 2 brokers were leader. i see below 2 lines frequently as you mentioned in logs [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated cluster metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, 9093)], partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = 2, leader = none, replicas = [], isr = [], Partition(topic = kafkaPOCTopic, partition = 0, leader = none, replicas = [], isr = []]) [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for partition kafkaPOCTopic-0 unavailable for fetching offset, wait for metadata refresh I also saw this behavior due to below error where i might have killed the only ISR available for the topic at that time. "Error while fetching metadata with correlation id 242 : {kafkaPOCTopic=LEADER_NOT_AVAILABLE}" Do you see any such error in your logs? Regards, Vinay Sharma On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent < vincent_f...@cable.comcast.com> wrote: > > > My code is very straightforward. I create a producer, and then call it to > send messages. Here is the factory method:: > > public Producer<String, String> createProducer() { > > Properties props = new Properties(); > > props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092"); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("acks", "all"); > props.put("retries", "0"); > props.put("batch.size", "1638"); > props.put("linger.ms", "1"); > props.put("buffer.memory", "33554432"); > props.put("compression.type", "gzip"); > props.put("client.id", "sds-merdevl"); > > return new KafkaProducer<>(props); > } > > That is injected into a single instance of KafkaNotifier:: > > public class KafkaNotifier implements MessageNotifier { > private static Logger logger = > LoggerFactory.getLogger(KafkaNotifier.class); > > private Producer<String, String> producer; > private String topicName; > > @Override > public void sendNotificationMessage(DataObjectModificationMessage > message) { > > String json = message.asJSON(); > String key = message.getObject().getId().toString(); > > producer.send(new ProducerRecord<>(topicName, key, json)); > > if (logger.isDebugEnabled()) { > logger.debug("Sent Kafka message for object with id={}", key); > } > } > > @Required > public void setProducer(Producer<String, String> producer) { > this.producer = producer; > } > > @Required > public void setTopicName(String topicName) { > this.topicName = topicName; > } > } > > Here is the topic config info : > > ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe > --topic s.notifications.dev > Topic:s.notifications.dev PartitionCount:1 > ReplicationFactor:1 Configs:retention.ms > =172800000,cleanup.policy=compact > Topic: s.notifications.dev Partition: 0 Leader: 0 > Replicas: 0 Isr: 0 > > > > > > > On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com> > wrote: > > > > 2 producer's to same topic should not be a problem. There can be multiple > > producers and consumers of same kafka topic. > > > > I am not sure what can be wrong here. I can this at my end If you can > share > > producer code and any config of topic ot broker that you changed and is > not > > default. > > > > Please also check that you are not creating producer every time you send > a > > message but reusing producer once created to send multiple messages. > Before > > a send and after creation a producers sends this request to fetch > metadata. > > I ran a test to publish messages from 2 producers to a topic with 3 > > partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more > than > > a minute and saw just once for both producers before their 1st send. > > > > Regards, > > Vinay Sharma > > On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com > > > > wrote: > > > >> Hi. I've not set that value. My producer properties are as follows : > >> > >> acks=all > >> retries=0 > >> bath.size=1638 > >> linger.ms=1 > >> buffer.memory=33554432 > >> compression.type=gzip > >> client.id=sds-merdevl > >> > >> I have this running on two hosts with the same config. I thought that > >> having the same client.id on each would just consolidate the tracking > >> (same logical name). You don't think there is an issue with 2 producers > to > >> the same topic? > >> > >> > >> > >> > >>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com> > >> wrote: > >>> > >>> Generally a proactive metadata refresh request is sent by producer and > >>> consumer every 5 minutes but this interval can be overriden with > >> property " > >>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes. > >> Check if > >>> you have set this property very low in your producer? > >>> > >>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent < > >>> vincent_f...@cable.comcast.com> wrote: > >>> > >>>> I'm testing a kafka install and using the java client. I have a topic > >> set > >>>> up and it appears to work great, but after a while I noticed my log > >>>> starting to fill up with what appears to be some kind of loop for > >> metadata > >>>> updates. > >>>> > >>>> example:: > >>>> > >>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out env="md" > >>>> [kafka-producer-network-thread | sds-merdevl] > >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version > >> 6196 to > >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], > partitions > >> = > >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, > >> replicas > >>>> = [0,], isr = [0,]]) > >>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out env="md" > >>>> [kafka-producer-network-thread | sds-merdevl] > >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version > >> 6197 to > >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], > partitions > >> = > >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, > >> replicas > >>>> = [0,], isr = [0,]]) > >>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out env="md" > >>>> [kafka-producer-network-thread | sds-merdevl] > >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version > >> 6198 to > >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], > partitions > >> = > >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, > >> replicas > >>>> = [0,], isr = [0,]]) > >>>> > >>>> etc. > >>>> > >>>> It hasn't stopped.. > >>>> > >>>> I'm curious about what's going on here. Can anyone help? > >>>> > >>>> > >>>> > >>>> > >>>> > >> > >> > >