> AFAIK the committer would not see the file-1-2 when ck1 happens in the ExactlyOnce mode.
@Guowei Ma <guowei....@gmail.com> I think you are right for exactly once checkpoint semantics. what about "at least once"? I guess we can argue that it is fine to commit file-1-2 for at least once mode. I still like the concept of grouping data files per checkpoint for streaming mode. it is cleaner and probably easier to manage and deal with commit failures. Plus, it can reduce dupes for the at least once mode. I understand checkpoint is not an option for batch execution. We don't have to expose the checkpointId in API, as long as the internal bookkeeping groups data files by checkpoints for streaming mode. On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com> wrote: > > images don't make it through to the mailing lists. You would need to > host the file somewhere and send a link. > > Sorry about that. Here is the sample DAG in google drawings. > > https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing > > > On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, Dawid >> >> >>I still find the merging case the most confusing. I don't necessarily >> understand why do you need the "SingleFileCommit" step in this scenario. >> The way I >> >> understand "commit" operation is that it makes some data/artifacts >> visible to the external system, thus it should be immutable from a point >> of >> view of a single >>process. Having an additional step in the same process >> that works on committed data contradicts with those assumptions. I might >> be >> missing something though. >> Could you elaborate >why can't it be >> something >> like FileWriter -> FileMergeWriter -> Committer (either global or >> non-global)? Again it might be just me not getting the example. >> >> I think you are right. The topology >> "FileWriter->FileMergeWriter->Committer" could meet the merge requirement. >> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> >> GlobalCommitter" reuses some code of the StreamingFileSink(For example >> rolling policy) so it has the "SingleFileCommitter" in the topology. In >> general I want to use the case to show that there are different topologies >> according to the requirements. >> >> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the >> actual topology of merged supported HiveSink is more complicated than >> that. >> >> >> >> I've just briefly skimmed over the proposed interfaces. I would suggest >> one >> >> addition to the Writer interface (as I understand this is the runtime >> >> interface in this proposal?): add some availability method, to avoid, >> if >> >> possible, blocking calls on the sink. We already have similar >> >> availability methods in the new sources [1] and in various places in >> the >> >> network stack [2]. >> >> BTW Let's not forget about Piotr's comment. I think we could add the >> isAvailable or similar method to the Writer interface in the FLIP. >> >> Thanks @Dawid Wysakowicz <dwysakow...@apache.org> for your reminder. >> There >> are two many issues at the same time. >> >> In addition to what Ajjoscha said : there is very little system support >> it. Another thing I worry about is that: Does the sink's snapshot return >> immediately when the sink's status is unavailable? Maybe we could do it by >> dedupe some element in the state but I think it might be too complicated. >> For me I want to know is what specific sink will benefit from this >> feature. @piotr <pi...@ververica.com> Please correct me if I >> misunderstand you. thanks. >> >> Best, >> Guowei >> >> >> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >> > What I understand is that HiveSink's implementation might need the local >> > committer(FileCommitter) because the file rename is needed. >> > But the iceberg only needs to write the manifest file. Would you like >> to >> > enlighten me why the Iceberg needs the local committer? >> > Thanks >> > >> > Sorry if I caused a confusion here. I am not saying the Iceberg sink >> needs >> > a local committer. What I had in mind is that prior to the Iceberg >> example >> > I did not see a need for a "GlobalCommitter" in the streaming case. I >> > thought it is always enough to have the "normal" committer in that case. >> > Now I understand that this differentiation is not really about logical >> > separation. It is not really about the granularity with which we commit, >> > i.e. answering the "WHAT" question. It is really about the performance >> and >> > that in the end we will have a single "transaction", so it is about >> > answering the question "HOW". >> > >> > >> > - >> > >> > Commit a directory with merged files(Some user want to merge the >> files >> > in a directory before committing the directory to Hive meta store) >> > >> > >> > 1. >> > >> > FileWriter -> SingleFileCommit -> FileMergeWriter -> GlobalCommitter >> > >> > I still find the merging case the most confusing. I don't necessarily >> > understand why do you need the "SingleFileCommit" step in this scenario. >> > The way I understand "commit" operation is that it makes some >> > data/artifacts visible to the external system, thus it should be >> immutable >> > from a point of view of a single process. Having an additional step in >> the >> > same process that works on committed data contradicts with those >> > assumptions. I might be missing something though. Could you elaborate >> why >> > can't it be something like FileWriter -> FileMergeWriter -> Committer >> > (either global or non-global)? Again it might be just me not getting the >> > example. >> > >> > I've just briefly skimmed over the proposed interfaces. I would suggest >> one >> > addition to the Writer interface (as I understand this is the runtime >> > interface in this proposal?): add some availability method, to avoid, if >> > possible, blocking calls on the sink. We already have similar >> > availability methods in the new sources [1] and in various places in the >> > network stack [2]. >> > >> > BTW Let's not forget about Piotr's comment. I think we could add the >> > isAvailable or similar method to the Writer interface in the FLIP. >> > >> > Best, >> > >> > Dawid >> > On 15/09/2020 08:06, Guowei Ma wrote: >> > >> > I would think that we only need flush() and the semantics are that it >> > prepares for a commit, so on a physical level it would be called from >> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> > think flush() should be renamed to something like "prepareCommit()". >> > >> > Generally speaking it is a good point that emitting the committables >> > should happen before emitting the checkpoint barrier downstream. >> > However, if I remember offline discussions well, the idea behind >> > Writer#flush and Writer#snapshotState was to differentiate commit on >> > checkpoint vs final checkpoint at the end of the job. Both of these >> > methods could emit committables, but the flush should not leave any in >> > progress state (e.g. in case of file sink in STREAM mode, in >> > snapshotState it could leave some open files that would be committed in >> > a subsequent cycle, however flush should close all files). The >> > snapshotState as it is now can not be called in >> > prepareSnapshotPreBarrier as it can store some state, which should >> > happen in Operator#snapshotState as otherwise it would always be >> > synchronous. Therefore I think we would need sth like: >> > >> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >> > >> > ver 1: >> > >> > List<StateT> snapshotState(); >> > >> > ver 2: >> > >> > void snapshotState(); // not sure if we need that method at all in >> option >> > >> > 2 >> > >> > I second Dawid's proposal. This is a valid scenario. And version2 does >> not >> > need the snapshotState() any more. >> > >> > >> > The Committer is as described in the FLIP, it's basically a function >> > "void commit(Committable)". The GobalCommitter would be a function "void >> > commit(List<Committable>)". The former would be used by an S3 sink where >> > we can individually commit files to S3, a committable would be the list >> > of part uploads that will form the final file and the commit operation >> > creates the metadata in S3. The latter would be used by something like >> > Iceberg where the Committer needs a global view of all the commits to be >> > efficient and not overwhelm the system. >> > >> > I don't know yet if sinks would only implement on type of commit >> > function or potentially both at the same time, and maybe Commit can >> > return some CommitResult that gets shipped to the GlobalCommit function. >> > I must admit it I did not get the need for Local/Normal + Global >> > committer at first. The Iceberg example helped a lot. I think it makes a >> > lot of sense. >> > >> > @Dawid >> > What I understand is that HiveSink's implementation might need the local >> > committer(FileCommitter) because the file rename is needed. >> > But the iceberg only needs to write the manifest file. Would you like >> to >> > enlighten me why the Iceberg needs the local committer? >> > Thanks >> > >> > Best, >> > Guowei >> > >> > >> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < >> dwysakow...@apache.org> <dwysakow...@apache.org> >> > wrote: >> > >> > >> > Hi all, >> > >> > >> > I would think that we only need flush() and the semantics are that it >> > prepares for a commit, so on a physical level it would be called from >> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> > think flush() should be renamed to something like "prepareCommit()". >> > >> > Generally speaking it is a good point that emitting the committables >> > should happen before emitting the checkpoint barrier downstream. >> > However, if I remember offline discussions well, the idea behind >> > Writer#flush and Writer#snapshotState was to differentiate commit on >> > checkpoint vs final checkpoint at the end of the job. Both of these >> > methods could emit committables, but the flush should not leave any in >> > progress state (e.g. in case of file sink in STREAM mode, in >> > snapshotState it could leave some open files that would be committed in >> > a subsequent cycle, however flush should close all files). The >> > snapshotState as it is now can not be called in >> > prepareSnapshotPreBarrier as it can store some state, which should >> > happen in Operator#snapshotState as otherwise it would always be >> > synchronous. Therefore I think we would need sth like: >> > >> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >> > >> > ver 1: >> > >> > List<StateT> snapshotState(); >> > >> > ver 2: >> > >> > void snapshotState(); // not sure if we need that method at all in >> option 2 >> > >> > >> > The Committer is as described in the FLIP, it's basically a function >> > "void commit(Committable)". The GobalCommitter would be a function "void >> > commit(List<Committable>)". The former would be used by an S3 sink where >> > we can individually commit files to S3, a committable would be the list >> > of part uploads that will form the final file and the commit operation >> > creates the metadata in S3. The latter would be used by something like >> > Iceberg where the Committer needs a global view of all the commits to be >> > efficient and not overwhelm the system. >> > >> > I don't know yet if sinks would only implement on type of commit >> > function or potentially both at the same time, and maybe Commit can >> > return some CommitResult that gets shipped to the GlobalCommit function. >> > >> > I must admit it I did not get the need for Local/Normal + Global >> > committer at first. The Iceberg example helped a lot. I think it makes a >> > lot of sense. >> > >> > >> > For Iceberg, writers don't need any state. But the GlobalCommitter >> > needs to >> > checkpoint StateT. For the committer, CommT is "DataFile". Since a >> single >> > committer can collect thousands (or more) data files in one checkpoint >> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the >> > collected thousands data files) as StateT. This allows us to absorb >> > extended commit outages without losing written/uploaded data files, as >> > operator state size is as small as one manifest file per checkpoint >> cycle >> > [2]. >> > ------------------ >> > StateT snapshotState(SnapshotContext context) throws Exception; >> > >> > That means we also need the restoreCommitter API in the Sink interface >> > --------------- >> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT >> > state); >> > >> > I think this might be a valid case. Not sure though if I would go with a >> > "state" there. Having a state in a committer would imply we need a >> > collect method as well. So far we needed a single method commit(...) and >> > the bookkeeping of the committables could be handled by the framework. I >> > think something like an optional combiner in the GlobalCommitter would >> > be enough. What do you think? >> > >> > GlobalCommitter<CommT, GlobalCommT> { >> > >> > void commit(GlobalCommT globalCommittables); >> > >> > GlobalCommT combine(List<CommT> committables); >> > >> > } >> > >> > A different problem that I see here is how do we handle commit failures. >> > Should the committables (both normal and global be included in the next >> > cycle, shall we retry it, ...) I think it would be worth laying it out >> > in the FLIP. >> > >> > @Aljoscha I think you can find the code Steven was referring in here: >> > >> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java >> > >> > Best, >> > >> > Dawid >> > >> > On 14/09/2020 15:19, Aljoscha Krettek wrote: >> > >> > On 14.09.20 01:23, Steven Wu wrote: >> > >> > ## Writer interface >> > >> > For the Writer interface, should we add "*prepareSnapshot"* before the >> > checkpoint barrier emitted downstream? IcebergWriter would need it. Or >> > would the framework call "*flush*" before the barrier emitted >> > downstream? >> > that guarantee would achieve the same goal. >> > >> > I would think that we only need flush() and the semantics are that it >> > prepares for a commit, so on a physical level it would be called from >> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> > think flush() should be renamed to something like "prepareCommit()". >> > >> > @Guowei, what do you think about this? >> > >> > >> > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) >> > tuple to the committer. The committer needs checkpointId to separate out >> > data files for different checkpoints if concurrent checkpoints are >> > enabled. >> > >> > When can this happen? Even with concurrent checkpoints the snapshot >> > barriers would still cleanly segregate the input stream of an operator >> > into tranches that should manifest in only one checkpoint. With >> > concurrent checkpoints, all that can happen is that we start a >> > checkpoint before a last one is confirmed completed. >> > >> > Unless there is some weirdness in the sources and some sources start >> > chk1 first and some other ones start chk2 first? >> > >> > @Piotrek, do you think this is a problem? >> > >> > >> > For the Committer interface, I am wondering if we should split the >> > single >> > commit method into separate "*collect"* and "*commit"* methods? This >> > way, >> > it can handle both single and multiple CommT objects. >> > >> > I think we can't do this. If the sink only needs a regular Commiter, >> > we can perform the commits in parallel, possibly on different >> > machines. Only when the sink needs a GlobalCommitter would we need to >> > ship all commits to a single process and perform the commit there. If >> > both methods were unified in one interface we couldn't make the >> > decision of were to commit in the framework code. >> > >> > >> > For Iceberg, writers don't need any state. But the GlobalCommitter >> > needs to >> > checkpoint StateT. For the committer, CommT is "DataFile". Since a >> > single >> > committer can collect thousands (or more) data files in one checkpoint >> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the >> > collected thousands data files) as StateT. This allows us to absorb >> > extended commit outages without losing written/uploaded data files, as >> > operator state size is as small as one manifest file per checkpoint >> > cycle >> > >> > You could have a point here. Is the code for this available in >> > open-source? I was checking out >> > >> > >> > >> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java >> > >> > and didn't find the ManifestFile optimization there. >> > >> > Best, >> > Aljoscha >> > >> > >> > >> >