Hi,

I've just briefly skimmed over the proposed interfaces. I would suggest one
addition to the Writer interface (as I understand this is the runtime
interface in this proposal?): add some availability method, to avoid, if
possible, blocking calls on the sink. We already have similar
availability methods in the new sources [1] and in various places in the
network stack [2].

I'm aware that many implementations wouldn't be able to implement it, but
some may. For example `FlinkKafkaProducer` could
use `FlinkKafkaProducer#pendingRecords` to control `Writer`'s availability.
Also any sink that would be implementing records handover to some writer
thread could easily provide availability as well.

Non blocking calls are important for many things, for example they are
crucial for unaligned checkpoints to complete quickly.

Piotrek

[1] org.apache.flink.api.connector.source.SourceReader#isAvailable
[2] org.apache.flink.runtime.io.AvailabilityProvider

pon., 14 wrz 2020 o 01:23 Steven Wu <stevenz...@gmail.com> napisaƂ(a):

> Aljoscha, thanks a lot for the detailed response. Now I have a better
> understanding of the initial scope.
>
> To me, there are possibly three different committer behaviors. For the lack
> of better names, let's call them
> * No/NoopCommitter
> * Committer / LocalCommitter (file sink?)
> * GlobalCommitter (Iceberg)
>
> ## Writer interface
>
> For the Writer interface, should we add "*prepareSnapshot"* before the
> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> would the framework call "*flush*" before the barrier emitted downstream?
> that guarantee would achieve the same goal.
> -----------------
> // before barrier emitted to downstream
> void prepareSnapshot(long checkpointId) throws Exception;
>
> // or will flush be called automatically before the barrier emitted
> downstream?
> // if yes, we need the checkpointId arg for the reason listed in [1]
> void flush(WriterOutput<CommT> output) throws IOException;
>
> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> tuple to the committer. The committer needs checkpointId to separate out
> data files for different checkpoints if concurrent checkpoints are enabled.
> For that reason, writers need to know the checkpointId where the restore
> happened. Can we add a RestoreContext interface to the restoreWriter
> method?
> ---------------
> Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context,
> RestoreContext restoreContext, List<WriterS> state, List<SharedS> share);
>
> interface RestoreContext {
>   long getCheckpointId();
> }
>
>
> ## Committer interface
>
> For the Committer interface, I am wondering if we should split the single
> commit method into separate "*collect"* and "*commit"* methods? This way,
> it can handle both single and multiple CommT objects.
> ------------------
> void commit(CommT committable) throws Exception;
>       ==>
> void collect(CommT committable) throws Exception;
> void commit() throws Exception;
>
> As discussed in [1] and mentioned above, the Iceberg committer needs to
> know which checkpointId is the commit for. So can we add checkpiontId arg
> to the commit API. However, I don't know how this would affect the batch
> execution where checkpoints are disabled.
> ------------------
> void commit(long checkpointId) throws Exception;
>
> For Iceberg, writers don't need any state. But the GlobalCommitter needs to
> checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> committer can collect thousands (or more) data files in one checkpoint
> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> collected thousands data files) as StateT. This allows us to absorb
> extended commit outages without losing written/uploaded data files, as
> operator state size is as small as one manifest file per checkpoint cycle
> [2].
> ------------------
> StateT snapshotState(SnapshotContext context) throws Exception;
>
> That means we also need the restoreCommitter API in the Sink interface
> ---------------
> Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> state);
>
>
> Thanks,
> Steven
>
> [1] https://github.com/apache/iceberg/pull/1185#discussion_r479589663
> [2] https://github.com/apache/iceberg/pull/1185#discussion_r479457104
>
>
>
> On Fri, Sep 11, 2020 at 3:27 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Regarding the FLIP itself, I like the motivation section and the
> > What/How/When/Where section a lot!
> >
> > I don't understand why we need the "Drain and Snapshot" section. It
> > seems to be some details about stop-with-savepoint and drain, and the
> > relation to BATCH execution but I don't know if it is needed to
> > understand the rest of the document. I'm happy to be wrong here, though,
> > if there's good reasons for the section.
> >
> > On the question of Alternative 1 and 2, I have a strong preference for
> > Alternative 1 because we could avoid strong coupling to other modules.
> > With Alternative 2 we would depend on `flink-streaming-java` and even
> > `flink-runtime`. For the new source API (FLIP-27) we managed to keep the
> > dependencies slim and the code is in flink-core. I'd be very happy if we
> > can manage the same for the new sink API.
> >
> > Best,
> > Aljoscha
> >
> > On 11.09.20 12:02, Aljoscha Krettek wrote:
> > > Hi Everyone,
> > >
> > > thanks to Guowei for publishing the FLIP, and thanks Steven for the
> very
> > > thoughtful email!
> > >
> > > We thought a lot internally about some of the questions you posted but
> > > left a lot (almost all) of the implementation details out of the FLIP
> > > for now because we wanted to focus on semantics and API. I'll try and
> > > address the points below.
> > >
> > > ## Initial Scope of the new Sink API
> > >
> > > We need to accept some initial scope that we want to achieve for Flink
> > > 1.12. I don't think we can try and find the solution that will work for
> > > all current and future external systems. For me, the initial goal would
> > > be to produce a Sink API and implementations for systems where you can
> > > prepare "committables" in one process and commit those from another
> > > process. Those are systems that support "real" transactions as you need
> > > them in a two-phase commit protocol. This includes:
> > >
> > >   - File Sink, including HDFS, S3 via special part-file uploads
> > >   - Iceberg
> > >   - HDFS
> > >
> > > The work should include runtime support for both BATCH and STREAMING
> > > execution as outlined in https://s.apache.org/FLIP-134.
> > >
> > > Supporting Kafka already becomes difficult but I'll go into that below.
> > >
> > > ## Where to run the Committer
> > >
> > > Guowei hinted at this in the FLIP: the idea is that the Framework
> > > decides where to run the committer based on requirements and based on
> > > the execution mode (STREAMING or BATCH).
> > >
> > > Something that is not in the FLIP but which we thought about is that we
> > > need to allow different types of committers. I'm currently thinking we
> > > need at least a normal "Committer" and a "GlobalCommiter" (name TBD).
> > >
> > > The Committer is as described in the FLIP, it's basically a function
> > > "void commit(Committable)". The GobalCommitter would be a function
> "void
> > > commit(List<Committable>)". The former would be used by an S3 sink
> where
> > > we can individually commit files to S3, a committable would be the list
> > > of part uploads that will form the final file and the commit operation
> > > creates the metadata in S3. The latter would be used by something like
> > > Iceberg where the Committer needs a global view of all the commits to
> be
> > > efficient and not overwhelm the system.
> > >
> > > I don't know yet if sinks would only implement on type of commit
> > > function or potentially both at the same time, and maybe Commit can
> > > return some CommitResult that gets shipped to the GlobalCommit
> function.
> > >
> > > An interesting read on this topic is the discussion on
> > > https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop
> > > FileOutputCommitter and the two different available algorithms for
> > > committing final job/task results.
> > >
> > > These interfaces untie the sink implementation from the Runtime and we
> > > could, for example, have a runtime like this:
> > >
> > > ### BATCH
> > >
> > >   - Collect all committables and store them in a fault tolerant way
> > > until the job finishes
> > >   - For a normal Commit function, call it on the individual commits. We
> > > can potentially distribute this if it becomes a bottleneck
> > >   - For GlobalCommit function, call it will all the commits. This
> cannot
> > > be distributed
> > >
> > > We can collect the committables in an OperatorCoordinator or
> potentially
> > > somehow in a task. Though I prefer an OperatorCoordinator right now.
> The
> > > operator coordinator needs to keep the commits in a fault-tolerant way.
> > >
> > > ### STREAMING
> > >
> > >   - For normal Commit, keep the committables in state on the individual
> > > tasks, commit them when a checkpoint completes
> > >   - For global CommitFunction we have options: collect them in a DOP-1
> > > operator in the topology or send them to an OperatorCoordinator to do
> > > the commit there. This is where the source/sink duality that Steven
> > > mentions becomes visible.
> > >
> > > ## Kafka
> > >
> > > Kafka is a problematic case because it doesn't really support
> > > transactions as outlined above. Our current Sink implementations works
> > > around that with hacks but that only gets us so far.
> > >
> > > The problem with Kafka is that we need to aggressively clean up pending
> > > transactions in case a failure happens. Otherwise stale transactions
> > > would block downstream consumers. See here for details:
> > > http://kafka.apache.org/documentation/#isolation.level.
> > >
> > > The way we solve this in the current Kafka sink is by using a fixed
> pool
> > > of transactional IDs and then cancelling all outstanding transactions
> > > for the IDs when we restore from a savepoint. In order for this to work
> > > we need to recycle the IDs, so there needs to be a back-channel from
> the
> > > Committer to the Writter, or they need to share internal state.
> > >
> > > I don't get see a satisfying solution for this so I think we should
> > > exclude this from the initial version.
> > >
> > > ## On Write-Ahead-Log Sinks
> > >
> > > Some sinks, like ES or Cassandra would require that we keep a WAL in
> > > Flink and then ship the contents to the external system on checkpoint.
> > > The reason is that these systems don't support real transactions where
> > > you can prepare them in one process and commit them from another
> process.
> > >
> > > Best,
> > > Aljoscha
> > >
> > >
> > > On 11.09.20 02:23, Steven Wu wrote:
> > >> Guowei,
> > >>
> > >> Thanks a lot for the proposal and starting the discussion thread. Very
> > >> excited.
> > >>
> > >> For the big question of "Is the sink an operator or a topology?", I
> > >> have a
> > >> few related sub questions.
> > >> * Where should we run the committers?
> > >> * Is the committer parallel or single parallelism?
> > >> * Can a single choice satisfy all sinks?
> > >>
> > >> Trying to envision how some sinks can be implemented with this new
> > >> unified
> > >> sink interface.
> > >>
> > >> 1. Kafka sink
> > >>
> > >> Kafka supports non-transactional and transactional writes
> > >> * Non-transaction writes don't need commit action. we can have
> *parallel
> > >> writers and no/no-op committers*. This is probably true for other
> > >> non-transactional message queues.
> > >> * Transaction writes can be implemented as *parallel writers and
> > parallel
> > >> committers*. In this case, I don't know if it makes sense to separate
> > >> writers and committers into two separate operators, because they
> > probably
> > >> need to share the same KafkaProducer object.
> > >>
> > >> Either way, both writers and committers probably should *run inside
> task
> > >> managers*.
> > >>
> > >> 2. ES sink
> > >>
> > >> ES sink typically buffers the data up to a certain size or time
> > threshold
> > >> and then uploads/commits a batch to ES. Writers buffer data and flush
> > >> when
> > >> needed, and committer does the HTTP bulk upload to commit. To avoid
> > >> serialization/deserialization cost, we should run *parallel writers
> and
> > >> parallel committers* and they *should be* *chained or bundled
> together*
> > >> while *running inside task managers*.
> > >>
> > >> It can also be implemented as *parallel writers and no/no-op
> > committers*,
> > >> where all logics (batching and upload) are put inside the writers.
> > >>
> > >> 3. Iceberg [1] sink
> > >>
> > >> It is currently implemented as two-stage operators with *parallel
> > writers
> > >> and single-parallelism committers*.
> > >> * *parallel writers* that write records into data files. Upon
> > checkpoint,
> > >> writers flush and upload the files, and send the metadata/location of
> > the
> > >> data files to the downstream committer. Writers need to do the flush
> > >> inside
> > >> the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method)
> > >> before
> > >> forwarding the checkpoint barrier to the committer
> > >> * single-parallelism committer operator. It collects data files from
> > >> upstream writers. During "snapshotState", it saves collected data
> > >> files (or
> > >> an uber metadata file) into state. When the checkpoint is completed,
> > >> inside
> > >> "notifyCheckpointComplete" it commits those data files to Iceberg
> > >> tables. *The
> > >> committer has to be single parallelism*, because we don't want
> > >> hundreds or
> > >> thousands of parallel committers to compete for commit operations with
> > >> opportunistic concurrency control. It will be very inefficient and
> > >> probably
> > >> infeasible if the parallelism is high. Too many tiny
> > commits/transactions
> > >> can also slow down both the table write and read paths due to too many
> > >> manifest files.
> > >>
> > >> Right now, both Iceberg writer and committer operators run inside task
> > >> managers. It has one major drawback. With Iceberg sink, embarrassingly
> > >> parallel jobs won't be embarrassingly parallel anymore. That breaks
> the
> > >> benefit of region recovery for embarrassingly parallel DAG.
> > Conceptually,
> > >> the Writer-Committer sink pattern is like the mirroring of the FLIP-27
> > >> Enumerator-Reader source pattern. It will be better *if the committer
> > can
> > >> run inside the job manager* like the SplitEnumerator for the FLIP-27
> > >> source.
> > >>
> > >> -----------------------
> > >> Additional questions regarding the doc/API
> > >> * Any example for the writer shared state
> (Writer#snapshotSharedState)?
> > >> * We allow the case where the writer has no state, right? Meaning
> > WriterS
> > >> can be Void.
> > >>
> > >> [1] https://iceberg.apache.org/
> > >>
> > >> Thanks,
> > >> Steven
> > >>
> > >> On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <guowei....@gmail.com>
> wrote:
> > >>
> > >>> Hi, devs & users
> > >>>
> > >>> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in
> > >>> favor
> > >>> of DataStream API and Table API. Users should be able to use
> > >>> DataStream API
> > >>> to write jobs that support both bounded and unbounded execution
> modes.
> > >>> However, Flink does not provide a sink API to guarantee the
> > Exactly-once
> > >>> semantics in both bounded and unbounded scenarios, which blocks the
> > >>> unification.
> > >>>
> > >>> So we want to introduce a new unified sink API which could let the
> user
> > >>> develop the sink once and run it everywhere. You could find more
> > >>> details in
> > >>> FLIP-143[2].
> > >>>
> > >>> The FLIP contains some open questions that I'd really appreciate
> inputs
> > >>> from the community. Some of the open questions include:
> > >>>
> > >>>     1. We provide two alternative Sink API in the FLIP. The only
> > >>>     difference between the two versions is how to expose the state to
> > >>> the user.
> > >>>     We want to know which one is your preference?
> > >>>     2. How does the sink API support to write to the Hive?
> > >>>     3. Is the sink an operator or a topology?
> > >>>
> > >>> [1]
> > >>>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> > >>>
> > >>> [2]
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> > >>>
> > >>>
> > >>> Best,
> > >>> Guowei
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to