Hi folks,

thanks for the lively discussion. Let me present my point of view on a
couple of items:

*Impact on checkpointing times*

Currently, we send the committables of the writer downstream before the
barrier is sent. That allows us to include all committables in the state of
the committers, such that the committer receives all committables before it
snapshots.

All proposals now add a processing step on the committables where certain
committables are virtually merged into larger committables. However, none
of the approaches require the physical merge to happen before the barrier
is sent downstream. In fact, we can even delay the virtual merge after the
checkpoint has been fully taken. We could even optimize the writer to only
emit the committables after notifyCheckpointComplete as long as we retain
them in the state of the writer. The important point is that the
committables become part of the checkpoint, but they are always committed
only after notifyCheckpointComplete. Especially the retry mechanism of 1.14
decouples the exact time of the commit from notifyCheckpointComplete. The
retry happens asynchronously, so there is no reason to believe that we
can't do the merging asynchronously with any option.

Naturally, all approaches delay the checkpoint barrier a bit by either
adding RPC calls or shuffles but the impact is rather minimal in a well
configured system (the number of committables is assumed to be tiny), so
I'm assuming a tad higher checkpointing time because the topology is more
complex (in all cases).

*Impact on latency*

All approaches will also delay the effective commit, since additionaly work
needs to be done but I'd argue that this is by design and should be clear
to everyone. Especially, when merging files across checkpoints, certain
results will not be visible until much later. Once, we settle for an
approach, we should think which options we give to sink developers and
end-users to impact that latency.

An important aspect here is that we also refine the contract on
GlobalCommitter. Currently, it's not clear when it is supposed to be
called; for example, to register files in a metastore. Naively, I would
have said that the GlobalCommitter is invoked when all committables of a
certain checkpoint have been committed.
a) But what happens in the case of a failure and retry? Do we delay until
the commit finally happens?
b) What do we do with committables that are held back for compaction? Do we
global commit when all committables of checkpoint A are committed ignoring
small files? Or do we wait until a later checkpoint, when all small files
of A have been merged such that indeed all data of A has been committed.

*Re: Developers understand that the cost is relatively high.*

Yes that is a valid concern that we already have with committer and global
committer (which none of the new users understand). I don't like that we
have so many Optional methods where it's not clear which methods to
implement to achieve certain functionality. Ideally, we split up the Sink
in many smaller components where you add certain traits. For example,
HiveSink implements Sink, HasState, HasCommitter, HasCompaction,
HasGlobalCommitter (there are probably better names for the traits)
We still can change that as everything is experimental (and annoy connector
devs) but it seems to be an orthogonal discussion.

If we compare Option 2 with Option 3, I'd argue that the mental model of
the connector dev is more stressed with option 3 than with 2. He needs to
map the high-level concepts of the current Sink to the low-level concepts
of DataStream. He needs to understand the data that is being sent between
writer and committer to be able to hook in.
Note that we need to move away from just sending CommT and wrap it with
some metadata, such as checkpoint id and subtask id.

The dev also needs to care about managing the state, which we may abstract
in Option 2 (not entirely sure). And the dev needs to understand the task
life-cycle to emit the remaining committables before the job shuts down (is
that even possible on DataStream level? Are we notified on EOI or do we
expect devs to use operator API?). Lastly, if things are done
asynchronously as you have championed, the user also needs to take care of
ensuring that all async tasks are done before shutdown.

*Re 2. Repeated implementation*

