Hi, Steven Thanks you for your thoughtful ideas and concerns. >>I still like the concept of grouping data files per checkpoint for streaming mode. it is cleaner and probably easier to manage and deal with commit failures. Plus, it >>can reduce dupes for the at least once >>mode. I understand checkpoint is not an option for batch execution. We don't have to expose the checkpointId in API, as >>long as the internal bookkeeping groups data files by checkpoints for streaming >>mode.
I think this problem(How to dedupe the combined committed data) also depends on where to place the agg/combine logic . 1. If the agg/combine takes place in the “commit” maybe we need to figure out how to give the aggregated committable a unique and auto-increment id in the committer. 2. If the agg/combine takes place in a separate operator maybe sink developer could maintain the id itself by using the state. I think this problem is also decided by what the topology pattern the sink API should support. Actually there are already many other topology requirements. :) Best, Guowei On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com> wrote: > > AFAIK the committer would not see the file-1-2 when ck1 happens in the > ExactlyOnce mode. > > @Guowei Ma <guowei....@gmail.com> I think you are right for exactly once > checkpoint semantics. what about "at least once"? I guess we can argue that > it is fine to commit file-1-2 for at least once mode. > > I still like the concept of grouping data files per checkpoint for > streaming mode. it is cleaner and probably easier to manage and deal with > commit failures. Plus, it can reduce dupes for the at least once mode. I > understand checkpoint is not an option for batch execution. We don't have > to expose the checkpointId in API, as long as the internal bookkeeping > groups data files by checkpoints for streaming mode. > > > On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com> wrote: > >> > images don't make it through to the mailing lists. You would need to >> host the file somewhere and send a link. >> >> Sorry about that. Here is the sample DAG in google drawings. >> >> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing >> >> >> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, Dawid >>> >>> >>I still find the merging case the most confusing. I don't necessarily >>> understand why do you need the "SingleFileCommit" step in this scenario. >>> The way I >>> >> understand "commit" operation is that it makes some data/artifacts >>> visible to the external system, thus it should be immutable from a point >>> of >>> view of a single >>process. Having an additional step in the same process >>> that works on committed data contradicts with those assumptions. I might >>> be >>> missing something though. >> Could you elaborate >why can't it be >>> something >>> like FileWriter -> FileMergeWriter -> Committer (either global or >>> non-global)? Again it might be just me not getting the example. >>> >>> I think you are right. The topology >>> "FileWriter->FileMergeWriter->Committer" could meet the merge >>> requirement. >>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> >>> GlobalCommitter" reuses some code of the StreamingFileSink(For example >>> rolling policy) so it has the "SingleFileCommitter" in the topology. In >>> general I want to use the case to show that there are different >>> topologies >>> according to the requirements. >>> >>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the >>> actual topology of merged supported HiveSink is more complicated than >>> that. >>> >>> >>> >> I've just briefly skimmed over the proposed interfaces. I would >>> suggest >>> one >>> >> addition to the Writer interface (as I understand this is the runtime >>> >> interface in this proposal?): add some availability method, to avoid, >>> if >>> >> possible, blocking calls on the sink. We already have similar >>> >> availability methods in the new sources [1] and in various places in >>> the >>> >> network stack [2]. >>> >> BTW Let's not forget about Piotr's comment. I think we could add the >>> isAvailable or similar method to the Writer interface in the FLIP. >>> >>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org> for your reminder. >>> There >>> are two many issues at the same time. >>> >>> In addition to what Ajjoscha said : there is very little system support >>> it. Another thing I worry about is that: Does the sink's snapshot >>> return >>> immediately when the sink's status is unavailable? Maybe we could do it >>> by >>> dedupe some element in the state but I think it might be too complicated. >>> For me I want to know is what specific sink will benefit from this >>> feature. @piotr <pi...@ververica.com> Please correct me if I >>> misunderstand you. thanks. >>> >>> Best, >>> Guowei >>> >>> >>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org >>> > >>> wrote: >>> >>> > What I understand is that HiveSink's implementation might need the >>> local >>> > committer(FileCommitter) because the file rename is needed. >>> > But the iceberg only needs to write the manifest file. Would you like >>> to >>> > enlighten me why the Iceberg needs the local committer? >>> > Thanks >>> > >>> > Sorry if I caused a confusion here. I am not saying the Iceberg sink >>> needs >>> > a local committer. What I had in mind is that prior to the Iceberg >>> example >>> > I did not see a need for a "GlobalCommitter" in the streaming case. I >>> > thought it is always enough to have the "normal" committer in that >>> case. >>> > Now I understand that this differentiation is not really about logical >>> > separation. It is not really about the granularity with which we >>> commit, >>> > i.e. answering the "WHAT" question. It is really about the performance >>> and >>> > that in the end we will have a single "transaction", so it is about >>> > answering the question "HOW". >>> > >>> > >>> > - >>> > >>> > Commit a directory with merged files(Some user want to merge the >>> files >>> > in a directory before committing the directory to Hive meta store) >>> > >>> > >>> > 1. >>> > >>> > FileWriter -> SingleFileCommit -> FileMergeWriter -> >>> GlobalCommitter >>> > >>> > I still find the merging case the most confusing. I don't necessarily >>> > understand why do you need the "SingleFileCommit" step in this >>> scenario. >>> > The way I understand "commit" operation is that it makes some >>> > data/artifacts visible to the external system, thus it should be >>> immutable >>> > from a point of view of a single process. Having an additional step in >>> the >>> > same process that works on committed data contradicts with those >>> > assumptions. I might be missing something though. Could you elaborate >>> why >>> > can't it be something like FileWriter -> FileMergeWriter -> Committer >>> > (either global or non-global)? Again it might be just me not getting >>> the >>> > example. >>> > >>> > I've just briefly skimmed over the proposed interfaces. I would >>> suggest one >>> > addition to the Writer interface (as I understand this is the runtime >>> > interface in this proposal?): add some availability method, to avoid, >>> if >>> > possible, blocking calls on the sink. We already have similar >>> > availability methods in the new sources [1] and in various places in >>> the >>> > network stack [2]. >>> > >>> > BTW Let's not forget about Piotr's comment. I think we could add the >>> > isAvailable or similar method to the Writer interface in the FLIP. >>> > >>> > Best, >>> > >>> > Dawid >>> > On 15/09/2020 08:06, Guowei Ma wrote: >>> > >>> > I would think that we only need flush() and the semantics are that it >>> > prepares for a commit, so on a physical level it would be called from >>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >>> > think flush() should be renamed to something like "prepareCommit()". >>> > >>> > Generally speaking it is a good point that emitting the committables >>> > should happen before emitting the checkpoint barrier downstream. >>> > However, if I remember offline discussions well, the idea behind >>> > Writer#flush and Writer#snapshotState was to differentiate commit on >>> > checkpoint vs final checkpoint at the end of the job. Both of these >>> > methods could emit committables, but the flush should not leave any in >>> > progress state (e.g. in case of file sink in STREAM mode, in >>> > snapshotState it could leave some open files that would be committed in >>> > a subsequent cycle, however flush should close all files). The >>> > snapshotState as it is now can not be called in >>> > prepareSnapshotPreBarrier as it can store some state, which should >>> > happen in Operator#snapshotState as otherwise it would always be >>> > synchronous. Therefore I think we would need sth like: >>> > >>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >>> > >>> > ver 1: >>> > >>> > List<StateT> snapshotState(); >>> > >>> > ver 2: >>> > >>> > void snapshotState(); // not sure if we need that method at all in >>> option >>> > >>> > 2 >>> > >>> > I second Dawid's proposal. This is a valid scenario. And version2 does >>> not >>> > need the snapshotState() any more. >>> > >>> > >>> > The Committer is as described in the FLIP, it's basically a function >>> > "void commit(Committable)". The GobalCommitter would be a function >>> "void >>> > commit(List<Committable>)". The former would be used by an S3 sink >>> where >>> > we can individually commit files to S3, a committable would be the list >>> > of part uploads that will form the final file and the commit operation >>> > creates the metadata in S3. The latter would be used by something like >>> > Iceberg where the Committer needs a global view of all the commits to >>> be >>> > efficient and not overwhelm the system. >>> > >>> > I don't know yet if sinks would only implement on type of commit >>> > function or potentially both at the same time, and maybe Commit can >>> > return some CommitResult that gets shipped to the GlobalCommit >>> function. >>> > I must admit it I did not get the need for Local/Normal + Global >>> > committer at first. The Iceberg example helped a lot. I think it makes >>> a >>> > lot of sense. >>> > >>> > @Dawid >>> > What I understand is that HiveSink's implementation might need the >>> local >>> > committer(FileCommitter) because the file rename is needed. >>> > But the iceberg only needs to write the manifest file. Would you like >>> to >>> > enlighten me why the Iceberg needs the local committer? >>> > Thanks >>> > >>> > Best, >>> > Guowei >>> > >>> > >>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < >>> dwysakow...@apache.org> <dwysakow...@apache.org> >>> > wrote: >>> > >>> > >>> > Hi all, >>> > >>> > >>> > I would think that we only need flush() and the semantics are that it >>> > prepares for a commit, so on a physical level it would be called from >>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >>> > think flush() should be renamed to something like "prepareCommit()". >>> > >>> > Generally speaking it is a good point that emitting the committables >>> > should happen before emitting the checkpoint barrier downstream. >>> > However, if I remember offline discussions well, the idea behind >>> > Writer#flush and Writer#snapshotState was to differentiate commit on >>> > checkpoint vs final checkpoint at the end of the job. Both of these >>> > methods could emit committables, but the flush should not leave any in >>> > progress state (e.g. in case of file sink in STREAM mode, in >>> > snapshotState it could leave some open files that would be committed in >>> > a subsequent cycle, however flush should close all files). The >>> > snapshotState as it is now can not be called in >>> > prepareSnapshotPreBarrier as it can store some state, which should >>> > happen in Operator#snapshotState as otherwise it would always be >>> > synchronous. Therefore I think we would need sth like: >>> > >>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >>> > >>> > ver 1: >>> > >>> > List<StateT> snapshotState(); >>> > >>> > ver 2: >>> > >>> > void snapshotState(); // not sure if we need that method at all in >>> option 2 >>> > >>> > >>> > The Committer is as described in the FLIP, it's basically a function >>> > "void commit(Committable)". The GobalCommitter would be a function >>> "void >>> > commit(List<Committable>)". The former would be used by an S3 sink >>> where >>> > we can individually commit files to S3, a committable would be the list >>> > of part uploads that will form the final file and the commit operation >>> > creates the metadata in S3. The latter would be used by something like >>> > Iceberg where the Committer needs a global view of all the commits to >>> be >>> > efficient and not overwhelm the system. >>> > >>> > I don't know yet if sinks would only implement on type of commit >>> > function or potentially both at the same time, and maybe Commit can >>> > return some CommitResult that gets shipped to the GlobalCommit >>> function. >>> > >>> > I must admit it I did not get the need for Local/Normal + Global >>> > committer at first. The Iceberg example helped a lot. I think it makes >>> a >>> > lot of sense. >>> > >>> > >>> > For Iceberg, writers don't need any state. But the GlobalCommitter >>> > needs to >>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a >>> single >>> > committer can collect thousands (or more) data files in one checkpoint >>> > cycle, as an optimization we checkpoint a single "ManifestFile" (for >>> the >>> > collected thousands data files) as StateT. This allows us to absorb >>> > extended commit outages without losing written/uploaded data files, as >>> > operator state size is as small as one manifest file per checkpoint >>> cycle >>> > [2]. >>> > ------------------ >>> > StateT snapshotState(SnapshotContext context) throws Exception; >>> > >>> > That means we also need the restoreCommitter API in the Sink interface >>> > --------------- >>> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT >>> > state); >>> > >>> > I think this might be a valid case. Not sure though if I would go with >>> a >>> > "state" there. Having a state in a committer would imply we need a >>> > collect method as well. So far we needed a single method commit(...) >>> and >>> > the bookkeeping of the committables could be handled by the framework. >>> I >>> > think something like an optional combiner in the GlobalCommitter would >>> > be enough. What do you think? >>> > >>> > GlobalCommitter<CommT, GlobalCommT> { >>> > >>> > void commit(GlobalCommT globalCommittables); >>> > >>> > GlobalCommT combine(List<CommT> committables); >>> > >>> > } >>> > >>> > A different problem that I see here is how do we handle commit >>> failures. >>> > Should the committables (both normal and global be included in the next >>> > cycle, shall we retry it, ...) I think it would be worth laying it out >>> > in the FLIP. >>> > >>> > @Aljoscha I think you can find the code Steven was referring in here: >>> > >>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java >>> > >>> > Best, >>> > >>> > Dawid >>> > >>> > On 14/09/2020 15:19, Aljoscha Krettek wrote: >>> > >>> > On 14.09.20 01:23, Steven Wu wrote: >>> > >>> > ## Writer interface >>> > >>> > For the Writer interface, should we add "*prepareSnapshot"* before the >>> > checkpoint barrier emitted downstream? IcebergWriter would need it. Or >>> > would the framework call "*flush*" before the barrier emitted >>> > downstream? >>> > that guarantee would achieve the same goal. >>> > >>> > I would think that we only need flush() and the semantics are that it >>> > prepares for a commit, so on a physical level it would be called from >>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >>> > think flush() should be renamed to something like "prepareCommit()". >>> > >>> > @Guowei, what do you think about this? >>> > >>> > >>> > In [1], we discussed the reason for Writer to emit (checkpointId, >>> CommT) >>> > tuple to the committer. The committer needs checkpointId to separate >>> out >>> > data files for different checkpoints if concurrent checkpoints are >>> > enabled. >>> > >>> > When can this happen? Even with concurrent checkpoints the snapshot >>> > barriers would still cleanly segregate the input stream of an operator >>> > into tranches that should manifest in only one checkpoint. With >>> > concurrent checkpoints, all that can happen is that we start a >>> > checkpoint before a last one is confirmed completed. >>> > >>> > Unless there is some weirdness in the sources and some sources start >>> > chk1 first and some other ones start chk2 first? >>> > >>> > @Piotrek, do you think this is a problem? >>> > >>> > >>> > For the Committer interface, I am wondering if we should split the >>> > single >>> > commit method into separate "*collect"* and "*commit"* methods? This >>> > way, >>> > it can handle both single and multiple CommT objects. >>> > >>> > I think we can't do this. If the sink only needs a regular Commiter, >>> > we can perform the commits in parallel, possibly on different >>> > machines. Only when the sink needs a GlobalCommitter would we need to >>> > ship all commits to a single process and perform the commit there. If >>> > both methods were unified in one interface we couldn't make the >>> > decision of were to commit in the framework code. >>> > >>> > >>> > For Iceberg, writers don't need any state. But the GlobalCommitter >>> > needs to >>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a >>> > single >>> > committer can collect thousands (or more) data files in one checkpoint >>> > cycle, as an optimization we checkpoint a single "ManifestFile" (for >>> the >>> > collected thousands data files) as StateT. This allows us to absorb >>> > extended commit outages without losing written/uploaded data files, as >>> > operator state size is as small as one manifest file per checkpoint >>> > cycle >>> > >>> > You could have a point here. Is the code for this available in >>> > open-source? I was checking out >>> > >>> > >>> > >>> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java >>> > >>> > and didn't find the ManifestFile optimization there. >>> > >>> > Best, >>> > Aljoscha >>> > >>> > >>> > >>> >>