Hi all,

I'll try and summarize my thoughts after Guowei, Yun, Kostas, Dawid, and me had an offline discussion about this.
Also, I would like to give credit to Guowei for initially coming up with 
the idea of a topology sink in the context of this discussion. I think 
it's a good idea and we should pursue it in the future. And yes, Beam 
already does it like this but I hadn't thought about it now when 
thinking about the sink APIs because having a more limited API gives 
more freedom to the framework.
## Topology Sink vs. Transactional Sink

From the discussion, it seems clear to me that to support all kinds of different use cases we will have to offer some sort of API that allows Sink developers to specify mostly arbitrary operator topologies. I think, however, that we will not manage to finish such a (semi user-facing) API within the 1.12 release cycle with satisfactory results. Therefore, I would say that we need to go with a more straightforward TransactionalSink API (name TBD) that lets sink developers specify basic Writer, Committer, GlobalCommitter components as discussed above.
This Sink interface would initially support a FileSink that supports 
HDFS/S3 and behaves like the StreamingFileSink does for STREAM execution 
mode. Additionally, it would seamlessly work for BATCH execution mode. 
With the addition of a properly designed GlobalCommitter this should 
also work for Iceberg.
It seems to me that the Hive use cases are still to fuzzy and not well 
defined to allow us to come up with a good solution.
## Committer vs. GlobalCommitter or both

To make it short, we should make both optional but also allow both to be used by the same sink.
The Committer is the interface that should be preferred because it 
allows the framework to distribute the work of committing, i.e. it has 
more potential for being optimised.
Iceberg would use only a GlobalCommitter.

The FileSink would use only Committer but can optionally use a GlobalCommitter to create a _SUCCESS file in the output directory to emulate Hadoop to some degree. Writing such a _SUCCESS file would only work in BATCH execution mode and it would write a _SUCCESS file in the toplevel output directory. Writing _SUCCESS files in individual buckets or subdirectories whenever these buckets are considered "done" is a different problem, and one I don't think we can solve well right now.
Initially, I would propose these interfaces that have been floated by 
various people above:
interface Committer<CommittableT> {
  CommitResult commit(CommittableT);
}

interface GlobalCommitter<CommittableT, GlobalCommittableT> {
  GlobalCommittableT merge(List<CommittableT>);
  CommitResult commit(GlobalCommittableT);
}

enum CommitResult {
  SUCCESS, FAILURE, RETRY
}

Alternatively, we could consider changing commit() to take a List<> to allow the sink to better check if commits are already in the external system. For example, Iceberg would check for the whole batch whether they are already committed.
Also alternatively, we could change the GlobalCommitter to basically 
return an AggregateFunction instead of the simple merge() function.
What do you think?

Best,
Aljoscha

On 21.09.20 10:06, Piotr Nowojski wrote:
Hi Guowei,

I believe that we could support such an async sink writer
very easily in the future. What do you think?
How would you see the expansion in the future? Do you mean just adding
`isAvailable()` method with a default implementation later on?

Piotrek

pon., 21 wrz 2020 o 02:39 Steven Wu <stevenz...@gmail.com> napisał(a):

I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.


@Guowei Ma <guowei....@gmail.com>  It is undesirable to do the dedup check
in the `commit` call, because it happens for each checkpoint cycle. We only
need to do the de-dup check one time when restoring GlobalCommT list from
the checkpoint.


Can you clarify the purpose of `recoveredGlobalCommittables`? If it is to
let sink implementations know the recovered GlobalCommT list, it is
probably not a sufficient API. For the Iceberg sink, we can try to
implement the de-dup check  inside the `recoveredGlobalCommittables` method
and commit any uncommitted GlobalCommT items. But how do we handle the
commit failed?


One alternative is to allow sink implementations to override "Li
st<GlobalCommT> recoverGlobalCommittables()". Framework handles the
checkpoint/state, and sink implementations can further customize the
restored list with de-dup check and filtering. Recovered uncommitted
GlobalCommT list will be committed in the next cycle. It is the same
rollover strategy for commit failure handling that we have been discussing.


