Hi,

Your code looks good. i don't see any reason there for frequent meta data
fetch but i will run it to verify.

I was able to replicate this issue with my consumer today where I killed 2
brokers out or 3 and ran consumer. I saw a lot of meta data requests in
wait for new leader for partitions for which those 2 brokers were leader. i
see below 2 lines frequently as you mentioned in logs

[Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated cluster
metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, 9093)],
partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1,
replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition =
2, leader = none, replicas = [], isr = [], Partition(topic = kafkaPOCTopic,
partition = 0, leader = none, replicas = [], isr = []])

[Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
metadata refresh

I also saw this behavior due to below error where i might have killed the
only ISR available for the topic at that time.

"Error while fetching metadata with correlation id 242 :
{kafkaPOCTopic=LEADER_NOT_AVAILABLE}"

Do you see any such error in your logs?

Regards,
Vinay Sharma


On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
vincent_f...@cable.comcast.com> wrote:

>
>
> My code is very straightforward. I create a producer, and then call it to
> send messages. Here is the factory method::
>
> public Producer<String, String> createProducer() {
>
>     Properties props = new Properties();
>
>     props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
>     props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("acks", "all");
>     props.put("retries", "0");
>     props.put("batch.size", "1638");
>     props.put("linger.ms", "1");
>     props.put("buffer.memory", "33554432");
>     props.put("compression.type", "gzip");
>     props.put("client.id", "sds-merdevl");
>
>     return new KafkaProducer<>(props);
> }
>
> That is injected into a single instance of KafkaNotifier::
>
> public class KafkaNotifier implements MessageNotifier {
>     private static Logger logger =
> LoggerFactory.getLogger(KafkaNotifier.class);
>
>     private Producer<String, String> producer;
>     private String topicName;
>
>     @Override
>     public void sendNotificationMessage(DataObjectModificationMessage
> message) {
>
>         String json = message.asJSON();
>         String key = message.getObject().getId().toString();
>
>         producer.send(new ProducerRecord<>(topicName, key, json));
>
>         if (logger.isDebugEnabled()) {
>             logger.debug("Sent Kafka message for object with id={}", key);
>         }
>     }
>
>     @Required
>     public void setProducer(Producer<String, String> producer) {
>         this.producer = producer;
>     }
>
>     @Required
>     public void setTopicName(String topicName) {
>         this.topicName = topicName;
>     }
> }
>
> Here is the topic config info :
>
> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
> --topic s.notifications.dev
> Topic:s.notifications.dev       PartitionCount:1
> ReplicationFactor:1     Configs:retention.ms
> =172800000,cleanup.policy=compact
>         Topic: s.notifications.dev      Partition: 0    Leader: 0
>  Replicas: 0     Isr: 0
>
>
>
>
>
> > On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com>
> wrote:
> >
> > 2 producer's to same topic should not be a problem. There can be multiple
> > producers and consumers of same kafka topic.
> >
> > I am not sure what can be wrong here. I can this at my end If you can
> share
> > producer code and any config of topic ot broker that you changed and is
> not
> > default.
> >
> > Please also check that you are not creating producer every time you send
> a
> > message but reusing producer once created to send multiple messages.
> Before
> > a send and after creation a producers sends this request to fetch
> metadata.
> > I ran a test to publish messages from 2 producers to a topic with 3
> > partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more
> than
> > a minute and saw just once for both producers before their 1st send.
> >
> > Regards,
> > Vinay Sharma
> > On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com
> >
> > wrote:
> >
> >> Hi. I've not set that value. My producer properties are as follows :
> >>
> >> acks=all
> >> retries=0
> >> bath.size=1638
> >> linger.ms=1
> >> buffer.memory=33554432
> >> compression.type=gzip
> >> client.id=sds-merdevl
> >>
> >> I have this running on two hosts with the same config. I thought that
> >> having the same client.id on each would just consolidate the tracking
> >> (same logical name). You don't think there is an issue with 2 producers
> to
> >> the same topic?
> >>
> >>
> >>
> >>
> >>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com>
> >> wrote:
> >>>
> >>> Generally a proactive metadata refresh request is sent by producer and
> >>> consumer every 5 minutes but this interval can be overriden with
> >> property "
> >>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
> >> Check if
> >>> you have set this property very low in your producer?
> >>>
> >>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
> >>> vincent_f...@cable.comcast.com> wrote:
> >>>
> >>>> I'm testing a kafka install and using the java client. I have a topic
> >> set
> >>>> up and it appears to work great, but after a while I noticed my log
> >>>> starting to fill up with what appears to be some kind of loop for
> >> metadata
> >>>> updates.
> >>>>
> >>>> example::
> >>>>
> >>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out  env="md"
> >>>> [kafka-producer-network-thread | sds-merdevl]
> >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >> 6196 to
> >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> partitions
> >> =
> >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >> replicas
> >>>> = [0,], isr = [0,]])
> >>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out  env="md"
> >>>> [kafka-producer-network-thread | sds-merdevl]
> >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >> 6197 to
> >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> partitions
> >> =
> >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >> replicas
> >>>> = [0,], isr = [0,]])
> >>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out  env="md"
> >>>> [kafka-producer-network-thread | sds-merdevl]
> >>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >> 6198 to
> >>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> partitions
> >> =
> >>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >> replicas
> >>>> = [0,], isr = [0,]])
> >>>>
> >>>> etc.
> >>>>
> >>>> It hasn't stopped..
> >>>>
> >>>> I'm curious about what's going on here. Can anyone help?
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to