>
> Emitting records for downstream operators in or after
> notifyCheckpointComplete no longer works after FLIP-147 when executing the
> final checkpoint. The problem is that the final checkpoint happens after
> the EOI and we would like to keep the property that you can terminate the
> whole topology with a single checkpoint, if possible.
>

Sorry for the confusion, I meant to say emit them after the barrier (e.g.
in snapshotState).

On Thu, Nov 4, 2021 at 9:49 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Thanks for the detailed description Arvid.
>
> I might misunderstand things but one comment concerning:
>
> | We could even optimize the writer to only emit the committables after
> notifyCheckpointComplete as long as we retain them in the state of the
> writer.
>
> Emitting records for downstream operators in or after
> notifyCheckpointComplete no longer works after FLIP-147 when executing the
> final checkpoint. The problem is that the final checkpoint happens after
> the EOI and we would like to keep the property that you can terminate the
> whole topology with a single checkpoint, if possible.
>
> Cheers,
> Till
>
> On Thu, Nov 4, 2021 at 9:05 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi folks,
>>
>> thanks for the lively discussion. Let me present my point of view on a
>> couple of items:
>>
>> *Impact on checkpointing times*
>>
>> Currently, we send the committables of the writer downstream before the
>> barrier is sent. That allows us to include all committables in the state of
>> the committers, such that the committer receives all committables before it
>> snapshots.
>>
>> All proposals now add a processing step on the committables where certain
>> committables are virtually merged into larger committables. However, none
>> of the approaches require the physical merge to happen before the barrier
>> is sent downstream. In fact, we can even delay the virtual merge after the
>> checkpoint has been fully taken. We could even optimize the writer to only
>> emit the committables after notifyCheckpointComplete as long as we
>> retain them in the state of the writer. The important point is that the
>> committables become part of the checkpoint, but they are always committed
>> only after notifyCheckpointComplete. Especially the retry mechanism of
>> 1.14 decouples the exact time of the commit from notifyCheckpointComplete.
>> The retry happens asynchronously, so there is no reason to believe that we
>> can't do the merging asynchronously with any option.
>>
>> Naturally, all approaches delay the checkpoint barrier a bit by either
>> adding RPC calls or shuffles but the impact is rather minimal in a well
>> configured system (the number of committables is assumed to be tiny), so
>> I'm assuming a tad higher checkpointing time because the topology is more
>> complex (in all cases).
>>
>> *Impact on latency*
>>
>> All approaches will also delay the effective commit, since additionaly
>> work needs to be done but I'd argue that this is by design and should be
>> clear to everyone. Especially, when merging files across checkpoints,
>> certain results will not be visible until much later. Once, we settle for
>> an approach, we should think which options we give to sink developers and
>> end-users to impact that latency.
>>
>> An important aspect here is that we also refine the contract on
>> GlobalCommitter. Currently, it's not clear when it is supposed to be
>> called; for example, to register files in a metastore. Naively, I would
>> have said that the GlobalCommitter is invoked when all committables of a
>> certain checkpoint have been committed.
>> a) But what happens in the case of a failure and retry? Do we delay until
>> the commit finally happens?
>> b) What do we do with committables that are held back for compaction? Do
>> we global commit when all committables of checkpoint A are committed
>> ignoring small files? Or do we wait until a later checkpoint, when all
>> small files of A have been merged such that indeed all data of A has been
>> committed.
>>
>> *Re: Developers understand that the cost is relatively high.*
>>
>> Yes that is a valid concern that we already have with committer and
>> global committer (which none of the new users understand). I don't like
>> that we have so many Optional methods where it's not clear which methods to
>> implement to achieve certain functionality. Ideally, we split up the Sink
>> in many smaller components where you add certain traits. For example,
>> HiveSink implements Sink, HasState, HasCommitter, HasCompaction,
>> HasGlobalCommitter (there are probably better names for the traits)
>> We still can change that as everything is experimental (and annoy
>> connector devs) but it seems to be an orthogonal discussion.
>>
>> If we compare Option 2 with Option 3, I'd argue that the mental model of
>> the connector dev is more stressed with option 3 than with 2. He needs to
>> map the high-level concepts of the current Sink to the low-level concepts
>> of DataStream. He needs to understand the data that is being sent between
>> writer and committer to be able to hook in.
>> Note that we need to move away from just sending CommT and wrap it with
>> some metadata, such as checkpoint id and subtask id.
>>
>> The dev also needs to care about managing the state, which we may
>> abstract in Option 2 (not entirely sure). And the dev needs to understand
>> the task life-cycle to emit the remaining committables before the job shuts
>> down (is that even possible on DataStream level? Are we notified on EOI or
>> do we expect devs to use operator API?). Lastly, if things are done
>> asynchronously as you have championed, the user also needs to take care of
>> ensuring that all async tasks are done before shutdown.
>>
>> *Re 2. Repeated implementation*
>>
>> The newly introduced `aggregate` can set the parallelism, thus perhaps
>>> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism`
>>> also need to be supported? If they are supported, additional workloads
>>> are
>>> required, and at the same time I feel that these workloads are
>>> unnecessary;
>>> if not, it will also increase the developers’ understanding cost: for
>>> example, why these operators can not set these attributes?
>>>
>>
>> I'm assuming that you mean that we are about to replicate API between
>> DataStream and Sink compactor? I wouldn't do that. I would actually also
>> fix the parallelism to the writer's parallelism. So we just have a pre-key,
>> aggregate, and post-key where we can have defaults for the keys. We should
>> provide access to metadata of CommT through a context. I'd always run
>> compaction in the same slot sharing group as the writer, similar to how the
>> committer is run. I don't see why we should take a different route with the
>> design of the compaction than with the committer where we abstract all
>> these things away.
>>
>> Also for another
>>> example, `SinkWriter` has introduced many existing (or repeated)
>>> implementations in the DataStream API in order to support ‘State’,
>>> ‘Timer’,
>>> ‘asynchronization’, ‘subindex’, etc.
>>>
>>
>> I agree that the repeated implementations for Timer is weird but that
>> could have been solved similar to how we solved it with asynchronization
>> where we did not replicate interfaces but rather pulled up the concept of
>> MailboxExecutor from datastream to core.
>> Having specific Context objects is imho actually a good design decision -
>> RuntimeContext is heavily overloaded and shouldn't be emulated or used.
>>
>>
>>> In addition, if a new feature is introduced on the DataStream in the
>>> future, I think it may be necessary to consider the special operators of
>>> the sinks separately, which may also be a burden.
>>>
>>
>> Yes, option 2 has a higher maintenance cost on Flink since Flink need to
>> provide these operators but this gives us also more control about changing
>> things in the future. If we expose the elements that we exchange between
>> the different sink operators, we cannot change that in the future. Now we
>> can easily abstract a change in the elements away.
>> Regarding new features in DataStream: I think this is actually an
>> advantage. We can evolve DataStream API without thinking about how that
>> could screw up potential sinks.
>>
>> *Re 2.Aggregator(I think 1 parallelism is OK, why is it multiple
>> parallelism?)*
>>
>> If a user wants to aggregate with parallelism 1, he can use a constant
>> key (which we should describe in the javadoc) and that should be default.
>> I think that a higher parallelism makes sense if you want to compact
>> across checkpoints where you probably want to aggregate the files in
>> different buckets and you can process buckets in parallel.
>>
>>
>>
>> I hope to have cleared things up a bit. I guess it became obvious that I
>> prefer Option 2 because it gives us more means to provide a good
>> implementation and help the sink developer while also giving us flexibility
>> in the future. Happy to go more into details on certain points.
>>
>>
>>
>> On Thu, Nov 4, 2021 at 7:20 AM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Very thanks Fabian for drafting this FLIP! It looks very good to me. I
>>> see
>>> currently most of us agree with option 2, but I personally feel that
>>> option
>>> 3 may be better :-)
>>> I have some small concerns for option 2
>>>
>>> 1. Developers understand that the cost is relatively high.
>>> The merging of small files is very important, but from the perspective of
>>> sink as a whole, this requirement is only a special case of sink. We
>>> expose
>>> this requirement directly in the Sink API, which means that all
>>> developers
>>> may need to understand this "special case". From my personal point of
>>> view,
>>> for a developer who has no compaction requirements, seeing this API may
>>> make him feel more complicated. In addition, from a higher level, the
>>> first
>>> level of the Sink API should have a relatively general abstraction; the
>>> second level might be different abstractions according to specific types.
>>>
>>> 2. Repeated implementation
>>> The newly introduced `aggregate` can set the parallelism, thus perhaps
>>> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism`
>>> also need to be supported? If they are supported, additional workloads
>>> are
>>> required, and at the same time I feel that these workloads are
>>> unnecessary;
>>> if not, it will also increase the developers’ understanding cost: for
>>> example, why these operators can not set these attributes? Also for
>>> another
>>> example, `SinkWriter` has introduced many existing (or repeated)
>>> implementations in the DataStream API in order to support ‘State’,
>>> ‘Timer’,
>>> ‘asynchronization’, ‘subindex’, etc.
>>> In addition, if a new feature is introduced on the DataStream in the
>>> future, I think it may be necessary to consider the special operators of
>>> the sinks separately, which may also be a burden.
>>>
>>> 3. Might Blocking checkpoint
>>> For option 2, we would need to actually merge files in the committer.
>>> However, this might cause some problems in whether compaction blocks
>>> checkpoint: suppose now our policy is to merge all the files produced in
>>> 10
>>> checkpoints, if we allow users to construct the topology, they could
>>> merge
>>> the file asynchronously in a dedicated executor, the merge could span
>>> multiple checkpoints (as long as not exceed 10), and then we emit the
>>> files
>>> to be renamed to the committer. But if we merge in a committer, it has to
>>> be done before the next checkpoint since we do not support asynchronous
>>> commits now. But with option3, this can be done at present.
>>>
>>>
>>> In addition, The FLIP mentioned that option 3 has a certain burden on
>>> developers from the perspective of stream batch unification. Could you
>>> enlighten me a little about what the burden is? I think that the current
>>> APIs of DataStream are all stream-batch unification, and the topology in
>>> Opiont3 is also done with `DataStream`, so I understand that developers
>>> will not have any additional burdens. Or what is missing
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Thu, Nov 4, 2021 at 11:16 AM Jingsong Li <jingsongl...@gmail.com>
>>> wrote:
>>>
>>> > Hi Fabian,
>>> >
>>> > Thanks for drafting the FLIP!
>>> >
>>> > ## Few thoughts of user requirements
>>> >
>>> > 1.compact files from multiple checkpoints
>>> >
>>> > This is what users need very much.
>>> >
>>> > 2.The compaction block the checkpointing
>>> >
>>> > - Some scenarios are required. For example, the user expects the
>>> > output data to be consistent with the input data. If the checkpoint
>>> > succeeds, it needs to see how much data is output. Otherwise, the user
>>> > restarts a job to consume the same source offset, and he may lose
>>> > data.
>>> > - But I think in most cases, users are concerned about this, and we
>>> > can delay the data to be visible.
>>> >
>>> > 3.The end-to-end latency of data
>>> >
>>> > This also depends on the situation.
>>> > - Some user delays are very important. We'd better compact the data at
>>> > the current checkpoint, even if it affects the checkpoint delay.
>>> > - Some users think that the delay doesn't matter (the delay is at the
>>> > minute level). As long as you compact the file, it won't bring me
>>> > trouble with small files.
>>> >
>>> > So I think flexibility is important.
>>> >
>>> > ## Few thoughts on the option 2
>>> >
>>> > Option1 and option2 seem to be just the difference between the
>>> > aggregator in the middle, whether it is a separate operator or a
>>> > coordinator.
>>> >
>>> > I would be prefer to option 2.
>>> >
>>> > If I understand correctly, the whole process should be as follows:
>>> >
>>> > 1.SinkWriter ->
>>> > 2.Aggregator(I think 1 parallelism is OK, why is it multiple
>>> parallelism?)
>>> > ->
>>> > 3.LocalCommitter (Do compaction works) ->
>>> > 4.GlobalCommitter
>>> >
>>> > The Aggregator can control single checkpoint compaction or cross
>>> > checkpoint compaction. Controlling block or not block the current
>>> > checkpoint. Controlling the end-to-end latency of data is how many
>>> > checkpoints to wait for.
>>> >
>>> > Best,
>>> > Jingsong
>>> >
>>> > On Wed, Nov 3, 2021 at 11:01 PM Fabian Paul <fabianp...@ververica.com>
>>> > wrote:
>>> > >
>>> > >
>>> > > Hi David and Till,
>>> > >
>>> > > Thanks for your great feedback. One definitely confusing point in the
>>> > FLIP is who is doing the actual compaction. The compaction will not be
>>> done
>>> > > by the CommittableAggregator operator but the committers so it should
>>> > also not affect the checkpointing duration or have a significant
>>> performance
>>> > > bottleneck because the committers are executed in parallel (also in
>>> > batch mode [1]).
>>> > >
>>> > > I will update the FLIP to clarify it.
>>> > >
>>> > > > From your description I would be in favour of option 2 for the
>>> > following
>>> > > > reasons: Assuming that option 2 solves all our current problems, it
>>> > seems
>>> > > > like the least invasive change and smallest in scope. Your main
>>> > concern is
>>> > > > that it might not cover future use cases. Do you have some
>>> specific use
>>> > > > cases in mind?
>>> > >
>>> > > No, I do not have anything specific in mind I just wanted to raise
>>> the
>>> > point that adding more and more operators to the sink might complicate
>>> the
>>> > > development in the future that they can all be used together.
>>> > >
>>> > > > What I am missing a bit
>>> > > > from the description is how option 2 will behave wrt checkpoints
>>> and
>>> > the
>>> > > > batch execution mode.
>>> > >
>>> > > My idea was to always invoke CommittableAggregate#aggregate on a
>>> > checkpoint and endOfInput. In the batch case the aggregation is only
>>> done
>>> > > once on all committables.
>>> > >
>>> > >
>>> > > > Few thoughts on the option 2)
>>> > > >
>>> > > > The file compaction is by definition quite costly IO bound
>>> operation.
>>> > If I
>>> > > > understand the proposal correctly, the aggregation itself would run
>>> > during
>>> > > > operator (aggregator) checkpoint. Would this significantly
>>> increase the
>>> > > > checkpoint duration?
>>> > > >
>>> > > > Compaction between different sub-tasks incur additional network IO
>>> (to
>>> > > > fetch the raw non-compacted files from the remote filesystem), so
>>> this
>>> > > > could quickly become a bottleneck. Basically we're decreasing the
>>> sink
>>> > > > parallelism (possible throughput) to parallelism of the aggregator.
>>> > >
>>> > > Hopefully these concerns are covered by the explanation at the
>>> beginning.
>>> > >
>>> > > > To be really effective here, compaction would ideally be able to
>>> > compact
>>> > > > files from multiple checkpoints. However there is a huge tradeoff
>>> > between
>>> > > > latency a efficiency (especially with exactly once). Is this
>>> something
>>> > > > worth exploring?
>>> > >
>>> > > I agree with you by enabling the compaction across checkpoint the
>>> > latency can increase because files might be committed several
>>> checkpoints
>>> > > later. I guess the best we can do is to let the user configure the
>>> > behaviour. By configuring the checkpointing interval and the wanted
>>> file
>>> > size the
>>> > > user can already affect the latency.
>>> > > Is this answering you questions? I am not fully sure what you are
>>> > referring to with efficiency. @dvmk
>>> > >
>>> > > > I hope that with option 2, we can support both use cases: single
>>> task
>>> > > compaction as well as cross task compaction if needed. Similarly for
>>> > single
>>> > > checkpoint compaction as well as cross checkpoint compaction.
>>> > >
>>> > > Compaction across subtasks should be controllable by the parallelism
>>> of
>>> > the commttableAggregator operator i.e. a parallelism of 2 can reduce
>>> > > the computational complexity but might not compute the best
>>> compaction.
>>> > >
>>> > > Best,
>>> > > Fabian
>>> > >
>>> > > [1] https://github.com/apache/flink/pull/17536 <
>>> > https://github.com/apache/flink/pull/17536>)
>>> >
>>> >
>>> >
>>> > --
>>> > Best, Jingsong Lee
>>> >
>>>
>>

Reply via email to