Hi Peter,
I also encountered this issue. As far as I know, it is not currently
possible to stream from files (or any bounded stream) into a
*StreamingFileSink*.
This is because files are rolled over only on checkpoints and NOT when the
stream closes. This is due to the fact that at the function le
Thanks for the quick response Piotr,
I feel like I have everything working but no files are getting written to
disk. I've implemented my own BucketAssigner like so:
class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] {
override def getBucketId(element: IN, context:
BucketAssigne
Hi Peter,
It sounds like this should work, however my question would be do you want
exactly-once processing? If yes, then you would have to somehow know which
exact events needs re-processing or deduplicate them somehow. Keep in mind that
in case of an outage in the original job, you probably w