Hi there, In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like: @Override public String getBucketId( V element, Context context) { return "partitionkey=" + element.getPartitionkey(); } That works well so far.
Now I need to know when all sink instances are fully "done" with a partition in order to trigger some export jobs (for machine learning/model training) and also notify Impala about the new (final) partition. In my case, I can well define "fully done". The partitionkey is directly deduced from event time and my watermarks guarantee no late arrivals. So once a watermark passes a certain event time, I know that the prior partition is completed and can trigger my stuff. Well not directly: Once the watermark passes, I need to wait for the next checkpoint to be completed because only then, the parquet files are committed and the partition is fully written to. The question is: How do I implement my "partition-completed"-condition check in Flink? It pretty much comes down to that I want to do some processing _after_ a Sink based on the sinks progress. (Watermark+checkpoints) The only idea I got up with so far is: Make the sink a process-function which also emits elements. Only on a completed checkpoint, emit an element with the current watermark downstream. In the next step, assign event timestamps based on these events and merge the parallel subtasks into one, thus keeping track of the global watermark. In the task with parallelism 1, I could then issue my impala queries and export jobs. (Which should not be called by multiple parallel instances simultaneously). Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances: Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files. What kind of problems do I need to expect when making a sink a process-function? Best regards Theo