Thanks Martijn,
I'm actually trying to run our V1 Delta connector on Flink 1.15 using
SinkV1Adapter with GlobalCommitterOperator.

Having said that, I might have found a potential issue with
GlobalCommitterOperator, checkpoitining and failover recovery [1].
For "normal" scenarios it does look good though.

Regards,
Krzysztof Chmielewski

[1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc

pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org>
napisał(a):

> Hi all,
>
> A couple of bits from when work was being done on the new sink: V1 is
> completely simulated as V2 [1]. V2 is strictly more expressive.
>
> If there's desire to stick to the `GlobalCommitter` interface, have a
> look at the StandardSinkTopologies. Or you can just add your own more
> fitting PostCommitTopology. The important part to remember is that this
> topology is lagging one checkpoint behind in terms of fault-tolerance: it
> only receives data once the committer committed
> on notifyCheckpointComplete. Thus, the global committer needs to be
> idempotent and able to restore the actual state on recovery. That
> limitation is coming in from Flink's checkpointing behaviour and applies to
> both V1 and V2. GlobalCommitterOperator is abstracting these issues along
> with handling retries (so commits that happen much later). So it's probably
> a good place to start just with the standard topology.
>
> Best regards,
>
> Martijn
>
> [1]
>
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>
> Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com>:
>
> > Hi,
> > Krzysztof Chmielewski [1] from Delta-Flink connector open source
> community
> > here [2].
> >
> > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> > something exactly what Flink-Delta Sink needs since it is the place where
> > we do an actual commit to the Delta Log which should be done from a one
> > place/instance.
> >
> > Currently I'm evaluating V2 for our connector and having, how Steven
> > described it a "more natural, built-in concept/support of GlobalCommitter
> > in the sink v2 interface" would be greatly appreciated.
> >
> > Cheers,
> > Krzysztof Chmielewski
> >
> > [1] https://github.com/kristoffSC
> > [2] https://github.com/delta-io/connectors/tree/master/flink
> >
> > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com> napisał(a):
> >
> > > Hi Yun,
> > >
> > > Thanks a lot for the reply!
> > >
> > > While we can add the global committer in the WithPostCommitTopology,
> the
> > > semantics are weird. The Commit stage actually didn't commit anything
> to
> > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
> > > happens.
> > >
> > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
> > sink
> > > interface [1]. I think it might have the same issue when switching to
> the
> > > V2 sink interface.
> > >
> > > For data lake storages (like Iceberg, DeltaLake) or any storage with
> > global
> > > transactional commit, it would be more natural to have a built-in
> > > concept/support of GlobalCommitter in the sink v2 interface.
> > >
> > > Thanks,
> > > Steven
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> > >
> > >
> > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid>
> > > wrote:
> > >
> > > > Hi Steven, Liwei,
> > > > Very sorry for missing this mail and response very late.
> > > > I think the initial thought is indeed to use `WithPostCommitTopology`
> > as
> > > > a replacement of the original GlobalCommitter, and currently the
> > adapter
> > > of
> > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > > > interface
> > > > onto an implementation of `WithPostCommitTopology`.
> > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
> > seems
> > > > to
> > > > me it could support both global committer and small file compaction?
> We
> > > > might
> > > > have an `WithPostCommitTopology` implementation like
> > > > DataStream ds = add global committer;
> > > > if (enable file compaction) {
> > > >  build the compaction subgraph from ds
> > > > }
> > > > Best,
> > > > Yun
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > >
> > > > ------------------------------------------------------------------
> > > > From:Steven Wu <stevenz...@gmail.com>
> > > > Send Time:2022 Aug. 17 (Wed.) 07:30
> > > > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com>
> > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > > > Plus, it will disable the future capability of small file
> compaction
> > > > stage post commit.
> > > > I should clarify this comment. if we are using the
> > > `WithPostCommitTopology`
> > > > for global committer, we would lose the capability of using the post
> > > commit
> > > > stage for small files compaction.
> > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com>
> > wrote:
> > > > >
> > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
> > With
> > > > the
> > > > > V2 sink interface, GlobalCommitter has been deprecated by
> > > > > WithPostCommitTopology. I thought the post commit stage is mainly
> for
> > > > async
> > > > > maintenance (like compaction).
> > > > >
> > > > > Are we supposed to do sth similar to the
> GlobalCommittingSinkAdapter?
> > > It
> > > > > seems like a temporary transition plan for bridging v1 sinks to v2
> > > > > interfaces.
> > > > >
> > > > > private class GlobalCommittingSinkAdapter extends
> > > > TwoPhaseCommittingSinkAdapter
> > > > > implements WithPostCommitTopology<InputT, CommT> {
> > > > > @Override
> > > > > public void
> > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > > > committables) {
> > > > > StandardSinkTopologies.addGlobalCommitter(
> > > > > committables,
> > > > > GlobalCommitterAdapter::new,
> > > > > () -> sink.getCommittableSerializer().get());
> > > > > }
> > > > > }
> > > > >
> > > > >
> > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
> used
> > > the
> > > > > "global" partitioner to force all committables go to a single
> > committer
> > > > > task 0. It will effectively force a global committer disguised in
> the
> > > > > parallel committers. It is a little weird and also can lead to
> > > questions
> > > > > why other committer tasks are not getting any messages. Plus, it
> will
> > > > > disable the future capability of small file compaction stage post
> > > commit.
> > > > > Hence, I am asking what is the right approach to achieve global
> > > committer
> > > > > behavior.
> > > > >
> > > > > Thanks,
> > > > > Steven
> > > > >
> > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > > > >
> > > >
> > >
> >
>

Reply via email to