Hi Matthias,

It should be probably be like this:

Properties SinkkafkaProps  = new Properties();
SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);

SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType);
SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation);
SinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
SinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);


KafkaSink<String> kSink = KafkaSink.<String>builder()
        .setBootstrapServers(outputBrokers)
        .setKafkaProducerConfig(SinkkafkaProps)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(kafkaOutputTopic)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build()
        )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();


Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl <matth...@ververica.com>:

> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG <hanspeter.sl...@gmail.com> wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>>         .setProperties(kafkaProps)
>>         .setProperty("ssl.truststore.type", trustStoreType)
>>         .setProperty("ssl.truststore.password", trustStorePassword)
>>         .setProperty("ssl.truststore.location", trustStoreLocation)
>>         .setProperty("security.protocol", securityProtocol)
>>         .setProperty("partition.discovery.interval.ms", 
>> partitionDiscoveryIntervalMs)
>>         .setProperty("commit.offsets.on.checkpoint", 
>> commitOffsetsOnCheckpoint)
>>         .setGroupId(inputGroupId)
>>         .setClientIdPrefix(clientId)
>>         .setTopics(kafkaInputTopic)
>>         .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>>         
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>         .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>

Reply via email to