I spoke too soon. It's back doing the same thing.. which is really odd. 

> On Apr 26, 2016, at 12:24 PM, Fumo, Vincent <vincent_f...@cable.comcast.com> 
> wrote:
> 
> Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my code 
> was 0.9.0. Simple mistake and one that I should have thought of sooner. Once 
> I had them bump up to the latest kafka all was well. Thank you for your help!
> 
> v
> 
> 
>> On Apr 25, 2016, at 2:54 PM, vinay sharma <vinsharma.t...@gmail.com> wrote:
>> 
>> Hi,
>> 
>> Your code looks good. i don't see any reason there for frequent meta data
>> fetch but i will run it to verify.
>> 
>> I was able to replicate this issue with my consumer today where I killed 2
>> brokers out or 3 and ran consumer. I saw a lot of meta data requests in
>> wait for new leader for partitions for which those 2 brokers were leader. i
>> see below 2 lines frequently as you mentioned in logs
>> 
>> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated cluster
>> metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, 9093)],
>> partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1,
>> replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition =
>> 2, leader = none, replicas = [], isr = [], Partition(topic = kafkaPOCTopic,
>> partition = 0, leader = none, replicas = [], isr = []])
>> 
>> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
>> partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
>> metadata refresh
>> 
>> I also saw this behavior due to below error where i might have killed the
>> only ISR available for the topic at that time.
>> 
>> "Error while fetching metadata with correlation id 242 :
>> {kafkaPOCTopic=LEADER_NOT_AVAILABLE}"
>> 
>> Do you see any such error in your logs?
>> 
>> Regards,
>> Vinay Sharma
>> 
>> 
>> On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
>> vincent_f...@cable.comcast.com> wrote:
>> 
>>> 
>>> 
>>> My code is very straightforward. I create a producer, and then call it to
>>> send messages. Here is the factory method::
>>> 
>>> public Producer<String, String> createProducer() {
>>> 
>>>   Properties props = new Properties();
>>> 
>>>   props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
>>>   props.put("key.serializer",
>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>   props.put("value.serializer",
>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>   props.put("acks", "all");
>>>   props.put("retries", "0");
>>>   props.put("batch.size", "1638");
>>>   props.put("linger.ms", "1");
>>>   props.put("buffer.memory", "33554432");
>>>   props.put("compression.type", "gzip");
>>>   props.put("client.id", "sds-merdevl");
>>> 
>>>   return new KafkaProducer<>(props);
>>> }
>>> 
>>> That is injected into a single instance of KafkaNotifier::
>>> 
>>> public class KafkaNotifier implements MessageNotifier {
>>>   private static Logger logger =
>>> LoggerFactory.getLogger(KafkaNotifier.class);
>>> 
>>>   private Producer<String, String> producer;
>>>   private String topicName;
>>> 
>>>   @Override
>>>   public void sendNotificationMessage(DataObjectModificationMessage
>>> message) {
>>> 
>>>       String json = message.asJSON();
>>>       String key = message.getObject().getId().toString();
>>> 
>>>       producer.send(new ProducerRecord<>(topicName, key, json));
>>> 
>>>       if (logger.isDebugEnabled()) {
>>>           logger.debug("Sent Kafka message for object with id={}", key);
>>>       }
>>>   }
>>> 
>>>   @Required
>>>   public void setProducer(Producer<String, String> producer) {
>>>       this.producer = producer;
>>>   }
>>> 
>>>   @Required
>>>   public void setTopicName(String topicName) {
>>>       this.topicName = topicName;
>>>   }
>>> }
>>> 
>>> Here is the topic config info :
>>> 
>>> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
>>> --topic s.notifications.dev
>>> Topic:s.notifications.dev       PartitionCount:1
>>> ReplicationFactor:1     Configs:retention.ms
>>> =172800000,cleanup.policy=compact
>>>       Topic: s.notifications.dev      Partition: 0    Leader: 0
>>> Replicas: 0     Isr: 0
>>> 
>>> 
>>> 
>>> 
>>> 
>>>> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com>
>>> wrote:
>>>> 
>>>> 2 producer's to same topic should not be a problem. There can be multiple
>>>> producers and consumers of same kafka topic.
>>>> 
>>>> I am not sure what can be wrong here. I can this at my end If you can
>>> share
>>>> producer code and any config of topic ot broker that you changed and is
>>> not
>>>> default.
>>>> 
>>>> Please also check that you are not creating producer every time you send
>>> a
>>>> message but reusing producer once created to send multiple messages.
>>> Before
>>>> a send and after creation a producers sends this request to fetch
>>> metadata.
>>>> I ran a test to publish messages from 2 producers to a topic with 3
>>>> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more
>>> than
>>>> a minute and saw just once for both producers before their 1st send.
>>>> 
>>>> Regards,
>>>> Vinay Sharma
>>>> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com
>>>> 
>>>> wrote:
>>>> 
>>>>> Hi. I've not set that value. My producer properties are as follows :
>>>>> 
>>>>> acks=all
>>>>> retries=0
>>>>> bath.size=1638
>>>>> linger.ms=1
>>>>> buffer.memory=33554432
>>>>> compression.type=gzip
>>>>> client.id=sds-merdevl
>>>>> 
>>>>> I have this running on two hosts with the same config. I thought that
>>>>> having the same client.id on each would just consolidate the tracking
>>>>> (same logical name). You don't think there is an issue with 2 producers
>>> to
>>>>> the same topic?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>> Generally a proactive metadata refresh request is sent by producer and
>>>>>> consumer every 5 minutes but this interval can be overriden with
>>>>> property "
>>>>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes.
>>>>> Check if
>>>>>> you have set this property very low in your producer?
>>>>>> 
>>>>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent <
>>>>>> vincent_f...@cable.comcast.com> wrote:
>>>>>> 
>>>>>>> I'm testing a kafka install and using the java client. I have a topic
>>>>> set
>>>>>>> up and it appears to work great, but after a while I noticed my log
>>>>>>> starting to fill up with what appears to be some kind of loop for
>>>>> metadata
>>>>>>> updates.
>>>>>>> 
>>>>>>> example::
>>>>>>> 
>>>>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out  env="md"
>>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>>> 6196 to
>>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>>> partitions
>>>>> =
>>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>>> replicas
>>>>>>> = [0,], isr = [0,]])
>>>>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out  env="md"
>>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>>> 6197 to
>>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>>> partitions
>>>>> =
>>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>>> replicas
>>>>>>> = [0,], isr = [0,]])
>>>>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out  env="md"
>>>>>>> [kafka-producer-network-thread | sds-merdevl]
>>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version
>>>>> 6198 to
>>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)],
>>> partitions
>>>>> =
>>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0,
>>>>> replicas
>>>>>>> = [0,], isr = [0,]])
>>>>>>> 
>>>>>>> etc.
>>>>>>> 
>>>>>>> It hasn't stopped..
>>>>>>> 
>>>>>>> I'm curious about what's going on here. Can anyone help?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 
> 

Reply via email to