> images don't make it through to the mailing lists. You would need to host
the file somewhere and send a link.

Sorry about that. Here is the sample DAG in google drawings.
https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing


On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Dawid
>
> >>I still find the merging case the most confusing. I don't necessarily
> understand why do you need the "SingleFileCommit" step in this scenario.
> The way I
> >> understand "commit" operation is that it makes some data/artifacts
> visible to the external system, thus it should be immutable from a point of
> view of a single >>process. Having an additional step in the same process
> that works on committed data contradicts with those assumptions. I might be
> missing something though. >> Could you elaborate >why can't it be something
> like FileWriter -> FileMergeWriter -> Committer (either global or
> non-global)? Again it might be just me not getting the example.
>
> I think you are right. The topology
> "FileWriter->FileMergeWriter->Committer" could meet the merge requirement.
> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
> GlobalCommitter" reuses some code of the StreamingFileSink(For example
> rolling policy) so it has the "SingleFileCommitter" in the topology. In
> general I want to use the case to show that there are different topologies
> according to the requirements.
>
> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the
> actual topology of merged supported HiveSink is more complicated than that.
>
>
> >> I've just briefly skimmed over the proposed interfaces. I would suggest
> one
> >> addition to the Writer interface (as I understand this is the runtime
> >> interface in this proposal?): add some availability method, to avoid, if
> >> possible, blocking calls on the sink. We already have similar
> >> availability methods in the new sources [1] and in various places in the
> >> network stack [2].
> >> BTW Let's not forget about Piotr's comment. I think we could add the
> isAvailable or similar method to the Writer interface in the FLIP.
>
> Thanks @Dawid Wysakowicz <dwysakow...@apache.org>  for your reminder.
> There
> are two many issues at the same time.
>
> In addition to what Ajjoscha said : there is very little system support
> it.   Another thing I worry about is that: Does the sink's snapshot return
> immediately when the sink's status is unavailable? Maybe we could do it by
> dedupe some element in the state but I think it might be too complicated.
> For me I want to know is what specific sink will benefit from this
> feature.  @piotr <pi...@ververica.com>  Please correct me if  I
> misunderstand you. thanks.
>
> Best,
> Guowei
>
>
> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
> > What I understand is that HiveSink's implementation might need the local
> > committer(FileCommitter) because the file rename is needed.
> > But the iceberg only needs to write the manifest file.  Would you like to
> > enlighten me why the Iceberg needs the local committer?
> > Thanks
> >
> > Sorry if I caused a confusion here. I am not saying the Iceberg sink
> needs
> > a local committer. What I had in mind is that prior to the Iceberg
> example
> > I did not see a need for a "GlobalCommitter" in the streaming case. I
> > thought it is always enough to have the "normal" committer in that case.
> > Now I understand that this differentiation is not really about logical
> > separation. It is not really about the granularity with which we commit,
> > i.e. answering the "WHAT" question. It is really about the performance
> and
> > that in the end we will have a single "transaction", so it is about
> > answering the question "HOW".
> >
> >
> >    -
> >
> >    Commit a directory with merged files(Some user want to merge the files
> >    in a directory before committing the directory to Hive meta store)
> >
> >
> >    1.
> >
> >    FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter
> >
> > I still find the merging case the most confusing. I don't necessarily
> > understand why do you need the "SingleFileCommit" step in this scenario.
> > The way I understand "commit" operation is that it makes some
> > data/artifacts visible to the external system, thus it should be
> immutable
> > from a point of view of a single process. Having an additional step in
> the
> > same process that works on committed data contradicts with those
> > assumptions. I might be missing something though. Could you elaborate why
> > can't it be something like FileWriter -> FileMergeWriter -> Committer
> > (either global or non-global)? Again it might be just me not getting the
> > example.
> >
> > I've just briefly skimmed over the proposed interfaces. I would suggest
> one
> > addition to the Writer interface (as I understand this is the runtime
> > interface in this proposal?): add some availability method, to avoid, if
> > possible, blocking calls on the sink. We already have similar
> > availability methods in the new sources [1] and in various places in the
> > network stack [2].
> >
> > BTW Let's not forget about Piotr's comment. I think we could add the
> > isAvailable or similar method to the Writer interface in the FLIP.
> >
> > Best,
> >
> > Dawid
> > On 15/09/2020 08:06, Guowei Ma wrote:
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > Generally speaking it is a good point that emitting the committables
> > should happen before emitting the checkpoint barrier downstream.
> > However, if I remember offline discussions well, the idea behind
> > Writer#flush and Writer#snapshotState was to differentiate commit on
> > checkpoint vs final checkpoint at the end of the job. Both of these
> > methods could emit committables, but the flush should not leave any in
> > progress state (e.g. in case of file sink in STREAM mode, in
> > snapshotState it could leave some open files that would be committed in
> > a subsequent cycle, however flush should close all files). The
> > snapshotState as it is now can not be called in
> > prepareSnapshotPreBarrier as it can store some state, which should
> > happen in Operator#snapshotState as otherwise it would always be
> > synchronous. Therefore I think we would need sth like:
> >
> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
> >
> > ver 1:
> >
> > List<StateT> snapshotState();
> >
> > ver 2:
> >
> > void snapshotState(); // not sure if we need that method at all in option
> >
> > 2
> >
> > I second Dawid's proposal. This is a valid scenario. And version2 does
> not
> > need the snapshotState() any more.
> >
> >
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> > I must admit it I did not get the need for Local/Normal + Global
> > committer at first. The Iceberg example helped a lot. I think it makes a
> > lot of sense.
> >
> > @Dawid
> > What I understand is that HiveSink's implementation might need the local
> > committer(FileCommitter) because the file rename is needed.
> > But the iceberg only needs to write the manifest file.  Would you like to
> > enlighten me why the Iceberg needs the local committer?
> > Thanks
> >
> > Best,
> > Guowei
> >
> >
> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
> dwysakow...@apache.org> <dwysakow...@apache.org>
> > wrote:
> >
> >
> > Hi all,
> >
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > Generally speaking it is a good point that emitting the committables
> > should happen before emitting the checkpoint barrier downstream.
> > However, if I remember offline discussions well, the idea behind
> > Writer#flush and Writer#snapshotState was to differentiate commit on
> > checkpoint vs final checkpoint at the end of the job. Both of these
> > methods could emit committables, but the flush should not leave any in
> > progress state (e.g. in case of file sink in STREAM mode, in
> > snapshotState it could leave some open files that would be committed in
> > a subsequent cycle, however flush should close all files). The
> > snapshotState as it is now can not be called in
> > prepareSnapshotPreBarrier as it can store some state, which should
> > happen in Operator#snapshotState as otherwise it would always be
> > synchronous. Therefore I think we would need sth like:
> >
> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
> >
> > ver 1:
> >
> > List<StateT> snapshotState();
> >
> > ver 2:
> >
> > void snapshotState(); // not sure if we need that method at all in
> option 2
> >
> >
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List<Committable>)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> >
> > I must admit it I did not get the need for Local/Normal + Global
> > committer at first. The Iceberg example helped a lot. I think it makes a
> > lot of sense.
> >
> >
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > ------------------
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---------------
> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > state);
> >
> > I think this might be a valid case. Not sure though if I would go with a
> > "state" there. Having a state in a committer would imply we need a
> > collect method as well. So far we needed a single method commit(...) and
> > the bookkeeping of the committables could be handled by the framework. I
> > think something like an optional combiner in the GlobalCommitter would
> > be enough. What do you think?
> >
> > GlobalCommitter<CommT, GlobalCommT> {
> >
> >     void commit(GlobalCommT globalCommittables);
> >
> >     GlobalCommT combine(List<CommT> committables);
> >
> > }
> >
> > A different problem that I see here is how do we handle commit failures.
> > Should the committables (both normal and global be included in the next
> > cycle, shall we retry it, ...) I think it would be worth laying it out
> > in the FLIP.
> >
> > @Aljoscha I think you can find the code Steven was referring in here:
> >
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
> >
> > Best,
> >
> > Dawid
> >
> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
> >
> > On 14.09.20 01:23, Steven Wu wrote:
> >
> > ## Writer interface
> >
> > For the Writer interface, should we add "*prepareSnapshot"* before the
> > checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> > would the framework call "*flush*" before the barrier emitted
> > downstream?
> > that guarantee would achieve the same goal.
> >
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
> >
> > @Guowei, what do you think about this?
> >
> >
> > In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> > tuple to the committer. The committer needs checkpointId to separate out
> > data files for different checkpoints if concurrent checkpoints are
> > enabled.
> >
> > When can this happen? Even with concurrent checkpoints the snapshot
> > barriers would still cleanly segregate the input stream of an operator
> > into tranches that should manifest in only one checkpoint. With
> > concurrent checkpoints, all that can happen is that we start a
> > checkpoint before a last one is confirmed completed.
> >
> > Unless there is some weirdness in the sources and some sources start
> > chk1 first and some other ones start chk2 first?
> >
> > @Piotrek, do you think this is a problem?
> >
> >
> > For the Committer interface, I am wondering if we should split the
> > single
> > commit method into separate "*collect"* and "*commit"* methods? This
> > way,
> > it can handle both single and multiple CommT objects.
> >
> > I think we can't do this. If the sink only needs a regular Commiter,
> > we can perform the commits in parallel, possibly on different
> > machines. Only when the sink needs a GlobalCommitter would we need to
> > ship all commits to a single process and perform the commit there. If
> > both methods were unified in one interface we couldn't make the
> > decision of were to commit in the framework code.
> >
> >
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a
> > single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint
> > cycle
> >
> > You could have a point here. Is the code for this available in
> > open-source? I was checking out
> >
> >
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> >
> > and didn't find the ManifestFile optimization there.
> >
> > Best,
> > Aljoscha
> >
> >
> >
>

Reply via email to