Hi Konstantine,

Given that the reporter interface is intentionally agnostic about how
records are handled and does not necessarily entail writes to a DLQ, I'm
also in favor of not specifying a return type for the reporting mechanism.

I'm still unclear on how futures are going to provide any benefit to
developers, though. Blocking on the return of such a future slightly later
on in the process of handling records is still blocking, and to be done
truly asynchronously without blocking processing of non-errant records,
would have to be done on a separate thread. It's technically possible for
users to cache all of these futures and instead of invoking "get" on them,
simply check whether they're complete or not via "isDone", but this seems
like an anti-pattern.

What is the benefit of wrapping this in a future?

With regards to Randall's comments about a callback, I'd like to make some
observations:

>  1. The `Callable` interface requires the sink task developer to handle
>   an error being passed to the callback, but in this case it's very
unlikely
>   the Connect runtime will ever call the callback with an error and will
>   instead handle it (e.g., retry forever, or fail the task, etc.). IOW,
the
>   `Callback` interface is appropriate for `Producer`, but it's far more
broad
>   than is needed here.

Since we're already introducing a new interface with the current proposal,
it seems like we might be able to implement our own interface for this
callback as well. One possibility is basically just a Consumer<SinkRecord>
that is given the errant sink record if/when that record has been
successfully reported by the framework. If the plan is still to fail the
task immediately if the framework fails to report an errant record provided
to it by a task, I don't think we need to have a mechanism to report such a
failure to the task by, e.g., throwing an exception, and can simply request
that the task shut down and stop giving it records.

>   2. Without the callback, the Connect runtime will not run task code
>   within the DLQ producer thread, and this is very safe. But when a
callback
>   is provided, that callback *will* be called on the DQL producer's
thread --
>   and any mistakes in the sink task's callback may block the DLQ. IOW,
having
>   a callback is too risky.

With the current DLQ implementation, each task is given its own producer,
which is unlikely to change soon given that per-connector producer
overrides are now possible thanks to KIP-458 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy).
So the fallout of any errors in this callback would be limited solely to
the sink task that provided the callback. There's some precedent for this
with how the framework invokes "SourceTask::commitRecord" directly in the
producer callback for source tasks.

>   3. We actually don't know that a callback is even necessary.

There's a decent use case for a callback here where a task uses it for
tracking offsets to report in "SinkTask::preCommit" without blocking on the
result of an errant record report. This might even be simpler than a
future-based approach, depending on how we anticipate developers would use
that approach.

>   4. Just having the one `report(SinkTask, Throwable)` is simpler and,
>   given the looming deadline, much closer to what we've already discussed.

Agreed that, no matter what, one asynchronous method is enough. One async
and one sync method might be reasonable if we really can't settle on a good
one-size-fits-all API but hopefully it won't come to that.

Cheers,

Chris

