Thanks for the quick response Piotr, I feel like I have everything working but no files are getting written to disk. I've implemented my own BucketAssigner like so:
class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] { override def getBucketId(element: IN, context: BucketAssigner.Context): String = { DateTimeFormatter.ofPattern(formatString).withZone(ZoneId.systemDefault).format(Instant.ofEpochMilli(context.timestamp())) } override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE } And plugged it into my sink: val parquet = StreamingFileSink .forBulkFormat(path, ParquetAvroWriters.forGenericRecord(ReflectData.get().getSchema(clazz))) .withBucketAssigner(new BackFillBucketAssigner[GenericRecord]) .build stream.addSink(parquet) When I run locally I can see the temporary part files but nothing ever gets rolled. I saw this once before when I didn't have checkpointing enabled for my original streaming job and this note tipped me off: IMPORTANT: Bulk-encoding formats can only be combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every checkpoint. Is it possible that something similar is happening? I have enabled checkpointing in the job however since it is reading from flat files and assigning a timestamp, is it possible checkpointing not working as I expect? Nothing in my logs seems to suggest an error and the job runs to completion (about 30 minutes). Thanks again for your help! Peter On Fri, May 3, 2019 at 4:46 AM Piotr Nowojski <pi...@ververica.com> wrote: > Hi Peter, > > It sounds like this should work, however my question would be do you want > exactly-once processing? If yes, then you would have to somehow know which > exact events needs re-processing or deduplicate them somehow. Keep in mind > that in case of an outage in the original job, you probably will have some > files already committed by the StreamingFileSink. > > Another approach might be to somehow overwrite the previous files (but > then you would have to check whether the bucket assignment and file naming > is completely deterministic) or before reprocessing from backup remove the > dirty files from the crashed job. > > Piotrek > > On 2 May 2019, at 23:10, Peter Groesbeck <peter.groesb...@gmail.com> > wrote: > > Hi all, > > I have an application that reads from various Kafka topics and writes > parquet files to corresponding buckets on S3 using StreamingFileSink with > DateTimeBucketAssigner. The upstream application that writes to Kafka > also writes records as gzipped json files to date bucketed locations on S3 > as backup. > > One requirement we have is to back fill missing data in the event that the > application or Kafka experiences an outage. This can be accomplished by > reading the backup files that were written to S3 by our upstream > application instead of reading from Kafka. My current approach is to read > the hourly backup buckets, transform the files into a DataStream and > assign them a timestamp based on a datetime field on the json records using > BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the > DataStream to the same StreamingFileSink which ideally would write past > records in the same manner as if they had been streamed by Kafka. > > Unfortunately for me, the bucket assigner works on system time: > > A BucketAssigner > <https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> > that > assigns to buckets based on current system time. > > @Override > public String getBucketId(IN element, BucketAssigner.Context context) { > if (dateTimeFormatter == null) { > dateTimeFormatter = > DateTimeFormatter.ofPattern(formatString).withZone(zoneId); > } > return > dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime())); > } > > > No problem, I can extend DateTimeBucketAssigner and override the method to > grab elementTimestamp instead of currentProccessingTime, but I'm wondering if > this is the right approach? And if so would this behavior be useful outside > of the context of my application? > > Thanks in advance for your help and for this awesome framework! > > Peter > > >