Re: DateTimeBucketAssigner using Element Timestamp

2019-05-06 Thread Rafi Aroch
Hi Peter, I also encountered this issue. As far as I know, it is not currently possible to stream from files (or any bounded stream) into a *StreamingFileSink*. This is because files are rolled over only on checkpoints and NOT when the stream closes. This is due to the fact that at the function le

Re: DateTimeBucketAssigner using Element Timestamp

2019-05-03 Thread Peter Groesbeck
Thanks for the quick response Piotr, I feel like I have everything working but no files are getting written to disk. I've implemented my own BucketAssigner like so: class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] { override def getBucketId(element: IN, context: BucketAssigne

Re: DateTimeBucketAssigner using Element Timestamp

2019-05-03 Thread Piotr Nowojski
Hi Peter, It sounds like this should work, however my question would be do you want exactly-once processing? If yes, then you would have to somehow know which exact events needs re-processing or deduplicate them somehow. Keep in mind that in case of an outage in the original job, you probably w