Have you taken a look
at
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
?
final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
final Consumed<String, JsonNode> consumed =
Consumed.with(Serdes.String(), jsonSerde);
FYI
On Wed, Oct 4, 2017 at 7:13 AM, Phani Kumar Yadavilli -X (phyadavi -
ALLEGIS SERVICES INDIA PRIVATE LIMITED at Cisco) <[email protected]> wrote:
> Hi Everyone,
>
> I am a newbie to kafka and kafka streams api.
>
>
> I am trying to consume a json message using kafka connect api in kafka
> streams. I tried searching in google but i could not find any substantial
> information on how to read json message in streams api.
>
> Therefore, with the limited knowledge i have tried the below method.
> Producer Class:
>
> package com.kafka.api.serializers.json;
>
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import java.util.Properties;
>
> public class ProducerUtilities {
>
> private static ObjectMapper om = new ObjectMapper();
>
>
> public static org.apache.kafka.clients.producer.Producer<String,
> JsonNode> getProducer() {
> Properties configProperties = new Properties();
> configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
> "kafka json producer");
> configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.
> ByteArraySerializer");
> configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.connect.json.JsonSerializer");
>
> org.apache.kafka.clients.producer.Producer<String, JsonNode>
> producer = new KafkaProducer<String, JsonNode>(
> configProperties);
> return producer;
> }
>
> public ProducerRecord<String,JsonNode> createRecord(Person person){
> JsonNode jsonNode = om.valueToTree(person);
> ProducerRecord<String,JsonNode> record = new
> ProducerRecord<String,JsonNode>("test",jsonNode);
> return record;
> }
>
> }
>
> Stream class:
>
> package com.kafka.api.serializers.json;
>
> import java.util.Properties;
>
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.ForeachAction;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
>
> import com.fasterxml.jackson.core.JsonProcessingException;
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
>
> public class ConsumerUtilities {
>
> private static ObjectMapper om = new ObjectMapper();
>
> public static Properties getProperties() {
>
> Properties configs = new Properties();
> configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "Kafka test application");
> configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.
> ByteArraySerializer");
> configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> "org.apache.kafka.connect.json.JsonDeserializer");
> return configs;
> }
>
> public static KStreamBuilder getStreamingConsumer() {
> KStreamBuilder builder = new KStreamBuilder();
> return builder;
> }
>
> public static void printStreamData() {
> KStreamBuilder builder = getStreamingConsumer();
> KStream<String, JsonNode> kStream = builder.stream("test");
> kStream.foreach(new ForeachAction<String, JsonNode>() {
> @Override
> public void apply(String key, JsonNode value) {
> try {
> System.out.println(key + " : " + om.treeToValue(value,
> Person.class));
> } catch (JsonProcessingException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> });
>
> KafkaStreams kafkaStreams = new KafkaStreams(builder,
> getProperties());
> kafkaStreams.start();
> }
>
> }
>
> When i execute the code i am getting exception as below
>
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
> User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group Kafka test application failed on partition assignment
> org.apache.kafka.streams.errors.StreamsException: Failed to configure
> value serde class org.apache.kafka.connect.json.JsonDeserializer
> at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:770)
> at org.apache.kafka.streams.processor.internals.
> AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:138)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:1078)
> at org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.createTask(StreamThread.java:255)
> at org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.createTasks(StreamThread.java:245)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:1147)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.access$800(StreamThread.java:68)
> at org.apache.kafka.streams.processor.internals.StreamThread$
> RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:265)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:367)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:316)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1078)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1043)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.pollRequests(StreamThread.java:536)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:490)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:480)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:457)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.connect.json.JsonDeserializer is not an instance of
> org.apache.kafka.common.serialization.Serde
> at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstance(AbstractConfig.java:248)
> at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:764)
> ... 19 more
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] Shutting down
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] State transition from
> PARTITIONS_ASSIGNED to PENDING_SHUTDOWN.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] State transition from
> PENDING_SHUTDOWN to DEAD.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test
> application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have
> died. The Kafka Streams instance will be in an error state and should be
> closed.
> [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1]
> INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test
> application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from
> REBALANCING to ERROR.
> Exception in thread "Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException:
> stream-thread [Kafka test application-d3a307c9-d998-
> 421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance.
> at org.apache.kafka.streams.processor.internals.
> StreamThread.pollRequests(StreamThread.java:543)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:490)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:480)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:457)
> Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to
> configure value serde class org.apache.kafka.connect.json.JsonDeserializer
> at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:770)
> at org.apache.kafka.streams.processor.internals.
> AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:138)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:1078)
> at org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.createTask(StreamThread.java:255)
> at org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.createTasks(StreamThread.java:245)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:1147)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.access$800(StreamThread.java:68)
> at org.apache.kafka.streams.processor.internals.StreamThread$
> RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:265)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:367)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:316)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1078)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1043)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.pollRequests(StreamThread.java:536)
> ... 3 more
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.connect.json.JsonDeserializer is not an instance of
> org.apache.kafka.common.serialization.Serde
> at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstance(AbstractConfig.java:248)
> at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(
> StreamsConfig.java:764)
> ... 19 more
>
>
> I am looking for some guidance to solve the issue.
>
>
> Regards
> Phani Kumar Yadavilli
>
>