Hey Theo,

your solution of turning the sink into a process function should work. I'm
just not sure how easy it is to re-use the StreamingFileSink inside it.
Have you considered sending all the records to a parallelism=1 process
function sitting "next" to the StreamingFileSink. You could track the
watermarks and partitions in there, and listen to the
"notifyCheckpointComplete()" calls. Since that ProcessFunction is receiving
data at the same rate as the sink, it should align with the sinks. I'm not
100% sure if this solution really works, but I wanted to bring to see if
you've considered it.

Best,
Robert



On Sat, Dec 14, 2019 at 7:08 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Tim,
>
> As I said:
>
> > Do you have any better ideas for implementation or is this the best way
> to go? I thought about just building a custom sink inheriting from
> StreamingFileSink, but I don't know how to trigger my > jobs then only once
> . I _could_ check for my sink parallel subtask index to be something like 0
> and only in that case trigger the subtasks, but I have heavy skew in my
> parallel instances:
> > Some process millions of elements, whereas other process just 10 events
> a day. If my "notification-sink-subtask" would end up on a partition with
> those few events, I would get way too
> > seldom new triggers. And I further wouldn't know if the other instances
> also had already committed there parquet files.
>
> I don't know how to make that within a sink function. I kind of need a
> "synchronisation-barrier" after all "notify-checkpoint-complete"-calls to
> all sink instances. Can you tell me on how to do that in my own sink
> function?
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Timothy Victor" <vict...@gmail.com>
> *An: *"Theo Diefenthal" <theo.diefent...@scoop-software.de>
> *CC: *"user" <user@flink.apache.org>
> *Gesendet: *Samstag, 14. Dezember 2019 16:27:31
> *Betreff: *Re: Processing post to sink?
>
> Why not implement your own SinkFunction, or maybe inherit from the one you
> are using now?
>
> Tim
>
> On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi there,
>>
>> In my pipeline, I write data into a partitioned parquet directory via
>> StreamingFileSink and a partitioner like:
>>
>> @Override
>> public String getBucketId(V element, Context context) {
>>     return "partitionkey=" + element.getPartitionkey();
>> }
>>
>> That works well so far.
>>
>> Now I need to know when all sink instances are fully "done" with a
>> partition in order to trigger some export jobs (for machine learning/model
>> training) and also notify Impala about the new (final) partition.
>>
>> In my case, I can well define "fully done". The partitionkey is directly
>> deduced from event time and my watermarks guarantee no late arrivals. So
>> once a watermark passes a certain event time, I know that the prior
>> partition is completed and can trigger my stuff. Well not directly: Once
>> the watermark passes, I need to wait for the next checkpoint to be
>> completed because only then, the parquet files are committed and the
>> partition is fully written to.
>>
>> The question is: How do I implement my "partition-completed"-condition
>> check in Flink? It pretty much comes down to that I want to do some
>> processing _after_ a Sink based on the sinks progress.
>> (Watermark+checkpoints)
>>
>> The only idea I got up with so far is: Make the sink a process-function
>> which also emits elements. Only on a completed checkpoint, emit an element
>> with the current watermark downstream. In the next step, assign event
>> timestamps based on these events and merge the parallel subtasks into one,
>> thus keeping track of the global watermark. In the task with parallelism 1,
>> I could then issue my impala queries and export jobs. (Which should not be
>> called by multiple parallel instances simultaneously).
>>
>> Do you have any better ideas for implementation or is this the best way
>> to go? I thought about just building a custom sink inheriting from
>> StreamingFileSink, but I don't know how to trigger my jobs then only once .
>> I _could_ check for my sink parallel subtask index to be something like 0
>> and only in that case trigger the subtasks, but I have heavy skew in my
>> parallel instances: Some process millions of elements, whereas other
>> process just 10 events a day. If my "notification-sink-subtask" would end
>> up on a partition with those few events, I would get way too seldom new
>> triggers. And I further wouldn't know if the other instances also had
>> already committed there parquet files.
>>
>> What kind of problems do I need to expect when making a sink a
>> process-function?
>>
>> Best regards
>> Theo
>>
>
>
> --
> SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
> Theo Diefenthal
>
> T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
> theo.diefent...@scoop-software.de - www.scoop-software.de
> Sitz der Gesellschaft: Köln, Handelsregister: Köln,
> Handelsregisternummer: HRB 36625
> Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
> Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
>

Reply via email to