Hello all,

I'd actually like to retract the earlier additions to the KIP and point out
that since I've started the voting process and gotten some responses, I
will not be making any major changes to the KIP as it would require a
re-voting process.

Thanks,
Aakash

On Tue, May 19, 2020 at 2:31 PM Aakash Shah <as...@confluent.io> wrote:

> Hi Chris and others,
>
> Yes, you are correct; I looked through KIP-298 to understand it better. I
> agree with your idea to handle "errors.tolerance=none."
>
> I see, you are basically saying you are in favor of standardizing handling
> what to set the reporter to if it is not configured. I am on board with
> this proposal, especially if this is in line with previous behaviors as you
> mentioned.
>
> I will add both of these suggestions to the KIP.
>
> Lastly, unless anyone has any issues with Chris's suggestions, I believe
> the last part we have to come to a consensus is using a Future as the
> return type. I am for giving extra guarantees to the user if they wish;
> however, I am not very familiar with the potential issues with the consumer
> heartbeat as Arjun pointed out. Does anyone have any thoughts on this?
>
> Thanks,
> Aakash
>
> On Tue, May 19, 2020 at 2:10 PM Chris Egerton <chr...@confluent.io> wrote:
>
>> Hi Aakash,
>>
>> > If "errors.tolerance=none", should it not be the case that the error
>> reporter does not even report any error; rather, the task just fails after
>> throwing the error? I do understand the point you are saying about
>> duplicates, though.
>>
>> I believe the "errors.tolerance" property dictates whether a task should
>> fail after a record that causes problems during conversion or
>> transformation is encountered and reported (for example, by writing to a
>> DLQ). If it is set to "none", then the task will fail immediately; if it
>> is
>> set to "all", then the task will continue running normally. So if we want
>> to preserve that behavior, we might want to immediately throw an exception
>> when an errant record is reported by a "SinkTask" instance and the user
>> has
>> configured "errors.tolerance = none", which unless caught will cause the
>> task to cease writing records to the sink. In addition to throwing that
>> exception, we should also still fail the task; the exception is just a way
>> to (hopefully) interrupt the task's processing of records in order to
>> prevent duplicates if/when the task is restarted later on.
>>
>> > Lastly, why do you say we should always provide an errant record
>> reporter?
>> Doesn't that change the contract of what functionality it is providing?
>>
>> I'm just thinking that instead of returning "null" when no errant record
>> reporter is configured, we could return one that always fails the task and
>> throws an exception. This seems in line with the default behavior of the
>> framework when no error handling configuration properties are specified
>> and
>> a record causes problems during conversion or transformation. We could
>> leave the choice in the hands of developers but this might make things
>> confusing for users who get different behavior from different connectors
>> under the same circumstances.
>>
>> Hope this helps!
>>
>> Cheers,
>>
>> Chris
>>
>> On Tue, May 19, 2020 at 1:50 PM Aakash Shah <as...@confluent.io> wrote:
>>
>> > Hi Arjun,
>> >
>> > I am not very familiar with how the potential heartbeat failure would
>> cause
>> > more failures when consuming subsequent records. Can you elaborate on
>> this?
>> >
>> > Thanks,
>> > Aakash
>> >
>> > On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com>
>> > wrote:
>> >
>> > > One more concern with the connector blocking on the Future's get() is
>> > that
>> > > it may cause the task's consumer to fail to heartbeat (since there is
>> no
>> > > independent thread to do this). That would then cause failures when we
>> > > eventually try to consume more records after returning from put(). The
>> > > developer would need to be cognizant of these bits before waiting on
>> the
>> > > future, which adds a reasonable amount of complexity.
>> > >
>> > > Even with preCommit() returning incomplete offsets, I suppose the
>> concern
>> > > would be that the put() method keeps giving the task more records,
>> and to
>> > > truly pause the "firehose", the task needs to pause all partitions?
>> > >
>> > >
>> > > On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com>
>> > > wrote:
>> > >
>> > > > Can we get a couple of examples that shows utility of waiting on the
>> > > > Future<>? Also, in preCommit() we would report back on the
>> incomplete
>> > > > offsets. So that feedback mechanism will already exists for
>> developers
>> > > who
>> > > > want to manually manage this.
>> > > >
>> > > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com>
>> > wrote:
>> > > >
>> > > >> Thanks, Aakash, for updating the KIP.
>> > > >>
>> > > >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish <
>> arjun.sat...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >> > Hi Randall,
>> > > >> >
>> > > >> > Thanks for the explanation! Excellent point about guaranteeing
>> > offsets
>> > > >> in
>> > > >> > the async case.
>> > > >> >
>> > > >> > If we can guarantee that the offsets will be advanced only after
>> the
>> > > bad
>> > > >> > records are reported, then is there any value is the Future<>
>> return
>> > > >> type?
>> > > >> > I feel we can declare the function with a void return type:
>> > > >> >
>> > > >> > void report(SinkRecord failedRecord, Throwable error)
>> > > >> >
>> > > >> > that works asynchronously, and advances offsets only after the
>> DLQ
>> > > >> producer
>> > > >> > (and other reporters) complete successfully (as you explained).
>> > > >> >
>> > > >> > This actually alleviates my concern of what this Future<>
>> actually
>> > > >> means.
>> > > >> > Since a failure to report should kill the tasks, there is no
>> reason
>> > > for
>> > > >> the
>> > > >> > connector to ever wait on the get().
>> > > >>
>> > > >>
>> > > >> We should not say "there is no reason", because we don't know all
>> of
>> > the
>> > > >> requirements that might exist for sending records to external
>> systems.
>> > > The
>> > > >> additional guarantee regarding error records being fully recorded
>> > before
>> > > >> `preCommit(...)` is called is a minimal guarantee that Connect
>> > provides
>> > > >> the
>> > > >> sink task, and returning a Future allow a sink task to have
>> *stronger*
>> > > >> guarantees than what Connect provides by default.
>> > > >>
>> > > >> Once again:
>> > > >> 1. we need an async API to allow the sink task to report problem
>> > records
>> > > >> and then immediately continue doing more work.
>> > > >> 2. Connect should guarantee to the sink task that all reported
>> records
>> > > >> will
>> > > >> actually be recorded before `preCommit(...)` is called
>> > > >> 3. a sink task *might* need stronger guarantees, and may need to
>> block
>> > > on
>> > > >> the reported records some time before `preCommit(...)`, and we
>> should
>> > > >> allow
>> > > >> them to do this.
>> > > >> 4. Future and callbacks are common techniques, but there are
>> > significant
>> > > >> runtime risks of using callbacks, whereas Future is a
>> common/standard
>> > > >> pattern that is straightforward to use.
>> > > >>
>> > > >> This *exactly* matches the current KIP, which is why I plan to vote
>> > for
>> > > >> this valuable and well-thought out KIP.
>> > > >>
>> > > >>
>> > > >> > And if we are guaranteeing that the
>> > > >> > offsets are only advanced when the errors are reported, then this
>> > > >> becomes a
>> > > >> > double win:
>> > > >> >
>> > > >> > 1. connector developers can literally fire and forget failed
>> > records.
>> > > >> > 2. offsets are correctly advanced on errors being reported.
>> Failure
>> > to
>> > > >> > report error will kill the task, and the last committed offset
>> will
>> > be
>> > > >> the
>> > > >> > correct one.
>> > > >>
>> > > >>
>> > > >> > The main contract would simply be to call report() before
>> > preCommit()
>> > > or
>> > > >> > before put() returns in the task, so the framework knows that
>> that
>> > > there
>> > > >> > are error records reported, and those need to finish before the
>> > > offsets
>> > > >> can
>> > > >> > be advanced.
>> > > >> >
>> > > >> > I think I'd be pretty excited about this API. and if we all
>> agree,
>> > > then
>> > > >> > let's go ahead with this?
>> > > >>
>> > > >>
>> > > >> > Best,
>> > > >> >
>> > > >> >
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>

Reply via email to