Hi Chris,

Thanks for the suggestions.

If "errors.tolerance=none", should it not be the case that the error
reporter does not even report any error; rather, the task just fails after
throwing the error? I do understand the point you are saying about
duplicates, though.

You raise a good point about "offset.flush.interval.ms" and I think we
should respect that. I will add this constraint to the KIP. Please let me
know if this extra constraint adds any other issues I am not aware of.

Lastly, why do you say we should always provide an errant record reporter?
Doesn't that change the contract of what functionality it is providing?

Thanks,
Aakash


On Tue, May 19, 2020 at 11:15 AM Chris Egerton <chr...@confluent.io> wrote:

> Hi Randall,
>
> First off, thank you for the incredibly detailed example. I don't mind
> walls of text. I found it very helpful. I especially liked the idea about
> modifying how the framework invokes "SinkTask::preCommit" to take most of
> the work out of developers' hands in the common case of a "fire-and-forget"
> but still provide flexibility to accommodate connectors with, for example,
> exactly-once delivery guarantees that involve committing offsets to the
> sink atomically with the actual records that they've received from Kafka.
>
> I have one point I'd like to raise about the stated advantage of an
> asynchronous API: that tasks can continue processing records and sending
> them to the sink destination without having to block on the completion of
> the error report.
>
> Wouldn't this actually be a disadvantage in the case that the user has
> configured the connector with "errors.tolerance = none"? In that case, the
> expectation is that the task should fail as soon as it hits a bad record;
> allowing it to possibly continue to produce records in that case (which
> would likely end up as duplicates in the sink if/when the task is
> restarted) doesn't seem optimal.
>
> I don't think that this makes an asynchronous API completely unusable; I
> just think that we'd want to synchronously throw some kind of exception
> when the error reporter is invoked and the connector is configured with
> "errors.tolerance = none", instead of causing one to be thrown wrapped in
> an ExecutionException if/when "Future::get" is called on the returned
> future.
>
> I'd also like to suggest a slight change to the logic for invoking
> "SinkTask::preCommit". The interval at which offsets are committed for sink
> tasks is configurable via the worker-level "offset.flush.interval.ms"
> property; I think it'd be nice to respect that property if we could. What
> would you think about calling "SinkTask::preCommit" at the normally
> scheduled times, but altering the offsets that are passed in to that call
> to not go beyond any offsets for errant records that have been reported but
> not fully processed yet?
>
> For example, imagine a task has been given records with offsets 0-10 on a
> single topic partition and reports records with offsets 2 and 7 to the
> framework. Then, the framework is able to process the record with offset 2
> but not the record with offset 7. When it comes time for an offset commit,
> the framework will call "SinkTask::preCommit" with an offset of 6 for that
> topic partition, since the record for offset 7 has not been completely
> taken care of yet.
>
> One more small suggestion: we may want to always provide an errant record
> reporter to connectors, even if one has not been configured. This reporter
> would simply fail the task and throw an exception as soon as it's invoked.
> This would provide a more uniform experience for users across different
> connectors and would establish expectations that, if a connector uses the
> features added by KIP-610 at all, it will fail by default on any invalid
> records (instead of doing something implementation-dependent).
>
> Cheers,
>
> Chris
>
> On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com>
> wrote:
>
> > One more concern with the connector blocking on the Future's get() is
> that
> > it may cause the task's consumer to fail to heartbeat (since there is no
> > independent thread to do this). That would then cause failures when we
> > eventually try to consume more records after returning from put(). The
> > developer would need to be cognizant of these bits before waiting on the
> > future, which adds a reasonable amount of complexity.
> >
> > Even with preCommit() returning incomplete offsets, I suppose the concern
> > would be that the put() method keeps giving the task more records, and to
> > truly pause the "firehose", the task needs to pause all partitions?
> >
> >
> > On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com>
> > wrote:
> >
> > > Can we get a couple of examples that shows utility of waiting on the
> > > Future<>? Also, in preCommit() we would report back on the incomplete
> > > offsets. So that feedback mechanism will already exists for developers
> > who
> > > want to manually manage this.
> > >
> > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com>
> wrote:
> > >
> > >> Thanks, Aakash, for updating the KIP.
> > >>
> > >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish <arjun.sat...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Randall,
> > >> >
> > >> > Thanks for the explanation! Excellent point about guaranteeing
> offsets
> > >> in
> > >> > the async case.
> > >> >
> > >> > If we can guarantee that the offsets will be advanced only after the
> > bad
> > >> > records are reported, then is there any value is the Future<> return
> > >> type?
> > >> > I feel we can declare the function with a void return type:
> > >> >
> > >> > void report(SinkRecord failedRecord, Throwable error)
> > >> >
> > >> > that works asynchronously, and advances offsets only after the DLQ
> > >> producer
> > >> > (and other reporters) complete successfully (as you explained).
> > >> >
> > >> > This actually alleviates my concern of what this Future<> actually
> > >> means.
> > >> > Since a failure to report should kill the tasks, there is no reason
> > for
> > >> the
> > >> > connector to ever wait on the get().
> > >>
> > >>
> > >> We should not say "there is no reason", because we don't know all of
> the
> > >> requirements that might exist for sending records to external systems.
> > The
> > >> additional guarantee regarding error records being fully recorded
> before
> > >> `preCommit(...)` is called is a minimal guarantee that Connect
> provides
> > >> the
> > >> sink task, and returning a Future allow a sink task to have *stronger*
> > >> guarantees than what Connect provides by default.
> > >>
> > >> Once again:
> > >> 1. we need an async API to allow the sink task to report problem
> records
> > >> and then immediately continue doing more work.
> > >> 2. Connect should guarantee to the sink task that all reported records
> > >> will
> > >> actually be recorded before `preCommit(...)` is called
> > >> 3. a sink task *might* need stronger guarantees, and may need to block
> > on
> > >> the reported records some time before `preCommit(...)`, and we should
> > >> allow
> > >> them to do this.
> > >> 4. Future and callbacks are common techniques, but there are
> significant
> > >> runtime risks of using callbacks, whereas Future is a common/standard
> > >> pattern that is straightforward to use.
> > >>
> > >> This *exactly* matches the current KIP, which is why I plan to vote
> for
> > >> this valuable and well-thought out KIP.
> > >>
> > >>
> > >> > And if we are guaranteeing that the
> > >> > offsets are only advanced when the errors are reported, then this
> > >> becomes a
> > >> > double win:
> > >> >
> > >> > 1. connector developers can literally fire and forget failed
> records.
> > >> > 2. offsets are correctly advanced on errors being reported. Failure
> to
> > >> > report error will kill the task, and the last committed offset will
> be
> > >> the
> > >> > correct one.
> > >>
> > >>
> > >> > The main contract would simply be to call report() before
> preCommit()
> > or
> > >> > before put() returns in the task, so the framework knows that that
> > there
> > >> > are error records reported, and those need to finish before the
> > offsets
> > >> can
> > >> > be advanced.
> > >> >
> > >> > I think I'd be pretty excited about this API. and if we all agree,
> > then
> > >> > let's go ahead with this?
> > >>
> > >>
> > >> > Best,
> > >> >
> > >> >
> > >> >
> > >>
> > >
> >
>

Reply via email to