Hi,
Your code looks good. i don't see any reason there for frequent meta data
fetch but i will run it to verify.
I was able to replicate this issue with my consumer today where I killed 2
brokers out or 3 and ran consumer. I saw a lot of meta data requests in
wait for new leader for partitions for which those 2 brokers were leader. i
see below 2 lines frequently as you mentioned in logs
[Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated cluster
metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, 9093)],
partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1,
replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition =
2, leader = none, replicas = [], isr = [], Partition(topic = kafkaPOCTopic,
partition = 0, leader = none, replicas = [], isr = []])
[Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
metadata refresh
I also saw this behavior due to below error where i might have killed the
only ISR available for the topic at that time.
"Error while fetching metadata with correlation id 242 :
{kafkaPOCTopic=LEADER_NOT_AVAILABLE}"
Do you see any such error in your logs?
Regards,
Vinay Sharma
On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
[email protected]> wrote:
>
>
> My code is very straightforward. I create a producer, and then call it to
> send messages. Here is the factory method::
>
> public Producer<String, String> createProducer() {
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("acks", "all");
> props.put("retries", "0");
> props.put("batch.size", "1638");
> props.put("linger.ms", "1");
> props.put("buffer.memory", "33554432");
> props.put("compression.type", "gzip");
> props.put("client.id", "sds-merdevl");
>
> return new KafkaProducer<>(props);
> }
>
> That is injected into a single instance of KafkaNotifier::
>
> public class KafkaNotifier implements MessageNotifier {
> private static Logger logger =
> LoggerFactory.getLogger(KafkaNotifier.class);
>
> private Producer<String, String> producer;
> private String topicName;
>
> @Override
> public void sendNotificationMessage(DataObjectModificationMessage
> message) {
>
> String json = message.asJSON();
> String key = message.getObject().getId().toString();
>
> producer.send(new ProducerRecord<>(topicName, key, json));
>
> if (logger.isDebugEnabled()) {
> logger.debug("Sent Kafka message for object with id={}", key);
> }
> }
>
> @Required
> public void setProducer(Producer<String, String> producer) {
> this.producer = producer;
> }
>
> @Required
> public void setTopicName(String topicName) {
> this.topicName = topicName;
> }
> }
>
> Here is the topic config info :
>
> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
> --topic s.notifications.dev
> Topic:s.notifications.dev PartitionCount:1
> ReplicationFactor:1 Configs:retention.ms
> =172800000,cleanup.policy=compact
> Topic: s.notifications.dev Partition: 0 Leader: 0
> Replicas: 0 Isr: 0
>
>
>
>
>
> > On Apr 22, 2016, at 6:27 PM, vinay sharma <[email protected]>
> wrote:
> >
> > 2 producer's to same topic should not be a problem. There can be multiple
> > producers and consumers of same kafka topic.
> >
> > I am not sure what can be wrong here. I can this at my end If you can
> share
> > producer code and any config of topic ot broker that you changed and is
> not
> > default.
> >
> > Please also check that you are not creating producer every time you send
> a
> > message but reusing producer once created to send multiple messages.
> Before
> > a send and after creation a producers sends this request to fetch
> metadata.
> > I ran a test to publish messages from 2 producers to a topic with 3
> > partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more
> than
> > a minute and saw just once for both producers before their 1st send.
> >
> > Regards,
> > Vinay Sharma
> > On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <[email protected]
> >
> > wrote:
> >
> >> Hi. I've not set that value. My producer properties are as follows :
> >>
> >> acks=all
> >> retries=0
> >> bath.size=1638
> >> linger.ms=1
> >> buffer.memory=33554432
> >> compression.type=gzip
> >> client.id=sds-merdevl
> >>
> >> I have this running on two hosts with the same config. I thought that
> >> having the same client.id on each would just consolidate the tracking
> >> (same logical name). You don't think there is an issue with 2 producers
> to
> >> the same topic?
> >>
> >>
> >>
> >>
> >>> On Apr 22, 2016, at 3:05 PM, vinay sharma <[email protected]>
> >> wrote:
> >>>
> >>> Generally a proactive metadata refresh request is sent by producer and
> >>> consumer every 5 minutes but this interval can be overriden with
> >> property "
> >>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
> >> Check if
> >>> you have set this property very low in your producer?
> >>>
> >>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
> >>> [email protected]> wrote:
> >>>
> >>>> I'm testing a kafka install and using the java client. I have a topic
> >> set
> >>>> up and it appears to work great, but after a while I noticed my log
> >>>> starting to fill up with what appears to be some kind of loop for
> >> metadata
> >>>> updates.
> >>>>
> >>>> example::
> >>>>
> >>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out env="md"
> >>>> [kafka-producer-network-thread | sds-merdevl]
> >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >> 6196 to
> >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> partitions
> >> =
> >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >> replicas
> >>>> = [0,], isr = [0,]])
> >>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out env="md"
> >>>> [kafka-producer-network-thread | sds-merdevl]
> >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >> 6197 to
> >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> partitions
> >> =
> >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >> replicas
> >>>> = [0,], isr = [0,]])
> >>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out env="md"
> >>>> [kafka-producer-network-thread | sds-merdevl]
> >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >> 6198 to
> >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> partitions
> >> =
> >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >> replicas
> >>>> = [0,], isr = [0,]])
> >>>>
> >>>> etc.
> >>>>
> >>>> It hasn't stopped..
> >>>>
> >>>> I'm curious about what's going on here. Can anyone help?
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>
> >>
>
>