## topologies


Regarding the topology options, if we agree that there is no one size fit
for all, we can let sink implementations choose the best topology. Maybe
the framework can provide 2-3 pre-defined topology implementations to help
the sinks.




On Sun, Sep 20, 2020 at 3:27 AM Guowei Ma <guowei....@gmail.com> wrote:

I would like to summarize the file type sink in the thread and their
possible topologies.  I also try to give pros and cons of every topology
option. Correct me if I am wrong.

### FileSink

Topology Option: TmpFileWriter + Committer.

### IceBerg Sink

#### Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
Pro:
1. Same group has some id.
Cons:
1. May limit users’ optimization space;
2. The topology does not meet the Hive’s requirements.

#### Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
Pro:
1. User has the opportunity to optimize the implementation of idempotence
Cons:
2. Make the GlobalCommit more complicated.
3. The topology does not meets the Hive’s requirements

### Topology Option3: DataFileWriter + AggWriter + Committer

Pros:
1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s
requirements.
2. Opportunity to optimize the implementation of idempotence
3. The topology meets the Hive’s requirements.(See flowing)
Con:
1. It introduce a relative complex topologies

## HiveSink

### Topology Option1: `TmpFileWriter` + `Committer` +
`GlobalCommitterV2`.
Pro:
1. Could skip the cleanup problem at first.
Con:
1. This style topology does not meet the CompactHiveSink requirements.

### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
`Committer`
Pros
1. Could skip the clean up problem at first.
2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
Cons
1. This style topology does not meet the CompactHiveSink requirements.
2. There are two general `Committers` in the topology. For Hive’s case
there might be no problem. But there might be a problem in 1.12. For
example where to execute the sub-topology following the `Committer` in
batch execution mode for the general case. Because the topology is built
from `Writer` and `Committer` we might put all the sub-topology in the
OperatorCoordinator. But if the topology is too complicated it might be
very complicated. See following.

### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
Pro
1. There is only one general committer.
Cons
1. It has to consider the cleanup problem. (In theory both the Option1
and
Option2 need to cleanup)
2. This style topology does not meet the CompactHiveSink requirements.
3. Have to figure out how to make the current version compatible.

### CompactHiveSink/MergeHiveSink

#### Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator`
+ `MergeWriter` + `GlobalCommiterV2`
Pro
1. Could skip the clean up problem at first.
Cons
2. Where to execute the sub-topology following the `Committer`.

#### Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator`
+ `MergeWriter` + AggWriter + Committer
Pros
1. Could skip the clean up problem at first
2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
Con
1. Where to execute the sub-topology following the `Committer`.

### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg)
+ Committer
Pro
1. There is only one committer. It is very easy to support in the batch
execution mode.
Con
2. It has to consider the cleanup problem. (In theory both the Option1
and
Option2 need to cleanup)


### Summary

 From above we could divide the sink topology into two parts:
1. Write topology.
2. And One committer

So we could provide a unified sink API looks like the following:

public interface Sink<CommT> {
         List<Writer<?, ?>> getWriters();
         Committer<CommT> createCommitter()
}

In the long run maybe we could give the user more powerful ability like
this (Currently some transformation still belongs to runtime):
Sink<CommT> {
         Transformation<CommT> createWriteTopology();
          CommitFunction<CommT> createCommitter();
}

Best,
Guowei


On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma <guowei....@gmail.com> wrote:

Hi, Stevn
I want to make a clarification first, the following reply only considers
the Iceberge sink, but does not consider other sinks.  Before make
decision
we should consider all the sink.I would try to summary all the sink
requirments in the next mail


  run global committer in jobmanager (e.g. like sink coordinator)
I think it could be.


You meant GlobalCommit -> GlobalCommT, right?
Yes. Thanks :)


Is this called when restored from checkpoint/savepoint?
Yes.


Iceberg sink needs to do a dup check here on which GlobalCommT were
committed and which weren't. Should it return the filtered/de-duped
list of
GlobalCommT?


I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.


Sink implementation can decide if it wants to commit immediately or
just leave

