Hi Peter, It sounds like this should work, however my question would be do you want exactly-once processing? If yes, then you would have to somehow know which exact events needs re-processing or deduplicate them somehow. Keep in mind that in case of an outage in the original job, you probably will have some files already committed by the StreamingFileSink.
Another approach might be to somehow overwrite the previous files (but then you would have to check whether the bucket assignment and file naming is completely deterministic) or before reprocessing from backup remove the dirty files from the crashed job. Piotrek > On 2 May 2019, at 23:10, Peter Groesbeck <peter.groesb...@gmail.com> wrote: > > Hi all, > > I have an application that reads from various Kafka topics and writes parquet > files to corresponding buckets on S3 using StreamingFileSink with > DateTimeBucketAssigner. The upstream application that writes to Kafka also > writes records as gzipped json files to date bucketed locations on S3 as > backup. > > One requirement we have is to back fill missing data in the event that the > application or Kafka experiences an outage. This can be accomplished by > reading the backup files that were written to S3 by our upstream application > instead of reading from Kafka. My current approach is to read the hourly > backup buckets, transform the files into a DataStream and assign them a > timestamp based on a datetime field on the json records using > BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the > DataStream to the same StreamingFileSink which ideally would write past > records in the same manner as if they had been streamed by Kafka. > > Unfortunately for me, the bucket assigner works on system time: > > A BucketAssigner > <https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> > that assigns to buckets based on current system time. > @Override > public String getBucketId(IN element, BucketAssigner.Context context) { > if (dateTimeFormatter == null) { > dateTimeFormatter = > DateTimeFormatter.ofPattern(formatString).withZone(zoneId); > } > return > dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime())); > } > > No problem, I can extend DateTimeBucketAssigner and override the method to > grab elementTimestamp instead of currentProccessingTime, but I'm wondering if > this is the right approach? And if so would this behavior be useful outside > of the context of my application? > > Thanks in advance for your help and for this awesome framework! > > Peter