My code is very straightforward. I create a producer, and then call it to send
messages. Here is the factory method::
public Producer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", "0");
props.put("batch.size", "1638");
props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("compression.type", "gzip");
props.put("client.id", "sds-merdevl");
return new KafkaProducer<>(props);
}
That is injected into a single instance of KafkaNotifier::
public class KafkaNotifier implements MessageNotifier {
private static Logger logger = LoggerFactory.getLogger(KafkaNotifier.class);
private Producer<String, String> producer;
private String topicName;
@Override
public void sendNotificationMessage(DataObjectModificationMessage message) {
String json = message.asJSON();
String key = message.getObject().getId().toString();
producer.send(new ProducerRecord<>(topicName, key, json));
if (logger.isDebugEnabled()) {
logger.debug("Sent Kafka message for object with id={}", key);
}
}
@Required
public void setProducer(Producer<String, String> producer) {
this.producer = producer;
}
@Required
public void setTopicName(String topicName) {
this.topicName = topicName;
}
}
Here is the topic config info :
./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe --topic
s.notifications.dev
Topic:s.notifications.dev PartitionCount:1 ReplicationFactor:1
Configs:retention.ms=172800000,cleanup.policy=compact
Topic: s.notifications.dev Partition: 0 Leader: 0
Replicas: 0 Isr: 0
> On Apr 22, 2016, at 6:27 PM, vinay sharma <[email protected]> wrote:
>
> 2 producer's to same topic should not be a problem. There can be multiple
> producers and consumers of same kafka topic.
>
> I am not sure what can be wrong here. I can this at my end If you can share
> producer code and any config of topic ot broker that you changed and is not
> default.
>
> Please also check that you are not creating producer every time you send a
> message but reusing producer once created to send multiple messages. Before
> a send and after creation a producers sends this request to fetch metadata.
> I ran a test to publish messages from 2 producers to a topic with 3
> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more than
> a minute and saw just once for both producers before their 1st send.
>
> Regards,
> Vinay Sharma
> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <[email protected]>
> wrote:
>
>> Hi. I've not set that value. My producer properties are as follows :
>>
>> acks=all
>> retries=0
>> bath.size=1638
>> linger.ms=1
>> buffer.memory=33554432
>> compression.type=gzip
>> client.id=sds-merdevl
>>
>> I have this running on two hosts with the same config. I thought that
>> having the same client.id on each would just consolidate the tracking
>> (same logical name). You don't think there is an issue with 2 producers to
>> the same topic?
>>
>>
>>
>>
>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <[email protected]>
>> wrote:
>>>
>>> Generally a proactive metadata refresh request is sent by producer and
>>> consumer every 5 minutes but this interval can be overriden with
>> property "
>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
>> Check if
>>> you have set this property very low in your producer?
>>>
>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
>>> [email protected]> wrote:
>>>
>>>> I'm testing a kafka install and using the java client. I have a topic
>> set
>>>> up and it appears to work great, but after a while I noticed my log
>>>> starting to fill up with what appears to be some kind of loop for
>> metadata
>>>> updates.
>>>>
>>>> example::
>>>>
>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out env="md"
>>>> [kafka-producer-network-thread | sds-merdevl]
>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>> 6196 to
>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions
>> =
>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>> replicas
>>>> = [0,], isr = [0,]])
>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out env="md"
>>>> [kafka-producer-network-thread | sds-merdevl]
>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>> 6197 to
>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions
>> =
>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>> replicas
>>>> = [0,], isr = [0,]])
>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out env="md"
>>>> [kafka-producer-network-thread | sds-merdevl]
>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>> 6198 to
>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], partitions
>> =
>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>> replicas
>>>> = [0,], isr = [0,]])
>>>>
>>>> etc.
>>>>
>>>> It hasn't stopped..
>>>>
>>>> I'm curious about what's going on here. Can anyone help?
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>>