The newly introduced `aggregate` can set the parallelism, thus perhaps
> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism`
> also need to be supported? If they are supported, additional workloads are
> required, and at the same time I feel that these workloads are unnecessary;
> if not, it will also increase the developers’ understanding cost: for
> example, why these operators can not set these attributes?
>

I'm assuming that you mean that we are about to replicate API between
DataStream and Sink compactor? I wouldn't do that. I would actually also
fix the parallelism to the writer's parallelism. So we just have a pre-key,
aggregate, and post-key where we can have defaults for the keys. We should
provide access to metadata of CommT through a context. I'd always run
compaction in the same slot sharing group as the writer, similar to how the
committer is run. I don't see why we should take a different route with the
design of the compaction than with the committer where we abstract all
these things away.

Also for another
> example, `SinkWriter` has introduced many existing (or repeated)
> implementations in the DataStream API in order to support ‘State’, ‘Timer’,
> ‘asynchronization’, ‘subindex’, etc.
>

I agree that the repeated implementations for Timer is weird but that could
have been solved similar to how we solved it with asynchronization where we
did not replicate interfaces but rather pulled up the concept of
MailboxExecutor from datastream to core.
Having specific Context objects is imho actually a good design decision -
RuntimeContext is heavily overloaded and shouldn't be emulated or used.


> In addition, if a new feature is introduced on the DataStream in the
> future, I think it may be necessary to consider the special operators of
> the sinks separately, which may also be a burden.
>

Yes, option 2 has a higher maintenance cost on Flink since Flink need to
provide these operators but this gives us also more control about changing
things in the future. If we expose the elements that we exchange between
the different sink operators, we cannot change that in the future. Now we
can easily abstract a change in the elements away.
Regarding new features in DataStream: I think this is actually an
advantage. We can evolve DataStream API without thinking about how that
could screw up potential sinks.

*Re 2.Aggregator(I think 1 parallelism is OK, why is it multiple
parallelism?)*

If a user wants to aggregate with parallelism 1, he can use a constant key
(which we should describe in the javadoc) and that should be default.
I think that a higher parallelism makes sense if you want to compact across
checkpoints where you probably want to aggregate the files in different
buckets and you can process buckets in parallel.



I hope to have cleared things up a bit. I guess it became obvious that I
prefer Option 2 because it gives us more means to provide a good
implementation and help the sink developer while also giving us flexibility
in the future. Happy to go more into details on certain points.



On Thu, Nov 4, 2021 at 7:20 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,
>
> Very thanks Fabian for drafting this FLIP! It looks very good to me. I see
> currently most of us agree with option 2, but I personally feel that option
> 3 may be better :-)
> I have some small concerns for option 2
>
> 1. Developers understand that the cost is relatively high.
> The merging of small files is very important, but from the perspective of
> sink as a whole, this requirement is only a special case of sink. We expose
> this requirement directly in the Sink API, which means that all developers
> may need to understand this "special case". From my personal point of view,
> for a developer who has no compaction requirements, seeing this API may
> make him feel more complicated. In addition, from a higher level, the first
> level of the Sink API should have a relatively general abstraction; the
> second level might be different abstractions according to specific types.
>
> 2. Repeated implementation
> The newly introduced `aggregate` can set the parallelism, thus perhaps
> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism`
> also need to be supported? If they are supported, additional workloads are
> required, and at the same time I feel that these workloads are unnecessary;
> if not, it will also increase the developers’ understanding cost: for
> example, why these operators can not set these attributes? Also for another
> example, `SinkWriter` has introduced many existing (or repeated)
> implementations in the DataStream API in order to support ‘State’, ‘Timer’,
> ‘asynchronization’, ‘subindex’, etc.
> In addition, if a new feature is introduced on the DataStream in the
> future, I think it may be necessary to consider the special operators of
> the sinks separately, which may also be a burden.
>
> 3. Might Blocking checkpoint
> For option 2, we would need to actually merge files in the committer.
> However, this might cause some problems in whether compaction blocks
> checkpoint: suppose now our policy is to merge all the files produced in 10
> checkpoints, if we allow users to construct the topology, they could merge
> the file asynchronously in a dedicated executor, the merge could span
> multiple checkpoints (as long as not exceed 10), and then we emit the files
> to be renamed to the committer. But if we merge in a committer, it has to
> be done before the next checkpoint since we do not support asynchronous
> commits now. But with option3, this can be done at present.
>
>
> In addition, The FLIP mentioned that option 3 has a certain burden on
> developers from the perspective of stream batch unification. Could you
> enlighten me a little about what the burden is? I think that the current
> APIs of DataStream are all stream-batch unification, and the topology in
> Opiont3 is also done with `DataStream`, so I understand that developers
> will not have any additional burdens. Or what is missing
>
> Best,
> Guowei
>
>
> On Thu, Nov 4, 2021 at 11:16 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > Hi Fabian,
> >
> > Thanks for drafting the FLIP!
> >
> > ## Few thoughts of user requirements
> >
> > 1.compact files from multiple checkpoints
> >
> > This is what users need very much.
> >
> > 2.The compaction block the checkpointing
> >
> > - Some scenarios are required. For example, the user expects the
> > output data to be consistent with the input data. If the checkpoint
> > succeeds, it needs to see how much data is output. Otherwise, the user
> > restarts a job to consume the same source offset, and he may lose
> > data.
> > - But I think in most cases, users are concerned about this, and we
> > can delay the data to be visible.
> >
> > 3.The end-to-end latency of data
> >
> > This also depends on the situation.
> > - Some user delays are very important. We'd better compact the data at
> > the current checkpoint, even if it affects the checkpoint delay.
> > - Some users think that the delay doesn't matter (the delay is at the
> > minute level). As long as you compact the file, it won't bring me
> > trouble with small files.
> >
> > So I think flexibility is important.
> >
> > ## Few thoughts on the option 2
> >
> > Option1 and option2 seem to be just the difference between the
> > aggregator in the middle, whether it is a separate operator or a
> > coordinator.
> >
> > I would be prefer to option 2.
> >
> > If I understand correctly, the whole process should be as follows:
> >
> > 1.SinkWriter ->
> > 2.Aggregator(I think 1 parallelism is OK, why is it multiple
> parallelism?)
> > ->
> > 3.LocalCommitter (Do compaction works) ->
> > 4.GlobalCommitter
> >
> > The Aggregator can control single checkpoint compaction or cross
> > checkpoint compaction. Controlling block or not block the current
> > checkpoint. Controlling the end-to-end latency of data is how many
> > checkpoints to wait for.
> >
> > Best,
> > Jingsong
> >
> > On Wed, Nov 3, 2021 at 11:01 PM Fabian Paul <fabianp...@ververica.com>
> > wrote:
> > >
> > >
> > > Hi David and Till,
> > >
> > > Thanks for your great feedback. One definitely confusing point in the
> > FLIP is who is doing the actual compaction. The compaction will not be
> done
> > > by the CommittableAggregator operator but the committers so it should
> > also not affect the checkpointing duration or have a significant
> performance
> > > bottleneck because the committers are executed in parallel (also in
> > batch mode [1]).
> > >
> > > I will update the FLIP to clarify it.
> > >
> > > > From your description I would be in favour of option 2 for the
> > following
> > > > reasons: Assuming that option 2 solves all our current problems, it
> > seems
> > > > like the least invasive change and smallest in scope. Your main
> > concern is
> > > > that it might not cover future use cases. Do you have some specific
> use
> > > > cases in mind?
> > >
> > > No, I do not have anything specific in mind I just wanted to raise the
> > point that adding more and more operators to the sink might complicate
> the
> > > development in the future that they can all be used together.
> > >
> > > > What I am missing a bit
> > > > from the description is how option 2 will behave wrt checkpoints and
> > the
> > > > batch execution mode.
> > >
> > > My idea was to always invoke CommittableAggregate#aggregate on a
> > checkpoint and endOfInput. In the batch case the aggregation is only done
> > > once on all committables.
> > >
> > >
> > > > Few thoughts on the option 2)
> > > >
> > > > The file compaction is by definition quite costly IO bound operation.
> > If I
> > > > understand the proposal correctly, the aggregation itself would run
> > during
> > > > operator (aggregator) checkpoint. Would this significantly increase
> the
> > > > checkpoint duration?
> > > >
> > > > Compaction between different sub-tasks incur additional network IO
> (to
> > > > fetch the raw non-compacted files from the remote filesystem), so
> this
> > > > could quickly become a bottleneck. Basically we're decreasing the
> sink
> > > > parallelism (possible throughput) to parallelism of the aggregator.
> > >
> > > Hopefully these concerns are covered by the explanation at the
> beginning.
> > >
> > > > To be really effective here, compaction would ideally be able to
> > compact
> > > > files from multiple checkpoints. However there is a huge tradeoff
> > between
> > > > latency a efficiency (especially with exactly once). Is this
> something
> > > > worth exploring?
> > >
> > > I agree with you by enabling the compaction across checkpoint the
> > latency can increase because files might be committed several checkpoints
> > > later. I guess the best we can do is to let the user configure the
> > behaviour. By configuring the checkpointing interval and the wanted file
> > size the
> > > user can already affect the latency.
> > > Is this answering you questions? I am not fully sure what you are
> > referring to with efficiency. @dvmk
> > >
> > > > I hope that with option 2, we can support both use cases: single task
> > > compaction as well as cross task compaction if needed. Similarly for
> > single
> > > checkpoint compaction as well as cross checkpoint compaction.
> > >
> > > Compaction across subtasks should be controllable by the parallelism of
> > the commttableAggregator operator i.e. a parallelism of 2 can reduce
> > > the computational complexity but might not compute the best compaction.
> > >
> > > Best,
> > > Fabian
> > >
> > > [1] https://github.com/apache/flink/pull/17536 <
> > https://github.com/apache/flink/pull/17536>)
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>

Reply via email to