I think only the frame knows *when* call the commit function.


should this be "commit(List<GlobalCommT>)"?
It could be. thanks.


Best,
Guowei


On Sun, Sep 20, 2020 at 12:11 AM Steven Wu <stevenz...@gmail.com>
wrote:
I prefer to let the developer produce id to dedupe. I think this
gives
the developer more opportunity to optimize.

Thinking about it again, I totally agree with Guowei on this. We don't
really need the framework to generate the unique id for Iceberg sink.
De-dup logic is totally internal to Iceberg sink and should be isolated
inside. My earlier question regarding
"commitGlobally(List<GlobalCommT>)
can be concurrent or not" also becomes irrelevant, as long as the
framework
handles the GlobalCommT list properly (even with concurrent calls).

Here are the things where framework can help

    1. run global committer in jobmanager (e.g. like sink coordinator)
    2. help with checkpointing, bookkeeping, commit failure handling,
    recovery


@Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter
interface, I have some clarifying questions.

void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
    1. You meant GlobalCommit -> GlobalCommT, right?
    2. Is this called when restored from checkpoint/savepoint?
    3.  Iceberg sink needs to do a dup check here on which GlobalCommT
    were committed and which weren't. Should it return the
filtered/de-duped
    list of GlobalCommT?
    4. Sink implementation can decide if it wants to commit immediately
    or just leave

void commit(GlobalCommit globalCommit);
should this be "commit(List<GlobalCommT>)"?

Thanks,
Steven


On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma <guowei....@gmail.com>
wrote:
Hi, all

Just to add to what Aljoscha said regarding the unique id. Iceberg
sink
checkpoints the unique id into state during snapshot. It also
inserts
the
unique id into the Iceberg snapshot metadata during commit. When a
job
restores the state after failure, it needs to know if the restored
transactions/commits were successful or not. It basically iterates
through
the list of table snapshots from Iceberg and matches the unique ids
with
what is stored in Iceberg snapshot metadata.
Thanks Steven for these detailed explanations. It makes me know the
IceBerg
better. However, I prefer to let the developer produce id to dedupe. I
think this gives the developer more opportunity to optimize. You could
see
the following for more details. Please correct me if I misunderstand
you.

3. Whether the `Writer` supports async functionality or not.
Currently I
do
not know which sink could benefit from it. Maybe it is just my own
problem.

Here, I don't really know. We can introduce an "isAvailable()"
method
and mostly ignore it for now and sinks can just always return true.
Or,
as an alternative, we don't add the method now but can add it later
with
a default implementation. Either way, we will probably not take
advantage of the "isAvailable()" now because that would require
more
runtime changes.
 From the @Pitor's explanation I could see the other benefit that might
be
gained in the future. For example decoupling the task number and the
thread
number. But I have to admit that introducing `isAvailable` might
introduce
some complications in the runtime. You could see my alternative API
option
in the following. I believe that we could support such an async sink
writer
very easily in the future. What do you think?

Yes, this is still tricky. What is the current state, would the
introduction of a "LocalCommit" and a "GlobalCommit" already solve
both
the Iceberg and Hive cases? I believe Hive is the most tricky one
here,
but if we introduce the "combine" method on GlobalCommit, that
could
serve the same purpose as the "aggregation operation" on the
individual
files, and we could even execute that "combine" in a distributed
way.
We assume that GlobalCommit is a Agg/Combiner?
I would share what possible problems that I am seeing currently and
the
alternative options.

## IceBerg Sink

### Concern about generating nonce by framework.

If let the `GlobalCommitter` provide a random nonce for the
`IceBergSink` I
think that it might not be efficient.  Because even if there are a
very
small number of committables in the state you still need to iterate
all
the
iceberg snapshot files to check whether the committable is committed
already. Even if it is efficient for the IceBergSink it might not be
the
case for other sinks.

If the framework generates auto-increment nonce instead, it might
still
not
be optimal for users. For example, users might want to use some
business id
so that after failover they could query whether the commit is
successful
after failover.

