Guowei Just to add to what Aljoscha said regarding the unique id. Iceberg sink checkpoints the unique id into state during snapshot. It also inserts the unique id into the Iceberg snapshot metadata during commit. When a job restores the state after failure, it needs to know if the restored transactions/commits were successful or not. It basically iterates through the list of table snapshots from Iceberg and matches the unique ids with what is stored in Iceberg snapshot metadata.
Thanks, Steven On Thu, Sep 17, 2020 at 7:40 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Thanks for the summary! > > On 16.09.20 06:29, Guowei Ma wrote: > > ## Consensus > > > > 1. The motivation of the unified sink API is to decouple the sink > > implementation from the different runtime execution mode. > > 2. The initial scope of the unified sink API only covers the file system > > type, which supports the real transactions. The FLIP focuses more on the > > semantics the new sink api should support. > > 3. We prefer the first alternative API, which could give the framework a > > greater opportunity to optimize. > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > > 5. The FLIP could move the `Snapshot & Drain` section in order to be more > > focused. > > Agreed! > > > ## Not Consensus > > > > 1. What should the “Unified Sink API” support/cover? The API can > > “unified”(decoupe) the commit operation in the term of supporting exactly > > once semantics. However, even if we narrow down the initial supported > > system to the file system there would be different topology requirements. > > These requirements come from performance optimization > > (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is > > “finished”). Should the unified sink API support these requirements? > > Yes, this is still tricky. What is the current state, would the > introduction of a "LocalCommit" and a "GlobalCommit" already solve both > the Iceberg and Hive cases? I believe Hive is the most tricky one here, > but if we introduce the "combine" method on GlobalCommit, that could > serve the same purpose as the "aggregation operation" on the individual > files, and we could even execute that "combine" in a distributed way. > > To answer the more general question, I think we will offer a couple of > different commit strategies and sinks can implement 0 to n of them. What > is unified about the sink is that the same sink implementation will work > for both STREAMING and BATCH execution mode. > > > 2. The API does not expose the checkpoint-id because the batch execution > > mode does not have the normal checkpoint. But there still some > > implementations depend on this.(IceBergSink uses this to do some dedupe). > > I think how to support this requirement depends on the first open > question. > > I think this can be solved by introducing a nonce, see more thorough > explanation below. > > > 3. Whether the `Writer` supports async functionality or not. Currently I > do > > not know which sink could benefit from it. Maybe it is just my own > problem. > > Here, I don't really know. We can introduce an "isAvailable()" method > and mostly ignore it for now and sinks can just always return true. Or, > as an alternative, we don't add the method now but can add it later with > a default implementation. Either way, we will probably not take > advantage of the "isAvailable()" now because that would require more > runtime changes. > > On 17.09.20 06:28, Guowei Ma wrote: > > But my understanding is: if the committer function is idempotent, the > > framework can guarantee exactly once semantics in batch/stream execution > > mode. But I think maybe the idempotence should be guaranteed by the sink > > developer, not on the basic API. > > I believe the problem here is that some sinks (including Iceberg) can > only be idempotent with a little help from the framework. > > The process would be like this: > > 1. collect all committables, generate unique ID (nonce), store > committables and ID in fault tolerant storage > > 2. call commitGlobal(committables, nonce) > > 3. Iceberg checks if there is already a commit with the given nonce, if > not it will append a commit of the committables along with the nonce to > the log structure/meta store > > The problem is that Iceberg cannot decide without some extra data > whether a set of committables has already been committed because the > commit basically just appends some information to the end of a log. And > we just just keep appending the same data if we didn't check the nonce. > > We would have this same problem if we wanted to implement a > write-ahead-log Kafka sink where the "commit" would just take some > records from a file and append it to Kafka. Without looking at Kafka and > checking if you already committed the same records you don't know if you > already committed. > > > > >