Have you taken a look
at 
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
?

        final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);

        final Consumed<String, JsonNode> consumed =
Consumed.with(Serdes.String(), jsonSerde);

FYI

On Wed, Oct 4, 2017 at 7:13 AM, Phani Kumar Yadavilli -X (phyadavi -
ALLEGIS SERVICES INDIA PRIVATE LIMITED at Cisco) <phyad...@cisco.com> wrote:

> Hi Everyone,
>
> I am a newbie to kafka and kafka streams api.
>
>
> I am trying to consume a json message using kafka connect api in kafka
> streams. I tried searching in google but i could not find any substantial
> information on how to read json message in streams api.
>
> Therefore, with the limited knowledge i have tried the below method.
> Producer Class:
>
> package com.kafka.api.serializers.json;
>
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import java.util.Properties;
>
> public class ProducerUtilities {
>
>     private static ObjectMapper om = new ObjectMapper();
>
>
>     public static org.apache.kafka.clients.producer.Producer<String,
> JsonNode> getProducer() {
>         Properties configProperties = new Properties();
>         configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
>                 "kafka json producer");
>         configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>                 "localhost:9092");
>         configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>                 "org.apache.kafka.common.serialization.
> ByteArraySerializer");
>         configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>                 "org.apache.kafka.connect.json.JsonSerializer");
>
>         org.apache.kafka.clients.producer.Producer<String, JsonNode>
> producer = new KafkaProducer<String, JsonNode>(
>                 configProperties);
>         return producer;
>     }
>
>     public ProducerRecord<String,JsonNode> createRecord(Person person){
>         JsonNode jsonNode = om.valueToTree(person);
>         ProducerRecord<String,JsonNode> record = new
> ProducerRecord<String,JsonNode>("test",jsonNode);
>         return record;
>     }
>
> }
>
> Stream class:
>
> package com.kafka.api.serializers.json;
>
> import java.util.Properties;
>
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.ForeachAction;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
>
> import com.fasterxml.jackson.core.JsonProcessingException;
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
>
> public class ConsumerUtilities {
>
>     private static ObjectMapper om = new ObjectMapper();
>
>     public static Properties getProperties() {
>
>         Properties configs = new Properties();
>         configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
>                 "Kafka test application");
>         configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>                 "org.apache.kafka.common.serialization.
> ByteArraySerializer");
>         configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>                 "org.apache.kafka.connect.json.JsonDeserializer");
>         return configs;
>     }
>
>     public static KStreamBuilder getStreamingConsumer() {
>         KStreamBuilder builder = new KStreamBuilder();
>         return builder;
>     }
>
>     public static void printStreamData() {
>         KStreamBuilder builder = getStreamingConsumer();
>         KStream<String, JsonNode> kStream = builder.stream("test");
>         kStream.foreach(new ForeachAction<String, JsonNode>() {
>             @Override
>             public void apply(String key, JsonNode value) {
>                 try {
>                     System.out.println(key + " : " + om.treeToValue(value,
> Person.class));
>                 } catch (JsonProcessingException e) {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
>
>         });
>
>         KafkaStreams kafkaStreams = new KafkaStreams(builder,
> getProperties());
>         kafkaStreams.start();
>     }
>
> }
>
> When i execute the code i am getting exception as below
>
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
> User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group Kafka test application failed on partition assignment
> org.apache.kafka.streams.errors.StreamsException: Failed to configure
> value serde class org.apache.kafka.connect.json.JsonDeserializer
>     at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:770)
>     at org.apache.kafka.streams.processor.internals.
> AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
>     at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
>     at org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:138)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:1078)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.createTask(StreamThread.java:255)
>     at org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.createTasks(StreamThread.java:245)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:1147)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.access$800(StreamThread.java:68)
>     at org.apache.kafka.streams.processor.internals.StreamThread$
> RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:265)
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:367)
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:316)
>     at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1078)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1043)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.pollRequests(StreamThread.java:536)
>     at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:490)
>     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:480)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:457)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.connect.json.JsonDeserializer is not an instance of
> org.apache.kafka.common.serialization.Serde
>     at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstance(AbstractConfig.java:248)
>     at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:764)
>     ... 19 more
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] Shutting down
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] State transition from
> PARTITIONS_ASSIGNED to PENDING_SHUTDOWN.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] State transition from
> PENDING_SHUTDOWN to DEAD.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test
> application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have
> died. The Kafka Streams instance will be in an error state and should be
> closed.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test
> application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from
> REBALANCING to ERROR.
> Exception in thread "Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException:
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance.
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.pollRequests(StreamThread.java:543)
>     at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:490)
>     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:480)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:457)
> Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to
> configure value serde class org.apache.kafka.connect.json.JsonDeserializer
>     at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:770)
>     at org.apache.kafka.streams.processor.internals.
> AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
>     at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
>     at org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:138)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:1078)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.createTask(StreamThread.java:255)
>     at org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.createTasks(StreamThread.java:245)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:1147)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.access$800(StreamThread.java:68)
>     at org.apache.kafka.streams.processor.internals.StreamThread$
> RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:265)
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:367)
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:316)
>     at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1078)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1043)
>     at org.apache.kafka.streams.processor.internals.
> StreamThread.pollRequests(StreamThread.java:536)
>     ... 3 more
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.connect.json.JsonDeserializer is not an instance of
> org.apache.kafka.common.serialization.Serde
>     at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstance(AbstractConfig.java:248)
>     at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:764)
>     ... 19 more
>
>
> I am looking for some guidance to solve the issue.
>
>
> Regards
> Phani Kumar Yadavilli
>
>
  • ... Phani Kumar Yadavilli -X (phyadavi - ALLEGIS SERVICES INDIA PRIVATE LIMITED at Cisco)
    • ... Ted Yu
      • ... Valentin Forst
        • ... Ted Yu
          • ... Sachin Mittal
          • ... Valentin Forst
            • ... Sachin Mittal

Reply via email to