Hi Aakash,

Thanks for the KIP! Given the myriad of different error-handling mechanisms
available in sink connectors today, I think it'd be a great improvement to
add this kind of support to the framework and not only take some of the
development burden off of connector writers but also some of the
configuration burden off of users.

I've got some thoughts but no major objections:

1. It's stated in the "Motivation" section that "this proposal aims to
extend KIP-298 and add error reporting functionality within the context of
put(...) without adding redundancy in configuration." It sounds like this
is true insofar as we're trying to fill a gap explicitly called out by
KIP-298 with regards to records that are given to a task via SinkTask::put,
but reading further, it seems like there's nothing in place that would
prevent a task from sending a record to the error reporter outside of its
"put" method. Would it be fair to say that the proposal "aims to extend
KIP-298 and add error reporting functionality even after records are sent
to connector tasks without adding redundancy in configuration" instead?

2. I'm assuming that the javadoc comment above the "errantRecordReporter"
method in the sample Java code in the "Method" section is the proposed
javadoc description for the method we'll add to the SinkTask class. We
should be pretty careful about that description since it's the very first
thing some users will read when learning about this new error reporting
mechanism; I've got some suggestions about how we might improve the
language there:

a - In the first paragraph, "Set the error reporter for this task.", I
think we should call out that this is a method that will be invoked by the
Connect framework. Maybe something like "Invoked by the framework to supply
the task with an error reporter, if the user has configured one for this
connector"?

b - In the next paragraph, "The general usage pattern for this error
reporter is to use this method to set it, and invoke its {@link
accept(SinkRecord record, Throwable e)} method when an exception is thrown
while processing a record in {@link put(Collection<SinkRecord> records)} to
send this errant record to the error reporter.", I think we can be a little
more concise. What do you think about just "Tasks can send problematic
records back to the framework  by invoking {@link accept(SinkRecord record,
Throwable e)}"?

3. Just below that section, in the sample code, it looks like the task
isn't performing a null check on the "errorReporter" field. According to
the "Backwards Compatibility" section, it's possible that that field might
be null; do you think we might want to update the sample code to perform
that null check and add a brief comment on why?

4. It looks like the config properties are only related to setting up a DLQ
for a connector. Although properties like "errors.retry.timeout" and "
errors.retry.delay.max.ms" are probably out of scope for this KIP, why not
also include the "errors.tolerance", "errors.log.enable", and
"errors.log.include.messages" properties? Seems like these would also be
relevant for the kinds of records that a task might send to its error
reporter. If we decide to include these properties, we should also update
the "Synchrony" section to be agnostic about what the error reporter is
doing under the hood since there won't necessarily be a Kafka producer
involved in handling records given to the error reporter.

5. In the "Order of Operations section, the second step is "The task calls
start(...)". Isn't SinkTask::start invoked by the framework, and not
normally by tasks themselves?

6. The "Metrics" section is a little sparse; do you think you could add
some detail on what format the metrics will take (presumably JMX but other
ideas are always welcome), and what their names and types will be? Might
help to check out some other KIPs in the past that have added metrics such
as KIP-475 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-475%3A+New+Metrics+to+Measure+Number+of+Tasks+on+a+Connector),
KIP-507 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints#KIP-507:SecuringInternalConnectRESTEndpoints-NewJMXworkermetric),
and last but not least, KIP-196 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework#KIP-196:AddmetricstoKafkaConnectframework-PublicInterfaces
).

7. In the "Rejected Alternatives" section, I think there's a rationale
missing for the item "Labeling as a dead letter queue" :)

Thanks again for the KIP! Looking forward to seeing this in the framework,
I think it'd be great.

Cheers,

Chris

On Thu, May 7, 2020 at 12:02 PM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi,
> Thanks for the KIP.
>
> I wonder whether this idea would be better implemented using a new method
> on the
> SinkTaskContext.
>
>   public void putFailed(Collection<SinkRecord> records)
>
> Then the rest of KIP-298 could apply. Failed records could be put to the
> DLQ or logged, as
> appropriate. I think there's real value in keeping the rest of the error
> handling intact and using
> the same mechanism for failed puts as for failures in the other stages of
> processing. If you want
> to retry a batch, throw a RetriableException. If the record cannot be
> processed perhaps due to
> a size limit in the target system, flag it as failed and the framework can
> invoke its error handling
> code on the offending record or records. The only stipulation is that the
> error action needs to be
> completed before the offset for a failed record is committed or it might
> be lost.
>
> Thanks,
> Andrew
>
> On 07/05/2020, 19:09, "Aakash Shah" <as...@confluent.io> wrote:
>
>     Hello all,
>
>     I've created a KIP to handle error reporting for records in sink
>     connectors, specifically within the context of put(...):
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
>
>     I would appreciate any kind of feedback.
>
>     Thanks,
>
>     Aakash
>
>

Reply via email to