> images don't make it through to the mailing lists. You would need to host the file somewhere and send a link.
Sorry about that. Here is the sample DAG in google drawings. https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Dawid > > >>I still find the merging case the most confusing. I don't necessarily > understand why do you need the "SingleFileCommit" step in this scenario. > The way I > >> understand "commit" operation is that it makes some data/artifacts > visible to the external system, thus it should be immutable from a point of > view of a single >>process. Having an additional step in the same process > that works on committed data contradicts with those assumptions. I might be > missing something though. >> Could you elaborate >why can't it be something > like FileWriter -> FileMergeWriter -> Committer (either global or > non-global)? Again it might be just me not getting the example. > > I think you are right. The topology > "FileWriter->FileMergeWriter->Committer" could meet the merge requirement. > The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> > GlobalCommitter" reuses some code of the StreamingFileSink(For example > rolling policy) so it has the "SingleFileCommitter" in the topology. In > general I want to use the case to show that there are different topologies > according to the requirements. > > BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the > actual topology of merged supported HiveSink is more complicated than that. > > > >> I've just briefly skimmed over the proposed interfaces. I would suggest > one > >> addition to the Writer interface (as I understand this is the runtime > >> interface in this proposal?): add some availability method, to avoid, if > >> possible, blocking calls on the sink. We already have similar > >> availability methods in the new sources [1] and in various places in the > >> network stack [2]. > >> BTW Let's not forget about Piotr's comment. I think we could add the > isAvailable or similar method to the Writer interface in the FLIP. > > Thanks @Dawid Wysakowicz <dwysakow...@apache.org> for your reminder. > There > are two many issues at the same time. > > In addition to what Ajjoscha said : there is very little system support > it. Another thing I worry about is that: Does the sink's snapshot return > immediately when the sink's status is unavailable? Maybe we could do it by > dedupe some element in the state but I think it might be too complicated. > For me I want to know is what specific sink will benefit from this > feature. @piotr <pi...@ververica.com> Please correct me if I > misunderstand you. thanks. > > Best, > Guowei > > > On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > > > What I understand is that HiveSink's implementation might need the local > > committer(FileCommitter) because the file rename is needed. > > But the iceberg only needs to write the manifest file. Would you like to > > enlighten me why the Iceberg needs the local committer? > > Thanks > > > > Sorry if I caused a confusion here. I am not saying the Iceberg sink > needs > > a local committer. What I had in mind is that prior to the Iceberg > example > > I did not see a need for a "GlobalCommitter" in the streaming case. I > > thought it is always enough to have the "normal" committer in that case. > > Now I understand that this differentiation is not really about logical > > separation. It is not really about the granularity with which we commit, > > i.e. answering the "WHAT" question. It is really about the performance > and > > that in the end we will have a single "transaction", so it is about > > answering the question "HOW". > > > > > > - > > > > Commit a directory with merged files(Some user want to merge the files > > in a directory before committing the directory to Hive meta store) > > > > > > 1. > > > > FileWriter -> SingleFileCommit -> FileMergeWriter -> GlobalCommitter > > > > I still find the merging case the most confusing. I don't necessarily > > understand why do you need the "SingleFileCommit" step in this scenario. > > The way I understand "commit" operation is that it makes some > > data/artifacts visible to the external system, thus it should be > immutable > > from a point of view of a single process. Having an additional step in > the > > same process that works on committed data contradicts with those > > assumptions. I might be missing something though. Could you elaborate why > > can't it be something like FileWriter -> FileMergeWriter -> Committer > > (either global or non-global)? Again it might be just me not getting the > > example. > > > > I've just briefly skimmed over the proposed interfaces. I would suggest > one > > addition to the Writer interface (as I understand this is the runtime > > interface in this proposal?): add some availability method, to avoid, if > > possible, blocking calls on the sink. We already have similar > > availability methods in the new sources [1] and in various places in the > > network stack [2]. > > > > BTW Let's not forget about Piotr's comment. I think we could add the > > isAvailable or similar method to the Writer interface in the FLIP. > > > > Best, > > > > Dawid > > On 15/09/2020 08:06, Guowei Ma wrote: > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > Generally speaking it is a good point that emitting the committables > > should happen before emitting the checkpoint barrier downstream. > > However, if I remember offline discussions well, the idea behind > > Writer#flush and Writer#snapshotState was to differentiate commit on > > checkpoint vs final checkpoint at the end of the job. Both of these > > methods could emit committables, but the flush should not leave any in > > progress state (e.g. in case of file sink in STREAM mode, in > > snapshotState it could leave some open files that would be committed in > > a subsequent cycle, however flush should close all files). The > > snapshotState as it is now can not be called in > > prepareSnapshotPreBarrier as it can store some state, which should > > happen in Operator#snapshotState as otherwise it would always be > > synchronous. Therefore I think we would need sth like: > > > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > > > ver 1: > > > > List<StateT> snapshotState(); > > > > ver 2: > > > > void snapshotState(); // not sure if we need that method at all in option > > > > 2 > > > > I second Dawid's proposal. This is a valid scenario. And version2 does > not > > need the snapshotState() any more. > > > > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > > I must admit it I did not get the need for Local/Normal + Global > > committer at first. The Iceberg example helped a lot. I think it makes a > > lot of sense. > > > > @Dawid > > What I understand is that HiveSink's implementation might need the local > > committer(FileCommitter) because the file rename is needed. > > But the iceberg only needs to write the manifest file. Would you like to > > enlighten me why the Iceberg needs the local committer? > > Thanks > > > > Best, > > Guowei > > > > > > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < > dwysakow...@apache.org> <dwysakow...@apache.org> > > wrote: > > > > > > Hi all, > > > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > Generally speaking it is a good point that emitting the committables > > should happen before emitting the checkpoint barrier downstream. > > However, if I remember offline discussions well, the idea behind > > Writer#flush and Writer#snapshotState was to differentiate commit on > > checkpoint vs final checkpoint at the end of the job. Both of these > > methods could emit committables, but the flush should not leave any in > > progress state (e.g. in case of file sink in STREAM mode, in > > snapshotState it could leave some open files that would be committed in > > a subsequent cycle, however flush should close all files). The > > snapshotState as it is now can not be called in > > prepareSnapshotPreBarrier as it can store some state, which should > > happen in Operator#snapshotState as otherwise it would always be > > synchronous. Therefore I think we would need sth like: > > > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > > > ver 1: > > > > List<StateT> snapshotState(); > > > > ver 2: > > > > void snapshotState(); // not sure if we need that method at all in > option 2 > > > > > > The Committer is as described in the FLIP, it's basically a function > > "void commit(Committable)". The GobalCommitter would be a function "void > > commit(List<Committable>)". The former would be used by an S3 sink where > > we can individually commit files to S3, a committable would be the list > > of part uploads that will form the final file and the commit operation > > creates the metadata in S3. The latter would be used by something like > > Iceberg where the Committer needs a global view of all the commits to be > > efficient and not overwhelm the system. > > > > I don't know yet if sinks would only implement on type of commit > > function or potentially both at the same time, and maybe Commit can > > return some CommitResult that gets shipped to the GlobalCommit function. > > > > I must admit it I did not get the need for Local/Normal + Global > > committer at first. The Iceberg example helped a lot. I think it makes a > > lot of sense. > > > > > > For Iceberg, writers don't need any state. But the GlobalCommitter > > needs to > > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > > committer can collect thousands (or more) data files in one checkpoint > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > > collected thousands data files) as StateT. This allows us to absorb > > extended commit outages without losing written/uploaded data files, as > > operator state size is as small as one manifest file per checkpoint cycle > > [2]. > > ------------------ > > StateT snapshotState(SnapshotContext context) throws Exception; > > > > That means we also need the restoreCommitter API in the Sink interface > > --------------- > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > > state); > > > > I think this might be a valid case. Not sure though if I would go with a > > "state" there. Having a state in a committer would imply we need a > > collect method as well. So far we needed a single method commit(...) and > > the bookkeeping of the committables could be handled by the framework. I > > think something like an optional combiner in the GlobalCommitter would > > be enough. What do you think? > > > > GlobalCommitter<CommT, GlobalCommT> { > > > > void commit(GlobalCommT globalCommittables); > > > > GlobalCommT combine(List<CommT> committables); > > > > } > > > > A different problem that I see here is how do we handle commit failures. > > Should the committables (both normal and global be included in the next > > cycle, shall we retry it, ...) I think it would be worth laying it out > > in the FLIP. > > > > @Aljoscha I think you can find the code Steven was referring in here: > > > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > > > > Best, > > > > Dawid > > > > On 14/09/2020 15:19, Aljoscha Krettek wrote: > > > > On 14.09.20 01:23, Steven Wu wrote: > > > > ## Writer interface > > > > For the Writer interface, should we add "*prepareSnapshot"* before the > > checkpoint barrier emitted downstream? IcebergWriter would need it. Or > > would the framework call "*flush*" before the barrier emitted > > downstream? > > that guarantee would achieve the same goal. > > > > I would think that we only need flush() and the semantics are that it > > prepares for a commit, so on a physical level it would be called from > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > > think flush() should be renamed to something like "prepareCommit()". > > > > @Guowei, what do you think about this? > > > > > > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > > tuple to the committer. The committer needs checkpointId to separate out > > data files for different checkpoints if concurrent checkpoints are > > enabled. > > > > When can this happen? Even with concurrent checkpoints the snapshot > > barriers would still cleanly segregate the input stream of an operator > > into tranches that should manifest in only one checkpoint. With > > concurrent checkpoints, all that can happen is that we start a > > checkpoint before a last one is confirmed completed. > > > > Unless there is some weirdness in the sources and some sources start > > chk1 first and some other ones start chk2 first? > > > > @Piotrek, do you think this is a problem? > > > > > > For the Committer interface, I am wondering if we should split the > > single > > commit method into separate "*collect"* and "*commit"* methods? This > > way, > > it can handle both single and multiple CommT objects. > > > > I think we can't do this. If the sink only needs a regular Commiter, > > we can perform the commits in parallel, possibly on different > > machines. Only when the sink needs a GlobalCommitter would we need to > > ship all commits to a single process and perform the commit there. If > > both methods were unified in one interface we couldn't make the > > decision of were to commit in the framework code. > > > > > > For Iceberg, writers don't need any state. But the GlobalCommitter > > needs to > > checkpoint StateT. For the committer, CommT is "DataFile". Since a > > single > > committer can collect thousands (or more) data files in one checkpoint > > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > > collected thousands data files) as StateT. This allows us to absorb > > extended commit outages without losing written/uploaded data files, as > > operator state size is as small as one manifest file per checkpoint > > cycle > > > > You could have a point here. Is the code for this available in > > open-source? I was checking out > > > > > > > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > > > > and didn't find the ManifestFile optimization there. > > > > Best, > > Aljoscha > > > > > > >