Hi Tim, As I said:
> Do you have any better ideas for implementation or is this the best way to > go? I thought about just building a custom sink inheriting from > StreamingFileSink, but I don't know how to trigger my > jobs then only once . > I _could_ check for my sink parallel subtask index to be something like 0 and > only in that case trigger the subtasks, but I have heavy skew in my parallel > instances: > Some process millions of elements, whereas other process just 10 events a > day. If my "notification-sink-subtask" would end up on a partition with those > few events, I would get way too > seldom new triggers. And I further wouldn't know if the other instances also > had already committed there parquet files. I don't know how to make that within a sink function. I kind of need a "synchronisation-barrier" after all "notify-checkpoint-complete"-calls to all sink instances. Can you tell me on how to do that in my own sink function? Best regards Theo Von: "Timothy Victor" <vict...@gmail.com> An: "Theo Diefenthal" <theo.diefent...@scoop-software.de> CC: "user" <user@flink.apache.org> Gesendet: Samstag, 14. Dezember 2019 16:27:31 Betreff: Re: Processing post to sink? Why not implement your own SinkFunction, or maybe inherit from the one you are using now? Tim On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal < [ mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] > wrote: Hi there, In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like: @Override public String getBucketId( V element, Context context) { return "partitionkey=" + element.getPartitionkey(); } That works well so far. Now I need to know when all sink instances are fully "done" with a partition in order to trigger some export jobs (for machine learning/model training) and also notify Impala about the new (final) partition. In my case, I can well define "fully done". The partitionkey is directly deduced from event time and my watermarks guarantee no late arrivals. So once a watermark passes a certain event time, I know that the prior partition is completed and can trigger my stuff. Well not directly: Once the watermark passes, I need to wait for the next checkpoint to be completed because only then, the parquet files are committed and the partition is fully written to. The question is: How do I implement my "partition-completed"-condition check in Flink? It pretty much comes down to that I want to do some processing _after_ a Sink based on the sinks progress. (Watermark+checkpoints) The only idea I got up with so far is: Make the sink a process-function which also emits elements. Only on a completed checkpoint, emit an element with the current watermark downstream. In the next step, assign event timestamps based on these events and merge the parallel subtasks into one, thus keeping track of the global watermark. In the task with parallelism 1, I could then issue my impala queries and export jobs. (Which should not be called by multiple parallel instances simultaneously). Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances: Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files. What kind of problems do I need to expect when making a sink a process-function? Best regards Theo -- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 theo.diefent...@scoop-software.de - www.scoop-software.de Sitz der Gesellschaft: Köln, Handelsregister: Köln, Handelsregisternummer: HRB 36625 Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel