Hi, I have some jobs running under flink 1.14.0. For security reason, we have to update flink to 1.20.1.
The problem is, when I sink records to kafka, in 1.14.0, the timestamp in kafka is the log append time. However, in 1.20.1, the timestamp in kafka is the event time. I have checked the source in KafkaRecordSerializationSchemaBuilder, in 1.14.0, there is nothing to do with timestamp. In 1.20.1, the code changes to @Override public ProducerRecord<byte[], byte[]> serialize( IN element, KafkaSinkContext context, Long timestamp) { final String targetTopic = topicSelector.apply(element); final byte[] value = valueSerializationSchema.serialize(element); byte[] key = null; if (keySerializationSchema != null) { key = keySerializationSchema.serialize(element); } final OptionalInt partition = partitioner != null ? OptionalInt.of( partitioner.partition( element, key, value, targetTopic, context.getPartitionsForTopic(targetTopic))) : OptionalInt.empty(); return new ProducerRecord<>( targetTopic, partition.isPresent() ? partition.getAsInt() : null, timestamp == null || timestamp < 0L ? null : timestamp, key, value, headerProvider != null ? headerProvider.getHeaders(element) : null); } I think this means, if event time exists, flink will use it as timestamp when sink records. But I don't know how to change the behavior in 1.20.1 to use log append time as timestamp. Any advice? Thanks. Regards