>
> > One way to avoid write-read-merge is by wrapping SinkWriter with
> > another one, which would buffer input elements in a temporary storage
> > (e.g. local file) until a threshold is reached; after that, it would
> > invoke the original SinkWriter. And if a checkpoint barrier comes in
> > earlier, it would send written data to some aggregator.
>
> I think perhaps this seems to be a kind of WAL method? Namely we first
> write the elements to some WAL logs and persist them on checkpoint
> (in snapshot or remote FS), or we directly write WAL logs to the remote
> FS eagerly.
>
> At some point, Flink will definitely have some WAL adapter that can turn
any sink into an exactly-once sink (with some caveats). For now, we keep
that as an orthogonal solution as it has a rather high price (bursty
workload with high latency). Ideally, we can keep the compaction
asynchronously...

On Mon, Nov 29, 2021 at 8:52 AM Yun Gao <yungao...@aliyun.com.invalid>
wrote:

> Hi,
>
> @Roman very sorry for the late response for a long time,
>
> > Merging artifacts from multiple checkpoints would apparently
> require multiple concurrent checkpoints
>
> I think it might not need concurrent checkpoints: suppose some
> operators (like the committer aggregator in the option 2) maintains
> the list of files to merge, it could stores the lists of files to merge
> in the states, then after several checkpoints are done and we have
> enough files, we could merge all the files in the list.
>
> > Asynchronous merging in an aggregator would require some resolution
> > logic on recovery, so that a merged artifact can be used if the
> > original one was deleted. Otherwise, wouldn't recovery fail because
> > some artifacts are missing?
> > We could also defer deletion until the "compacted" checkpoint is
> > subsumed - but isn't it too late, as it will be deleted anyways once
> > subsumed?
>
> I think logically we could delete the original files once the "compacted"
> checkpoint
> (which finish merging the compacted files and record it in the checkpoint)
> is completed
> in all the options. If there are failover before we it, we could restart
> the merging and if
> there are failover after it, we could have already recorded the files in
> the checkpoint.
>
> > One way to avoid write-read-merge is by wrapping SinkWriter with
> > another one, which would buffer input elements in a temporary storage
> > (e.g. local file) until a threshold is reached; after that, it would
> > invoke the original SinkWriter. And if a checkpoint barrier comes in
> > earlier, it would send written data to some aggregator.
>
> I think perhaps this seems to be a kind of WAL method? Namely we first
> write the elements to some WAL logs and persist them on checkpoint
> (in snapshot or remote FS), or we directly write WAL logs to the remote
> FS eagerly.
>
> Sorry if I do not understand correctly somewhere.
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Roman Khachatryan <ro...@apache.org>
> Send Time:2021 Nov. 9 (Tue.) 22:03
> To:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support
> small file compaction
>
> Hi everyone,
>
> Thanks for the proposal and the discussion, I have some remarks:
> (I'm not very familiar with the new Sink API but I thought about the
> same problem in context of the changelog state backend)
>
> 1. Merging artifacts from multiple checkpoints would apparently
> require multiple concurrent checkpoints (otherwise, a new checkpoint
> won't be started before completing the previous one; and the previous
> one can't be completed before durably storing the artifacts). However,
> concurrent checkpoints are currently not supported with Unaligned
> checkpoints (this is besides increasing e2e-latency).
>
> 2. Asynchronous merging in an aggregator would require some resolution
> logic on recovery, so that a merged artifact can be used if the
> original one was deleted. Otherwise, wouldn't recovery fail because
> some artifacts are missing?
> We could also defer deletion until the "compacted" checkpoint is
> subsumed - but isn't it too late, as it will be deleted anyways once
> subsumed?
>
> 3. Writing small files, then reading and merging them for *every*
> checkpoint seems worse than only reading them on recovery. I guess I'm
> missing some cases of reading, so to me it would make sense to mention
> these cases explicitly in the FLIP motivation section.
>
> 4. One way to avoid write-read-merge is by wrapping SinkWriter with
> another one, which would buffer input elements in a temporary storage
> (e.g. local file) until a threshold is reached; after that, it would
> invoke the original SinkWriter. And if a checkpoint barrier comes in
> earlier, it would send written data to some aggregator. It will
> increase checkpoint delay (async phase) compared to the current Flink;
> but not compared to the write-read-merge solution, IIUC.
> Then such "BufferingSinkWriters" could aggregate input elements from
> each other, potentially recursively (I mean something like
>
> https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png
> )
>
> 5. Reducing the number of files by reducing aggregator parallelism as
> opposed to merging on reaching size threshold will likely be less
> optimal and more difficult to configure. OTH, thresholds might be more
> difficult to implement and (with recursive merging) would incur higher
> latency. Maybe that's also something to decide explicitly or at least
> mention in the FLIP.
>
>
>
> Regards,
> Roman
>
>
> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei <leinuo...@gmail.com> wrote:
> >
> > Hi Fabian,
> >
> > Thanks for drafting the FLIP and trying to support small file
> compaction. I
> > think this feature is very urgent and valuable for users(at least for
> me).
> >
> > Currently I am trying to support streaming rewrite(compact) for Iceberg
> on
> > PR#3323 <https://github.com/apache/iceberg/pull/3323>. As Steven
> mentioned,
> > Iceberg sink and compact data through the following steps:
> > Step-1: Some parallel data writer(sinker) to write streaming data as
> files.
> > Step-2: A single parallelism data files committer to commit the completed
> > files as soon as possible to make them available.
> > Step-3: Some parallel file rewriter(compactor) to collect committed files
> > from multiple checkpoints, and rewriter(compact) them together once the
> > total file size or number of files reach the threshold.
> > Step-4: A single parallelism rewrite(compact) result committer to commit
> > the rewritten(compacted) files to replace the old files and make them
> > available.
> >
> >
> > If Flink want to support small file compaction, some key point I think is
> > necessary:
> >
> > 1, Compact files from multiple checkpoints.
> > I totally agree with Jingsong, because completed file size usually could
> > not reach the threshold in a single checkpoint. Especially for
> partitioned
> > table, we need to compact the files of each partition, but usually the
> file
> > size of each partition will be different and may not reach the merge
> > threshold. If we compact these files, in a single checkpoint, regardless
> of
> > whether the total file size reaches the threshold, then the value of
> > compacting will be diminished and we will still get small files because
> > these compacted files are not reach to target size. So we need the
> > compactor to collect committed files from multiple checkpoints and
> compact
> > them until they reach the threshold.
> >
> > 2, Separate write phase and compact phase.
> > Users usually hope the data becomes available as soon as possible, and
> the
> >  end-to-end latency is very important. I think we need to separate the
> > write and compact phase. For the write phase, there include the Step-1
> > and Step-2, we sink data as file and commit it pre checkpoint and
> regardless
> > of whether the file size it is. That could ensure the data will be
> > available ASAP. For the compact phase, there include the Step-3
> > and Step-4,  the compactor should collect committed files from multiple
> > checkpoints and compact them asynchronously once they reach the
> threshold,
> > and the compact committer will commit the  compaction result in the next
> > checkpoint. We compact the committed files asynchronously because we
> don't
> > want the compaction to affect the data sink or the whole pipeline.
> >
> > 3, Exactly once guarantee between write and compact phase.
> > Once we separate write phase and compact phase, we need to consider
> > how to guarantee
> > the exact once semantic between two phases. We should not lose any data
> or
> > files on the compactor(Step-3) in any case and cause the compaction
> result
> > to be inconsistent with before. I think flink should provide an
> easy-to-use
> > interface to make that easier.
> >
> > 4, Metadata operation and  compaction result validation.
> > In the compact phase, there may be not only compact files, but also a lot
> > of metadata operations, such as the iceberg needing to read/write
> manifest
> > and do MOR. And we need some interface to support users to do some
> > validation of the compaction result. I think these points should be
> > considered when we design the compaction API.
> >
> >
> > Back to FLIP-191, option 1 looks very complicated while option 2 is
> > relatively simple, but neither of these two solutions separates the write
> > phase from the compact phase. So I think we should consider the points I
> > mentioned above. And if you have any other questions you can always feel
> > free to reach out to me!
> >
> > BR,
> > Reo
> >
> > Fabian Paul <fabianp...@ververica.com> 于2021年11月8日周一 下午7:59写道:
> >
> > > Hi all,
> > >
> > > Thanks for the lively discussions. I am really excited to see so many
> > > people
> > > participating in this thread. It also underlines the need that many
> people
> > > would
> > > like to see a solution soon.
> > >
> > > I have updated the FLIP and removed the parallelism configuration
> because
> > > it is
> > > unnecessary since users can configure a constant exchange key to send
> all
> > > committables to only one committable aggregator.
> > >
> > >
> > > 1. Burden for developers w.r.t batch stream unification.
> > >
> > > @yun @guowei, from a theoretical point you are right about exposing the
> > > DataStream
> > > API in the sink users have the full power to write correct batch and
> > > streaming
> > > sinks. I think in reality a lot of users still struggle to build
> pipelines
> > > with
> > > i.e. the operator pipeline which works correct in streaming and batch
> mode.
> > > Another problem I see is by exposing more deeper concepts is that we
> > > cannot do
> > > any optimization because we cannot reason about how sinks are built in
> the
> > > future.
> > >
> > > We should also try to steer users towards using only `Functions` to
> give
> > > us more
> > > flexibility to swap the internal operator representation. I agree with
> > > @yun we
> > > should try to make the `ProcessFunction` more versatile to work on that
> > > goal but
> > > I see this as unrelated to the FLIP.
> > >
> > >
> > > 2. Regarding Commit / Global commit
> > >
> > > I envision the global committer to be specific depending on the data
> lake
> > > solution you want to write to. However, it is entirely orthogonal to
> the
> > > compaction.
> > > Currently, I do not expect any changes w.r.t the Global commit
> introduces
> > > by
> > > this FLIP.
> > >
> > >
> > > 3. Regarding the case of trans-checkpoints merging
> > >
> > > @yun, as user, I would expect that if the committer receives in a
> > > checkpoint files
> > > to merge/commit that these are also finished when the checkpoint
> finishes.
> > > I think all sinks rely on this principle currently i.e., KafkaSink
> needs to
> > > commit all open transactions until the next checkpoint can happen.
> > >
> > > Maybe in the future, we can somehow move the Committer#commit call to
> an
> > > asynchronous execution, but we should discuss it as a separate thread.
> > >
> > > > We probably should first describe the different causes of small
> files and
> > > > what problems was this proposal trying to solve. I wrote a data
> shuffling
> > > > proposal [1] for Flink Iceberg sink (shared with Iceberg community
> [2]).
> > > It
> > > > can address small files problems due to skewed data distribution
> across
> > > > Iceberg table partitions. Streaming shuffling before writers (to
> files)
> > > is
> > > > typically more efficient than post-write file compaction (which
> involves
> > > > read-merge-write). It is usually cheaper to prevent a problem (small
> > > files)
> > > > than fixing it.
> > >
> > >
> > > @steven you are raising a good point, although I think only using a
> > > customizable
> > > shuffle won't address the generation of small files. One assumption is
> that
> > > at least the sink generates one file per subtask, which can already be
> too
> > > many.
> > > Another problem is that with low checkpointing intervals, the files do
> not
> > > meet
> > > the required size. The latter point is probably addressable by
> changing the
> > > checkpoint interval, which might be inconvenient for some users.
> > >
> > > > The sink coordinator checkpoint problem (mentioned in option 1)
> would be
> > > > great if Flink can address it. In the spirit of source
> > > (enumerator-reader)
> > > > and sink (writer-coordinator) duality, sink coordinator checkpoint
> should
> > > > happen after the writer operator. This would be a natural fit to
> support
> > > > global committer in FLIP-143. It is probably an orthogonal matter to
> this
> > > > proposal.
> > >
> > >
> > > To me the question here is what are the benefits of having a
> coordinator in
> > > comparison to a global committer/aggregator operator.
> > >
> > > > Personally, I am usually in favor of keeping streaming ingestion (to
> data
> > > > lake) relatively simple and stable. Also sometimes compaction and
> sorting
> > > > are performed together in data rewrite maintenance jobs to improve
> read
> > > > performance. In that case, the value of compacting (in Flink
> streaming
> > > > ingestion) diminishes.
> > >
> > >
> > > I agree it is always possible to have scheduled maintenance jobs
> keeping
> > > care of
> > > your data i.e., doing compaction. Unfortunately, the downside is that
> you
> > > have to your data after it is already available for other downstream
> > > consumers.
> > > I guess this can lead to all kinds of visibility problems. I am also
> > > surprised that
> > > you personally are a fan of this approach and, on the other hand, are
> > > developing
> > > the Iceberg sink, which goes somewhat against your mentioned principle
> of
> > > keeping
> > > the sink simple.
> > >
> > > > Currently, it is unclear from the doc and this thread where the
> > > compaction
> > > > is actually happening. Jingsong's reply described one model
> > > > writer (parallel) -> aggregator (single-parallelism compaction
> planner)
> > > ->
> > > > compactor (parallel) -> global committer (single-parallelism)
> > >
> > >
> > > My idea of the topology is very similar to the one outlined by
> Jinsong. The
> > > compaction will happen in the committer operator.
> > >
> > > >
> > > > In the Iceberg community, the following model has been discussed. It
> is
> > > > better for Iceberg because it won't delay the data availability.
> > > > writer (parallel) -> global committer for append (single
> parallelism) ->
> > > > compactor (parallel) -> global committer for rewrite commit (single
> > > > parallelism)
> > >
> > >
> > > From a quick glimpse, it seems that the exact same topology is
> possible to
> > > express with the committable aggregator, but this definitely depends on
> > > the exact
> > > setup.
> > >
> > > Best,
> > > Fabian
>
>

Reply via email to