Hi,

Very thanks Fabian for drafting this FLIP! It looks very good to me. I see
currently most of us agree with option 2, but I personally feel that option
3 may be better :-)
I have some small concerns for option 2

1. Developers understand that the cost is relatively high.
The merging of small files is very important, but from the perspective of
sink as a whole, this requirement is only a special case of sink. We expose
this requirement directly in the Sink API, which means that all developers
may need to understand this "special case". From my personal point of view,
for a developer who has no compaction requirements, seeing this API may
make him feel more complicated. In addition, from a higher level, the first
level of the Sink API should have a relatively general abstraction; the
second level might be different abstractions according to specific types.

2. Repeated implementation
The newly introduced `aggregate` can set the parallelism, thus perhaps
`setUid`, `slotSharingGroup (including resources)`, and `maxParallelism`
also need to be supported? If they are supported, additional workloads are
required, and at the same time I feel that these workloads are unnecessary;
if not, it will also increase the developers’ understanding cost: for
example, why these operators can not set these attributes? Also for another
example, `SinkWriter` has introduced many existing (or repeated)
implementations in the DataStream API in order to support ‘State’, ‘Timer’,
‘asynchronization’, ‘subindex’, etc.
In addition, if a new feature is introduced on the DataStream in the
future, I think it may be necessary to consider the special operators of
the sinks separately, which may also be a burden.

3. Might Blocking checkpoint
For option 2, we would need to actually merge files in the committer.
However, this might cause some problems in whether compaction blocks
checkpoint: suppose now our policy is to merge all the files produced in 10
checkpoints, if we allow users to construct the topology, they could merge
the file asynchronously in a dedicated executor, the merge could span
multiple checkpoints (as long as not exceed 10), and then we emit the files
to be renamed to the committer. But if we merge in a committer, it has to
be done before the next checkpoint since we do not support asynchronous
commits now. But with option3, this can be done at present.


In addition, The FLIP mentioned that option 3 has a certain burden on
developers from the perspective of stream batch unification. Could you
enlighten me a little about what the burden is? I think that the current
APIs of DataStream are all stream-batch unification, and the topology in
Opiont3 is also done with `DataStream`, so I understand that developers
will not have any additional burdens. Or what is missing

Best,
Guowei


On Thu, Nov 4, 2021 at 11:16 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Fabian,
>
> Thanks for drafting the FLIP!
>
> ## Few thoughts of user requirements
>
> 1.compact files from multiple checkpoints
>
> This is what users need very much.
>
> 2.The compaction block the checkpointing
>
> - Some scenarios are required. For example, the user expects the
> output data to be consistent with the input data. If the checkpoint
> succeeds, it needs to see how much data is output. Otherwise, the user
> restarts a job to consume the same source offset, and he may lose
> data.
> - But I think in most cases, users are concerned about this, and we
> can delay the data to be visible.
>
> 3.The end-to-end latency of data
>
> This also depends on the situation.
> - Some user delays are very important. We'd better compact the data at
> the current checkpoint, even if it affects the checkpoint delay.
> - Some users think that the delay doesn't matter (the delay is at the
> minute level). As long as you compact the file, it won't bring me
> trouble with small files.
>
> So I think flexibility is important.
>
> ## Few thoughts on the option 2
>
> Option1 and option2 seem to be just the difference between the
> aggregator in the middle, whether it is a separate operator or a
> coordinator.
>
> I would be prefer to option 2.
>
> If I understand correctly, the whole process should be as follows:
>
> 1.SinkWriter ->
> 2.Aggregator(I think 1 parallelism is OK, why is it multiple parallelism?)
> ->
> 3.LocalCommitter (Do compaction works) ->
> 4.GlobalCommitter
>
> The Aggregator can control single checkpoint compaction or cross
> checkpoint compaction. Controlling block or not block the current
> checkpoint. Controlling the end-to-end latency of data is how many
> checkpoints to wait for.
>
> Best,
> Jingsong
>
> On Wed, Nov 3, 2021 at 11:01 PM Fabian Paul <fabianp...@ververica.com>
> wrote:
> >
> >
> > Hi David and Till,
> >
> > Thanks for your great feedback. One definitely confusing point in the
> FLIP is who is doing the actual compaction. The compaction will not be done
> > by the CommittableAggregator operator but the committers so it should
> also not affect the checkpointing duration or have a significant performance
> > bottleneck because the committers are executed in parallel (also in
> batch mode [1]).
> >
> > I will update the FLIP to clarify it.
> >
> > > From your description I would be in favour of option 2 for the
> following
> > > reasons: Assuming that option 2 solves all our current problems, it
> seems
> > > like the least invasive change and smallest in scope. Your main
> concern is
> > > that it might not cover future use cases. Do you have some specific use
> > > cases in mind?
> >
> > No, I do not have anything specific in mind I just wanted to raise the
> point that adding more and more operators to the sink might complicate the
> > development in the future that they can all be used together.
> >
> > > What I am missing a bit
> > > from the description is how option 2 will behave wrt checkpoints and
> the
> > > batch execution mode.
> >
> > My idea was to always invoke CommittableAggregate#aggregate on a
> checkpoint and endOfInput. In the batch case the aggregation is only done
> > once on all committables.
> >
> >
> > > Few thoughts on the option 2)
> > >
> > > The file compaction is by definition quite costly IO bound operation.
> If I
> > > understand the proposal correctly, the aggregation itself would run
> during
> > > operator (aggregator) checkpoint. Would this significantly increase the
> > > checkpoint duration?
> > >
> > > Compaction between different sub-tasks incur additional network IO (to
> > > fetch the raw non-compacted files from the remote filesystem), so this
> > > could quickly become a bottleneck. Basically we're decreasing the sink
> > > parallelism (possible throughput) to parallelism of the aggregator.
> >
> > Hopefully these concerns are covered by the explanation at the beginning.
> >
> > > To be really effective here, compaction would ideally be able to
> compact
> > > files from multiple checkpoints. However there is a huge tradeoff
> between
> > > latency a efficiency (especially with exactly once). Is this something
> > > worth exploring?
> >
> > I agree with you by enabling the compaction across checkpoint the
> latency can increase because files might be committed several checkpoints
> > later. I guess the best we can do is to let the user configure the
> behaviour. By configuring the checkpointing interval and the wanted file
> size the
> > user can already affect the latency.
> > Is this answering you questions? I am not fully sure what you are
> referring to with efficiency. @dvmk
> >
> > > I hope that with option 2, we can support both use cases: single task
> > compaction as well as cross task compaction if needed. Similarly for
> single
> > checkpoint compaction as well as cross checkpoint compaction.
> >
> > Compaction across subtasks should be controllable by the parallelism of
> the commttableAggregator operator i.e. a parallelism of 2 can reduce
> > the computational complexity but might not compute the best compaction.
> >
> > Best,
> > Fabian
> >
> > [1] https://github.com/apache/flink/pull/17536 <
> https://github.com/apache/flink/pull/17536>)
>
>
>
> --
> Best, Jingsong Lee
>

Reply via email to