Hi Anton, First of all, there is this PR https://github.com/apache/flink/pull/9581 that may be interesting to you.
Second, I think you have to keep in mind that the hourly bucket reporting will be per-subtask. So if you have parallelism of 4, each of the 4 tasks will report individually that they are done with hour e.g. 10, and it is up to the receiving end to know if it should wait for more or not. This may be a problem for your stateful assigner approach as the assigner cannot know by default which subtask it belongs to. If, for example, you have parallelism of 1, then your stateful assigner approach could work, although it suffers from the problem you also mentioned, that it is not integrated with checkpointing (so a part file may be "reverted") and that a file may roll, but it does not mean that the previous is already written to the FS. Third, a solution could be that instead of having the job itself pushing notifications that a part file has rolled (which may suffer from the problem that a part file may roll but the FS takes some time until it writes everything to disk), you could simply monitor the FS directory where you are writing your buckets, and parse the part file names in order to know that all subtasks have finished with hour X. This can be done by another job which will also put notifications to the SQS. I think that this will also solve your concern: "I’m also thinking on how I should couple this with checkpointing mechanism as ideally I’d like to not invoke this callback before checkpoint is written." Cheers, Kostas On Mon, Sep 9, 2019 at 12:40 PM Anton Parkhomenko <mail...@chuwy.me> wrote: > > Hello, > > I’m writing a Flink job that reads heterogenius (one row contains several > types that need to be partitioned downstream) data from AWS Kinesis and > writes to S3 directory structure like s3://bucket/year/month/day/hour/type, > this all works great with StreamingFileSink in Flink 1.9, but problem is that > I need to immedietely (or “as soon as possible” rather) let know another > application to know when “hour” bucket has rolled (i.e. we’re 100% sure it > won’t write any more data for this hour). Another problem is that data can be > very skewed in types, e.g. one hour can contain 90% of rows with typeA, 30% > of rows with typeB and 1% of rows with typeC. > > My current plan is to: > > 1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t > care about event time at all) > 2. Assign every row its bucket in a windowing function > 3. Write a stateful BucketAssigner that: > 3.1. Keeps its last window in a mutable variable > 3.2. Once we received a row with newer window sends a message to SQS and > increments the window > > My biggest concern now is about 3rd point. For me BucketAssigner looks like a > pure function of (Row, Time) -> Bucket and I’m not sure that introducing > state and side-effect there would be reasonable. Is there any other ways to > do it? I’m also thinking on how I should couple this with checkpointing > mechanism as ideally I’d like to not invoke this callback before checkpoint > is written. > > StreamingFileSink provides not much ways to extend it. I tried to > re-implement it for my purposes, but stumbled upon many private methods and > classes, so even though it looks possible, the end result probably will be > too ugly. > > To make things a little bit easier, I don’t care too much about delivery > semantics of those final SQS messages - if I get only ~99% of them - that’s > fine, if some of them will be duplicated - that’s also fine. > > Regards, > Anton