Hi all, When I build this code:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type",trustStoreType) .setProperty("ssl.truststore.password",trustStorePassword) .setProperty("ssl.truststore.location",trustStoreLocation) .setProperty("security.protocol",securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(groupId) .setTopics(kafkaInputTopic) .setDeserializer(new JSONKeyValueDeserializationSchema(false)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); I get: This error: error: incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) What am I doing wrong? As per the documentation JSONKeyValueDeserializationSchema returns an ObjectNode. Regards Hans