Hi all, A couple of bits from when work was being done on the new sink: V1 is completely simulated as V2 [1]. V2 is strictly more expressive.
If there's desire to stick to the `GlobalCommitter` interface, have a look at the StandardSinkTopologies. Or you can just add your own more fitting PostCommitTopology. The important part to remember is that this topology is lagging one checkpoint behind in terms of fault-tolerance: it only receives data once the committer committed on notifyCheckpointComplete. Thus, the global committer needs to be idempotent and able to restore the actual state on recovery. That limitation is coming in from Flink's checkpointing behaviour and applies to both V1 and V2. GlobalCommitterOperator is abstracting these issues along with handling retries (so commits that happen much later). So it's probably a good place to start just with the standard topology. Best regards, Martijn [1] https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com>: > Hi, > Krzysztof Chmielewski [1] from Delta-Flink connector open source community > here [2]. > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is > something exactly what Flink-Delta Sink needs since it is the place where > we do an actual commit to the Delta Log which should be done from a one > place/instance. > > Currently I'm evaluating V2 for our connector and having, how Steven > described it a "more natural, built-in concept/support of GlobalCommitter > in the sink v2 interface" would be greatly appreciated. > > Cheers, > Krzysztof Chmielewski > > [1] https://github.com/kristoffSC > [2] https://github.com/delta-io/connectors/tree/master/flink > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com> napisaĆ(a): > > > Hi Yun, > > > > Thanks a lot for the reply! > > > > While we can add the global committer in the WithPostCommitTopology, the > > semantics are weird. The Commit stage actually didn't commit anything to > > the Iceberg table, and the PostCommit stage is where the Iceberg commit > > happens. > > > > I just took a quick look at DeltaLake Flink sink. It still uses the V1 > sink > > interface [1]. I think it might have the same issue when switching to the > > V2 sink interface. > > > > For data lake storages (like Iceberg, DeltaLake) or any storage with > global > > transactional commit, it would be more natural to have a built-in > > concept/support of GlobalCommitter in the sink v2 interface. > > > > Thanks, > > Steven > > > > [1] > > > > > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > > > > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid> > > wrote: > > > > > Hi Steven, Liwei, > > > Very sorry for missing this mail and response very late. > > > I think the initial thought is indeed to use `WithPostCommitTopology` > as > > > a replacement of the original GlobalCommitter, and currently the > adapter > > of > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 > > > interface > > > onto an implementation of `WithPostCommitTopology`. > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It > seems > > > to > > > me it could support both global committer and small file compaction? We > > > might > > > have an `WithPostCommitTopology` implementation like > > > DataStream ds = add global committer; > > > if (enable file compaction) { > > > build the compaction subgraph from ds > > > } > > > Best, > > > Yun > > > [1] > > > > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > < > > > > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > > > > > ------------------------------------------------------------------ > > > From:Steven Wu <stevenz...@gmail.com> > > > Send Time:2022 Aug. 17 (Wed.) 07:30 > > > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com> > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter > > > > Plus, it will disable the future capability of small file compaction > > > stage post commit. > > > I should clarify this comment. if we are using the > > `WithPostCommitTopology` > > > for global committer, we would lose the capability of using the post > > commit > > > stage for small files compaction. > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com> > wrote: > > > > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg. > With > > > the > > > > V2 sink interface, GlobalCommitter has been deprecated by > > > > WithPostCommitTopology. I thought the post commit stage is mainly for > > > async > > > > maintenance (like compaction). > > > > > > > > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? > > It > > > > seems like a temporary transition plan for bridging v1 sinks to v2 > > > > interfaces. > > > > > > > > private class GlobalCommittingSinkAdapter extends > > > TwoPhaseCommittingSinkAdapter > > > > implements WithPostCommitTopology<InputT, CommT> { > > > > @Override > > > > public void > addPostCommitTopology(DataStream<CommittableMessage<CommT>> > > > committables) { > > > > StandardSinkTopologies.addGlobalCommitter( > > > > committables, > > > > GlobalCommitterAdapter::new, > > > > () -> sink.getCommittableSerializer().get()); > > > > } > > > > } > > > > > > > > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei used > > the > > > > "global" partitioner to force all committables go to a single > committer > > > > task 0. It will effectively force a global committer disguised in the > > > > parallel committers. It is a little weird and also can lead to > > questions > > > > why other committer tasks are not getting any messages. Plus, it will > > > > disable the future capability of small file compaction stage post > > commit. > > > > Hence, I am asking what is the right approach to achieve global > > committer > > > > behavior. > > > > > > > > Thanks, > > > > Steven > > > > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 < > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 > > > > > > > > > > >