What versions of kafka and client API are you using now? On Apr 26, 2016 3:35 PM, "Fumo, Vincent" <vincent_f...@cable.comcast.com> wrote:
> I spoke too soon. It's back doing the same thing.. which is really odd. > > > On Apr 26, 2016, at 12:24 PM, Fumo, Vincent < > vincent_f...@cable.comcast.com> wrote: > > > > Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my > code was 0.9.0. Simple mistake and one that I should have thought of > sooner. Once I had them bump up to the latest kafka all was well. Thank you > for your help! > > > > v > > > > > >> On Apr 25, 2016, at 2:54 PM, vinay sharma <vinsharma.t...@gmail.com> > wrote: > >> > >> Hi, > >> > >> Your code looks good. i don't see any reason there for frequent meta > data > >> fetch but i will run it to verify. > >> > >> I was able to replicate this issue with my consumer today where I > killed 2 > >> brokers out or 3 and ran consumer. I saw a lot of meta data requests in > >> wait for new leader for partitions for which those 2 brokers were > leader. i > >> see below 2 lines frequently as you mentioned in logs > >> > >> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated > cluster > >> metadata version 275 to Cluster(nodes = [Node(1, node1.example.com, > 9093)], > >> partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = > 1, > >> replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition > = > >> 2, leader = none, replicas = [], isr = [], Partition(topic = > kafkaPOCTopic, > >> partition = 0, leader = none, replicas = [], isr = []]) > >> > >> [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for > >> partition kafkaPOCTopic-0 unavailable for fetching offset, wait for > >> metadata refresh > >> > >> I also saw this behavior due to below error where i might have killed > the > >> only ISR available for the topic at that time. > >> > >> "Error while fetching metadata with correlation id 242 : > >> {kafkaPOCTopic=LEADER_NOT_AVAILABLE}" > >> > >> Do you see any such error in your logs? > >> > >> Regards, > >> Vinay Sharma > >> > >> > >> On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent < > >> vincent_f...@cable.comcast.com> wrote: > >> > >>> > >>> > >>> My code is very straightforward. I create a producer, and then call it > to > >>> send messages. Here is the factory method:: > >>> > >>> public Producer<String, String> createProducer() { > >>> > >>> Properties props = new Properties(); > >>> > >>> props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092"); > >>> props.put("key.serializer", > >>> "org.apache.kafka.common.serialization.StringSerializer"); > >>> props.put("value.serializer", > >>> "org.apache.kafka.common.serialization.StringSerializer"); > >>> props.put("acks", "all"); > >>> props.put("retries", "0"); > >>> props.put("batch.size", "1638"); > >>> props.put("linger.ms", "1"); > >>> props.put("buffer.memory", "33554432"); > >>> props.put("compression.type", "gzip"); > >>> props.put("client.id", "sds-merdevl"); > >>> > >>> return new KafkaProducer<>(props); > >>> } > >>> > >>> That is injected into a single instance of KafkaNotifier:: > >>> > >>> public class KafkaNotifier implements MessageNotifier { > >>> private static Logger logger = > >>> LoggerFactory.getLogger(KafkaNotifier.class); > >>> > >>> private Producer<String, String> producer; > >>> private String topicName; > >>> > >>> @Override > >>> public void sendNotificationMessage(DataObjectModificationMessage > >>> message) { > >>> > >>> String json = message.asJSON(); > >>> String key = message.getObject().getId().toString(); > >>> > >>> producer.send(new ProducerRecord<>(topicName, key, json)); > >>> > >>> if (logger.isDebugEnabled()) { > >>> logger.debug("Sent Kafka message for object with id={}", > key); > >>> } > >>> } > >>> > >>> @Required > >>> public void setProducer(Producer<String, String> producer) { > >>> this.producer = producer; > >>> } > >>> > >>> @Required > >>> public void setTopicName(String topicName) { > >>> this.topicName = topicName; > >>> } > >>> } > >>> > >>> Here is the topic config info : > >>> > >>> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe > >>> --topic s.notifications.dev > >>> Topic:s.notifications.dev PartitionCount:1 > >>> ReplicationFactor:1 Configs:retention.ms > >>> =172800000,cleanup.policy=compact > >>> Topic: s.notifications.dev Partition: 0 Leader: 0 > >>> Replicas: 0 Isr: 0 > >>> > >>> > >>> > >>> > >>> > >>>> On Apr 22, 2016, at 6:27 PM, vinay sharma <vinsharma.t...@gmail.com> > >>> wrote: > >>>> > >>>> 2 producer's to same topic should not be a problem. There can be > multiple > >>>> producers and consumers of same kafka topic. > >>>> > >>>> I am not sure what can be wrong here. I can this at my end If you can > >>> share > >>>> producer code and any config of topic ot broker that you changed and > is > >>> not > >>>> default. > >>>> > >>>> Please also check that you are not creating producer every time you > send > >>> a > >>>> message but reusing producer once created to send multiple messages. > >>> Before > >>>> a send and after creation a producers sends this request to fetch > >>> metadata. > >>>> I ran a test to publish messages from 2 producers to a topic with 3 > >>>> partitions on a 3 broker 1 zookeeper kafka setup. I ran test for more > >>> than > >>>> a minute and saw just once for both producers before their 1st send. > >>>> > >>>> Regards, > >>>> Vinay Sharma > >>>> On Apr 22, 2016 3:15 PM, "Fumo, Vincent" < > vincent_f...@cable.comcast.com > >>>> > >>>> wrote: > >>>> > >>>>> Hi. I've not set that value. My producer properties are as follows : > >>>>> > >>>>> acks=all > >>>>> retries=0 > >>>>> bath.size=1638 > >>>>> linger.ms=1 > >>>>> buffer.memory=33554432 > >>>>> compression.type=gzip > >>>>> client.id=sds-merdevl > >>>>> > >>>>> I have this running on two hosts with the same config. I thought that > >>>>> having the same client.id on each would just consolidate the > tracking > >>>>> (same logical name). You don't think there is an issue with 2 > producers > >>> to > >>>>> the same topic? > >>>>> > >>>>> > >>>>> > >>>>> > >>>>>> On Apr 22, 2016, at 3:05 PM, vinay sharma <vinsharma.t...@gmail.com > > > >>>>> wrote: > >>>>>> > >>>>>> Generally a proactive metadata refresh request is sent by producer > and > >>>>>> consumer every 5 minutes but this interval can be overriden with > >>>>> property " > >>>>>> metadata.max.age.ms" which has default value 300000 i.e 5 minutes. > >>>>> Check if > >>>>>> you have set this property very low in your producer? > >>>>>> > >>>>>> On Fri, Apr 22, 2016 at 11:46 AM, Fumo, Vincent < > >>>>>> vincent_f...@cable.comcast.com> wrote: > >>>>>> > >>>>>>> I'm testing a kafka install and using the java client. I have a > topic > >>>>> set > >>>>>>> up and it appears to work great, but after a while I noticed my log > >>>>>>> starting to fill up with what appears to be some kind of loop for > >>>>> metadata > >>>>>>> updates. > >>>>>>> > >>>>>>> example:: > >>>>>>> > >>>>>>> 2016-04-22 15:43:55,139 DEBUG s=s-root_out env="md" > >>>>>>> [kafka-producer-network-thread | sds-merdevl] > >>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version > >>>>> 6196 to > >>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], > >>> partitions > >>>>> = > >>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, > >>>>> replicas > >>>>>>> = [0,], isr = [0,]]) > >>>>>>> 2016-04-22 15:43:55,240 DEBUG s=s-root_out env="md" > >>>>>>> [kafka-producer-network-thread | sds-merdevl] > >>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version > >>>>> 6197 to > >>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], > >>> partitions > >>>>> = > >>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, > >>>>> replicas > >>>>>>> = [0,], isr = [0,]]) > >>>>>>> 2016-04-22 15:43:55,341 DEBUG s=s-root_out env="md" > >>>>>>> [kafka-producer-network-thread | sds-merdevl] > >>>>>>> org.apache.kafka.clients.Metadata: Updated cluster metadata version > >>>>> 6198 to > >>>>>>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.cc.com, 9092)], > >>> partitions > >>>>> = > >>>>>>> [Partition(topic = s.notifications.dev, partition = 0, leader = 0, > >>>>> replicas > >>>>>>> = [0,], isr = [0,]]) > >>>>>>> > >>>>>>> etc. > >>>>>>> > >>>>>>> It hasn't stopped.. > >>>>>>> > >>>>>>> I'm curious about what's going on here. Can anyone help? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >>> > > > > > >