Hi Matthias, It should be probably be like this:
Properties SinkkafkaProps = new Properties(); SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outputBrokers); SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType); SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation); SinkkafkaProps.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs); SinkkafkaProps.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint); KafkaSink<String> kSink = KafkaSink.<String>builder() .setBootstrapServers(outputBrokers) .setKafkaProducerConfig(SinkkafkaProps) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(kafkaOutputTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl <matth...@ververica.com>: > Could you share more details on what's not working? Is the > ssl.trustore.location accessible from the Flink nodes? > > Matthias > > On Thu, Mar 17, 2022 at 4:00 PM HG <hanspeter.sl...@gmail.com> wrote: > >> Hi all, >> I am probably not the smartest but I cannot find how to set >> ssl-properties for a Kafka Sink. >> My assumption was that it would be just like the Kafka Consumer >> >> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() >> .setProperties(kafkaProps) >> .setProperty("ssl.truststore.type", trustStoreType) >> .setProperty("ssl.truststore.password", trustStorePassword) >> .setProperty("ssl.truststore.location", trustStoreLocation) >> .setProperty("security.protocol", securityProtocol) >> .setProperty("partition.discovery.interval.ms", >> partitionDiscoveryIntervalMs) >> .setProperty("commit.offsets.on.checkpoint", >> commitOffsetsOnCheckpoint) >> .setGroupId(inputGroupId) >> .setClientIdPrefix(clientId) >> .setTopics(kafkaInputTopic) >> .setDeserializer(KafkaRecordDeserializationSchema.of(new >> JSONKeyValueDeserializationSchema(fetchMetadata))) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> .build(); >> >> >> But that seems not to be the case. >> >> Any quick pointers? >> >> Regards Hans-Peter >> >