Hi, Chris, Aakash, and others:

First of all, apologies for the extremely long email. Secondly, thanks for
the input on this KIP. The timing is unfortunately, but I do believe we're
agreed on most points.

Chris asked earlier:

> I'm still unclear on how futures are going to provide any benefit to
> developers, though.


Let me rephrase this, because I think it's obvious that futures are just a
way for the sink task to know, if needed, *when* the reporter has
successfully recorded the report of a bad record:

What is the benefit of an asynchronous `report(...)`, and is it worth the
additional complexity?


I would agree that this complexity is not worthwhile *if* every sink task
implementation were expected to call `get()` on the future right after it's
called. But I hope to show that the asynchronous API as described by
KIP-610 (as of the time I sent this email) is valuable, simple to use,
flexible, and not onerous for many sink task implementations, because many
sink task implementations will not *have* to use the future -- as long as
we add one *additional* guarantee to the current KIP (which I'll define
later on).

To show this, I'd like to walk through an example of what happens when a
sink task runs. I'm going to describe a scenario that involves multiple
`put(...)` followed by a single commit of offsets, since this is the
pattern the WorkerSinkTask uses, and because the "commit" part is essential
to the guarantee I believe we need to make. Specifically, consider one task
for a sink connector consuming a single topic partition, and the calls made
by WorkerSinkTask:

1. put(...) with records with offsets 1-1000
2. put(...) with records with offsets 1001-2000
3. put(...) with records with offsets 2001-3000
4. put(...) with records with offsets 3001-4000
5. preCommit(...) with offset 4000

Now, let's say that the sink task calls `report(...)` on six records at
offsets 10, 11, 1010, 1011, 2010, and 2011, and let's look at what happens
when `report(...)` is synchronous and asynchronous. For simplicity, we're
going to assume that records are sent to the external system in batches of
100.

Let's consider the synchronous `report(...)` case first. The task basically
goes through the following sequence:
a) processes records 1-9
b) report record 10, blocking until record 10 is written to the DLQ topic
c) report record 11, blocking until record 11 is written to the DLQ topic
d) process records 12-102
e) send batch of 100 records (offsets 1-102, minus 10 and 11) to external
system
f) process records 103-202
g) send batch of 100 records (offsets 203-202) to external system
...
s) send batch of 100 records (offsets 3007-4000) to external system
t) respond to preCommit(4000) by returning the same offset

Note that steps b, c, and similar steps for the other four problematic
records must wait until the one record is written to the DLQ before
continuing. In this case, this means that the first batch of 100 records is
sent to the external system only after the two records have been
successfully written to the DLQ. The *synchronous* `report(...)` method
results in an increased lag in the records appearing in the external system
relative to when they written to the topic consumed by the sink task. In
fact, every record passed to `report(...)` will cause some amount of
additional lag in delivering records to the external system. The lag is
even higher as the `report(...)` method takes longer, such as if/when the
DLQ producer is slower (e.g., due to network issues, retries, etc.).

Now let's look at the case when `report(...)` is asynchronous, and the task
does not call `get()` on the future. Now, the task basically goes through
the following sequence:
a) processes records 1-9
b) report record 10 and do NOT block until record 10 is written to the DLQ
topic
c) report record 11 and do NOT block until record 10 is written to the DLQ
topic
d) process records 12-102
e) send batch of 100 records (offsets 1-102, minus 10 and 11) to external
system
f) process records 103-202
g) send batch of 100 records (offsets 203-202) to external system
...
s) send batch of 100 records (offsets 3007-4000) to external system
t) respond to preCommit(4000) by returning the same offset

Note that steps b, c, and similar steps for the other four problematic
records do NOT wait until records are written to the DLQ before continuing.
This means that the first batch of 100 records is sent to the external
system whether or not the two records have been successfully written to the
DLQ. The *asynchronous* `report(...)` method allows the sink task to
continue processing subsequent records without delay, and the net lag is
lower than the synchronous case. Plus, minor to moderate delays in
reporting do not necessarily impact the sink task operation.

This is an example of a sink task choosing to not block on the future
returned from `report(...)`. Of course, sink tasks *can* use the future if
they desire -- for example, maybe the sink task does block on the future
before returning from each `put(...)` simply because of the guarantees it
wants to provide. Maybe the sink task is managing offsets, and it wants to
write offsets in the external system only after all error records in some
sequence are fully processed.

The bottom line is that an asynchronous `report(...)` method has
significant advantages, is still easy to use, and when necessary allows
sink task implementations to track when the errors have been "fully
reported", all while not constraining/limiting sink task implementations
that don't need those guarantees.

However, here's the additional concern I mentioned at the outset of my
email. Connect should not commit offsets for a topic partition only after
the error reporter has "fully processed" all submitted records with that or
earlier offsets. For the sink task developer, this means the framework must
guarantee this happens before `preCommit(...)` is called. I think we fully
describe this guarantee by adding the following to the KIP (perhaps in a
new "Guarantees" section):

