Hi,

I have some jobs running under flink 1.14.0. For security reason, we have to 
update flink to 1.20.1.

The problem is, when I sink records to kafka, in 1.14.0, the timestamp in kafka 
is the log append time. However, in 1.20.1, the timestamp in kafka is the event 
time.

I have checked the source in KafkaRecordSerializationSchemaBuilder, in 1.14.0, 
there is nothing to do with timestamp. In 1.20.1, the code changes to

        @Override
        public ProducerRecord<byte[], byte[]> serialize(
                IN element, KafkaSinkContext context, Long timestamp) {
            final String targetTopic = topicSelector.apply(element);
            final byte[] value = valueSerializationSchema.serialize(element);
            byte[] key = null;
            if (keySerializationSchema != null) {
                key = keySerializationSchema.serialize(element);
            }
            final OptionalInt partition =
                    partitioner != null
                            ? OptionalInt.of(
                                    partitioner.partition(
                                            element,
                                            key,
                                            value,
                                            targetTopic,
                                            
context.getPartitionsForTopic(targetTopic)))
                            : OptionalInt.empty();

            return new ProducerRecord<>(
                    targetTopic,
                    partition.isPresent() ? partition.getAsInt() : null,
                    timestamp == null || timestamp < 0L ? null : timestamp,
                    key,
                    value,
                    headerProvider != null ? headerProvider.getHeaders(element) 
: null);
        }

I think this means, if event time exists, flink will use it as timestamp when 
sink records.

But I don't know how to change the behavior in 1.20.1 to use log append time as 
timestamp. Any advice? Thanks.

Regards

Reply via email to