Hi Aakash, Thanks for the KIP! Given the myriad of different error-handling mechanisms available in sink connectors today, I think it'd be a great improvement to add this kind of support to the framework and not only take some of the development burden off of connector writers but also some of the configuration burden off of users.
I've got some thoughts but no major objections: 1. It's stated in the "Motivation" section that "this proposal aims to extend KIP-298 and add error reporting functionality within the context of put(...) without adding redundancy in configuration." It sounds like this is true insofar as we're trying to fill a gap explicitly called out by KIP-298 with regards to records that are given to a task via SinkTask::put, but reading further, it seems like there's nothing in place that would prevent a task from sending a record to the error reporter outside of its "put" method. Would it be fair to say that the proposal "aims to extend KIP-298 and add error reporting functionality even after records are sent to connector tasks without adding redundancy in configuration" instead? 2. I'm assuming that the javadoc comment above the "errantRecordReporter" method in the sample Java code in the "Method" section is the proposed javadoc description for the method we'll add to the SinkTask class. We should be pretty careful about that description since it's the very first thing some users will read when learning about this new error reporting mechanism; I've got some suggestions about how we might improve the language there: a - In the first paragraph, "Set the error reporter for this task.", I think we should call out that this is a method that will be invoked by the Connect framework. Maybe something like "Invoked by the framework to supply the task with an error reporter, if the user has configured one for this connector"? b - In the next paragraph, "The general usage pattern for this error reporter is to use this method to set it, and invoke its {@link accept(SinkRecord record, Throwable e)} method when an exception is thrown while processing a record in {@link put(Collection<SinkRecord> records)} to send this errant record to the error reporter.", I think we can be a little more concise. What do you think about just "Tasks can send problematic records back to the framework by invoking {@link accept(SinkRecord record, Throwable e)}"? 3. Just below that section, in the sample code, it looks like the task isn't performing a null check on the "errorReporter" field. According to the "Backwards Compatibility" section, it's possible that that field might be null; do you think we might want to update the sample code to perform that null check and add a brief comment on why? 4. It looks like the config properties are only related to setting up a DLQ for a connector. Although properties like "errors.retry.timeout" and " errors.retry.delay.max.ms" are probably out of scope for this KIP, why not also include the "errors.tolerance", "errors.log.enable", and "errors.log.include.messages" properties? Seems like these would also be relevant for the kinds of records that a task might send to its error reporter. If we decide to include these properties, we should also update the "Synchrony" section to be agnostic about what the error reporter is doing under the hood since there won't necessarily be a Kafka producer involved in handling records given to the error reporter. 5. In the "Order of Operations section, the second step is "The task calls start(...)". Isn't SinkTask::start invoked by the framework, and not normally by tasks themselves? 6. The "Metrics" section is a little sparse; do you think you could add some detail on what format the metrics will take (presumably JMX but other ideas are always welcome), and what their names and types will be? Might help to check out some other KIPs in the past that have added metrics such as KIP-475 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-475%3A+New+Metrics+to+Measure+Number+of+Tasks+on+a+Connector), KIP-507 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints#KIP-507:SecuringInternalConnectRESTEndpoints-NewJMXworkermetric), and last but not least, KIP-196 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework#KIP-196:AddmetricstoKafkaConnectframework-PublicInterfaces ). 7. In the "Rejected Alternatives" section, I think there's a rationale missing for the item "Labeling as a dead letter queue" :) Thanks again for the KIP! Looking forward to seeing this in the framework, I think it'd be great. Cheers, Chris On Thu, May 7, 2020 at 12:02 PM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi, > Thanks for the KIP. > > I wonder whether this idea would be better implemented using a new method > on the > SinkTaskContext. > > public void putFailed(Collection<SinkRecord> records) > > Then the rest of KIP-298 could apply. Failed records could be put to the > DLQ or logged, as > appropriate. I think there's real value in keeping the rest of the error > handling intact and using > the same mechanism for failed puts as for failures in the other stages of > processing. If you want > to retry a batch, throw a RetriableException. If the record cannot be > processed perhaps due to > a size limit in the target system, flag it as failed and the framework can > invoke its error handling > code on the offending record or records. The only stipulation is that the > error action needs to be > completed before the offset for a failed record is committed or it might > be lost. > > Thanks, > Andrew > > On 07/05/2020, 19:09, "Aakash Shah" <as...@confluent.io> wrote: > > Hello all, > > I've created a KIP to handle error reporting for records in sink > connectors, specifically within the context of put(...): > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors > > I would appreciate any kind of feedback. > > Thanks, > > Aakash > >