Sorry to have bothered everyone. This is the obvious solution:
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(false))) Regards Hans-Peter Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan <ro...@apache.org>: > Hi, > > setDeserializer() expects KafkaRecordDeserializationSchema; > JSONKeyValueDeserializationSchema you provided is not compatible with > it. > You can convert it using [1] > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema- > > > Regards, > Roman > > On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote: > > > > Hi all, > > > > When I build this code: > > > > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > > .setProperties(kafkaProps) > > .setProperty("ssl.truststore.type",trustStoreType) > > .setProperty("ssl.truststore.password",trustStorePassword) > > .setProperty("ssl.truststore.location",trustStoreLocation) > > .setProperty("security.protocol",securityProtocol) > > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > > .setGroupId(groupId) > > .setTopics(kafkaInputTopic) > > .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > > > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > > .build(); > > > > > > I get: > > This error: > > > > error: incompatible types: > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema > cannot be converted to > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> > > .setDeserializer(new > JSONKeyValueDeserializationSchema(false)) > > > > > > What am I doing wrong? > > As per the documentation JSONKeyValueDeserializationSchema returns an > ObjectNode. > > > > Regards Hans > > >