Thanks for KIP Aakash.
This proposal will address a significant gap in error handling for sink
connectors, so I'd also like to see it implemented.

Interesting discussion so far and I agree with a lot of what's been said.
First of all I agree with Andrew. When I first read the KIP I also felt
this method belongs to the SinkTaskContext. Aesthetics are important on
core APIs and having yet another method to perform some sort of
initialization does not seem very appealing. It will be the third for this
class. At the same time, compatibility is also important and we'd like to
see Connect developers adopting this functionality as soon as possible with
the least amount of effort. So, I'm on board with that too.

Then I liked Randall's suggestion on method overloading and I also agree
with Chris that it'd be good to avoid overloading a method that is invoked
periodically for the lifetime of a task.

So in an effort to combine all of the above, I see that we have remaining a
couple of options.
Basically we have the methods:

public void initialize(SinkTaskContext context)
and
public abstract void start(Map<String, String> props)

to overload.

Overloading `initialize` with `public void initialize(SinkTaskContext
context, BiFunction<...> failedRecordReporter)`
makes sense because it keeps together the context and the reporter in a
loose association. There's the risk of shadowing `context` which is a
protected member field of the abstract class SinkTask, but this should be
easy to catch during testing and, maybe, it makes sense to assume that if
you care about error handling of records you know what you are doing. This
method is there to just set the context, so setting the reporter as well
keeps it simple and aligns with what it currently does.

Then, there's the option to overload `start` as:
`public void start(Map<String, String> props, BiFunction<...>
failedRecordReporter)`. The resemblance of the arguments is not as high as
with `initialize` but could be ok. I'm more concerned about the deprecation
path here, because this method can't be abstract, so eventually deprecating
`SinkTask#start(props)` would be tricky. `initialize` won't have this issue
and eventually we can deprecate `SinkTask#initialize(context)`.

With all that in mind, I'd like us to consider adding:
`public void initialize(SinkTaskContext context, BiFunction<...>
failedRecordReporter)`

Now, regarding asynchrony what would you think about returning a Future
from the reporter's method. I have it as BiFunction above, because I'm
thinking that defining from the beginning the reporter to be of type:
BiFunction<SinkRecord, Throwable, Future<Void>> would give us better
prospects for this API. I notice Randall also used BiFunction in his
example, I wonder if it's for similar reasons. The producer is already
asynchronous under the covers and I haven't thought about all the details
but its callback could possibly update the future that we return to the
user. If users don't want to bother with asynchronous execution, they can
always call `get()` on the Future immediately. But having this API could
allow some to also do more sophisticated processing per batch and overlap
communication if their use case allows it.

Finally, you might have noticed a naming suggestion above as well. I find
failedRecordReporter a bit more clear.

These were some quick thoughts inspired by everyone's comments so far. Let
me know what you think.

Konstantine

On Thu, May 14, 2020 at 6:07 PM Aakash Shah <as...@confluent.io> wrote:

