Hi,
I have some jobs running under flink 1.14.0. For security reason, we have to
update flink to 1.20.1.
The problem is, when I sink records to kafka, in 1.14.0, the timestamp in kafka
is the log append time. However, in 1.20.1, the timestamp in kafka is the event
time.
I have checked the source in KafkaRecordSerializationSchemaBuilder, in 1.14.0,
there is nothing to do with timestamp. In 1.20.1, the code changes to
@Override
public ProducerRecord<byte[], byte[]> serialize(
IN element, KafkaSinkContext context, Long timestamp) {
final String targetTopic = topicSelector.apply(element);
final byte[] value = valueSerializationSchema.serialize(element);
byte[] key = null;
if (keySerializationSchema != null) {
key = keySerializationSchema.serialize(element);
}
final OptionalInt partition =
partitioner != null
? OptionalInt.of(
partitioner.partition(
element,
key,
value,
targetTopic,
context.getPartitionsForTopic(targetTopic)))
: OptionalInt.empty();
return new ProducerRecord<>(
targetTopic,
partition.isPresent() ? partition.getAsInt() : null,
timestamp == null || timestamp < 0L ? null : timestamp,
key,
value,
headerProvider != null ? headerProvider.getHeaders(element)
: null);
}
I think this means, if event time exists, flink will use it as timestamp when
sink records.
But I don't know how to change the behavior in 1.20.1 to use log append time as
timestamp. Any advice? Thanks.
Regards