What versions of kafka and client API are you using now?
On Apr 26, 2016 3:35 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com>
wrote:

> I spoke too soon. It's back doing the same thing.. which is really odd.
>
> > On Apr 26, 2016, at 12:24 PM, Fumo, Vincent <
> vincent_f...@cable.comcast.com> wrote:
> >
> > Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my
> code was 0.9.0. Simple mistake and one that I should have thought of
> sooner. Once I had them bump up to the latest kafka all was well. Thank you
> for your help!
> >
> > v
> >
> >
> >> On Apr 25, 2016, at 2:54 PM, vinay sharma <vinsharma.t...@gmail.com>
> wrote:
> >>
> >> Hi,
> >>
> >> Your code looks good. i don't see any reason there for frequent meta
> data
> >> fetch but i will run it to verify.
> >>
> >> I was able to replicate this issue with my consumer today where I
> killed 2
> >> brokers out or 3 and ran consumer. I saw a lot of meta data requests in
> >> wait for new leader for partitions for which those 2 brokers were
> leader. i
> >> see below 2 lines frequently as you mentioned in logs
> >>
> >> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated
> cluster
> >> metadata version 275 to Cluster(nodes = [Node(1, node1.example.com,
> 9093)],
> >> partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader =
> 1,
> >> replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition
> =
> >> 2, leader = none, replicas = [], isr = [], Partition(topic =
> kafkaPOCTopic,
> >> partition = 0, leader = none, replicas = [], isr = []])
> >>
> >> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
> >> partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
> >> metadata refresh
> >>
> >> I also saw this behavior due to below error where i might have killed
> the
> >> only ISR available for the topic at that time.
> >>
> >> "Error while fetching metadata with correlation id 242 :
> >> {kafkaPOCTopic=LEADER_NOT_AVAILABLE}"
> >>
> >> Do you see any such error in your logs?
> >>
> >> Regards,
> >> Vinay Sharma
> >>
> >>
> >> On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
> >> vincent_f...@cable.comcast.com> wrote:
> >>
> >>>
> >>>
> >>> My code is very straightforward. I create a producer, and then call it
> to
> >>> send messages. Here is the factory method::
> >>>
> >>> public Producer<String, String> createProducer() {
> >>>
> >>>   Properties props = new Properties();
> >>>
> >>>   props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
> >>>   props.put("key.serializer",
> >>> "org.apache.kafka.common.serialization.StringSerializer");
> >>>   props.put("value.serializer",
> >>> "org.apache.kafka.common.serialization.StringSerializer");
> >>>   props.put("acks", "all");
> >>>   props.put("retries", "0");
> >>>   props.put("batch.size", "1638");
> >>>   props.put("linger.ms", "1");
> >>>   props.put("buffer.memory", "33554432");
> >>>   props.put("compression.type", "gzip");
> >>>   props.put("client.id", "sds-merdevl");
> >>>
> >>>   return new KafkaProducer<>(props);
> >>> }
> >>>
> >>> That is injected into a single instance of KafkaNotifier::
> >>>
> >>> public class KafkaNotifier implements MessageNotifier {
> >>>   private static Logger logger =
> >>> LoggerFactory.getLogger(KafkaNotifier.class);
> >>>
> >>>   private Producer<String, String> producer;
> >>>   private String topicName;
> >>>
> >>>   @Override
> >>>   public void sendNotificationMessage(DataObjectModificationMessage
> >>> message) {
> >>>
> >>>       String json = message.asJSON();
> >>>       String key = message.getObject().getId().toString();
> >>>
> >>>       producer.send(new ProducerRecord<>(topicName, key, json));
> >>>
> >>>       if (logger.isDebugEnabled()) {
> >>>           logger.debug("Sent Kafka message for object with id={}",
> key);
> >>>       }
> >>>   }
> >>>
> >>>   @Required
> >>>   public void setProducer(Producer<String, String> producer) {
> >>>       this.producer = producer;
> >>>   }
> >>>
> >>>   @Required
> >>>   public void setTopicName(String topicName) {
> >>>       this.topicName = topicName;
> >>>   }
> >>> }
> >>>
> >>> Here is the topic config info :
> >>>
> >>> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
> >>> --topic s.notifications.dev
> >>> Topic:s.notifications.dev       PartitionCount:1
> >>> ReplicationFactor:1     Configs:retention.ms
> >>> =172800000,cleanup.policy=compact
> >>>       Topic: s.notifications.dev      Partition: 0    Leader: 0
> >>> Replicas: 0     Isr: 0
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com>
> >>> wrote:
> >>>>
> >>>> 2 producer's to same topic should not be a problem. There can be
> multiple
> >>>> producers and consumers of same kafka topic.
> >>>>
> >>>> I am not sure what can be wrong here. I can this at my end If you can
> >>> share
> >>>> producer code and any config of topic ot broker that you changed and
> is
> >>> not
> >>>> default.
> >>>>
> >>>> Please also check that you are not creating producer every time you
> send
> >>> a
> >>>> message but reusing producer once created to send multiple messages.
> >>> Before
> >>>> a send and after creation a producers sends this request to fetch
> >>> metadata.
> >>>> I ran a test to publish messages from 2 producers to a topic with 3
> >>>> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more
> >>> than
> >>>> a minute and saw just once for both producers before their 1st send.
> >>>>
> >>>> Regards,
> >>>> Vinay Sharma
> >>>> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <
> vincent_f...@cable.comcast.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Hi. I've not set that value. My producer properties are as follows :
> >>>>>
> >>>>> acks=all
> >>>>> retries=0
> >>>>> bath.size=1638
> >>>>> linger.ms=1
> >>>>> buffer.memory=33554432
> >>>>> compression.type=gzip
> >>>>> client.id=sds-merdevl
> >>>>>
> >>>>> I have this running on two hosts with the same config. I thought that
> >>>>> having the same client.id on each would just consolidate the
> tracking
> >>>>> (same logical name). You don't think there is an issue with 2
> producers
> >>> to
> >>>>> the same topic?
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com
> >
> >>>>> wrote:
> >>>>>>
> >>>>>> Generally a proactive metadata refresh request is sent by producer
> and
> >>>>>> consumer every 5 minutes but this interval can be overriden with
> >>>>> property "
> >>>>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
> >>>>> Check if
> >>>>>> you have set this property very low in your producer?
> >>>>>>
> >>>>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
> >>>>>> vincent_f...@cable.comcast.com> wrote:
> >>>>>>
> >>>>>>> I'm testing a kafka install and using the java client. I have a
> topic
> >>>>> set
> >>>>>>> up and it appears to work great, but after a while I noticed my log
> >>>>>>> starting to fill up with what appears to be some kind of loop for
> >>>>> metadata
> >>>>>>> updates.
> >>>>>>>
> >>>>>>> example::
> >>>>>>>
> >>>>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out  env="md"
> >>>>>>> [kafka-producer-network-thread | sds-merdevl]
> >>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >>>>> 6196 to
> >>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> >>> partitions
> >>>>> =
> >>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >>>>> replicas
> >>>>>>> = [0,], isr = [0,]])
> >>>>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out  env="md"
> >>>>>>> [kafka-producer-network-thread | sds-merdevl]
> >>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >>>>> 6197 to
> >>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> >>> partitions
> >>>>> =
> >>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >>>>> replicas
> >>>>>>> = [0,], isr = [0,]])
> >>>>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out  env="md"
> >>>>>>> [kafka-producer-network-thread | sds-merdevl]
> >>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
> >>>>> 6198 to
> >>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
> >>> partitions
> >>>>> =
> >>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
> >>>>> replicas
> >>>>>>> = [0,], isr = [0,]])
> >>>>>>>
> >>>>>>> etc.
> >>>>>>>
> >>>>>>> It hasn't stopped..
> >>>>>>>
> >>>>>>> I'm curious about what's going on here. Can anyone help?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >
> >
>
>

Reply via email to