I think users could generate more efficient nonce such as an
auto-increment
one. Therefore, it seems to provide more optimization chances if we
let
users to generate the nonce.


### Alternative Option

public interface GlobalCommit<CommT, GlobalCommT> {
         // provide some runtime context such as
attempt-id,job-id,task-id.
         void open(InitContext context);

         // This GlobalCommit would aggregate the committable to a
GlobalCommit before doing the commit operation.
         GlobalCommT combine(List<Committable> commitables)

         // This method would be called after committing all the
GlobalCommit producing in the previous session.
         void recoveredGlobalCommittables(List<GlobalCommit>
globalCommits)

         // developer would guarantee the idempotency by himself
         void commit(GlobalCommit globalCommit);
}

User could guarantee the idenpointecy himself in a more efficient or
application specific way. If the user wants the `GlobalCommit` to be
executed in a distributed way, the user could use the runtime
information
to generate the partial order id himself.(We could ignore the clean up
first)

Currently the sink might be looks like following:

Sink<IN, LC, LCO, GC> {
         Writer<IN, LC> createWriter();
         Optional<Committer<LC, LCO>> createCommitter();
         Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter();
}

## Hive

The HiveSink needs to compute whether a directory is finished or not.
But
HiveSink can not use the above `combine` method to decide whether a
directory is finished or not.

For example we assume that whether the directory is finished or not is
decided by the event time. There might be a topology that the source
and
sink are forward. The event time might be different in different
instances
of the `writer`. So the GlobalCommit’s combine can not produce a
GlobalCommT when the snapshot happens.

In addition to the above case we should also consider the unaligned
checkpoint. Because the watermark does not skip. So there might be the
same
problem in the unaligned checkpoint.

### Option1:

public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> {
         // provide some runtime context such as
attempt-id,job-id,task-id,
maybe the event time;provide the restore state
         void open(InitContext context, StateT state);

         // This is for the HiveSink. When all the writer say that the
the
bucket is finished it would return a GlobalCommitT
         Optional<GlobalCommT> combine(Committable commitables)

         // This is for IcebergSink. Producing a GlobalCommitT every
checkpoint.
         Optional<GlobalCommT> preCommit();

         // Maybe we need the shareState? After we decide the directory
we
make more detailed consideration then. The id could be remembered
here.
         StateT snapshotState();

         // developer would guarantee the idempotency by himself
         void commit(GlobalCommit globalCommit);
}

### Option2

Actually the `GlobalCommit` in the option1 mixes the `Writer` and
`Committer` together. So it is intuitive to decouple the two
functions.
For
support the hive we could prove a sink look like following

Sink<In, LC, LCO, LCG> {
         Writer<In, LC> createWriter();
         Optional<Committer<LC, LCO>> createCommitter(); // we need
this
to
change name.
         Optional<Writer<LCO, LCG>> createGlobalAgg();
         Optional<Committer<LCG, void>> createGlobalCommitter();
}

The pro of this method is that we use two basic concepts: `Committer`
and
`Writer` to build a HiveSink.

### CompactHiveSink / MergeHiveSink

There are still other complicated cases, which are not satisfied by
the
above option. Users often complain about writing out many small files,
which will affect file reading efficiency and the performance and
stability
of the distributed file system. CompactHiveSink/MergeHiveSink hopes to
merge all files generated by this job in a single Checkpoint.

The CompactHiveSink/MergeHiveSink topology can simply describe this
topology as follows:

CompactSubTopology -> GlobalAgg -> GobalCommitter.

The CompactSubTopology would look like following:

TmpFileWriter -> CompactCoodinator -> CompactorFileWriter

Maybe the topology could be simpler but please keep in mind I just
want
to
show that there might be very complicated topology requirements for
users.


A possible alternative option would be let the user build the topology
himself. But considering we have two execution modes we could only use
`Writer` and `Committer` to build the sink topology.

### Build Topology Option

Sink<IN, OUT> {
         Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a
WriterBuidler
         Sink<In, Out> addCommitter(Committer<In, Out> committer); //
Maybe
we could make this return Void if we do not consider code reuse and
introduce the cleaner
}

