On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
mage...@confluent.io> wrote:

> Randall
>
> Thanks a lot for your thoughts. I was wondering if we would ever have to
> make the API asynchronous, we could expose it as a new method right? If
> that's a possibility would it be better if the API explicitly has semantics
> of a synchronous API if the implementation is indeed going to be
> synchronous.
>

Thanks, Magesh.

I think it's likely that the implementation may need to be synchronous to
some degree. For example, just to keep the implementation simple we might
block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we
might latch until the reporter has received all acks, especially if it
simplifies the offset management and commit logic.

Even if that's the case, having each `report(...)` call be asynchronous
means that the sink task doesn't *have* to wait until each failed record
has been recorded to continue sending valid records to the external system.
Consider an example with 1000 records in a batch, where only the first
record has an error. If `record(...)` were synchronous, the `put(...)`
method would block reporting the first record and would then only send the
999 after that's happened. With an asynchronous `record(...)` method, the
`put(...)` method could report the first record, send the 999 records, and
then wait for the futures returned by the report method.


>
> On Sun, May 17, 2020, 9:27 AM Randall Hauch <rha...@gmail.com> wrote:
>
> > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > mage...@confluent.io> wrote:
> >
> > > Thanks Randall. The suggestion i made also has a problem when reporter
> > > isn't enabled where it could potentially write records after error
> > records
> > > to sink before failing.
> > >
> > > The other concern i had with reporter being asynchronous. For some
> reason
> > > if the reporter is taking longer because of say a specific broker
> issue,
> > > the connector might still move forward and commit if it's not waiting
> for
> > > the reporter.  During  this if the worker crashes we will now lose the
> > bad
> > > record
> > >  I don't think this is desirable behavior. I think the synchronous
> > reporter
> > > provides better guarantees for all connectors.
> > >
> > >
> > Thanks, Magesh.
> >
> > That's a valid concern, and maybe that will affect how the feature is
> > actually implemented. I expect it to be a bit tricky to ensure that
> errant
> > records are fully written to Kafka before the offsets are committed, so
> it
> > might be simplest to start out with a synchronous implementation. But the
> > API can still be an asynchronous design whether or not the implementation
> > is synchronous. That gives us the ability in the future to change the
> > implementation if we determine a way to handle all concerns. For example,
> > the WorkerSinkTask may need to backoff if waiting to commit due to too
> many
> > incomplete/unacknowledged reporter requests. OTOH, if we make the
> `report`
> > method(s) synchronous from the beginning, it will be very challenging to
> > change them in the future to be asynchronous.
> >
> > I guess it boils down to this question: do we know today that we will
> > *never* want the reporter to write asynchronously?
> >
> > Best regards,
> >
> > Randall
> >
>

Reply via email to