"The Connect framework also guarantees that by the time `preCommit(...)` is
called on the task, the error reporter will have successfully and fully
recorded all reported records with offsets at or before those passed to the
preCommit method. Sink task implementations that need more strict
guarantees can use the futures returned by `report(...)` to wait for
confirmation that reported records have been successfully recorded."


IMO this gives sink task developers pretty clear guidance: sink tasks need
only worry about the futures returned from `report(...)` if they require
more strict guarantees that the errors have been fully reported. This still
allows sink tasks to return different offsets from `preCommit(...)`. And
any sink connectors that rely upon the framework committing consumer
offsets based upon when records were fully-processed by the task will
likely not have to use futures at all.

To put in terms I used at the outset of this (way too long) email: the
asynchronous API

   - is valuable because it allows sink tasks to continue doing work
   without having to wait for the reporter, and that only during the "commit"
   phase do we need to potentially wait for the reporter
   - is simple because for many sink tasks they only need to call
   `report(...)` and will not need to even worry about the future;
   - is flexible because any task that needs stricter guarantees can use
   the future to block on the reporter, including at some later point in time
   after the `report(...)` method is called; and
   - is not onerous because using the future is a common pattern and simple
   blocking, if needed, is trivial.

It is true that when the last record on the last put before a commit is
reported as an error, the framework may have to wait. But this is no worse
and actually likely better than making the `report(...)` method synchronous.

I hope this helps.

Best regards,

Randall

On Mon, May 18, 2020 at 4:44 PM Aakash Shah <as...@confluent.io> wrote:

