Hi Everyone,

I am a newbie to kafka and kafka streams api.


I am trying to consume a json message using kafka connect api in kafka streams. 
I tried searching in google but i could not find any substantial information on 
how to read json message in streams api.

Therefore, with the limited knowledge i have tried the below method.
Producer Class:

package com.kafka.api.serializers.json;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ProducerUtilities {

    private static ObjectMapper om = new ObjectMapper();


    public static org.apache.kafka.clients.producer.Producer<String, JsonNode> 
getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.connect.json.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = 
new KafkaProducer<String, JsonNode>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String,JsonNode> createRecord(Person person){
        JsonNode jsonNode = om.valueToTree(person);
        ProducerRecord<String,JsonNode> record = new 
ProducerRecord<String,JsonNode>("test",jsonNode);
        return record;
    }

}

Stream class:

package com.kafka.api.serializers.json;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ConsumerUtilities {

    private static ObjectMapper om = new ObjectMapper();

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                "org.apache.kafka.connect.json.JsonDeserializer");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void printStreamData() {
        KStreamBuilder builder = getStreamingConsumer();
        KStream<String, JsonNode> kStream = builder.stream("test");
        kStream.foreach(new ForeachAction<String, JsonNode>() {
            @Override
            public void apply(String key, JsonNode value) {
                try {
                    System.out.println(key + " : " + om.treeToValue(value, 
Person.class));
                } catch (JsonProcessingException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        });

        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

When i execute the code i am getting exception as below

[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User 
provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for 
group Kafka test application failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Failed to configure value 
serde class org.apache.kafka.connect.json.JsonDeserializer
    at 
org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.connect.json.JsonDeserializer is not an instance of 
org.apache.kafka.common.serialization.Serde
    at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    at 
org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
Shutting down
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
producer with timeoutMillis = 9223372036854775807 ms.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
Stream thread shutdown complete
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
State transition from PENDING_SHUTDOWN to DEAD.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test 
application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. 
The Kafka Streams instance will be in an error state and should be closed.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] 
INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test 
application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from 
REBALANCING to ERROR.
Exception in thread "Kafka test 
application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test 
application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to 
rebalance.
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to 
configure value serde class org.apache.kafka.connect.json.JsonDeserializer
    at 
org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    ... 3 more
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.connect.json.JsonDeserializer is not an instance of 
org.apache.kafka.common.serialization.Serde
    at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    at 
org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more


I am looking for some guidance to solve the issue.


Regards
Phani Kumar Yadavilli

  • ... Phani Kumar Yadavilli -X (phyadavi - ALLEGIS SERVICES INDIA PRIVATE LIMITED at Cisco)
    • ... Ted Yu
      • ... Valentin Forst
        • ... Ted Yu
          • ... Sachin Mittal
          • ... Valentin Forst
            • ... Sachin Mittal

Reply via email to