Hi! All properties you set by calling KafkaSource.builder().setProperty() will also be given to KafkaConsumer (see [1]). However these two properties are specific to Flink and Kafka does not know them, so Kafka will produce a warning message. These messages are harmless as long as the properties you set are actually effective.
About writing timestamp to Kafka, I'm not familiar with Kafka but from the code I guess if you create a Kafka record serializer with KafkaRecordSerializationSchema.builder() then by default it will write timestamp to Kafka. You can try out the example in [2] and see if it works. [1] https://github.com/apache/flink/blob/e615106b38a289bc624a8554b86c83f9785352d3/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L85 [2] https://github.com/apache/flink/blob/0bc2234b60d1a0635e238d18990695943158123c/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java Mahima Agarwal <mahi.29agar...@gmail.com> 于2022年1月24日周一 15:35写道: > Hi Team, > > I am trying to set the following properties in Kafka Source API in flink > 1.14.3 version. > -> client.id.prefix > -> partition.discovery.interval.ms > > But I am getting the below mentioned warning in taskmanager logs: > > 1. WARN org.apache.kafka.clients.consumer.ConsumerConfig [] - > The configuration 'client.id.prefix' was supplied but isn't a known config. > 2. WARN org.apache.kafka.clients.consumer.ConsumerConfig [] - > The configuration 'partition.discovery.interval.ms' was supplied but > isn't a known config. > > What could be the reason for this warning? > > Also, in flink version 1.13.2 we were able write timestamp to kafka > using setWriteTimestampToKafka(true) method of FlinkKafkaProducer class. > Similar to this how can we write timestamp to kafka using KafkaSink API in > flink 1.14.3? > > Any leads would be appreciated. > > Thanks and Regards > Mahima Agarwal >