Hi Fabian,

quick question on your comment 3. If there is a pipelined data exchange
with a keyBy between the writers/committers and the component that does the
global commit, then there will only be a single failover region. So is it
correct that you assumed blocking data exchanges for the scenario you
described?

Cheers,
Till

On Thu, Dec 16, 2021 at 9:23 AM Fabian Paul <fp...@apache.org> wrote:

> Hi Yun,
>
> Thanks for your fast feedback. Let me clarify your points.
>
> 1. We solve it by using StreamExchangeMode.BATCH before any exchange.
> That obviously doesn’t help with lost TM but we would need to employ
> HA storage for that. Same issue as now and orthogonal.
>
> 2. Extending V1 with V2 or vice versa would require renames of methods
> (since return types are non-optional) and is making API changes. Even
> though Experimental, we want to give connector developers the
> opportunity to provide 1 implementation for all of Flink 1.X. We will
> offer an internal adapter from V1 to V2, 2 sinkTo , and internally
> just have one code-path.
>
> 3. DataStreamSink would act as a unified view on all the operators and
> update them all at once when using setParallelism and so on (setName
> and setUid will receive suffixes per operator).
> Iceberg actually has a different requirement: They want to have a
> committer with parallelism 1 but as a coordinator such that
> embarrassingly parallel pipelines have different fail-over regions. I
> was thinking that in this case, they need to implement a no-op
> committer (that just forwards the committables) and use a post-commit
> topology that achieves that.
> Another option is that they use the preCommit topology and insert a
> constant key-by that forwards all committables to a single committer.
> We are planning to provide building blocks for such pipelines as we
> go.
>
> Best,
> Fabian
>
> On Thu, Dec 16, 2021 at 5:50 AM Yun Gao <yungao...@aliyun.com> wrote:
> >
> > Hi Fabian,
> >
> > Very thanks for the update! I think the latest version in general looks
> good from my side
> > and I think using separate feature interface would be much more easy to
> understand
> > and extend in the future. I have some pending issues on the details
> though:
> >
> > 1. The first one is if we could support end-to-end exactly-once with
> post-committing
> > topology in the batch mode ? Since for the batch mode, currently we
> could only commit
> >  all the transactions after the whole job is finished, otherwise if
> there are JM failover or the
> > writer / committer get restarted due to indeterminstic (A -> [B1, B2],
> A, B1 have finished and
> >  B2 failed, if -> is rebalance / random / rescale,  all of A, B1, B2
> would restarted), there might
> > be repeat records. Previously one possible thought is to move committer
> and global committer
> >  to the operator coordinator, but if it is a topology, we might need
> some other kind of solutions?
> >
> > 2. I also want to have a dobule confirmation with the compatibility:
> since the old sink is also named
> > with Sink, do we want to put the Sink v2 in a new package ? Besides,
> since we might want to keep
> > only have one `sinkTo(Sink<?> sink)` , perhaps we also need to make the
> Sink v1 to be a subclass of
> > Sink v2 and extends the stateful and two-phase-commit sinks, right?
> >
> > 3. I'd like also have a confirmation on ours thoughts with the
> `DataStreamSink` returned by the sinkTo method:
> > The main issue is how do we implement the method like `setParallelism`
> or `setMaxParallelism` since now the sink
> > would be translated to multiple transformations? perhaps we could make
> it the default values for all the transformations
> > for the sink? A related issue would be for iceberg sink, I think it
> would need to have only one committer to avoid the
> > competition of the optimistic locks (which would cause performance
> degradation), then it might need to have N writers
> > with 1 committers, to build such topology, perhaps we might need to add
> new methods to specify the parallelism of
> > the writers and committers separately?
> >
> > Best,
> > Yun
> >
> >
> > ------------------Original Mail ------------------
> > Sender:Fabian Paul <fp...@apache.org>
> > Send Date:Mon Dec 13 23:59:43 2021
> > Recipients:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support
> small file compaction
> >>
> >> Hi all,
> >>
> >>
> >>
> >> After a lot of discussions with different, we received very fruitful
> >>
> >> feedback and reworked the ideas behind this FLIP. Initially, we had
> >>
> >> the impression that the compaction problem is solvable by a single
> >>
> >> topology that we can reuse across different sinks. We now have a
> >>
> >> better understanding that different external systems require different
> >>
> >> compaction mechanism i.e. Hive requires compaction before finally
> >>
> >> registering the file in the metastore or Iceberg compacts the files
> >>
> >> after they have been registered and just lazily compacts them.
> >>
> >>
> >>
> >> Considering all these different views we came up with a design that
> >>
> >> builds upon what @guowei....@gmail.com and @yungao...@aliyun.com have
> >>
> >> proposed at the beginning. We allow inserting custom topologies before
> >>
> >> and after the SinkWriters and Committers. Furthermore, we do not see
> >>
> >> it as a downside. The Sink interfaces that will expose the DataStream
> >>
> >> to the user reside in flink-streaming-java in contrast to the basic
> >>
> >> Sink interfaces that reside fin flink-core deem it to be only used by
> >>
> >> expert users.
> >>
> >>
> >>
> >> Moreover, we also wanted to remove the global committer from the
> >>
> >> unified Sink interfaces and replace it with a custom post-commit
> >>
> >> topology. Unfortunately, we cannot do it without breaking the Sink
> >>
> >> interface since the GlobalCommittables are part of the parameterized
> >>
> >> Sink interface. Thus, we propose building a new Sink V2 interface
> >>
> >> consisting of composable interfaces that do not offer the
> >>
> >> GlobalCommitter anymore. We will implement a utility to extend a Sink
> >>
> >> with post topology that mimics the behavior of the GlobalCommitter.
> >>
> >> The new Sink V2 provides the same sort of methods as the Sink V1
> >>
> >> interface, so a migration of sinks that do not use the GlobalCommitter
> >>
> >> should be very easy.
> >>
> >> We plan to keep the existing Sink V1 interfaces to not break
> >>
> >> externally built sinks. As part of this FLIP, we migrate all the
> >>
> >> connectors inside of the main repository to the new Sink V2 API.
> >>
> >>
> >>
> >> The FLIP document is also updated and includes the proposed changes.
> >>
> >>
> >>
> >> Looking forward to your feedback,
> >>
> >> Fabian
> >>
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Dec 2, 2021 at 10:15 AM Roman Khachatryan wrote:
> >>
> >> >
> >>
> >> > Thanks for clarifying (I was initially confused by merging state files
> >>
> >> > rather than output files).
> >>
> >> >
> >>
> >> > > At some point, Flink will definitely have some WAL adapter that can
> turn any sink into an exactly-once sink (with some caveats). For now, we
> keep that as an orthogonal solution as it has a rather high price (bursty
> workload with high latency). Ideally, we can keep the compaction
> asynchronously...
> >>
> >> >
> >>
> >> > Yes, that would be something like a WAL. I agree that it would have a
> >>
> >> > different set of trade-offs.
> >>
> >> >
> >>
> >> >
> >>
> >> > Regards,
> >>
> >> > Roman
> >>
> >> >
> >>
> >> > On Mon, Nov 29, 2021 at 3:33 PM Arvid Heise wrote:
> >>
> >> > >>
> >>
> >> > >> > One way to avoid write-read-merge is by wrapping SinkWriter with
> >>
> >> > >> > another one, which would buffer input elements in a temporary
> storage
> >>
> >> > >> > (e.g. local file) until a threshold is reached; after that, it
> would
> >>
> >> > >> > invoke the original SinkWriter. And if a checkpoint barrier
> comes in
> >>
> >> > >> > earlier, it would send written data to some aggregator.
> >>
> >> > >>
> >>
> >> > >> I think perhaps this seems to be a kind of WAL method? Namely we
> first
> >>
> >> > >> write the elements to some WAL logs and persist them on checkpoint
> >>
> >> > >> (in snapshot or remote FS), or we directly write WAL logs to the
> remote
> >>
> >> > >> FS eagerly.
> >>
> >> > >>
> >>
> >> > > At some point, Flink will definitely have some WAL adapter that can
> turn any sink into an exactly-once sink (with some caveats). For now, we
> keep that as an orthogonal solution as it has a rather high price (bursty
> workload with high latency). Ideally, we can keep the compaction
> asynchronously...
> >>
> >> > >
> >>
> >> > > On Mon, Nov 29, 2021 at 8:52 AM Yun Gao wrote:
> >>
> >> > >>
> >>
> >> > >> Hi,
> >>
> >> > >>
> >>
> >> > >> @Roman very sorry for the late response for a long time,
> >>
> >> > >>
> >>
> >> > >> > Merging artifacts from multiple checkpoints would apparently
> >>
> >> > >> require multiple concurrent checkpoints
> >>
> >> > >>
> >>
> >> > >> I think it might not need concurrent checkpoints: suppose some
> >>
> >> > >> operators (like the committer aggregator in the option 2) maintains
> >>
> >> > >> the list of files to merge, it could stores the lists of files to
> merge
> >>
> >> > >> in the states, then after several checkpoints are done and we have
> >>
> >> > >> enough files, we could merge all the files in the list.
> >>
> >> > >>
> >>
> >> > >> > Asynchronous merging in an aggregator would require some
> resolution
> >>
> >> > >> > logic on recovery, so that a merged artifact can be used if the
> >>
> >> > >> > original one was deleted. Otherwise, wouldn't recovery fail
> because
> >>
> >> > >> > some artifacts are missing?
> >>
> >> > >> > We could also defer deletion until the "compacted" checkpoint is
> >>
> >> > >> > subsumed - but isn't it too late, as it will be deleted anyways
> once
> >>
> >> > >> > subsumed?
> >>
> >> > >>
> >>
> >> > >> I think logically we could delete the original files once the
> "compacted" checkpoint
> >>
> >> > >> (which finish merging the compacted files and record it in the
> checkpoint) is completed
> >>
> >> > >> in all the options. If there are failover before we it, we could
> restart the merging and if
> >>
> >> > >> there are failover after it, we could have already recorded the
> files in the checkpoint.
> >>
> >> > >>
> >>
> >> > >> > One way to avoid write-read-merge is by wrapping SinkWriter with
> >>
> >> > >> > another one, which would buffer input elements in a temporary
> storage
> >>
> >> > >> > (e.g. local file) until a threshold is reached; after that, it
> would
> >>
> >> > >> > invoke the original SinkWriter. And if a checkpoint barrier
> comes in
> >>
> >> > >> > earlier, it would send written data to some aggregator.
> >>
> >> > >>
> >>
> >> > >> I think perhaps this seems to be a kind of WAL method? Namely we
> first
> >>
> >> > >> write the elements to some WAL logs and persist them on checkpoint
> >>
> >> > >> (in snapshot or remote FS), or we directly write WAL logs to the
> remote
> >>
> >> > >> FS eagerly.
> >>
> >> > >>
> >>
> >> > >> Sorry if I do not understand correctly somewhere.
> >>
> >> > >>
> >>
> >> > >> Best,
> >>
> >> > >> Yun
> >>
> >> > >>
> >>
> >> > >>
> >>
> >> > >> ------------------------------------------------------------------
> >>
> >> > >> From:Roman Khachatryan
> >>
> >> > >> Send Time:2021 Nov. 9 (Tue.) 22:03
> >>
> >> > >> To:dev
> >>
> >> > >> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to
> support small file compaction
> >>
> >> > >>
> >>
> >> > >> Hi everyone,
> >>
> >> > >>
> >>
> >> > >> Thanks for the proposal and the discussion, I have some remarks:
> >>
> >> > >> (I'm not very familiar with the new Sink API but I thought about
> the
> >>
> >> > >> same problem in context of the changelog state backend)
> >>
> >> > >>
> >>
> >> > >> 1. Merging artifacts from multiple checkpoints would apparently
> >>
> >> > >> require multiple concurrent checkpoints (otherwise, a new
> checkpoint
> >>
> >> > >> won't be started before completing the previous one; and the
> previous
> >>
> >> > >> one can't be completed before durably storing the artifacts).
> However,
> >>
> >> > >> concurrent checkpoints are currently not supported with Unaligned
> >>
> >> > >> checkpoints (this is besides increasing e2e-latency).
> >>
> >> > >>
> >>
> >> > >> 2. Asynchronous merging in an aggregator would require some
> resolution
> >>
> >> > >> logic on recovery, so that a merged artifact can be used if the
> >>
> >> > >> original one was deleted. Otherwise, wouldn't recovery fail because
> >>
> >> > >> some artifacts are missing?
> >>
> >> > >> We could also defer deletion until the "compacted" checkpoint is
> >>
> >> > >> subsumed - but isn't it too late, as it will be deleted anyways
> once
> >>
> >> > >> subsumed?
> >>
> >> > >>
> >>
> >> > >> 3. Writing small files, then reading and merging them for *every*
> >>
> >> > >> checkpoint seems worse than only reading them on recovery. I guess
> I'm
> >>
> >> > >> missing some cases of reading, so to me it would make sense to
> mention
> >>
> >> > >> these cases explicitly in the FLIP motivation section.
> >>
> >> > >>
> >>
> >> > >> 4. One way to avoid write-read-merge is by wrapping SinkWriter with
> >>
> >> > >> another one, which would buffer input elements in a temporary
> storage
> >>
> >> > >> (e.g. local file) until a threshold is reached; after that, it
> would
> >>
> >> > >> invoke the original SinkWriter. And if a checkpoint barrier comes
> in
> >>
> >> > >> earlier, it would send written data to some aggregator. It will
> >>
> >> > >> increase checkpoint delay (async phase) compared to the current
> Flink;
> >>
> >> > >> but not compared to the write-read-merge solution, IIUC.
> >>
> >> > >> Then such "BufferingSinkWriters" could aggregate input elements
> from
> >>
> >> > >> each other, potentially recursively (I mean something like
> >>
> >> > >>
> https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png
> >>
> >> > >> )
> >>
> >> > >>
> >>
> >> > >> 5. Reducing the number of files by reducing aggregator parallelism
> as
> >>
> >> > >> opposed to merging on reaching size threshold will likely be less
> >>
> >> > >> optimal and more difficult to configure. OTH, thresholds might be
> more
> >>
> >> > >> difficult to implement and (with recursive merging) would incur
> higher
> >>
> >> > >> latency. Maybe that's also something to decide explicitly or at
> least
> >>
> >> > >> mention in the FLIP.
> >>
> >> > >>
> >>
> >> > >>
> >>
> >> > >>
> >>
> >> > >> Regards,
> >>
> >> > >> Roman
> >>
> >> > >>
> >>
> >> > >>
> >>
> >> > >> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei wrote:
> >>
> >> > >> >
> >>
> >> > >> > Hi Fabian,
> >>
> >> > >> >
> >>
> >> > >> > Thanks for drafting the FLIP and trying to support small file
> compaction. I
> >>
> >> > >> > think this feature is very urgent and valuable for users(at
> least for me).
> >>
> >> > >> >
> >>
> >> > >> > Currently I am trying to support streaming rewrite(compact) for
> Iceberg on
> >>
> >> > >> > PR#3323 . As Steven mentioned,
> >>
> >> > >> > Iceberg sink and compact data through the following steps:
> >>
> >> > >> > Step-1: Some parallel data writer(sinker) to write streaming
> data as files.
> >>
> >> > >> > Step-2: A single parallelism data files committer to commit the
> completed
> >>
> >> > >> > files as soon as possible to make them available.
> >>
> >> > >> > Step-3: Some parallel file rewriter(compactor) to collect
> committed files
> >>
> >> > >> > from multiple checkpoints, and rewriter(compact) them together
> once the
> >>
> >> > >> > total file size or number of files reach the threshold.
> >>
> >> > >> > Step-4: A single parallelism rewrite(compact) result committer
> to commit
> >>
> >> > >> > the rewritten(compacted) files to replace the old files and make
> them
> >>
> >> > >> > available.
> >>
> >> > >> >
> >>
> >> > >> >
> >>
> >> > >> > If Flink want to support small file compaction, some key point I
> think is
> >>
> >> > >> > necessary:
> >>
> >> > >> >
> >>
> >> > >> > 1, Compact files from multiple checkpoints.
> >>
> >> > >> > I totally agree with Jingsong, because completed file size
> usually could
> >>
> >> > >> > not reach the threshold in a single checkpoint. Especially for
> partitioned
> >>
> >> > >> > table, we need to compact the files of each partition, but
> usually the file
> >>
> >> > >> > size of each partition will be different and may not reach the
> merge
> >>
> >> > >> > threshold. If we compact these files, in a single checkpoint,
> regardless of
> >>
> >> > >> > whether the total file size reaches the threshold, then the
> value of
> >>
> >> > >> > compacting will be diminished and we will still get small files
> because
> >>
> >> > >> > these compacted files are not reach to target size. So we need
> the
> >>
> >> > >> > compactor to collect committed files from multiple checkpoints
> and compact
> >>
> >> > >> > them until they reach the threshold.
> >>
> >> > >> >
> >>
> >> > >> > 2, Separate write phase and compact phase.
> >>
> >> > >> > Users usually hope the data becomes available as soon as
> possible, and the
> >>
> >> > >> > end-to-end latency is very important. I think we need to
> separate the
> >>
> >> > >> > write and compact phase. For the write phase, there include the
> Step-1
> >>
> >> > >> > and Step-2, we sink data as file and commit it pre checkpoint
> and regardless
> >>
> >> > >> > of whether the file size it is. That could ensure the data will
> be
> >>
> >> > >> > available ASAP. For the compact phase, there include the Step-3
> >>
> >> > >> > and Step-4, the compactor should collect committed files from
> multiple
> >>
> >> > >> > checkpoints and compact them asynchronously once they reach the
> threshold,
> >>
> >> > >> > and the compact committer will commit the compaction result in
> the next
> >>
> >> > >> > checkpoint. We compact the committed files asynchronously
> because we don't
> >>
> >> > >> > want the compaction to affect the data sink or the whole
> pipeline.
> >>
> >> > >> >
> >>
> >> > >> > 3, Exactly once guarantee between write and compact phase.
> >>
> >> > >> > Once we separate write phase and compact phase, we need to
> consider
> >>
> >> > >> > how to guarantee
> >>
> >> > >> > the exact once semantic between two phases. We should not lose
> any data or
> >>
> >> > >> > files on the compactor(Step-3) in any case and cause the
> compaction result
> >>
> >> > >> > to be inconsistent with before. I think flink should provide an
> easy-to-use
> >>
> >> > >> > interface to make that easier.
> >>
> >> > >> >
> >>
> >> > >> > 4, Metadata operation and compaction result validation.
> >>
> >> > >> > In the compact phase, there may be not only compact files, but
> also a lot
> >>
> >> > >> > of metadata operations, such as the iceberg needing to
> read/write manifest
> >>
> >> > >> > and do MOR. And we need some interface to support users to do
> some
> >>
> >> > >> > validation of the compaction result. I think these points should
> be
> >>
> >> > >> > considered when we design the compaction API.
> >>
> >> > >> >
> >>
> >> > >> >
> >>
> >> > >> > Back to FLIP-191, option 1 looks very complicated while option 2
> is
> >>
> >> > >> > relatively simple, but neither of these two solutions separates
> the write
> >>
> >> > >> > phase from the compact phase. So I think we should consider the
> points I
> >>
> >> > >> > mentioned above. And if you have any other questions you can
> always feel
> >>
> >> > >> > free to reach out to me!
> >>
> >> > >> >
> >>
> >> > >> > BR,
> >>
> >> > >> > Reo
> >>
> >> > >> >
> >>
> >> > >> > Fabian Paul 于2021年11月8日周一 下午7:59写道:
> >>
> >> > >> >
> >>
> >> > >> > > Hi all,
> >>
> >> > >> > >
> >>
> >> > >> > > Thanks for the lively discussions. I am really excited to see
> so many
> >>
> >> > >> > > people
> >>
> >> > >> > > participating in this thread. It also underlines the need that
> many people
> >>
> >> > >> > > would
> >>
> >> > >> > > like to see a solution soon.
> >>
> >> > >> > >
> >>
> >> > >> > > I have updated the FLIP and removed the parallelism
> configuration because
> >>
> >> > >> > > it is
> >>
> >> > >> > > unnecessary since users can configure a constant exchange key
> to send all
> >>
> >> > >> > > committables to only one committable aggregator.
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > 1. Burden for developers w.r.t batch stream unification.
> >>
> >> > >> > >
> >>
> >> > >> > > @yun @guowei, from a theoretical point you are right about
> exposing the
> >>
> >> > >> > > DataStream
> >>
> >> > >> > > API in the sink users have the full power to write correct
> batch and
> >>
> >> > >> > > streaming
> >>
> >> > >> > > sinks. I think in reality a lot of users still struggle to
> build pipelines
> >>
> >> > >> > > with
> >>
> >> > >> > > i.e. the operator pipeline which works correct in streaming
> and batch mode.
> >>
> >> > >> > > Another problem I see is by exposing more deeper concepts is
> that we
> >>
> >> > >> > > cannot do
> >>
> >> > >> > > any optimization because we cannot reason about how sinks are
> built in the
> >>
> >> > >> > > future.
> >>
> >> > >> > >
> >>
> >> > >> > > We should also try to steer users towards using only
> `Functions` to give
> >>
> >> > >> > > us more
> >>
> >> > >> > > flexibility to swap the internal operator representation. I
> agree with
> >>
> >> > >> > > @yun we
> >>
> >> > >> > > should try to make the `ProcessFunction` more versatile to
> work on that
> >>
> >> > >> > > goal but
> >>
> >> > >> > > I see this as unrelated to the FLIP.
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > 2. Regarding Commit / Global commit
> >>
> >> > >> > >
> >>
> >> > >> > > I envision the global committer to be specific depending on
> the data lake
> >>
> >> > >> > > solution you want to write to. However, it is entirely
> orthogonal to the
> >>
> >> > >> > > compaction.
> >>
> >> > >> > > Currently, I do not expect any changes w.r.t the Global commit
> introduces
> >>
> >> > >> > > by
> >>
> >> > >> > > this FLIP.
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > 3. Regarding the case of trans-checkpoints merging
> >>
> >> > >> > >
> >>
> >> > >> > > @yun, as user, I would expect that if the committer receives
> in a
> >>
> >> > >> > > checkpoint files
> >>
> >> > >> > > to merge/commit that these are also finished when the
> checkpoint finishes.
> >>
> >> > >> > > I think all sinks rely on this principle currently i.e.,
> KafkaSink needs to
> >>
> >> > >> > > commit all open transactions until the next checkpoint can
> happen.
> >>
> >> > >> > >
> >>
> >> > >> > > Maybe in the future, we can somehow move the Committer#commit
> call to an
> >>
> >> > >> > > asynchronous execution, but we should discuss it as a separate
> thread.
> >>
> >> > >> > >
> >>
> >> > >> > > > We probably should first describe the different causes of
> small files and
> >>
> >> > >> > > > what problems was this proposal trying to solve. I wrote a
> data shuffling
> >>
> >> > >> > > > proposal [1] for Flink Iceberg sink (shared with Iceberg
> community [2]).
> >>
> >> > >> > > It
> >>
> >> > >> > > > can address small files problems due to skewed data
> distribution across
> >>
> >> > >> > > > Iceberg table partitions. Streaming shuffling before writers
> (to files)
> >>
> >> > >> > > is
> >>
> >> > >> > > > typically more efficient than post-write file compaction
> (which involves
> >>
> >> > >> > > > read-merge-write). It is usually cheaper to prevent a
> problem (small
> >>
> >> > >> > > files)
> >>
> >> > >> > > > than fixing it.
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > @steven you are raising a good point, although I think only
> using a
> >>
> >> > >> > > customizable
> >>
> >> > >> > > shuffle won't address the generation of small files. One
> assumption is that
> >>
> >> > >> > > at least the sink generates one file per subtask, which can
> already be too
> >>
> >> > >> > > many.
> >>
> >> > >> > > Another problem is that with low checkpointing intervals, the
> files do not
> >>
> >> > >> > > meet
> >>
> >> > >> > > the required size. The latter point is probably addressable by
> changing the
> >>
> >> > >> > > checkpoint interval, which might be inconvenient for some
> users.
> >>
> >> > >> > >
> >>
> >> > >> > > > The sink coordinator checkpoint problem (mentioned in option
> 1) would be
> >>
> >> > >> > > > great if Flink can address it. In the spirit of source
> >>
> >> > >> > > (enumerator-reader)
> >>
> >> > >> > > > and sink (writer-coordinator) duality, sink coordinator
> checkpoint should
> >>
> >> > >> > > > happen after the writer operator. This would be a natural
> fit to support
> >>
> >> > >> > > > global committer in FLIP-143. It is probably an orthogonal
> matter to this
> >>
> >> > >> > > > proposal.
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > To me the question here is what are the benefits of having a
> coordinator in
> >>
> >> > >> > > comparison to a global committer/aggregator operator.
> >>
> >> > >> > >
> >>
> >> > >> > > > Personally, I am usually in favor of keeping streaming
> ingestion (to data
> >>
> >> > >> > > > lake) relatively simple and stable. Also sometimes
> compaction and sorting
> >>
> >> > >> > > > are performed together in data rewrite maintenance jobs to
> improve read
> >>
> >> > >> > > > performance. In that case, the value of compacting (in Flink
> streaming
> >>
> >> > >> > > > ingestion) diminishes.
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > I agree it is always possible to have scheduled maintenance
> jobs keeping
> >>
> >> > >> > > care of
> >>
> >> > >> > > your data i.e., doing compaction. Unfortunately, the downside
> is that you
> >>
> >> > >> > > have to your data after it is already available for other
> downstream
> >>
> >> > >> > > consumers.
> >>
> >> > >> > > I guess this can lead to all kinds of visibility problems. I
> am also
> >>
> >> > >> > > surprised that
> >>
> >> > >> > > you personally are a fan of this approach and, on the other
> hand, are
> >>
> >> > >> > > developing
> >>
> >> > >> > > the Iceberg sink, which goes somewhat against your mentioned
> principle of
> >>
> >> > >> > > keeping
> >>
> >> > >> > > the sink simple.
> >>
> >> > >> > >
> >>
> >> > >> > > > Currently, it is unclear from the doc and this thread where
> the
> >>
> >> > >> > > compaction
> >>
> >> > >> > > > is actually happening. Jingsong's reply described one model
> >>
> >> > >> > > > writer (parallel) -> aggregator (single-parallelism
> compaction planner)
> >>
> >> > >> > > ->
> >>
> >> > >> > > > compactor (parallel) -> global committer (single-parallelism)
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > My idea of the topology is very similar to the one outlined by
> Jinsong. The
> >>
> >> > >> > > compaction will happen in the committer operator.
> >>
> >> > >> > >
> >>
> >> > >> > > >
> >>
> >> > >> > > > In the Iceberg community, the following model has been
> discussed. It is
> >>
> >> > >> > > > better for Iceberg because it won't delay the data
> availability.
> >>
> >> > >> > > > writer (parallel) -> global committer for append (single
> parallelism) ->
> >>
> >> > >> > > > compactor (parallel) -> global committer for rewrite commit
> (single
> >>
> >> > >> > > > parallelism)
> >>
> >> > >> > >
> >>
> >> > >> > >
> >>
> >> > >> > > From a quick glimpse, it seems that the exact same topology is
> possible to
> >>
> >> > >> > > express with the committable aggregator, but this definitely
> depends on
> >>
> >> > >> > > the exact
> >>
> >> > >> > > setup.
> >>
> >> > >> > >
> >>
> >> > >> > > Best,
> >>
> >> > >> > > Fabian
> >>
> >> > >>
>

Reply via email to