My code is very straightforward. I create a producer, and then call it to send messages. Here is the factory method::
public Producer<String, String> createProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); props.put("retries", "0"); props.put("batch.size", "1638"); props.put("linger.ms", "1"); props.put("buffer.memory", "33554432"); props.put("compression.type", "gzip"); props.put("client.id", "sds-merdevl"); return new KafkaProducer<>(props); } That is injected into a single instance of KafkaNotifier:: public class KafkaNotifier implements MessageNotifier { private static Logger logger = LoggerFactory.getLogger(KafkaNotifier.class); private Producer<String, String> producer; private String topicName; @Override public void sendNotificationMessage(DataObjectModificationMessage message) { String json = message.asJSON(); String key = message.getObject().getId().toString(); producer.send(new ProducerRecord<>(topicName, key, json)); if (logger.isDebugEnabled()) { logger.debug("Sent Kafka message for object with id={}", key); } } @Required public void setProducer(Producer<String, String> producer) { this.producer = producer; } @Required public void setTopicName(String topicName) { this.topicName = topicName; } } Here is the topic config info : ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe --topic s.notifications.dev Topic:s.notifications.dev PartitionCount:1 ReplicationFactor:1 Configs:retention.ms=172800000,cleanup.policy=compact Topic: s.notifications.dev Partition: 0 Leader: 0 Replicas: 0 Isr: 0 > On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com> wrote: > > 2 producer's to same topic should not be a problem. There can be multiple > producers and consumers of same kafka topic. > > I am not sure what can be wrong here. I can this at my end If you can share > producer code and any config of topic ot broker that you changed and is not > default. > > Please also check that you are not creating producer every time you send a > message but reusing producer once created to send multiple messages. Before > a send and after creation a producers sends this request to fetch metadata. > I ran a test to publish messages from 2 producers to a topic with 3 > partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more than > a minute and saw just once for both producers before their 1st send. > > Regards, > Vinay Sharma > On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com> > wrote: > >> Hi. I've not set that value. My producer properties are as follows : >> >> acks=all >> retries=0 >> bath.size=1638 >> linger.ms=1 >> buffer.memory=33554432 >> compression.type=gzip >> client.id=sds-merdevl >> >> I have this running on two hosts with the same config. I thought that >> having the same client.id on each would just consolidate the tracking >> (same logical name). You don't think there is an issue with 2 producers to >> the same topic? >> >> >> >> >>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com> >> wrote: >>> >>> Generally a proactive metadata refresh request is sent by producer and >>> consumer every 5 minutes but this interval can be overriden with >> property " >>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes. >> Check if >>> you have set this property very low in your producer? >>> >>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent < >>> vincent_f...@cable.comcast.com> wrote: >>> >>>> I'm testing a kafka install and using the java client. I have a topic >> set >>>> up and it appears to work great, but after a while I noticed my log >>>> starting to fill up with what appears to be some kind of loop for >> metadata >>>> updates. >>>> >>>> example:: >>>> >>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out env="md" >>>> [kafka-producer-network-thread | sds-merdevl] >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version >> 6196 to >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions >> = >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, >> replicas >>>> = [0,], isr = [0,]]) >>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out env="md" >>>> [kafka-producer-network-thread | sds-merdevl] >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version >> 6197 to >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions >> = >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, >> replicas >>>> = [0,], isr = [0,]]) >>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out env="md" >>>> [kafka-producer-network-thread | sds-merdevl] >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version >> 6198 to >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions >> = >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, >> replicas >>>> = [0,], isr = [0,]]) >>>> >>>> etc. >>>> >>>> It hasn't stopped.. >>>> >>>> I'm curious about what's going on here. Can anyone help? >>>> >>>> >>>> >>>> >>>> >> >>