Any advice for this problem? Thanks.


---Original---
From: "guoliubin85"<guoliubi...@foxmail.com&gt;
Date: Thu, May 15, 2025 16:18 PM
To: "user@flink.apache.org"<user@flink.apache.org&gt;;
Subject: Kafka Sink Timestamp Behavior Change From 1.14.0 to 1.20.1


  Hi,
  
 
  I have some jobs running under flink 1.14.0. For security reason, we have to 
update flink to 1.20.1.
  
 
  The problem is, when I sink records to kafka, in 1.14.0, the timestamp in 
kafka is the log append time. However, in 1.20.1, the timestamp in kafka is the 
event time.
  
 
  I have checked the source in KafkaRecordSerializationSchemaBuilder, in 
1.14.0, there is nothing to do with timestamp. In 1.20.1, the code changes 
to&nbsp;
  
 
  &nbsp; &nbsp; &nbsp; &nbsp; @Override
  &nbsp; &nbsp; &nbsp; &nbsp; public ProducerRecord<byte[], byte[]&gt; 
serialize(
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; IN element, 
KafkaSinkContext context, Long timestamp) {
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final String targetTopic = 
topicSelector.apply(element);
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final byte[] value = 
valueSerializationSchema.serialize(element);
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; byte[] key = null;
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (keySerializationSchema != null) 
{
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; key = 
keySerializationSchema.serialize(element);
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final OptionalInt partition =
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
partitioner != null
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; ? OptionalInt.of(
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; partitioner.partition(
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
element,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
key,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
value,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
targetTopic,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
context.getPartitionsForTopic(targetTopic)))
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; : OptionalInt.empty();
  
 
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new ProducerRecord<&gt;(
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
targetTopic,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
partition.isPresent() ? partition.getAsInt() : null,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
timestamp == null || timestamp < 0L ? null : timestamp,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; key,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; value,
  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
headerProvider != null ? headerProvider.getHeaders(element) : null);
  &nbsp; &nbsp; &nbsp; &nbsp; }
  
 
  I think this means, if event time exists, flink will use it as timestamp when 
sink records.
  
 
  But I don't know how to change the behavior in 1.20.1 to use log append time 
as timestamp. Any advice? Thanks.
  
 
  Regards

Reply via email to