On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar < mage...@confluent.io> wrote:
> Randall > > Thanks a lot for your thoughts. I was wondering if we would ever have to > make the API asynchronous, we could expose it as a new method right? If > that's a possibility would it be better if the API explicitly has semantics > of a synchronous API if the implementation is indeed going to be > synchronous. > Thanks, Magesh. I think it's likely that the implementation may need to be synchronous to some degree. For example, just to keep the implementation simple we might block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we might latch until the reporter has received all acks, especially if it simplifies the offset management and commit logic. Even if that's the case, having each `report(...)` call be asynchronous means that the sink task doesn't *have* to wait until each failed record has been recorded to continue sending valid records to the external system. Consider an example with 1000 records in a batch, where only the first record has an error. If `record(...)` were synchronous, the `put(...)` method would block reporting the first record and would then only send the 999 after that's happened. With an asynchronous `record(...)` method, the `put(...)` method could report the first record, send the 999 records, and then wait for the futures returned by the report method. > > On Sun, May 17, 2020, 9:27 AM Randall Hauch <rha...@gmail.com> wrote: > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar < > > mage...@confluent.io> wrote: > > > > > Thanks Randall. The suggestion i made also has a problem when reporter > > > isn't enabled where it could potentially write records after error > > records > > > to sink before failing. > > > > > > The other concern i had with reporter being asynchronous. For some > reason > > > if the reporter is taking longer because of say a specific broker > issue, > > > the connector might still move forward and commit if it's not waiting > for > > > the reporter. During this if the worker crashes we will now lose the > > bad > > > record > > > I don't think this is desirable behavior. I think the synchronous > > reporter > > > provides better guarantees for all connectors. > > > > > > > > Thanks, Magesh. > > > > That's a valid concern, and maybe that will affect how the feature is > > actually implemented. I expect it to be a bit tricky to ensure that > errant > > records are fully written to Kafka before the offsets are committed, so > it > > might be simplest to start out with a synchronous implementation. But the > > API can still be an asynchronous design whether or not the implementation > > is synchronous. That gives us the ability in the future to change the > > implementation if we determine a way to handle all concerns. For example, > > the WorkerSinkTask may need to backoff if waiting to commit due to too > many > > incomplete/unacknowledged reporter requests. OTOH, if we make the > `report` > > method(s) synchronous from the beginning, it will be very challenging to > > change them in the future to be asynchronous. > > > > I guess it boils down to this question: do we know today that we will > > *never* want the reporter to write asynchronously? > > > > Best regards, > > > > Randall > > >