Hi, Steven
Thanks you for your thoughtful ideas and concerns.

>>I still like the concept of grouping data files per checkpoint for
streaming mode. it is cleaner and probably easier to manage and deal with
commit failures. Plus, it >>can reduce dupes for the at least once
>>mode.  I understand checkpoint is not an option for batch execution. We
don't have to expose the checkpointId in API, as >>long as  the internal
bookkeeping groups data files by checkpoints for streaming >>mode.

I think this problem(How to dedupe the combined committed data) also
depends on where to place the agg/combine logic .

1. If the agg/combine takes place in the “commit” maybe we need to figure
out how to give the aggregated committable a unique and auto-increment id
in the committer.
2. If the agg/combine takes place in a separate operator maybe sink
developer could maintain the id itself by using the state.

I think this problem is also decided by what the topology pattern the sink
API should support. Actually there are already many other topology
requirements. :)

Best,
Guowei


On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com> wrote:

> > AFAIK the committer would not see the file-1-2 when ck1 happens in the
> ExactlyOnce mode.
>
> @Guowei Ma <guowei....@gmail.com> I think you are right for exactly once
> checkpoint semantics. what about "at least once"? I guess we can argue that
> it is fine to commit file-1-2 for at least once mode.
>
> I still like the concept of grouping data files per checkpoint for
> streaming mode. it is cleaner and probably easier to manage and deal with
> commit failures. Plus, it can reduce dupes for the at least once mode.  I
> understand checkpoint is not an option for batch execution. We don't have
> to expose the checkpointId in API, as long as  the internal bookkeeping
> groups data files by checkpoints for streaming mode.
>
>
> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>> > images don't make it through to the mailing lists. You would need to
>> host the file somewhere and send a link.
>>
>> Sorry about that. Here is the sample DAG in google drawings.
>>
>> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
>>
>>
>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi, Dawid
>>>
>>> >>I still find the merging case the most confusing. I don't necessarily
>>> understand why do you need the "SingleFileCommit" step in this scenario.
>>> The way I
>>> >> understand "commit" operation is that it makes some data/artifacts
>>> visible to the external system, thus it should be immutable from a point
>>> of
>>> view of a single >>process. Having an additional step in the same process
>>> that works on committed data contradicts with those assumptions. I might
>>> be
>>> missing something though. >> Could you elaborate >why can't it be
>>> something
>>> like FileWriter -> FileMergeWriter -> Committer (either global or
>>> non-global)? Again it might be just me not getting the example.
>>>
>>> I think you are right. The topology
>>> "FileWriter->FileMergeWriter->Committer" could meet the merge
>>> requirement.
>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
>>> GlobalCommitter" reuses some code of the StreamingFileSink(For example
>>> rolling policy) so it has the "SingleFileCommitter" in the topology. In
>>> general I want to use the case to show that there are different
>>> topologies
>>> according to the requirements.
>>>
>>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the
>>> actual topology of merged supported HiveSink is more complicated than
>>> that.
>>>
>>>
>>> >> I've just briefly skimmed over the proposed interfaces. I would
>>> suggest
>>> one
>>> >> addition to the Writer interface (as I understand this is the runtime
>>> >> interface in this proposal?): add some availability method, to avoid,
>>> if
>>> >> possible, blocking calls on the sink. We already have similar
>>> >> availability methods in the new sources [1] and in various places in
>>> the
>>> >> network stack [2].
>>> >> BTW Let's not forget about Piotr's comment. I think we could add the
>>> isAvailable or similar method to the Writer interface in the FLIP.
>>>
>>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org>  for your reminder.
>>> There
>>> are two many issues at the same time.
>>>
>>> In addition to what Ajjoscha said : there is very little system support
>>> it.   Another thing I worry about is that: Does the sink's snapshot
>>> return
>>> immediately when the sink's status is unavailable? Maybe we could do it
>>> by
>>> dedupe some element in the state but I think it might be too complicated.
>>> For me I want to know is what specific sink will benefit from this
>>> feature.  @piotr <pi...@ververica.com>  Please correct me if  I
>>> misunderstand you. thanks.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org
>>> >
>>> wrote:
>>>
>>> > What I understand is that HiveSink's implementation might need the
>>> local
>>> > committer(FileCommitter) because the file rename is needed.
>>> > But the iceberg only needs to write the manifest file.  Would you like
>>> to
>>> > enlighten me why the Iceberg needs the local committer?
>>> > Thanks
>>> >
>>> > Sorry if I caused a confusion here. I am not saying the Iceberg sink
>>> needs
>>> > a local committer. What I had in mind is that prior to the Iceberg
>>> example
>>> > I did not see a need for a "GlobalCommitter" in the streaming case. I
>>> > thought it is always enough to have the "normal" committer in that
>>> case.
>>> > Now I understand that this differentiation is not really about logical
>>> > separation. It is not really about the granularity with which we
>>> commit,
>>> > i.e. answering the "WHAT" question. It is really about the performance
>>> and
>>> > that in the end we will have a single "transaction", so it is about
>>> > answering the question "HOW".
>>> >
>>> >
>>> >    -
>>> >
>>> >    Commit a directory with merged files(Some user want to merge the
>>> files
>>> >    in a directory before committing the directory to Hive meta store)
>>> >
>>> >
>>> >    1.
>>> >
>>> >    FileWriter -> SingleFileCommit -> FileMergeWriter  ->
>>> GlobalCommitter
>>> >
>>> > I still find the merging case the most confusing. I don't necessarily
>>> > understand why do you need the "SingleFileCommit" step in this
>>> scenario.
>>> > The way I understand "commit" operation is that it makes some
>>> > data/artifacts visible to the external system, thus it should be
>>> immutable
>>> > from a point of view of a single process. Having an additional step in
>>> the
>>> > same process that works on committed data contradicts with those
>>> > assumptions. I might be missing something though. Could you elaborate
>>> why
>>> > can't it be something like FileWriter -> FileMergeWriter -> Committer
>>> > (either global or non-global)? Again it might be just me not getting
>>> the
>>> > example.
>>> >
>>> > I've just briefly skimmed over the proposed interfaces. I would
>>> suggest one
>>> > addition to the Writer interface (as I understand this is the runtime
>>> > interface in this proposal?): add some availability method, to avoid,
>>> if
>>> > possible, blocking calls on the sink. We already have similar
>>> > availability methods in the new sources [1] and in various places in
>>> the
>>> > network stack [2].
>>> >
>>> > BTW Let's not forget about Piotr's comment. I think we could add the
>>> > isAvailable or similar method to the Writer interface in the FLIP.
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>> > On 15/09/2020 08:06, Guowei Ma wrote:
>>> >
>>> > I would think that we only need flush() and the semantics are that it
>>> > prepares for a commit, so on a physical level it would be called from
>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>>> > think flush() should be renamed to something like "prepareCommit()".
>>> >
>>> > Generally speaking it is a good point that emitting the committables
>>> > should happen before emitting the checkpoint barrier downstream.
>>> > However, if I remember offline discussions well, the idea behind
>>> > Writer#flush and Writer#snapshotState was to differentiate commit on
>>> > checkpoint vs final checkpoint at the end of the job. Both of these
>>> > methods could emit committables, but the flush should not leave any in
>>> > progress state (e.g. in case of file sink in STREAM mode, in
>>> > snapshotState it could leave some open files that would be committed in
>>> > a subsequent cycle, however flush should close all files). The
>>> > snapshotState as it is now can not be called in
>>> > prepareSnapshotPreBarrier as it can store some state, which should
>>> > happen in Operator#snapshotState as otherwise it would always be
>>> > synchronous. Therefore I think we would need sth like:
>>> >
>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>>> >
>>> > ver 1:
>>> >
>>> > List<StateT> snapshotState();
>>> >
>>> > ver 2:
>>> >
>>> > void snapshotState(); // not sure if we need that method at all in
>>> option
>>> >
>>> > 2
>>> >
>>> > I second Dawid's proposal. This is a valid scenario. And version2 does
>>> not
>>> > need the snapshotState() any more.
>>> >
>>> >
>>> > The Committer is as described in the FLIP, it's basically a function
>>> > "void commit(Committable)". The GobalCommitter would be a function
>>> "void
>>> > commit(List<Committable>)". The former would be used by an S3 sink
>>> where
>>> > we can individually commit files to S3, a committable would be the list
>>> > of part uploads that will form the final file and the commit operation
>>> > creates the metadata in S3. The latter would be used by something like
>>> > Iceberg where the Committer needs a global view of all the commits to
>>> be
>>> > efficient and not overwhelm the system.
>>> >
>>> > I don't know yet if sinks would only implement on type of commit
>>> > function or potentially both at the same time, and maybe Commit can
>>> > return some CommitResult that gets shipped to the GlobalCommit
>>> function.
>>> > I must admit it I did not get the need for Local/Normal + Global
>>> > committer at first. The Iceberg example helped a lot. I think it makes
>>> a
>>> > lot of sense.
>>> >
>>> > @Dawid
>>> > What I understand is that HiveSink's implementation might need the
>>> local
>>> > committer(FileCommitter) because the file rename is needed.
>>> > But the iceberg only needs to write the manifest file.  Would you like
>>> to
>>> > enlighten me why the Iceberg needs the local committer?
>>> > Thanks
>>> >
>>> > Best,
>>> > Guowei
>>> >
>>> >
>>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
>>> dwysakow...@apache.org> <dwysakow...@apache.org>
>>> > wrote:
>>> >
>>> >
>>> > Hi all,
>>> >
>>> >
>>> > I would think that we only need flush() and the semantics are that it
>>> > prepares for a commit, so on a physical level it would be called from
>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>>> > think flush() should be renamed to something like "prepareCommit()".
>>> >
>>> > Generally speaking it is a good point that emitting the committables
>>> > should happen before emitting the checkpoint barrier downstream.
>>> > However, if I remember offline discussions well, the idea behind
>>> > Writer#flush and Writer#snapshotState was to differentiate commit on
>>> > checkpoint vs final checkpoint at the end of the job. Both of these
>>> > methods could emit committables, but the flush should not leave any in
>>> > progress state (e.g. in case of file sink in STREAM mode, in
>>> > snapshotState it could leave some open files that would be committed in
>>> > a subsequent cycle, however flush should close all files). The
>>> > snapshotState as it is now can not be called in
>>> > prepareSnapshotPreBarrier as it can store some state, which should
>>> > happen in Operator#snapshotState as otherwise it would always be
>>> > synchronous. Therefore I think we would need sth like:
>>> >
>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>>> >
>>> > ver 1:
>>> >
>>> > List<StateT> snapshotState();
>>> >
>>> > ver 2:
>>> >
>>> > void snapshotState(); // not sure if we need that method at all in
>>> option 2
>>> >
>>> >
>>> > The Committer is as described in the FLIP, it's basically a function
>>> > "void commit(Committable)". The GobalCommitter would be a function
>>> "void
>>> > commit(List<Committable>)". The former would be used by an S3 sink
>>> where
>>> > we can individually commit files to S3, a committable would be the list
>>> > of part uploads that will form the final file and the commit operation
>>> > creates the metadata in S3. The latter would be used by something like
>>> > Iceberg where the Committer needs a global view of all the commits to
>>> be
>>> > efficient and not overwhelm the system.
>>> >
>>> > I don't know yet if sinks would only implement on type of commit
>>> > function or potentially both at the same time, and maybe Commit can
>>> > return some CommitResult that gets shipped to the GlobalCommit
>>> function.
>>> >
>>> > I must admit it I did not get the need for Local/Normal + Global
>>> > committer at first. The Iceberg example helped a lot. I think it makes
>>> a
>>> > lot of sense.
>>> >
>>> >
>>> > For Iceberg, writers don't need any state. But the GlobalCommitter
>>> > needs to
>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a
>>> single
>>> > committer can collect thousands (or more) data files in one checkpoint
>>> > cycle, as an optimization we checkpoint a single "ManifestFile" (for
>>> the
>>> > collected thousands data files) as StateT. This allows us to absorb
>>> > extended commit outages without losing written/uploaded data files, as
>>> > operator state size is as small as one manifest file per checkpoint
>>> cycle
>>> > [2].
>>> > ------------------
>>> > StateT snapshotState(SnapshotContext context) throws Exception;
>>> >
>>> > That means we also need the restoreCommitter API in the Sink interface
>>> > ---------------
>>> > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
>>> > state);
>>> >
>>> > I think this might be a valid case. Not sure though if I would go with
>>> a
>>> > "state" there. Having a state in a committer would imply we need a
>>> > collect method as well. So far we needed a single method commit(...)
>>> and
>>> > the bookkeeping of the committables could be handled by the framework.
>>> I
>>> > think something like an optional combiner in the GlobalCommitter would
>>> > be enough. What do you think?
>>> >
>>> > GlobalCommitter<CommT, GlobalCommT> {
>>> >
>>> >     void commit(GlobalCommT globalCommittables);
>>> >
>>> >     GlobalCommT combine(List<CommT> committables);
>>> >
>>> > }
>>> >
>>> > A different problem that I see here is how do we handle commit
>>> failures.
>>> > Should the committables (both normal and global be included in the next
>>> > cycle, shall we retry it, ...) I think it would be worth laying it out
>>> > in the FLIP.
>>> >
>>> > @Aljoscha I think you can find the code Steven was referring in here:
>>> >
>>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>> >
>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
>>> >
>>> > On 14.09.20 01:23, Steven Wu wrote:
>>> >
>>> > ## Writer interface
>>> >
>>> > For the Writer interface, should we add "*prepareSnapshot"* before the
>>> > checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
>>> > would the framework call "*flush*" before the barrier emitted
>>> > downstream?
>>> > that guarantee would achieve the same goal.
>>> >
>>> > I would think that we only need flush() and the semantics are that it
>>> > prepares for a commit, so on a physical level it would be called from
>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>>> > think flush() should be renamed to something like "prepareCommit()".
>>> >
>>> > @Guowei, what do you think about this?
>>> >
>>> >
>>> > In [1], we discussed the reason for Writer to emit (checkpointId,
>>> CommT)
>>> > tuple to the committer. The committer needs checkpointId to separate
>>> out
>>> > data files for different checkpoints if concurrent checkpoints are
>>> > enabled.
>>> >
>>> > When can this happen? Even with concurrent checkpoints the snapshot
>>> > barriers would still cleanly segregate the input stream of an operator
>>> > into tranches that should manifest in only one checkpoint. With
>>> > concurrent checkpoints, all that can happen is that we start a
>>> > checkpoint before a last one is confirmed completed.
>>> >
>>> > Unless there is some weirdness in the sources and some sources start
>>> > chk1 first and some other ones start chk2 first?
>>> >
>>> > @Piotrek, do you think this is a problem?
>>> >
>>> >
>>> > For the Committer interface, I am wondering if we should split the
>>> > single
>>> > commit method into separate "*collect"* and "*commit"* methods? This
>>> > way,
>>> > it can handle both single and multiple CommT objects.
>>> >
>>> > I think we can't do this. If the sink only needs a regular Commiter,
>>> > we can perform the commits in parallel, possibly on different
>>> > machines. Only when the sink needs a GlobalCommitter would we need to
>>> > ship all commits to a single process and perform the commit there. If
>>> > both methods were unified in one interface we couldn't make the
>>> > decision of were to commit in the framework code.
>>> >
>>> >
>>> > For Iceberg, writers don't need any state. But the GlobalCommitter
>>> > needs to
>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a
>>> > single
>>> > committer can collect thousands (or more) data files in one checkpoint
>>> > cycle, as an optimization we checkpoint a single "ManifestFile" (for
>>> the
>>> > collected thousands data files) as StateT. This allows us to absorb
>>> > extended commit outages without losing written/uploaded data files, as
>>> > operator state size is as small as one manifest file per checkpoint
>>> > cycle
>>> >
>>> > You could have a point here. Is the code for this available in
>>> > open-source? I was checking out
>>> >
>>> >
>>> >
>>> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
>>> >
>>> > and didn't find the ManifestFile optimization there.
>>> >
>>> > Best,
>>> > Aljoscha
>>> >
>>> >
>>> >
>>>
>>

Reply via email to