> Hi Chris,
>
> I agree with your point.
>
> Randall, Konstantine, do you guys mind weighing in on any benefit of adding
> asynchronous functionality using a Future in the KIP right now? It seems to
> me that it only provides user control on when the thread will be blocked,
> and if we are going to process all the futures at once in a batch at the
> end, why not support batch processing in a future KIP, since it is not too
> difficult now that we are adding an interface. I'm not sure I see any gain
> beyond some user control that could increase throughput - but at the same
> time, as I mentioned before, I don't think throughput is a factor we need
> to consider much with error reporting. We don't really need or necessarily
> want a higher throughput on error reporting, as ideally, there should not
> be a high volume of errant records.
>
> Thanks,
> Aakash
>
> On Mon, May 18, 2020 at 1:22 PM Chris Egerton <chr...@confluent.io> wrote:
>
> > Hi Aakash,
> >
> > Yep, that's pretty much it. I'd also like to emphasize that we should be
> > identifying practical use cases for whatever API we provide. Giving
> > developers a future that can be made synchronous with little effort seems
> > flexible, but if that's all that developers are going to do with it
> anyway,
> > why make it a future at all? We should have some idea of how people would
> > use a future that doesn't just hinge on them blocking on it immediately,
> > and isn't more easily-addressed by a different API (such as one with
> batch
> > reporting).
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, May 18, 2020 at 1:17 PM Aakash Shah <as...@confluent.io> wrote:
> >
> > > Hi all,
> > >
> > > Chris, I see your points about whether Futures provide much benefit at
> > all
> > > as they are not truly fully asynchronous.
> > >
> > > Correct me if I am wrong, but I think what you are trying to point out
> is
> > > that if we have the option to add additional functionality later (in a
> > > simpler way too since we are introducing a new interface), we should
> > > provide functionality that we know will provide value immediately and
> not
> > > cause any developer/user burden.
> > >
> > > In that case, I think the main area we have to come to a consensus on
> is
> > -
> > > how much control do we want to provide to the developer/user in this
> KIP
> > > considering that we can add the functionality relatively easily later?
> > >
> > > Randall, Konstantine, what do you think about adding it later vs now?
> > >
> > > Thanks,
> > > Aakash
> > >
> > > On Mon, May 18, 2020 at 12:45 PM Chris Egerton <chr...@confluent.io>
> > > wrote:
> > >
> > > > Hi Aakash,
> > > >
> > > > I asked this earlier about whether futures were the right way to go,
> if
> > > we
> > > > wanted to enable asynchronous behavior at all:
> > > >
> > > > > I'm still unclear on how futures are going to provide any benefit
> to
> > > > developers, though. Blocking on the return of such a future slightly
> > > later
> > > > on in the process of handling records is still blocking, and to be
> done
> > > > truly asynchronously without blocking processing of non-errant
> records,
> > > > would have to be done on a separate thread. It's technically possible
> > for
> > > > users to cache all of these futures and instead of invoking "get" on
> > > them,
> > > > simply check whether they're complete or not via "isDone", but this
> > seems
> > > > like an anti-pattern.
> > > >
> > > > > What is the benefit of wrapping this in a future?
> > > >
> > > > As far as I can tell, there hasn't been a practical example yet where
> > the
> > > > flexibility provided by a future would actually be beneficial in
> > writing
> > > a
> > > > connector. It'd be great if we could find one. One possible use case
> > > might
> > > > be processing records received in "SinkTask::put" without having to
> > block
> > > > for each errant record report before sending non-errant records to
> the
> > > > sink. However, this could also be addressed by allowing for batch
> > > reporting
> > > > of errant records instead of accepting a single record at a time; the
> > > task
> > > > would track errant records as it processes them in "put" and report
> > them
> > > > all en-masse after all non-errant records have been processed.
> > > >
> > > > With regards to the precedent of using futures for asynchronous
> APIs, I
> > > > think we should make sure that whatever API we decide on is actually
> > > useful
> > > > for the cases it serves. There's plenty of precedent for
> callback-based
> > > > asynchronous APIs in Kafka with both "Producer::send" and
> > > > "Consumer::commitAsync"; the question here shouldn't be about what's
> > done
> > > > in different APIs, but what would work for this one in particular.
> > > >
> > > > Finally, it's also been brought up that if we're going to introduce a
> > new
> > > > error reporter interface, we can always modify that interface later
> on
> > to
> > > > go from asynchronous to synchronous behavior, or vice-versa, or even
> to
> > > add
> > > > a callback- or future-based variant that didn't exist before. We have
> > > > plenty of room to maneuver in the future here, so the pressure to get
> > > > everything right the first time and provide maximum flexibility
> doesn't
> > > > seem as pressing, and the goal of minimizing the kind of API that we
> > have
> > > > to support for future versions without making unnecessary additions
> is
> > > > easier to achieve.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > >
> > > >
> > > > On Mon, May 18, 2020 at 12:20 PM Aakash Shah <as...@confluent.io>
> > wrote:
> > > >
> > > > > Hi Arjun,
> > > > >
> > > > > Thanks for your feedback.
> > > > >
> > > > > I agree with moving to Future<Void>, those are good points.
> > > > >
> > > > > I believe an earlier point made for asynchronous functionality were
> > > that
> > > > > modern APIs tend to be asynchronous as they result in more
> expressive
> > > and
> > > > > better defined APIs.
> > > > > Additionally, because a lot of Kafka Connect functionality is
> already
> > > > > asynchronous, I am inclined to believe that customers will want an
> > > > > asynchronous solution for this as well. And if is relatively simple
> > to
> > > > > block with future.get() to make it synchronous, would you not say
> > that
> > > > > having an opt-in synchronous functionality rather than synchronous
> > only
> > > > > functionality allows for customer control while maintaining that
> not
> > > too
> > > > > much burden of implementation is placed on the customer?
> > > > > WDYT?
> > > > >
> > > > > Thanks,
> > > > > Aakash
> > > > >
> > > > > On Sun, May 17, 2020 at 2:51 PM Arjun Satish <
> arjun.sat...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Thanks for all the feedback, folks.
> > > > > >
> > > > > > re: having a callback as a parameter, I agree that at this point,
> > it
> > > > > might
> > > > > > not add much value to the proposal.
> > > > > >
> > > > > > re: synchronous vs asynchronous, is the motivation
> > performance/higher
> > > > > > throughput? Taking a step back, calling report(..) in the new
> > > interface
> > > > > > does a couple of things:
> > > > > >
> > > > > > 1. at a fundamental level, it is a signal to the framework that a
> > > > failure
> > > > > > occurred when processing records, specifically due to the given
> > > record.
> > > > > > 2. depending on whether errors.log and errors.deadletterqueue has
> > > been
> > > > > set,
> > > > > > some messages are written to zero or more destinations.
> > > > > > 3. depending on the value of errors.tolerance (none or all), the
> > task
> > > > is
> > > > > > failed after reporters have completed.
> > > > > >
> > > > > > for kip-610, the asynchronous method has the advantage of working
> > > with
> > > > > the
> > > > > > internal dead letter queue (which has been transparent to the
> > > developer
> > > > > so
> > > > > > far). but, how does async method help if the DLQ is not enabled?
> in
> > > > this
> > > > > > case RecordMetadata is not very useful, AFAICT? also, if we add
> > more
> > > > > error
> > > > > > reporters in the future (say, for example, a new reporter in a
> > future
> > > > > that
> > > > > > writes to a RDBMS), would the async version return success on all
> > or
> > > > > > nothing, and what about partial successes?
> > > > > >
> > > > > > overall, if we really need async behavior, I'd prefer to just use
> > > > > > Future<Void>. but if we can keep it simple, then let's go with a
> > > > > > synchronous function with the parameters Randall proposed above
> > (with
> > > > > > return type as void, and if any of the reporters fail, the task
> is
> > > > failed
> > > > > > if error.tolerance is none, and kept alive if tolerance is all),
> > and
> > > > > maybe
> > > > > > add asynchronous methods in a future KIP?
> > > > > >
> > > > > > Best,
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to