Have you taken a look at streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ?
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde); FYI On Wed, Oct 4, 2017 at 7:13 AM, Phani Kumar Yadavilli -X (phyadavi - ALLEGIS SERVICES INDIA PRIVATE LIMITED at Cisco) <phyad...@cisco.com> wrote: > Hi Everyone, > > I am a newbie to kafka and kafka streams api. > > > I am trying to consume a json message using kafka connect api in kafka > streams. I tried searching in google but i could not find any substantial > information on how to read json message in streams api. > > Therefore, with the limited knowledge i have tried the below method. > Producer Class: > > package com.kafka.api.serializers.json; > > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import java.util.Properties; > > public class ProducerUtilities { > > private static ObjectMapper om = new ObjectMapper(); > > > public static org.apache.kafka.clients.producer.Producer<String, > JsonNode> getProducer() { > Properties configProperties = new Properties(); > configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, > "kafka json producer"); > configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization. > ByteArraySerializer"); > configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.connect.json.JsonSerializer"); > > org.apache.kafka.clients.producer.Producer<String, JsonNode> > producer = new KafkaProducer<String, JsonNode>( > configProperties); > return producer; > } > > public ProducerRecord<String,JsonNode> createRecord(Person person){ > JsonNode jsonNode = om.valueToTree(person); > ProducerRecord<String,JsonNode> record = new > ProducerRecord<String,JsonNode>("test",jsonNode); > return record; > } > > } > > Stream class: > > package com.kafka.api.serializers.json; > > import java.util.Properties; > > import org.apache.kafka.streams.KafkaStreams; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.ForeachAction; > import org.apache.kafka.streams.kstream.KStream; > import org.apache.kafka.streams.kstream.KStreamBuilder; > > import com.fasterxml.jackson.core.JsonProcessingException; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > > public class ConsumerUtilities { > > private static ObjectMapper om = new ObjectMapper(); > > public static Properties getProperties() { > > Properties configs = new Properties(); > configs.put(StreamsConfig.APPLICATION_ID_CONFIG, > "Kafka test application"); > configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > "org.apache.kafka.common.serialization. > ByteArraySerializer"); > configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > "org.apache.kafka.connect.json.JsonDeserializer"); > return configs; > } > > public static KStreamBuilder getStreamingConsumer() { > KStreamBuilder builder = new KStreamBuilder(); > return builder; > } > > public static void printStreamData() { > KStreamBuilder builder = getStreamingConsumer(); > KStream<String, JsonNode> kStream = builder.stream("test"); > kStream.foreach(new ForeachAction<String, JsonNode>() { > @Override > public void apply(String key, JsonNode value) { > try { > System.out.println(key + " : " + om.treeToValue(value, > Person.class)); > } catch (JsonProcessingException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > }); > > KafkaStreams kafkaStreams = new KafkaStreams(builder, > getProperties()); > kafkaStreams.start(); > } > > } > > When i execute the code i am getting exception as below > > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group Kafka test application failed on partition assignment > org.apache.kafka.streams.errors.StreamsException: Failed to configure > value serde class org.apache.kafka.connect.json.JsonDeserializer > at org.apache.kafka.streams.StreamsConfig.defaultValueSerde( > StreamsConfig.java:770) > at org.apache.kafka.streams.processor.internals. > AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) > at org.apache.kafka.streams.processor.internals. > StreamTask.<init>(StreamTask.java:138) > at org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:1078) > at org.apache.kafka.streams.processor.internals. > StreamThread$TaskCreator.createTask(StreamThread.java:255) > at org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.createTasks(StreamThread.java:245) > at org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:1147) > at org.apache.kafka.streams.processor.internals. > StreamThread.access$800(StreamThread.java:68) > at org.apache.kafka.streams.processor.internals.StreamThread$ > RebalanceListener.onPartitionsAssigned(StreamThread.java:184) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:265) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:367) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:316) > at org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > at org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1078) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:1043) > at org.apache.kafka.streams.processor.internals. > StreamThread.pollRequests(StreamThread.java:536) > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:490) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:480) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:457) > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.connect.json.JsonDeserializer is not an instance of > org.apache.kafka.common.serialization.Serde > at org.apache.kafka.common.config.AbstractConfig. > getConfiguredInstance(AbstractConfig.java:248) > at org.apache.kafka.streams.StreamsConfig.defaultValueSerde( > StreamsConfig.java:764) > ... 19 more > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread [Kafka test application-d3a307c9-d998- > 421f-829c-85532efc8b29-StreamThread-1] Shutting down > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread [Kafka test application-d3a307c9-d998- > 421f-829c-85532efc8b29-StreamThread-1] State transition from > PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread [Kafka test application-d3a307c9-d998- > 421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread [Kafka test application-d3a307c9-d998- > 421f-829c-85532efc8b29-StreamThread-1] State transition from > PENDING_SHUTDOWN to DEAD. > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test > application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have > died. The Kafka Streams instance will be in an error state and should be > closed. > [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test > application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from > REBALANCING to ERROR. > Exception in thread "Kafka test application-d3a307c9-d998- > 421f-829c-85532efc8b29-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: > stream-thread [Kafka test application-d3a307c9-d998- > 421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance. > at org.apache.kafka.streams.processor.internals. > StreamThread.pollRequests(StreamThread.java:543) > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:490) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:480) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:457) > Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to > configure value serde class org.apache.kafka.connect.json.JsonDeserializer > at org.apache.kafka.streams.StreamsConfig.defaultValueSerde( > StreamsConfig.java:770) > at org.apache.kafka.streams.processor.internals. > AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) > at org.apache.kafka.streams.processor.internals. > StreamTask.<init>(StreamTask.java:138) > at org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:1078) > at org.apache.kafka.streams.processor.internals. > StreamThread$TaskCreator.createTask(StreamThread.java:255) > at org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.createTasks(StreamThread.java:245) > at org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:1147) > at org.apache.kafka.streams.processor.internals. > StreamThread.access$800(StreamThread.java:68) > at org.apache.kafka.streams.processor.internals.StreamThread$ > RebalanceListener.onPartitionsAssigned(StreamThread.java:184) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:265) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:367) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:316) > at org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > at org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1078) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:1043) > at org.apache.kafka.streams.processor.internals. > StreamThread.pollRequests(StreamThread.java:536) > ... 3 more > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.connect.json.JsonDeserializer is not an instance of > org.apache.kafka.common.serialization.Serde > at org.apache.kafka.common.config.AbstractConfig. > getConfiguredInstance(AbstractConfig.java:248) > at org.apache.kafka.streams.StreamsConfig.defaultValueSerde( > StreamsConfig.java:764) > ... 19 more > > > I am looking for some guidance to solve the issue. > > > Regards > Phani Kumar Yadavilli > >