Thanks for clarifying (I was initially confused by merging state files
rather than output files).

> At some point, Flink will definitely have some WAL adapter that can turn any 
> sink into an exactly-once sink (with some caveats). For now, we keep that as 
> an orthogonal solution as it has a rather high price (bursty workload with 
> high latency). Ideally, we can keep the compaction asynchronously...

Yes, that would be something like a WAL. I agree that it would have a
different set of trade-offs.


Regards,
Roman

On Mon, Nov 29, 2021 at 3:33 PM Arvid Heise <ar...@apache.org> wrote:
>>
>> > One way to avoid write-read-merge is by wrapping SinkWriter with
>> > another one, which would buffer input elements in a temporary storage
>> > (e.g. local file) until a threshold is reached; after that, it would
>> > invoke the original SinkWriter. And if a checkpoint barrier comes in
>> > earlier, it would send written data to some aggregator.
>>
>> I think perhaps this seems to be a kind of WAL method? Namely we first
>> write the elements to some WAL logs and persist them on checkpoint
>> (in snapshot or remote FS), or we directly write WAL logs to the remote
>> FS eagerly.
>>
> At some point, Flink will definitely have some WAL adapter that can turn any 
> sink into an exactly-once sink (with some caveats). For now, we keep that as 
> an orthogonal solution as it has a rather high price (bursty workload with 
> high latency). Ideally, we can keep the compaction asynchronously...
>
> On Mon, Nov 29, 2021 at 8:52 AM Yun Gao <yungao...@aliyun.com.invalid> wrote:
>>
>> Hi,
>>
>> @Roman very sorry for the late response for a long time,
>>
>> > Merging artifacts from multiple checkpoints would apparently
>> require multiple concurrent checkpoints
>>
>> I think it might not need concurrent checkpoints: suppose some
>> operators (like the committer aggregator in the option 2) maintains
>> the list of files to merge, it could stores the lists of files to merge
>> in the states, then after several checkpoints are done and we have
>> enough files, we could merge all the files in the list.
>>
>> > Asynchronous merging in an aggregator would require some resolution
>> > logic on recovery, so that a merged artifact can be used if the
>> > original one was deleted. Otherwise, wouldn't recovery fail because
>> > some artifacts are missing?
>> > We could also defer deletion until the "compacted" checkpoint is
>> > subsumed - but isn't it too late, as it will be deleted anyways once
>> > subsumed?
>>
>> I think logically we could delete the original files once the "compacted" 
>> checkpoint
>> (which finish merging the compacted files and record it in the checkpoint) 
>> is completed
>> in all the options. If there are failover before we it, we could restart the 
>> merging and if
>> there are failover after it, we could have already recorded the files in the 
>> checkpoint.
>>
>> > One way to avoid write-read-merge is by wrapping SinkWriter with
>> > another one, which would buffer input elements in a temporary storage
>> > (e.g. local file) until a threshold is reached; after that, it would
>> > invoke the original SinkWriter. And if a checkpoint barrier comes in
>> > earlier, it would send written data to some aggregator.
>>
>> I think perhaps this seems to be a kind of WAL method? Namely we first
>> write the elements to some WAL logs and persist them on checkpoint
>> (in snapshot or remote FS), or we directly write WAL logs to the remote
>> FS eagerly.
>>
>> Sorry if I do not understand correctly somewhere.
>>
>> Best,
>> Yun
>>
>>
>> ------------------------------------------------------------------
>> From:Roman Khachatryan <ro...@apache.org>
>> Send Time:2021 Nov. 9 (Tue.) 22:03
>> To:dev <dev@flink.apache.org>
>> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support 
>> small file compaction
>>
>> Hi everyone,
>>
>> Thanks for the proposal and the discussion, I have some remarks:
>> (I'm not very familiar with the new Sink API but I thought about the
>> same problem in context of the changelog state backend)
>>
>> 1. Merging artifacts from multiple checkpoints would apparently
>> require multiple concurrent checkpoints (otherwise, a new checkpoint
>> won't be started before completing the previous one; and the previous
>> one can't be completed before durably storing the artifacts). However,
>> concurrent checkpoints are currently not supported with Unaligned
>> checkpoints (this is besides increasing e2e-latency).
>>
>> 2. Asynchronous merging in an aggregator would require some resolution
>> logic on recovery, so that a merged artifact can be used if the
>> original one was deleted. Otherwise, wouldn't recovery fail because
>> some artifacts are missing?
>> We could also defer deletion until the "compacted" checkpoint is
>> subsumed - but isn't it too late, as it will be deleted anyways once
>> subsumed?
>>
>> 3. Writing small files, then reading and merging them for *every*
>> checkpoint seems worse than only reading them on recovery. I guess I'm
>> missing some cases of reading, so to me it would make sense to mention
>> these cases explicitly in the FLIP motivation section.
>>
>> 4. One way to avoid write-read-merge is by wrapping SinkWriter with
>> another one, which would buffer input elements in a temporary storage
>> (e.g. local file) until a threshold is reached; after that, it would
>> invoke the original SinkWriter. And if a checkpoint barrier comes in
>> earlier, it would send written data to some aggregator. It will
>> increase checkpoint delay (async phase) compared to the current Flink;
>> but not compared to the write-read-merge solution, IIUC.
>> Then such "BufferingSinkWriters" could aggregate input elements from
>> each other, potentially recursively (I mean something like
>> https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png
>> )
>>
>> 5. Reducing the number of files by reducing aggregator parallelism as
>> opposed to merging on reaching size threshold will likely be less
>> optimal and more difficult to configure. OTH, thresholds might be more
>> difficult to implement and (with recursive merging) would incur higher
>> latency. Maybe that's also something to decide explicitly or at least
>> mention in the FLIP.
>>
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei <leinuo...@gmail.com> wrote:
>> >
>> > Hi Fabian,
>> >
>> > Thanks for drafting the FLIP and trying to support small file compaction. I
>> > think this feature is very urgent and valuable for users(at least for me).
>> >
>> > Currently I am trying to support streaming rewrite(compact) for Iceberg on
>> > PR#3323 <https://github.com/apache/iceberg/pull/3323>. As Steven mentioned,
>> > Iceberg sink and compact data through the following steps:
>> > Step-1: Some parallel data writer(sinker) to write streaming data as files.
>> > Step-2: A single parallelism data files committer to commit the completed
>> > files as soon as possible to make them available.
>> > Step-3: Some parallel file rewriter(compactor) to collect committed files
>> > from multiple checkpoints, and rewriter(compact) them together once the
>> > total file size or number of files reach the threshold.
>> > Step-4: A single parallelism rewrite(compact) result committer to commit
>> > the rewritten(compacted) files to replace the old files and make them
>> > available.
>> >
>> >
>> > If Flink want to support small file compaction, some key point I think is
>> > necessary:
>> >
>> > 1, Compact files from multiple checkpoints.
>> > I totally agree with Jingsong, because completed file size usually could
>> > not reach the threshold in a single checkpoint. Especially for partitioned
>> > table, we need to compact the files of each partition, but usually the file
>> > size of each partition will be different and may not reach the merge
>> > threshold. If we compact these files, in a single checkpoint, regardless of
>> > whether the total file size reaches the threshold, then the value of
>> > compacting will be diminished and we will still get small files because
>> > these compacted files are not reach to target size. So we need the
>> > compactor to collect committed files from multiple checkpoints and compact
>> > them until they reach the threshold.
>> >
>> > 2, Separate write phase and compact phase.
>> > Users usually hope the data becomes available as soon as possible, and the
>> >  end-to-end latency is very important. I think we need to separate the
>> > write and compact phase. For the write phase, there include the Step-1
>> > and Step-2, we sink data as file and commit it pre checkpoint and 
>> > regardless
>> > of whether the file size it is. That could ensure the data will be
>> > available ASAP. For the compact phase, there include the Step-3
>> > and Step-4,  the compactor should collect committed files from multiple
>> > checkpoints and compact them asynchronously once they reach the threshold,
>> > and the compact committer will commit the  compaction result in the next
>> > checkpoint. We compact the committed files asynchronously because we don't
>> > want the compaction to affect the data sink or the whole pipeline.
>> >
>> > 3, Exactly once guarantee between write and compact phase.
>> > Once we separate write phase and compact phase, we need to consider
>> > how to guarantee
>> > the exact once semantic between two phases. We should not lose any data or
>> > files on the compactor(Step-3) in any case and cause the compaction result
>> > to be inconsistent with before. I think flink should provide an easy-to-use
>> > interface to make that easier.
>> >
>> > 4, Metadata operation and  compaction result validation.
>> > In the compact phase, there may be not only compact files, but also a lot
>> > of metadata operations, such as the iceberg needing to read/write manifest
>> > and do MOR. And we need some interface to support users to do some
>> > validation of the compaction result. I think these points should be
>> > considered when we design the compaction API.
>> >
>> >
>> > Back to FLIP-191, option 1 looks very complicated while option 2 is
>> > relatively simple, but neither of these two solutions separates the write
>> > phase from the compact phase. So I think we should consider the points I
>> > mentioned above. And if you have any other questions you can always feel
>> > free to reach out to me!
>> >
>> > BR,
>> > Reo
>> >
>> > Fabian Paul <fabianp...@ververica.com> 于2021年11月8日周一 下午7:59写道:
>> >
>> > > Hi all,
>> > >
>> > > Thanks for the lively discussions. I am really excited to see so many
>> > > people
>> > > participating in this thread. It also underlines the need that many 
>> > > people
>> > > would
>> > > like to see a solution soon.
>> > >
>> > > I have updated the FLIP and removed the parallelism configuration because
>> > > it is
>> > > unnecessary since users can configure a constant exchange key to send all
>> > > committables to only one committable aggregator.
>> > >
>> > >
>> > > 1. Burden for developers w.r.t batch stream unification.
>> > >
>> > > @yun @guowei, from a theoretical point you are right about exposing the
>> > > DataStream
>> > > API in the sink users have the full power to write correct batch and
>> > > streaming
>> > > sinks. I think in reality a lot of users still struggle to build 
>> > > pipelines
>> > > with
>> > > i.e. the operator pipeline which works correct in streaming and batch 
>> > > mode.
>> > > Another problem I see is by exposing more deeper concepts is that we
>> > > cannot do
>> > > any optimization because we cannot reason about how sinks are built in 
>> > > the
>> > > future.
>> > >
>> > > We should also try to steer users towards using only `Functions` to give
>> > > us more
>> > > flexibility to swap the internal operator representation. I agree with
>> > > @yun we
>> > > should try to make the `ProcessFunction` more versatile to work on that
>> > > goal but
>> > > I see this as unrelated to the FLIP.
>> > >
>> > >
>> > > 2. Regarding Commit / Global commit
>> > >
>> > > I envision the global committer to be specific depending on the data lake
>> > > solution you want to write to. However, it is entirely orthogonal to the
>> > > compaction.
>> > > Currently, I do not expect any changes w.r.t the Global commit introduces
>> > > by
>> > > this FLIP.
>> > >
>> > >
>> > > 3. Regarding the case of trans-checkpoints merging
>> > >
>> > > @yun, as user, I would expect that if the committer receives in a
>> > > checkpoint files
>> > > to merge/commit that these are also finished when the checkpoint 
>> > > finishes.
>> > > I think all sinks rely on this principle currently i.e., KafkaSink needs 
>> > > to
>> > > commit all open transactions until the next checkpoint can happen.
>> > >
>> > > Maybe in the future, we can somehow move the Committer#commit call to an
>> > > asynchronous execution, but we should discuss it as a separate thread.
>> > >
>> > > > We probably should first describe the different causes of small files 
>> > > > and
>> > > > what problems was this proposal trying to solve. I wrote a data 
>> > > > shuffling
>> > > > proposal [1] for Flink Iceberg sink (shared with Iceberg community 
>> > > > [2]).
>> > > It
>> > > > can address small files problems due to skewed data distribution across
>> > > > Iceberg table partitions. Streaming shuffling before writers (to files)
>> > > is
>> > > > typically more efficient than post-write file compaction (which 
>> > > > involves
>> > > > read-merge-write). It is usually cheaper to prevent a problem (small
>> > > files)
>> > > > than fixing it.
>> > >
>> > >
>> > > @steven you are raising a good point, although I think only using a
>> > > customizable
>> > > shuffle won't address the generation of small files. One assumption is 
>> > > that
>> > > at least the sink generates one file per subtask, which can already be 
>> > > too
>> > > many.
>> > > Another problem is that with low checkpointing intervals, the files do 
>> > > not
>> > > meet
>> > > the required size. The latter point is probably addressable by changing 
>> > > the
>> > > checkpoint interval, which might be inconvenient for some users.
>> > >
>> > > > The sink coordinator checkpoint problem (mentioned in option 1) would 
>> > > > be
>> > > > great if Flink can address it. In the spirit of source
>> > > (enumerator-reader)
>> > > > and sink (writer-coordinator) duality, sink coordinator checkpoint 
>> > > > should
>> > > > happen after the writer operator. This would be a natural fit to 
>> > > > support
>> > > > global committer in FLIP-143. It is probably an orthogonal matter to 
>> > > > this
>> > > > proposal.
>> > >
>> > >
>> > > To me the question here is what are the benefits of having a coordinator 
>> > > in
>> > > comparison to a global committer/aggregator operator.
>> > >
>> > > > Personally, I am usually in favor of keeping streaming ingestion (to 
>> > > > data
>> > > > lake) relatively simple and stable. Also sometimes compaction and 
>> > > > sorting
>> > > > are performed together in data rewrite maintenance jobs to improve read
>> > > > performance. In that case, the value of compacting (in Flink streaming
>> > > > ingestion) diminishes.
>> > >
>> > >
>> > > I agree it is always possible to have scheduled maintenance jobs keeping
>> > > care of
>> > > your data i.e., doing compaction. Unfortunately, the downside is that you
>> > > have to your data after it is already available for other downstream
>> > > consumers.
>> > > I guess this can lead to all kinds of visibility problems. I am also
>> > > surprised that
>> > > you personally are a fan of this approach and, on the other hand, are
>> > > developing
>> > > the Iceberg sink, which goes somewhat against your mentioned principle of
>> > > keeping
>> > > the sink simple.
>> > >
>> > > > Currently, it is unclear from the doc and this thread where the
>> > > compaction
>> > > > is actually happening. Jingsong's reply described one model
>> > > > writer (parallel) -> aggregator (single-parallelism compaction planner)
>> > > ->
>> > > > compactor (parallel) -> global committer (single-parallelism)
>> > >
>> > >
>> > > My idea of the topology is very similar to the one outlined by Jinsong. 
>> > > The
>> > > compaction will happen in the committer operator.
>> > >
>> > > >
>> > > > In the Iceberg community, the following model has been discussed. It is
>> > > > better for Iceberg because it won't delay the data availability.
>> > > > writer (parallel) -> global committer for append (single parallelism) 
>> > > > ->
>> > > > compactor (parallel) -> global committer for rewrite commit (single
>> > > > parallelism)
>> > >
>> > >
>> > > From a quick glimpse, it seems that the exact same topology is possible 
>> > > to
>> > > express with the committable aggregator, but this definitely depends on
>> > > the exact
>> > > setup.
>> > >
>> > > Best,
>> > > Fabian
>>

Reply via email to