Multiple Leaders on single Partition in Kafka Cluster
Hi, We recently saw a split-brain behavior happened in production. There were a controller switch, and then unclean leader elections. It led to 24 partitions (out of 70) had 2 leaders for 2 days until I restarted the broker. Producers and consumers were talking to different "leaders" so the records were produced to a broker from where no consumer was consuming. Total controller count was always 1 during the event. I did a very rough search on Kafka's JIRA, and seems there is no similar report. I want to know if there's a known bug. If it is a new one, although I cannot reproduce it, I'd like to provide more details (logs, datadog dashboard screenshots) for Kafka team to investigate it. [env] Kafka version: 0.11.0.2 zookeeper version: 3.4.12. OS: CentOS 7 We have 3 servers, each is hosting 1 Kafka + 1 zookeeper. Unclean leader election is enabled. Topics: there were 2 topics: __consumer_offsets=>50 partitions; postfix=>20 partitions. [details] The split brain was triggered by server reboot. We were doing a system maintenance so rebooting 3 nodes 1 by 1. (Kafka/Zookeeper stop script are hooked to OS shutdown) 1. When rebooting server #3, controller election happened and switched from #1 to #2. 2. After Kafka #3 restarted, Unclean leader election was seen from JMX metrics. - Controller changed from #1 to #2. - The leader count changed from (23, 24, 23) to (43, 24, 27). - The under replicated partitions changed from (0, 0, 0) to (20, 0, 4) - ISR shrink rate on #2 grew and kept being about 4.8 3. message.in for each broker was the same as pre-reboot. 4. bytes.out was 0 for broker #2. The cluster kept the same weird status for 30 hours. Can anyone explain how could this happen? I'd like to provide logs and metrics if needed.
Get count of messages
Hi All, Could you please help me in getting count of all messages stored in kafka from a particular offset? I have tried GetOffsetShell command, it is not giving me. Kind Regards, Sachit Murarka
Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table
Hi John and Matthias thanks for the questions, maybe explaining our use case helps a bit: We are receiving CDC records (row-level insert/update/delete) in one topic per table. The key is derived from the DB records, the value is null in case of deletes. Those would be the immutable facts I guess. These topics are first streamed through a deduplication Transformer to drop changes on irrelevant fields. The results are translated to KTables and joined to each other to represent the same result as the SQLs on the database, but faster. At this stage the delete/null records matter because if a record gets deleted then we want it to drop out of the join too. -> Our reduce-approach produced unexpected results here. We took the deduplication step separately because in some cases we only need the the KStream for processing. If you see a simpler/cleaner approach here I'm open to suggestions, of course. Regarding the overhead: 1) Named topics create management/maintenance overhead because they have to be created/treated separately (auto-create is not an option) and be considered in future changes, topology changes/resets and so on. The internal topic removes most of those issues. 2) One of our developers came up with the question if the traffic to/from the broker was actually the same in both scenarios, we expect that the same is written to the broker for the named topic as well as the reduce-case, but if the KTable is maintained inside a streams topology, does it have to read back everything it sends to the broker or can it keep the table internally? I hope it is understandable what I mean, otherwise I can try the explain it more clearly. best regards Patrik On Sat, 27 Oct 2018 at 23:50, John Roesler wrote: > Hi again Patrik, > > Actually, this is a good question... Can you share some context about why > you need to convert a stream to a table (including nulls as retractions)? > > Thanks, > -John > > On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax > wrote: > > > I don't know your overall application setup. However, a KStream > > semantically models immutable facts and there is not update semantic. > > Thus, it seems semantically questionable, to allow changing the > > semantics from facts to updates (the other way is easier IMHO, and thus > > supported via KTable#toStream()). > > > > Does this make sense? > > > > Having said this: you _can_ write a KStream into a topic an read it back > > as KTable. But it's semantically questionable to do so, IMHO. Maybe it > > makes sense for your specific application, but in general I don't think > > it does make sense. > > > > > > -Matthias > > > > On 10/26/18 9:30 AM, John Roesler wrote: > > > Hi Patrik, > > > > > > Just to drop one observation in... Streaming to a topic and then > > consuming > > > it as a table does create overhead, but so does reducing a stream to a > > > table, and I think it's actually the same in either case. > > > > > > They both require a store to collect the table state, and in both > cases, > > > the stores need to have a changelog topic. For the "reduce" version, > it's > > > an internal changelog topic, and for the "topic-to-table" version, the > > > store can use the intermediate topic as its changelog. > > > > > > This doesn't address your ergonomic concern, but it seemed worth > pointing > > > out that (as far as I can tell), there doesn't seem to be a difference > in > > > overhead. > > > > > > Hope this helps! > > > -John > > > > > > On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl > > wrote: > > > > > >> Hello Matthias, > > >> thank you for the explanation. > > >> Streaming back to a topic and consuming this as a KTable does respect > > the > > >> null values as deletes, correct? But at the price of some overhead. > > >> Is there any (historical, technical or emotional;-)) reason that no > > simple > > >> one-step stream-to-table operation exists? > > >> Best regards > > >> Patrik > > >> > > >>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax < > matth...@confluent.io > > >: > > >>> > > >>> Patrik, > > >>> > > >>> `null` values in a KStream don't have delete semantics (it's not a > > >>> changelog stream). That's why we drop them in the KStream#reduce > > >>> implemenation. > > >>> > > >>> If you want to explicitly remove results for a key from the result > > >>> KTable, your `Reducer#apply()` implementation must return `null` -- > the > > >>> result of #apply() has changelog/KTable semantics and `null` is > > >>> interpreted as delete for this case. > > >>> > > >>> If you want to use `null` from your KStream to trigger reduce() to > > >>> delete, you will need to use a surrogate value for this, ie, do a > > >>> mapValues() before the groupByKey() call, an replace `null` values > with > > >>> the surrogate-delete-marker that you can evaluate in > `Reducer#apply()` > > >>> to return `null` for this case. > > >>> > > >>> Hope this helps. > > >>> > > >>> -Matthias > > >>> > > On 10/25/18 10:36 AM, Patrik Kleindl wrote: > > Hello >
How to create kafka producer by .Net framework in Kerberos-enabled Kafka cluster?
Hi, everyone, As titled, I'm dealing with the problem "How to create kafka producer by .Net framework in Kerberos-enabled Kafka cluster?" And I need some help. Does anyone have similar experience and kindly provide some suggestion? Best, *Yvonne Chang 張雅涵* *亦思科技專業資訊服務團隊* 看見新世代資料庫---*HareDB* Tel:03-5630345 Ext.18 Mobile: 0963-756811 Fax:03-5631345 新竹科學園區展業二路4號3樓 www.is-land.com.tw www.haredb.com
Having issue with Kafka partition
Hi All, Recently I faced the Below issue WARN Fetcher:679 - Received unknown topic or partition error in ListOffset request for partition messages-2-1. The topic/partition may not exist or the user may not have Describe access to it My consumer is not working for the particular topic. When I checked the Partition it looks fine Topic: messages-2PartitionCount:3ReplicationFactor:3Configs: Topic: messages-2Partition: 0Leader: 3Replicas: 3,2,1Isr: 3,2 Topic: messages-2Partition: 1Leader: 1Replicas: 1,3,2Isr: 1,3,2 Topic: messages-2Partition: 2Leader: 2Replicas: 2,1,3Isr: 2,3 Anyone have idea on this -- With Regards, Karthick.K
Problem with kafka-streams aggregate windowedBy
Hi everyone! I use kafka-streams, and i have a problem when i use windowedBy. Everything works well until I restart the application. After restarting my aggregation starts from beginning. Code bellow: > > StreamsBuilder builder = new StreamsBuilder() > KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), > Serdes.String())) > > KTable table = > stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > .aggregate( > { new AggregatorModel() }, > { key, value, aggregate -> > return aggregate.add(value) > } > ) > .toStream() > .map({ k, v -> > new KeyValue<>(k.window().end(), v) > }) > .to('output') > > def config = new Properties() > config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') > config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > TimeUnit.SECONDS.toMillis(60)) > > KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) > kafkaStreams.start() > > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to 'latest' and 'earliest' but it didn't help. Can you help me understand what I'm doing wrong? Thank you.
Kubernetes: Using Load Balancers
Hi all! I was trying to deploy a little Kafka cluster over Kubernetes (k8s), and noticed this wonderful helm charts from here https://github.com/confluentinc/cp-helm-charts The Kafka chart included there, will try to expose the brokers so they can be accesible not only from within the k8s cluster, but from outside too. Great! Lets (try to) draw this: +---VPC---+ | | | ++ ++ |.-~~~-. | | Node 1 | | Node 2 | |.- ~ ~-( )_ _ | ++ ++ | / ~ -. | ++ ++ | | The internetz\ | | Node 3 | | Node 4 | | \ .' | ++ ++ | ~- . _ . -~ | | | +---+ | | |A nice VM | | | +---+ | | | +-+ Legend: *Node X*:K8s nodes. They live inside a VPC. Pods lives in the nodes. *A nice VM*: A VM that lives in the same VPC as k8s nodes, but its not part of it. *The internetz*: Things outside the VPC Lets separate this in 3 parts: 1) Conectivity from within the cluster Easy, the communicate via the POD's IPs. K8s is in charge of route the packets around. 2) Conectivity from a nice VM. When enabled, the kafka chart will expose the borkers using NodePort (A) By default it uses the IP of the Host, on ports 31090, 31091 and 31092 (B) So clients that sits on this VM, need just to point to one of these and start doing its job. 3) Conectivity from the internet. Not possible. :( Clouds. They typical to have 2 IP's: a local to the VPC one, and a externally routable one, which can be volatile or permanent/elastic The variable hostIP used by the chart (C), will use the local to the VPC IP, which makes it possible to connect a VM from the same VPC subnet to kafka. But not from other places. On k8s i'm not sure if it makes too much sense to have elastic IPs on the nodes, but anyway they would be assigned after the node is created, and could be manually changed by the operator. Probably it will be a better idea to use LoadBalancers instead. If we wanted to implement LoadBalancer in the chart, how do you think would be a good way to do it? What would be better? - To make the kafka chart wait until the load-balancers are created, get their URL, setup advertised.listeners based on that and then start the broker? (maybe using a initContainer (?)) - Or maybe just listen to when it's ready, setup the advertised.listeners, and then make the the brokers restart with the new values? Thanks! (A) https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/templates/nodeport-service.yaml#L21 (B) https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/values.yaml#L106 (C) https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/templates/statefulset.yaml#L94
Re: Problem with kafka-streams aggregate windowedBy
Hi Does your applicationId change? Best regards Patrik > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov : > > Hi everyone! I use kafka-streams, and i have a problem when i use > windowedBy. Everything works well until I restart the application. After > restarting my aggregation starts from beginning. > Code bellow: >> >>StreamsBuilder builder = new StreamsBuilder() >>KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), >> Serdes.String())) >> >>KTable table = >> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) >>.aggregate( >>{ new AggregatorModel() }, >>{ key, value, aggregate -> >>return aggregate.add(value) >>} >>) >>.toStream() >>.map({ k, v -> >>new KeyValue<>(k.window().end(), v) >>}) >>.to('output') >> >>def config = new Properties() >>config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) >>config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') >>config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >> TimeUnit.SECONDS.toMillis(60)) >> >>KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) >>kafkaStreams.start() >> >> > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to > 'latest' and 'earliest' but it didn't help. > Can you help me understand what I'm doing wrong? > Thank you.
Re: Problem with kafka-streams aggregate windowedBy
Hi No, my application id doesn't change пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl : > Hi > Does your applicationId change? > Best regards > Patrik > > > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov : > > > > Hi everyone! I use kafka-streams, and i have a problem when i use > > windowedBy. Everything works well until I restart the application. After > > restarting my aggregation starts from beginning. > > Code bellow: > >> > >>StreamsBuilder builder = new StreamsBuilder() > >>KStream stream = builder.stream(topic, > Consumed.with(Serdes.String(), Serdes.String())) > >> > >>KTable table = > stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > >>.aggregate( > >>{ new AggregatorModel() }, > >>{ key, value, aggregate -> > >>return aggregate.add(value) > >>} > >>) > >>.toStream() > >>.map({ k, v -> > >>new KeyValue<>(k.window().end(), v) > >>}) > >>.to('output') > >> > >>def config = new Properties() > >>config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > >>config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') > >>config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > TimeUnit.SECONDS.toMillis(60)) > >> > >>KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) > >>kafkaStreams.start() > >> > >> > > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to > > 'latest' and 'earliest' but it didn't help. > > Can you help me understand what I'm doing wrong? > > Thank you. >
Re: Problem with kafka-streams aggregate windowedBy
Hi How long does your application run? More than the 60 seconds you set for commit interval? Have a look at https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ and check if your offsets are really comitted Best regards Patrik > Am 29.10.2018 um 18:20 schrieb Pavel Koroliov : > > Hi > No, my application id doesn't change > > пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl : > >> Hi >> Does your applicationId change? >> Best regards >> Patrik >> >>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov : >>> >>> Hi everyone! I use kafka-streams, and i have a problem when i use >>> windowedBy. Everything works well until I restart the application. After >>> restarting my aggregation starts from beginning. >>> Code bellow: StreamsBuilder builder = new StreamsBuilder() KStream stream = builder.stream(topic, >> Consumed.with(Serdes.String(), Serdes.String())) KTable table = >> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) .aggregate( { new AggregatorModel() }, { key, value, aggregate -> return aggregate.add(value) } ) .toStream() .map({ k, v -> new KeyValue<>(k.window().end(), v) }) .to('output') def config = new Properties() config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >> TimeUnit.SECONDS.toMillis(60)) KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) kafkaStreams.start() >>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to >>> 'latest' and 'earliest' but it didn't help. >>> Can you help me understand what I'm doing wrong? >>> Thank you. >>
Error when using mockSchemaregistry
Hey folks, I am getting the below exception when using a mockSchemaRegsitry in a junit test . I appreciate your help.I am using confluent 4.0 private SchemaRegistryClient schemaRegistry; private KafkaAvroSerializer avroSerializer; Properties defaultConfig = new Properties(); defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://fake-url";); defaultConfig.put("auto.register.schemas", false); schemaRegistry = new MockSchemaRegistryClient(); avroSerializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(defaultConfig)); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); KafkaProducer producer = new KafkaProducer(senderProps); ProducerRecord record =new ProducerRecord("test2","1",log); try { producer.send(record).get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Error == org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:360) ... 28 more
Re: Get count of messages
you can user kafkacat starting at that offset to head and pipe the output to "wc -l" (word count). -BW On Mon, Oct 29, 2018 at 3:39 AM Sachit Murarka wrote: > Hi All, > > Could you please help me in getting count of all messages stored in kafka > from a particular offset? > I have tried GetOffsetShell command, it is not giving me. > > > Kind Regards, > Sachit Murarka >
Re: Problem with kafka-streams aggregate windowedBy
Make sure to call `KafkaStreams#close()` to get the latest offsets committed. Beside this, you can check the consumer and Streams logs in DEBUG mode, to see what offset is picked up (or not). -Matthias On 10/29/18 11:43 AM, Patrik Kleindl wrote: > Hi > How long does your application run? More than the 60 seconds you set for > commit interval? > Have a look at > https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ > > and check if your offsets are really comitted > Best regards > Patrik > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov : >> >> Hi >> No, my application id doesn't change >> >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl : >> >>> Hi >>> Does your applicationId change? >>> Best regards >>> Patrik >>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov : Hi everyone! I use kafka-streams, and i have a problem when i use windowedBy. Everything works well until I restart the application. After restarting my aggregation starts from beginning. Code bellow: > > StreamsBuilder builder = new StreamsBuilder() > KStream stream = builder.stream(topic, >>> Consumed.with(Serdes.String(), Serdes.String())) > > KTable table = >>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > .aggregate( > { new AggregatorModel() }, > { key, value, aggregate -> > return aggregate.add(value) > } > ) > .toStream() > .map({ k, v -> > new KeyValue<>(k.window().end(), v) > }) > .to('output') > > def config = new Properties() > config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092') > config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>> TimeUnit.SECONDS.toMillis(60)) > > KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config) > kafkaStreams.start() > > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to 'latest' and 'earliest' but it didn't help. Can you help me understand what I'm doing wrong? Thank you. >>> > signature.asc Description: OpenPGP digital signature
Re: Error when using mockSchemaregistry
You set: > senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); This will tell the Producer to create a new AvroSerailizer object, and this object expects "schema.registry.url" to be set during initialization, ie, you need to add the config to `senderProps`. However, this will overall not give you want to what IMHO, as the Producer won't used MockSchemaRegistry for this case. You would need to use > avroSerializer = new KafkaAvroSerializer(schemaRegistry, new > HashMap(defaultConfig)); outside of the producer in your tests (ie, serialize the data "manually"), and pass `byte[]/byte[]` type into the producer itself. -Matthias On 10/29/18 2:56 PM, chinchu chinchu wrote: > Hey folks, > I am getting the below exception when using a mockSchemaRegsitry in a > junit test . I appreciate your help.I am using confluent 4.0 > > > private SchemaRegistryClient schemaRegistry; > private KafkaAvroSerializer avroSerializer; > > Properties defaultConfig = new Properties(); > defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " > http://fake-url";); > defaultConfig.put("auto.register.schemas", false); > schemaRegistry = new MockSchemaRegistryClient(); > avroSerializer = new KafkaAvroSerializer(schemaRegistry, new > HashMap(defaultConfig)); > Map senderProps = > KafkaTestUtils.producerProps(embeddedKafka); > senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); > KafkaProducer producer = new > KafkaProducer(senderProps); > ProducerRecord record =new > ProducerRecord("test2","1",log); > try { > producer.send(record).get(); > } catch (InterruptedException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > Error > == > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) > Caused by: io.confluent.common.config.ConfigException: Missing required > configuration "schema.registry.url" which has no default value. > at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) > at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) > at > io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) > at > io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) > at > io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) > at > org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:360) > ... 28 more > si
Re: Get count of messages
That is quite expensive to do... Might be best to write a short Java program that uses Consumer#endOffset() and Consumer#beginningOffsets() -Matthias On 10/29/18 3:02 PM, Burton Williams wrote: > you can user kafkacat starting at that offset to head and pipe the output > to "wc -l" (word count). > > -BW > > On Mon, Oct 29, 2018 at 3:39 AM Sachit Murarka > wrote: > >> Hi All, >> >> Could you please help me in getting count of all messages stored in kafka >> from a particular offset? >> I have tried GetOffsetShell command, it is not giving me. >> >> >> Kind Regards, >> Sachit Murarka >> > signature.asc Description: OpenPGP digital signature
Re: Error when using mockSchemaregistry
Thanks Mathias . How do I deal with this scenario in the case of a consumer that expects a specific record type ?.I am trying to write an integration test for a scenario for the below scenairo.All of these uses an avro object as the value for producer record .I am kind of able to write this for simple data types but have no luck with avro. Produces record > topic > consumer > doWork() > produce . On Mon, Oct 29, 2018 at 4:50 PM Matthias J. Sax wrote: > You set: > > > > senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); > > This will tell the Producer to create a new AvroSerailizer object, and > this object expects "schema.registry.url" to be set during > initialization, ie, you need to add the config to `senderProps`. > > However, this will overall not give you want to what IMHO, as the > Producer won't used MockSchemaRegistry for this case. > > You would need to use > > > avroSerializer = new KafkaAvroSerializer(schemaRegistry, new > HashMap(defaultConfig)); > > outside of the producer in your tests (ie, serialize the data > "manually"), and pass `byte[]/byte[]` type into the producer itself. > > > -Matthias > > On 10/29/18 2:56 PM, chinchu chinchu wrote: > > Hey folks, > > I am getting the below exception when using a mockSchemaRegsitry in a > > junit test . I appreciate your help.I am using confluent 4.0 > > > > > > private SchemaRegistryClient schemaRegistry; > > private KafkaAvroSerializer avroSerializer; > > > > Properties defaultConfig = new Properties(); > > > defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " > > http://fake-url";); > > defaultConfig.put("auto.register.schemas", false); > > schemaRegistry = new MockSchemaRegistryClient(); > > avroSerializer = new KafkaAvroSerializer(schemaRegistry, new > > HashMap(defaultConfig)); > > Map senderProps = > > KafkaTestUtils.producerProps(embeddedKafka); > > senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > > StringSerializer.class); > > > senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); > > KafkaProducer producer = new > > KafkaProducer(senderProps); > > ProducerRecord record =new > > ProducerRecord("test2","1",log); > > try { > > producer.send(record).get(); > > } catch (InterruptedException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > > > Error > > == > > org.apache.kafka.common.KafkaException: Failed to construct kafka > producer > > at > > > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441) > > at > > > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:268) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > at > > > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > > > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > at > > > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at > > > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > > at > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > at > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > at > > > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > > at > > > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > > at > > > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) > > at > > > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) > > at > > > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) > > at > > > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) > > Caused by: io.confluent.common.config.ConfigException: Missing required > > configuration "s
Re: Error when using mockSchemaregistry
You can use `ByteArrayDeserializer` to get `byte[]` key and value from the consumer and deserialize both "manually" using AvorDeserializer that is created with MockSchemaRegistry. `ConsumerRecord` also give you the topic name for each record via #topic(). -Matthias On 10/29/18 4:03 PM, chinchu chinchu wrote: > Thanks Mathias . How do I deal with this scenario in the case of a > consumer that expects a specific record type ?.I am trying to write an > integration test for a scenario for the below scenairo.All of these uses > an avro object as the value for producer record .I am kind of able to > write this for simple data types but have no luck with avro. > > Produces record > topic > consumer > doWork() > produce . > > On Mon, Oct 29, 2018 at 4:50 PM Matthias J. Sax > wrote: > >> You set: >> >>> >> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); >> >> This will tell the Producer to create a new AvroSerailizer object, and >> this object expects "schema.registry.url" to be set during >> initialization, ie, you need to add the config to `senderProps`. >> >> However, this will overall not give you want to what IMHO, as the >> Producer won't used MockSchemaRegistry for this case. >> >> You would need to use >> >>> avroSerializer = new KafkaAvroSerializer(schemaRegistry, new >> HashMap(defaultConfig)); >> >> outside of the producer in your tests (ie, serialize the data >> "manually"), and pass `byte[]/byte[]` type into the producer itself. >> >> >> -Matthias >> >> On 10/29/18 2:56 PM, chinchu chinchu wrote: >>> Hey folks, >>> I am getting the below exception when using a mockSchemaRegsitry in a >>> junit test . I appreciate your help.I am using confluent 4.0 >>> >>> >>> private SchemaRegistryClient schemaRegistry; >>> private KafkaAvroSerializer avroSerializer; >>> >>> Properties defaultConfig = new Properties(); >>> >> defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " >>> http://fake-url";); >>> defaultConfig.put("auto.register.schemas", false); >>> schemaRegistry = new MockSchemaRegistryClient(); >>> avroSerializer = new KafkaAvroSerializer(schemaRegistry, new >>> HashMap(defaultConfig)); >>> Map senderProps = >>> KafkaTestUtils.producerProps(embeddedKafka); >>> senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >>> StringSerializer.class); >>> >> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); >>> KafkaProducer producer = new >>> KafkaProducer(senderProps); >>> ProducerRecord record =new >>> ProducerRecord("test2","1",log); >>> try { >>> producer.send(record).get(); >>> } catch (InterruptedException e) { >>> // TODO Auto-generated catch block >>> e.printStackTrace(); >>> } >>> >>> Error >>> == >>> org.apache.kafka.common.KafkaException: Failed to construct kafka >> producer >>> at >>> >> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441) >>> at >>> >> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:268) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >>> at >>> >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >>> at >>> >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >>> at >>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >>> at >>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >>> at >>> >> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) >>> at >>> >> org.eclipse.