> Hi Randall,
>
> Thanks for the feedback.
>
> 1. This is a great suggestion, but I find that adding an overloaded
> put(...) which essentially deprecates the old put(...) to only be used when
> a connector is deployed on older versions of Connect adds enough of a
> complication that could cause connectors to break if the old put(...)
> doesn't correctly invoke the overloaded put(...); either that, or it will
> add duplication of functionality across the two put(...) methods. I think
> the older method simplifies things with the idea that a DLQ/error reporter
> will or will not be passed into the method depending on the version of AK.
> However, I also understand the aesthetic advantage of this method vs the
> setter method, so I am okay with going in this direction if others agree
> with adding the overloaded put(...).
>
> 2. Yes, your assumption is correct. Yes, we can remove the "Order of
> Operations" if we go with the overloaded put(...) direction.
>
> 3. Great point, I will remove them from the KIP.
>
> 4. Yeah, accept(...) will be synchronous. I will change it to be clearer,
> thanks.
>
> 5. This KIP will use existing metrics as well introduce new metrics. I will
> update this section to fully specify the metrics.
>
> Please let me know what you think.
>
> Thanks,
> Aakash
>
> On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com> wrote:
>
> > Hi, Aakash.
> >
> > Thanks for the KIP. Connect does need an improved ability for sink
> > connectors to report individual records as being problematic, and this
> > integrates nicely with the existing DLQ feature.
> >
> > I also appreciate the desire to maintain compatibility so that connectors
> > can take advantage of this feature when deployed in a runtime that
> supports
> > this feature, but can safely and easily do without the feature when
> > deployed to an older runtime. But I do understand Andrew's concern about
> > the aesthetics. Have you considered overloading the `put(...)` method and
> > adding the `reporter` as a second parameter? Essentially it would add the
> > one method (with proper JavaDoc) to `SinkTask` only:
> >
> > ```
> >     public void put(Collection<SinkRecord> records,
> BiFunction<SinkRecord,
> > Throwable> reporter) {
> >         put(records);
> >     }
> > ```
> > and the WorkerSinkTask would be changed to call `put(Collection,
> > BiFunction)` instead.
> >
> > Sink connector implementations that don't do anything different can still
> > override `put(Collection)`, and it still works as before. Developers that
> > want to change their sink connector implementations to support this new
> > feature would do the following, which would work in older and newer
> Connect
> > runtimes:
> > ```
> >     public void put(Collection<SinkRecord> records) {
> >         put(records, null);
> >     }
> >     public void put(Collection<SinkRecord> records,
> BiFunction<SinkRecord,
> > Throwable> reporter) {
> >         // the normal `put(Collection)` logic goes here, but can
> optionally
> > use `reporter` if non-null
> >     }
> > ```
> >
> > I think this has all the same benefits of the current KIP, but
> > it's noticeably simpler and hopefully more aesthetically pleasing.
> >
> > As for Andrew's second concern about "the task can send errant records to
> > it within put(...)" being too restrictive. My guess is that this was more
> > an attempt at describing the basic behavior, and less about requiring the
> > reporter only being called within the `put(...)` method and not by
> methods
> > to which `put(...)` synchronously or asynchronously delegates. Can you
> > confirm whether my assumption is correct? If so, then perhaps my
> suggestion
> > helps work around this issue as well, since there would be no restriction
> > on when the reporter is called, and the whole "Order of Operations"
> section
> > could potentially be removed.
> >
> > Third, it's not clear to me why the "Error Reporter Object" subsection in
> > the "Proposal" section lists the worker configuration properties that
> were
> > previously introduced with
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > .
> > Maybe it's worth mentioning that the error reporter functionality will
> > reuse or build upon KIP-298, including reusing the configuration
> properties
> > defined in KIP-298. But IIUC, the KIP does not propose changing any
> > technical or semantic aspect of these configuration properties, and
> > therefore the KIP would be more clear and succinct without them. *That*
> the
> > error reporter will use these properties is part of the UX and therefore
> > necessary to mention, but *how* it uses those properties is really up to
> > the implementation.
> >
> > Fourth, the "Synchrony" section has a sentence that is confusing, or not
> as
> > clear as it could be.
> >
> >     "If a record is sent to the error reporter, processing of the next
> > errant record in accept(...) will not begin until the producer
> successfully
> > sends the errant record to Kafka."
> >
> > This sentence is a bit difficult to understand, but IIUC this really just
> > means that "accept(...)" will be synchronous and will block until the
> > errant record has been successfully written to Kafka. If so, let's say
> > that. The rest of the paragraph is fine.
> >
> > Finally, is this KIP proposing new metrics, or that existing metrics
> would
> > be used to track the error reporter usage? If the former, then please
> > fully-specify what these metrics will be, similarly to how metrics are
> > specified in
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> > .
> >
> > Thoughts?
> >
> > Best regards,
> >
> > Randall
> >
> > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
> > andrew_schofi...@live.com>
> > wrote:
> >
> > > Hi Aakash,
> > > Thanks for sorting out the replies to the mailing list.
> > >
> > > First, I do like the idea of improving error reporting in sink
> > connectors.
> > > I'd like a simple
> > > way to put bad records onto the DLQ.
> > >
> > > I think this KIP is considerably more complicated than it seems. The
> > > guidance on the
> > > SinkTask.put() method is that it should send the records asynchronously
> > > and immediately
> > > return, so the task is likely to want to report errors asynchronously
> > > too.  Currently the KIP
> > > states that "the task can send errant records to it within put(...)"
> and
> > > that's too restrictive.
> > > The task ought to be able to report any unflushed records, but the
> > > synchronisation of this is going
> > > to be tricky. I suppose the connector author needs to make sure that
> all
> > > errant records have
> > > been reported before returning control from SinkTask.flush(...) or
> > perhaps
> > > SinkTask.preCommit(...).
> > >
> > > I think the interface is a little strange too. I can see that this was
> > > done so it's possible to deliver a connector
> > > that supports error reporting but it can also work in earlier versions
> of
> > > the KC runtime. But, the
> > > pattern so far is that the task uses the methods of SinkTaskContext to
> > > access utilities in the Kafka
> > > Connect runtime, and I suggest that reporting a bad record is such a
> > > utility. SinkTaskContext has
> > > changed before when the configs() methods was added, so I think there
> is
> > > precedent for adding a method.
> > > The way the KIP adds a method to SinkTask that the KC runtime calls to
> > > provide the error reporting utility
> > > seems not to match what has gone before.
> > >
> > > Thanks,
> > > Andrew
> > >
> > > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
> > >
> > >     I wasn't previously added to the dev mailing list, so I'd like to
> > post
> > > my
> > >     discussion with Andrew Schofield below for visibility and further
> > >     discussion:
> > >
> > >     Hi Andrew,
> > >
> > >     Thanks for the reply. The main concern with this approach would be
> > its
> > >     backward compatibility. I’ve highlighted the thoughts around the
> > > backwards
> > >     compatibility of the initial approach, please let me know what you
> > > think.
> > >
> > >     Thanks,
> > >     Aakash
> > >
> > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > >
> > >     Hi,
> > >     By adding a new method to the SinkContext interface in say Kafka
> > 2.6, a
> > >     connector that calls it would require a Kafka 2.6 connect runtime.
> I
> > > don't
> > >     quite see how that's a backward compatibility problem. It's just
> that
> > > new
> > >     connectors need the latest interface. I might not quite be
> > > understanding,
> > >     but I think it would be fine.
> > >
> > >     Thanks,
> > >     Andrew
> > >
> > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > >
> > >     Hi Andrew,
> > >
> > >     I apologize for the way the reply was sent. I just subscribed to
> the
> > > dev
> > >     mailing list so it should be resolved now.
> > >
> > >     You are correct, new connectors would simply require the latest
> > > interface.
> > >     However, we want to remove that requirement - in other words, we
> want
> > > to
> > >     allow the possibility that someone wants the latest connector/to
> > > upgrade to
> > >     the latest version, but deploys it on an older version of AK.
> > > Basically, we
> > >     don't want to enforce the necessity of upgrading AK to get the
> latest
> > >     interface. In the current approach, there would be no issue of
> > > deploying a
> > >     new connector on an older version of AK, as the Connect framework
> > would
> > >     simply not invoke the new method.
> > >
> > >     Please let me know what you think and if I need to clarify
> anything.
> > >
> > >     Thanks,
> > >     Aakash
> > >
> > >
> >
>

Reply via email to