Setting producerConfig properties for FlinkKafkaProducer

2021-11-19 Thread Darius Kasad
I've overridden v1.11.3 FlinkKafkaConsumer's 'open' method in order to set TLS configuration for kafka from the task manager node where the kafka consumer is running (TLS configuration differs between job manager and each task manager in our environment, which is why we use 'open' vs. setting confi

Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-19 Thread Kamil ty
Hello all, I'm working on a pyflink job that's supposed to consume json messages from Kafka and save them to a partitioned avro file sink. I'm having difficulties finding a solution on how to process the messages, because there is only one kafka topic for multiple message schemas. As pyflinks Flin

Re: How do I configure commit offsets on checkpoint in new KafkaSource api?

2021-11-19 Thread Mason Chen
Hi Marco, > https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#additional-properties In the new KafkaSource, you can configure it in your properties. You can take a look at `KafkaSourceOptions#COMMIT_OFFSETS_ON_CHECKPOINT` for the specific config, which is defau

How do I configure commit offsets on checkpoint in new KafkaSource api?

2021-11-19 Thread Marco Villalobos
The FlinkKafkaConsumer that will be deprecated has the method "setCommitOffsetsOnCheckpoints(boolan)" method. However, that functionality is not the new KafkaSource class. How is this behavior / functionality configured in the new API? -Marco A. Villalobos

Replacing S3 Client in Hadoop plugin

2021-11-19 Thread Tamir Sagi
Hey Martijn, sorry for late respond. We wanted to replace the default client with our custom S3 client and not use the AmazonS3Client provided by the plugin. We used Flink-s3-fs-hadoop v1.12.2 and for our needs we had to upgrade to v1.14.0 [1]. AmazonS3 client factory is initialized[2] - if t