Hi Everyone, I am a newbie to kafka and kafka streams api.
I am trying to consume a json message using kafka connect api in kafka streams. I tried searching in google but i could not find any substantial information on how to read json message in streams api. Therefore, with the limited knowledge i have tried the below method. Producer Class: package com.kafka.api.serializers.json; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerUtilities { private static ObjectMapper om = new ObjectMapper(); public static org.apache.kafka.clients.producer.Producer<String, JsonNode> getProducer() { Properties configProperties = new Properties(); configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka json producer"); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer"); org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>( configProperties); return producer; } public ProducerRecord<String,JsonNode> createRecord(Person person){ JsonNode jsonNode = om.valueToTree(person); ProducerRecord<String,JsonNode> record = new ProducerRecord<String,JsonNode>("test",jsonNode); return record; } } Stream class: package com.kafka.api.serializers.json; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; public class ConsumerUtilities { private static ObjectMapper om = new ObjectMapper(); public static Properties getProperties() { Properties configs = new Properties(); configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "Kafka test application"); configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonDeserializer"); return configs; } public static KStreamBuilder getStreamingConsumer() { KStreamBuilder builder = new KStreamBuilder(); return builder; } public static void printStreamData() { KStreamBuilder builder = getStreamingConsumer(); KStream<String, JsonNode> kStream = builder.stream("test"); kStream.foreach(new ForeachAction<String, JsonNode>() { @Override public void apply(String key, JsonNode value) { try { System.out.println(key + " : " + om.treeToValue(value, Person.class)); } catch (JsonProcessingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); kafkaStreams.start(); } } When i execute the code i am getting exception as below [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group Kafka test application failed on partition assignment org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) ... 19 more [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Shutting down [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD. [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed. [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from REBALANCING to ERROR. Exception in thread "Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) ... 3 more Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) ... 19 more I am looking for some guidance to solve the issue. Regards Phani Kumar Yadavilli