## Summary
The requirements of sink might be different, maybe we could use two
basic
bricks(Writer/Committer) to let the user build their own sink
topology.
What do you guys think?

I know the name stuff might be trikky for now but I want to discuss
these
things after we get the consus on the direction first.

Best,
Guowei


On Sat, Sep 19, 2020 at 12:15 AM Steven Wu <stevenz...@gmail.com>
wrote:

Aljoscha,

Instead the sink would have to check for each set of committables
seperately if they had already been committed. Do you think this is
feasible?

Yes, that is how it works in our internal implementation [1]. We
don't use
checkpointId. We generate a manifest file (GlobalCommT) to bundle
all
the
data files that the committer received in one checkpoint cycle. Then
we
generate a unique manifest id for by hashing the location of the
manifest
file. The manifest ids are stored in Iceberg snapshot metadata. Upon
restore, we check each of the restored manifest files against
Iceberg
table
snapshot metadata to determine if we should discard or keep the
restored
manifest files. If a commit has multiple manifest files (e.g.
accumulated
from previous failed commits), we store the comma-separated manifest
ids in
Iceberg snapshot metadata.

During normal operation this set would be very small, it would
usually
only be the committables for the last checkpoint. Only when there is
an
outage would multiple sets of committables pile up.

You are absolutely right here. Even if there are multiple sets of
committables, it is usually the last a few or dozen of snapshots we
need to
check. Even with our current inefficient implementation of
traversing
all
table snapshots (in the scale of thousands) from oldest to latest,
it
only
took avg 60 ms and max 800 ms. so it is really not a concern for
Iceberg.
CommitStatus commitGlobally(List<Committable>, Nonce)
Just to clarify on the terminology here. Assuming here the
Committable
meant the `GlobalCommT` (like ManifestFile in Iceberg) in
previous discussions, right? `CommT` means the Iceberg DataFile from
writer
to committer.

This can work assuming we *don't have concurrent executions
of commitGlobally* even with concurrent checkpoints. Here is the
scenario
regarding failure recovery I want to avoid.

Assuming checkpoints 1, 2, 3 all completed. Each checkpoint
generates
a
manifest file, manifest-1, 2, 3.
timeline

------------------------------------------------------------------------->
now
commitGlobally(manifest-1, nonce-1) started
          commitGlobally(manifest-2, nonce-2) started
                     commitGlobally(manifest-2, nonce-2) failed
                             commitGlobally(manifest-2 and
manifest-3,
nonce-3) started
                                     commitGlobally(manifest-1,
nonce-1)
failed

  commitGlobally(manifest-2
and
manifest-3, nonce-3) succeeded

Now the job failed and was restored from checkpoint 3, which
contains
manifest file 1,2,3. We found nonce-3 was committed when checking
Iceberg
table snapshot metadata. But in this case we won't be able to
correctly
determine which manifest files were committed or not.

If it is possible to have concurrent executions of  commitGlobally,
the
alternative is to generate the unique id/nonce per GlobalCommT. Then
we can
check each individual GlobalCommT (ManifestFile) with Iceberg
snapshot
metadata.

Thanks,
Steven

[1]


https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569
On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <
aljos...@apache.org
wrote:

Steven,

we were also wondering if it is a strict requirement that "later"
updates to Iceberg subsume earlier updates. In the current
version,
you
only check whether checkpoint X made it to Iceberg and then
discard
all
committable state from Flink state for checkpoints smaller X.

If we go with a (somewhat random) nonce, this would not work.
Instead
the sink would have to check for each set of committables
seperately if
they had already been committed. Do you think this is feasible?
During
normal operation this set would be very small, it would usually
only be
the committables for the last checkpoint. Only when there is an
outage
would multiple sets of committables pile up.

We were thinking to extend the GlobalCommitter interface to allow
it to
report success or failure and then let the framework retry. I
think
this
is something that you would need for the Iceberg case. The
signature
could be like this:

CommitStatus commitGlobally(List<Committable>, Nonce)

where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE,
and
RETRY.

Best,
Aljoscha



Reply via email to