Hi!

All properties you set by calling KafkaSource.builder().setProperty() will
also be given to KafkaConsumer (see [1]). However these two properties are
specific to Flink and Kafka does not know them, so Kafka will produce a
warning message. These messages are harmless as long as the properties you
set are actually effective.

About writing timestamp to Kafka, I'm not familiar with Kafka but from the
code I guess if you create a Kafka record serializer
with KafkaRecordSerializationSchema.builder() then by default it will write
timestamp to Kafka. You can try out the example in [2] and see if it works.

[1]
https://github.com/apache/flink/blob/e615106b38a289bc624a8554b86c83f9785352d3/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L85
[2]
https://github.com/apache/flink/blob/0bc2234b60d1a0635e238d18990695943158123c/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

Mahima Agarwal <mahi.29agar...@gmail.com> 于2022年1月24日周一 15:35写道:

> Hi Team,
>
> I am trying to set the following properties in Kafka Source API in flink
> 1.14.3 version.
> -> client.id.prefix
> -> partition.discovery.interval.ms
>
> But I am getting the below mentioned warning in taskmanager logs:
>
> 1. WARN  org.apache.kafka.clients.consumer.ConsumerConfig             [] -
> The configuration 'client.id.prefix' was supplied but isn't a known config.
> 2. WARN  org.apache.kafka.clients.consumer.ConsumerConfig             [] -
> The configuration 'partition.discovery.interval.ms' was supplied but
> isn't a known config.
>
> What could be the reason for this warning?
>
> Also, in flink version 1.13.2 we were able write timestamp to kafka
> using setWriteTimestampToKafka(true) method of FlinkKafkaProducer class.
> Similar to this how can we write timestamp to kafka using KafkaSink API in
> flink 1.14.3?
>
> Any leads would be appreciated.
>
> Thanks and Regards
> Mahima Agarwal
>

Reply via email to