Hi there, 

In my pipeline, I write data into a partitioned parquet directory via 
StreamingFileSink and a partitioner like: 
@Override 
public String getBucketId( V element, Context context) { 
return "partitionkey=" + element.getPartitionkey(); 
} 
That works well so far. 

Now I need to know when all sink instances are fully "done" with a partition in 
order to trigger some export jobs (for machine learning/model training) and 
also notify Impala about the new (final) partition. 

In my case, I can well define "fully done". The partitionkey is directly 
deduced from event time and my watermarks guarantee no late arrivals. So once a 
watermark passes a certain event time, I know that the prior partition is 
completed and can trigger my stuff. Well not directly: Once the watermark 
passes, I need to wait for the next checkpoint to be completed because only 
then, the parquet files are committed and the partition is fully written to. 

The question is: How do I implement my "partition-completed"-condition check in 
Flink? It pretty much comes down to that I want to do some processing _after_ a 
Sink based on the sinks progress. (Watermark+checkpoints) 

The only idea I got up with so far is: Make the sink a process-function which 
also emits elements. Only on a completed checkpoint, emit an element with the 
current watermark downstream. In the next step, assign event timestamps based 
on these events and merge the parallel subtasks into one, thus keeping track of 
the global watermark. In the task with parallelism 1, I could then issue my 
impala queries and export jobs. (Which should not be called by multiple 
parallel instances simultaneously). 

Do you have any better ideas for implementation or is this the best way to go? 
I thought about just building a custom sink inheriting from StreamingFileSink, 
but I don't know how to trigger my jobs then only once . I _could_ check for my 
sink parallel subtask index to be something like 0 and only in that case 
trigger the subtasks, but I have heavy skew in my parallel instances: Some 
process millions of elements, whereas other process just 10 events a day. If my 
"notification-sink-subtask" would end up on a partition with those few events, 
I would get way too seldom new triggers. And I further wouldn't know if the 
other instances also had already committed there parquet files. 

What kind of problems do I need to expect when making a sink a 
process-function? 

Best regards 
Theo 

Reply via email to