Hi all,

I have an application that reads from various Kafka topics and writes
parquet files to corresponding buckets on S3 using StreamingFileSink with
DateTimeBucketAssigner. The upstream application that writes to Kafka also
writes records as gzipped json files to date bucketed locations on S3 as
backup.

One requirement we have is to back fill missing data in the event that the
application or Kafka experiences an outage. This can be accomplished by
reading the backup files that were written to S3 by our upstream
application instead of reading from Kafka. My current approach is to read
the hourly backup buckets, transform the files into a DataStream and assign
them a timestamp based on a datetime field on the json records using
BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the
DataStream to the same StreamingFileSink which ideally would write past
records in the same manner as if they had been streamed by Kafka.

Unfortunately for me, the bucket assigner works on system time:

A BucketAssigner
<https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html>
that
assigns to buckets based on current system time.

@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
   if (dateTimeFormatter == null) {
      dateTimeFormatter =
DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
   }
   return 
dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
}


No problem, I can extend DateTimeBucketAssigner and override the
method to grab elementTimestamp instead of currentProccessingTime, but
I'm wondering if this is the right approach? And if so would this
behavior be useful outside of the context of my application?

Thanks in advance for your help and for this awesome framework!

Peter

Reply via email to