Hi all,

When I build this code:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setProperties(kafkaProps)
        .setProperty("ssl.truststore.type",trustStoreType)
        .setProperty("ssl.truststore.password",trustStorePassword)
        .setProperty("ssl.truststore.location",trustStoreLocation)
        .setProperty("security.protocol",securityProtocol)
        .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
        .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
        .setGroupId(groupId)
        .setTopics(kafkaInputTopic)
        .setDeserializer(new JSONKeyValueDeserializationSchema(false))
        
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .build();


I get:
This error:

error: incompatible types:
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
cannot be converted to
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
                .setDeserializer(new
JSONKeyValueDeserializationSchema(false))


What am I doing wrong?
As per the documentation JSONKeyValueDeserializationSchema returns an
ObjectNode.

Regards Hans

Reply via email to