On Sun, May 17, 2020 at 12:06 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all,
>
> I'm on board with adding an interface in the Connect API as Arjun
> suggested. Slightly higher commitment and maintenance but it also gives us
> an easier path to future extensions in this scope (error handling). The
> usage is equivalent to adding just a new method with known types to
> `SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
> in the connector code, but in both suggestions this would fail with
> `NoSuchMethodError` on older workers).
>
> With respect to the method signature, I also agree with Randall's latest
> suggestion, of a two argument method such as:
>
> Future<Void> report(SinkTask, Throwable)
>
> Returning `Future<RecordMetadata>` can also be ok, but since this refers to
> the DLQ I'd slightly prefer to avoid exposing information that might
> confuse the users regarding what topic, partitions and offset this return
> value corresponds to. But both return types should be fine and will give
> plenty of flexibility to connector developers, making the sync use case
> straightforward. In any case, given the interface we can extend this in a
> compatible way in the future if we think we need to.
>
> Minor comments:
> Version will be 2.6 and not 2.9 (the latter was added by accident in a few
> places).
>
> Best,
> Konstantine
>
>
> On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > If that's the case, I think framework should not commit if there are any
> > outstanding records in teh reporter. That would prevent the scenario
> where
> > we could potentially lose records frm being sent either to Sink/the
> > reporter. WDYT about the KIP including that as part of the design?
> >
> > On Sun, May 17, 2020 at 11:13 AM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> > > mage...@confluent.io> wrote:
> > >
> > > > Randall
> > > >
> > > > Thanks a lot for your thoughts. I was wondering if we would ever have
> > to
> > > > make the API asynchronous, we could expose it as a new method right?
> If
> > > > that's a possibility would it be better if the API explicitly has
> > > semantics
> > > > of a synchronous API if the implementation is indeed going to be
> > > > synchronous.
> > > >
> > >
> > > Thanks, Magesh.
> > >
> > > I think it's likely that the implementation may need to be synchronous
> to
> > > some degree. For example, just to keep the implementation simple we
> might
> > > block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we
> > > might latch until the reporter has received all acks, especially if it
> > > simplifies the offset management and commit logic.
> > >
> > > Even if that's the case, having each `report(...)` call be asynchronous
> > > means that the sink task doesn't *have* to wait until each failed
> record
> > > has been recorded to continue sending valid records to the external
> > system.
> > > Consider an example with 1000 records in a batch, where only the first
> > > record has an error. If `record(...)` were synchronous, the `put(...)`
> > > method would block reporting the first record and would then only send
> > the
> > > 999 after that's happened. With an asynchronous `record(...)` method,
> the
> > > `put(...)` method could report the first record, send the 999 records,
> > and
> > > then wait for the futures returned by the report method.
> > >
> > >
> > > >
> > > > On Sun, May 17, 2020, 9:27 AM Randall Hauch <rha...@gmail.com>
> wrote:
> > > >
> > > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > > > mage...@confluent.io> wrote:
> > > > >
> > > > > > Thanks Randall. The suggestion i made also has a problem when
> > > reporter
> > > > > > isn't enabled where it could potentially write records after
> error
> > > > > records
> > > > > > to sink before failing.
> > > > > >
> > > > > > The other concern i had with reporter being asynchronous. For
> some
> > > > reason
> > > > > > if the reporter is taking longer because of say a specific broker
> > > > issue,
> > > > > > the connector might still move forward and commit if it's not
> > waiting
> > > > for
> > > > > > the reporter.  During  this if the worker crashes we will now
> lose
> > > the
> > > > > bad
> > > > > > record
> > > > > >  I don't think this is desirable behavior. I think the
> synchronous
> > > > > reporter
> > > > > > provides better guarantees for all connectors.
> > > > > >
> > > > > >
> > > > > Thanks, Magesh.
> > > > >
> > > > > That's a valid concern, and maybe that will affect how the feature
> is
> > > > > actually implemented. I expect it to be a bit tricky to ensure that
> > > > errant
> > > > > records are fully written to Kafka before the offsets are
> committed,
> > so
> > > > it
> > > > > might be simplest to start out with a synchronous implementation.
> But
> > > the
> > > > > API can still be an asynchronous design whether or not the
> > > implementation
> > > > > is synchronous. That gives us the ability in the future to change
> the
> > > > > implementation if we determine a way to handle all concerns. For
> > > example,
> > > > > the WorkerSinkTask may need to backoff if waiting to commit due to
> > too
> > > > many
> > > > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > > > `report`
> > > > > method(s) synchronous from the beginning, it will be very
> challenging
> > > to
> > > > > change them in the future to be asynchronous.
> > > > >
> > > > > I guess it boils down to this question: do we know today that we
> will
> > > > > *never* want the reporter to write asynchronously?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Randall
> > > > >
> > > >
> > >
> >
> >
> > --
> > Thanks
> > Magesh
> >
> > *Magesh Nandakumar*
> > Software Engineer
> > mage...@confluent.io
> >
>

Reply via email to