ok folks, this is my POC PR: https://github.com/wicknicks/kafka/tree/kip-610-2.4.1. connectors built from this were copied into a fresh installation of Kafka Connect (v2.5, the latest version), and ran. Without proper try-catch, the tasks would fail. But when the appropriate exceptions were handled, the task proceeded without any issues. I've added some comments here https://github.com/apache/kafka/commit/295e6a46925993060a60b79e646e06432480ed0d that document these errors. Tested this with both java 8 and 11.
To check if it is safe to add methods and new classes, I dug around the JVM spec <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html>. The following points are being made here: 1. During the loading phase, a number of structural details of the class are added to the run-time constant pool (class descriptor, fields, methods). what these methods themselves do are not considered. 2. During the linking phase, resolution of symbolic references is optional. But the important bit is this <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4>: Whichever strategy (lazy or eager) is followed, any error detected during resolution must be thrown at a point in the program that (directly or indirectly) uses a symbolic reference to the class or interface. So this means that any new classes that were originally relied upon the connector that are missing now, will be in some internal "not found" say but no errors will be thrown till resolution stage. 3. Section 5.4.3 <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3> says execution of any of the (anewarray, checkcast etc) instructions requires resolution of its symbolic reference, which is good since the instructions are executed in order, and the connector developer has time to execute some code before program has to look for a unknown object. We are also safe in that the new interface will be used only during or after start() is called, so there should not be any fields that do something like: private final ErrantRecordReporter reporter = sinkTaskContext.failedRecordReporter(); The initialization will only be valid in start(), where we can safely wrap around try catch issues. The errors being thrown are very precise. NoSuchMethodError <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3.3> for method not found, and NoClassDefFoundError for any classes loaded during resolution <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.3>. BTW, one more example that comes to mind is the JDBC spec (the java.sql package has been changing with every major version of Java). Newer classes are added (look at java.sql.DriverAction (in Java 8) and java.sql.ShardingKey (in java 9)). New methods to existing classes are being added as well (for example, java.sql.DriverManager#registerDriver(java.sql.Driver, java.sql.DriverAction) <https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#registerDriver-java.sql.Driver-java.sql.DriverAction->). and the expectation is that the driver should handle any runtime inconsistencies. Overall, I think it should be safe to have new interfaces, and have them loaded safely, with correct checks. BTW, one more opportunity we have is that the connector can check if these new methods are present or not. For example: Class.forName("org.apache.kafka.connect.sink.SinkTaskContext").getMethod("failedRecordReporter"); And based on this, chose to return old a different task in SinkConnector#taskClass() if they want to hyper-safe. Apologies for throwing in such a wrench at the last minute! But IMHO it'd be good to take this opportunity if we can. Best, On Sat, May 16, 2020 at 6:05 PM Randall Hauch <rha...@gmail.com> wrote: > Thanks for updating the KIP, Aakash. A few comments on the updated content > there: > > In order to avoid error records being written out of order (for example, > > due to retries), the developer can use > > `max.in.flight.requests.per.connection=1` in their implementation for > > writing error records. > > > > IMO, the DLQ should always use this setting to prevent retries, and the > developer can always set this property for the DLQ to something larger than > 1 if order is not important and they need the extreme performance. > > Along with the original errant sink record, the exception thrown will be > > sent to the error reporter to provide additional context. > > > > This is also not really clear. What thrown exception is this referring to? > Isn't the exception *passed* to the reporter method? > > I do think we need to better explain from the sink task's perspective what > behavior it should expect? Is there any case when the reporter will be null > or be a no-op, and if so what should the sink task do? Should it simply > wrap and throw a ConnectException? And if there is a reporter, won't > Connect treat this sink record as "processed" with respect to the existing > behavior of passing "processed" offsets back to the sink task's > `preCommit(...)` method. > > Thanks, > > Randall > > On Sat, May 16, 2020 at 6:38 PM Arjun Satish <arjun.sat...@gmail.com> > wrote: > > > Yeah I had tried this locally on java 8 and 11, and it had seemed to > work. > > Let me clean up and publish my code in a branch somewhere so we can take > a > > look at it. > > > > Thanks, > > > > On Sat, May 16, 2020 at 3:39 PM Randall Hauch <rha...@gmail.com> wrote: > > > > > Have you tried this? IIUC the problem is with the new type, and any > class > > > that uses ‘ErrantRecordReporter’ with an import would fail to be loaded > > by > > > the classloader if the type does not exist (I.e., pre-2.9 Connect > > > runtimes). Catching that ClassNotFoundException and dynamically > importing > > > the type is actually much harder. > > > > > > On Sat, May 16, 2020 at 5:07 PM Aakash Shah <as...@confluent.io> > wrote: > > > > > > > Hi Arjun, > > > > > > > > Thanks for this suggestion. I actually like this a lot because a > > defined > > > > interface looks more appealing and is clearer in its intention. Since > > we > > > > are still using NoSuchMethodException to account for backwards > > > > compatibility, this works for me. I can't see any drawbacks besides > > > having > > > > to call the getter method for the processing of every errant record. > > > > > > > > I would like to hear others' thoughts on if this drawback outweighs > the > > > > added benefit of clarity. > > > > > > > > Thanks, > > > > Aakash > > > > > > > > On Sat, May 16, 2020 at 2:54 PM Arjun Satish <arjun.sat...@gmail.com > > > > > > wrote: > > > > > > > > > Thanks Konstantine, happy to write something up in a KIP. But I > think > > > it > > > > > would be redundant if we add this kip. What do you think? > > > > > > > > > > Also, Randall, yes that API would work. But, if we expect the > > > developers > > > > to > > > > > catch NoSuchMethodErrors, then should we also go ahead and make a > > class > > > > > that would have a report method(similar to ErrorReporter > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java > > > > > > > > > > > but maybe with different arguments on the report method)? they > would > > > also > > > > > have to catch NoClassDefFoundError. > > > > > > > > > > That would modify your change in SinkTaskContext to: > > > > > > > > > > public interface SinkTaskContext { > > > > > /** > > > > > * Get the reporter to which the sink task can report > problematic > > > or > > > > > * failed {@link SinkRecord} passed to the {@link > > > > > SinkTask#put(Collection)} method. > > > > > * > > > > > * @return a errant record reporter > > > > > */ > > > > > ErrantRecordReporter failedRecordReporter(); > > > > > } > > > > > > > > > > where ErrantRecordReporter is: > > > > > > > > > > public interface ErrantRecordReporter { > > > > > > > > > > /** > > > > > * Serialize and produce records to the error topic > > > > > * > > > > > * @param record the errant record > > > > > */ > > > > > void report(SinkRecord record, Throwable error); > > > > > > > > > > } > > > > > > > > > > Usage in put would be the same though if the class is not > explicitly > > > > named: > > > > > > > > > > try { > > > > > context.failedRecordReporter().report(record, error); > > > > > } catch (NoSuchMethodError e) { > > > > > log.info("Boooooooooo!"); > > > > > } > > > > > > > > > > Thoughts? > > > > > > > > > > On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis < > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > Thanks for following up Randall. > > > > > > > > > > > > I agree with your latest suggestion. It was good that we explored > > > > several > > > > > > options but accessing the context to obtain the reporter in Kafka > > > > Connect > > > > > > versions that support this feature makes the most sense. The > burden > > > for > > > > > > connector developers that want to use this reporter _and_ make > > > > connectors > > > > > > compatible with old and new workers is minimal. > > > > > > > > > > > > We'll have to leave with additions like this and the appearance > in > > > both > > > > > > KIP-131 and here in KIP-610 indeed creates a reasonable > precedent. > > > > > > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch <rha...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Thanks again for the active discussion! > > > > > > > > > > > > > > Regarding the future-vs-callback discussion: I did like where > > Chris > > > > was > > > > > > > going with the Callback, but he raises good point that it's > > unclear > > > > > what > > > > > > to > > > > > > > use for the reporter type, since we'd need three parameters. > > > > > Introducing > > > > > > a > > > > > > > new interface makes it much harder for a sink task to be > backward > > > > > > > compatible, so sticking with BiFunction is a good compromise. > > Plus, > > > > > > another > > > > > > > significant disadvantage of a callback approach is that a sink > > > task's > > > > > > > callback is called from the producer thread, and this risks a > > > > > > > poorly written sink task callback killing the reporter's > producer > > > > > without > > > > > > > necessarily failing the task. Using a future avoids this risk > > > > > altogether, > > > > > > > still provides the sink task with the ability to do synchronous > > > > > reporting > > > > > > > using Future, which is a standard and conventional design > > pattern. > > > So > > > > > we > > > > > > do > > > > > > > seem to have converged on using `BiFunction<SinkRecord, > > Throwable, > > > > > > > Future<Void>>` for the reporter type. > > > > > > > > > > > > > > Now, we still seem to not have converted upon how to pass the > > > > reporter > > > > > to > > > > > > > the sink task. I agree with Konstantine that the deprecation > > > affects > > > > > only > > > > > > > newer versions of Connect, and that a sink task should deal > with > > > both > > > > > put > > > > > > > methods only when it wants to support older runtimes. I also > > think > > > > that > > > > > > > this is a viable approach, but I do concede that this evolution > > of > > > > the > > > > > > sink > > > > > > > task API is more complicated than it should be. > > > > > > > > > > > > > > In the interest of quickly coming to consensus on how we pass > the > > > > > > reporter > > > > > > > to the sink task, I'd like to go back to Andrew's original > > > > suggestion, > > > > > > > which I think we disregarded too quickly: add a getter on the > > > > > > > SinkTaskContext interface. We already have precedent for adding > > > > methods > > > > > > to > > > > > > > one of the context classes with the newly-adopted KIP-131, > which > > > > adds a > > > > > > > getter for the OffsetStorageReader on the (new) > > > > SourceConnectorContext. > > > > > > > That KIP accepts the fact that a source connector wanting to > use > > > this > > > > > > > feature while also keeping the ability to be installed into > older > > > > > Connect > > > > > > > runtimes must guard its use of the context's getter method. > > > > > > > > > > > > > > I think we can use the same pattern for this KIP, and add a > > getter > > > to > > > > > the > > > > > > > existing SinkTaskContext that is defined something like: > > > > > > > > > > > > > > public interface SinkTaskContext { > > > > > > > ... > > > > > > > /** > > > > > > > * Get the reporter to which the sink task can report > > > problematic > > > > > or > > > > > > > failed {@link SinkRecord} > > > > > > > * passed to the {@link SinkTask#put(Collection)} method. > > When > > > > > > > reporting a failed record, > > > > > > > * the sink task will receive a {@link Future} that the > task > > > can > > > > > > > optionally use to wait until > > > > > > > * the failed record and exception have been written to > Kafka > > > via > > > > > > > Connect's DLQ. Note that > > > > > > > * the result of this method may be null if this connector > > has > > > > not > > > > > > been > > > > > > > configured with a DLQ. > > > > > > > * > > > > > > > * <p>This method was added in Apache Kafka 2.9. Sink tasks > > > that > > > > > use > > > > > > > this method but want to > > > > > > > * maintain backward compatibility so they can also be > > deployed > > > > to > > > > > > > older Connect runtimes > > > > > > > * should guard the call to this method with a try-catch > > block, > > > > > since > > > > > > > calling this method will result in a > > > > > > > * {@link NoSuchMethodException} when the sink connector is > > > > > deployed > > > > > > to > > > > > > > Connect runtimes > > > > > > > * older than Kafka 2.9. For example: > > > > > > > * <pre> > > > > > > > * BiFunction<SinkTask, Throwable, > > > Future<Void>> > > > > > > > reporter; > > > > > > > * try { > > > > > > > * reporter = context.failedRecordReporter(); > > > > > > > * } catch (NoSuchMethodException e) { > > > > > > > * reporter = null; > > > > > > > * } > > > > > > > * </pre> > > > > > > > * > > > > > > > * @return the reporter function; null if no error reporter > > has > > > > > been > > > > > > > configured for the connector > > > > > > > * @since 2.9 > > > > > > > */ > > > > > > > BiFunction<SinkTask, Throwable, Future<Void>> > > > > > failedRecordReporter(); > > > > > > > } > > > > > > > > > > > > > > The main advantage is that the KIP no longer has to make *any > > > other* > > > > > > > changes to the Sink Connector or Task API. The above is really > > the > > > > only > > > > > > > change, and it's merely an addition to the API. No deprecation > > and > > > no > > > > > > > overloading methods. The KIP does need to explain how the > > reporter > > > is > > > > > > > configured and used (which it already does), but IMO the KIP > > > doesn't > > > > > need > > > > > > > to describe when this reporter can/should be used. After all, > > this > > > > > > method > > > > > > > is on the existing SinkTaskContext, so this method is really no > > > > > different > > > > > > > than any other existing method. I think my JavaDoc (which is > > just a > > > > > > > suggestion for a starting point that Aakash can improve as > > needed) > > > > > > > describes how easy it is for a sink to maintain backward > > > > compatibility. > > > > > > > (The use of `BiFunction` helps tremendously.) Another not > > > > insignificant > > > > > > > advantage is that a sink task can use this reporter reference > > > > > throughout > > > > > > > the task's lifetime (after it's started and before it is > > stopped), > > > > > making > > > > > > > it less invasive for existing sink task implementations that > want > > > to > > > > > use > > > > > > > it. > > > > > > > > > > > > > > I hope we can all get being this compromise, which IMO is > > actually > > > > > super > > > > > > > clean and makes a lot of sense. Thanks, Andrew, for originally > > > > > suggesting > > > > > > > it. I know we're all trying to improve the Connect API in a way > > > that > > > > > > makes > > > > > > > sense, and deliberate and constructive discussion is a healthy > > > thing. > > > > > > > Thanks again to everyone for participating! > > > > > > > > > > > > > > BTW, we've agreed upon a number of other changes, but I don't > see > > > any > > > > > of > > > > > > > those changes on the KIP. Aakash, can you please update the KIP > > > > quickly > > > > > > so > > > > > > > we can make sure the other parts are the KIP are acceptable? > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Randall > > > > > > > > > > > > > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis < > > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > > > Thanks for the quick response Aakash. > > > > > > > > > > > > > > > > With respect to deprecation, this refers to deprecating this > > > method > > > > > in > > > > > > > > newer versions of Kafka Connect (and eventually removing it). > > > > > > > > > > > > > > > > As a connector developer, if you want your connector to run > > > across > > > > a > > > > > > wide > > > > > > > > spectrum of Connect versions, you'll have to take this into > > > > > > consideration > > > > > > > > and retain both methods in a functional state. The good news > is > > > > that > > > > > > both > > > > > > > > methods can share a lot of code, so in reality both the old > and > > > the > > > > > new > > > > > > > put > > > > > > > > will be thin shims over a `putRecord` method (or `process` > > method > > > > as > > > > > > you > > > > > > > > call it in the KIP). > > > > > > > > > > > > > > > > Given the above, there's no requirement to conditionally call > > one > > > > > > method > > > > > > > or > > > > > > > > the other in the framework based on configuration. Once you > > > > implement > > > > > > the > > > > > > > > new `put` with something other than its default > implementation, > > > as > > > > a > > > > > > > > connector developer, you'll know to adapt to the above. > > > > > > > > > > > > > > > > I definitely suggest extending our docs in a meaningful way > in > > > > order > > > > > to > > > > > > > > make the upgrade path easy to follow. Maybe you'd like to > add a > > > > note > > > > > to > > > > > > > > your compatibility section in this KIP as well. > > > > > > > > > > > > > > > > Regards, > > > > > > > > Konstantine > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah < > > as...@confluent.io > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > +1 > > > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis < > > > > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > > > > > > > Hi Arjun, > > > > > > > > > > > > > > > > > > > > I think I agree with you that subject is interesting. > Yet, > > I > > > > feel > > > > > > it > > > > > > > > > > belongs to a separate future KIP. Reading the proposal in > > the > > > > KIP > > > > > > > > format > > > > > > > > > > will help, at least myself, to understand it better. > > > > > > > > > > > > > > > > > > > > Having said that, for the purpose of simplifying error > > > handling > > > > > for > > > > > > > > sink > > > > > > > > > > tasks, the discussion on KIP-610 has made some good > > progress > > > on > > > > > the > > > > > > > > > mailing > > > > > > > > > > list. If the few open items are reflected on the > proposal, > > > > maybe > > > > > > it'd > > > > > > > > be > > > > > > > > > > even worthwhile to consider it for inclusion in the > > upcoming > > > > > > release > > > > > > > > with > > > > > > > > > > its current scope. > > > > > > > > > > > > > > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish < > > > > > > arjun.sat...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I'm kinda hoping that we get to an approach on how to > > > extend > > > > > the > > > > > > > > > Connect > > > > > > > > > > > framework. Adding parameters in the put method is nice, > > and > > > > > maybe > > > > > > > > works > > > > > > > > > > for > > > > > > > > > > > now, but I'm not sure how scalable it is. It'd great to > > be > > > > able > > > > > > to > > > > > > > > add > > > > > > > > > > more > > > > > > > > > > > functionality in the future. Couple of examples: > > > > > > > > > > > > > > > > > > > > > > - make the metrics registry available to a task, so > they > > > can > > > > > > report > > > > > > > > > task > > > > > > > > > > > level metrics or > > > > > > > > > > > - be able to pass in a RestExtension handle to the > task, > > so > > > > the > > > > > > > task > > > > > > > > > can > > > > > > > > > > > provide a rest endpoint which users can hit to get some > > > task > > > > > > level > > > > > > > > > > > information (about its status, health, for example) > > > > > > > > > > > > > > > > > > > > > > In such scenarios, maybe adding new parameters to > > existing > > > > > > methods > > > > > > > > may > > > > > > > > > > not > > > > > > > > > > > be immediately acceptable. > > > > > > > > > > > > > > > > > > > > > > Since we are very close to a deadline, I wanted to > check > > if > > > > the > > > > > > one > > > > > > > > > more > > > > > > > > > > > possibility is acceptable :-) > > > > > > > > > > > > > > > > > > > > > > What if we could create a library that could be used by > > > > > connector > > > > > > > to > > > > > > > > > > > independently integrated by connector developers in > their > > > > > > > connectors. > > > > > > > > > The > > > > > > > > > > > library would be packaged and shipped with their > > connector > > > > like > > > > > > any > > > > > > > > > other > > > > > > > > > > > library on maven (and other similar repositories). The > > new > > > > > module > > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > in the AK project, but its jars will *not* be added to > > > > > classpath > > > > > > > for > > > > > > > > > > > Connect worker. > > > > > > > > > > > > > > > > > > > > > > The library would provide a public interface for an > error > > > > > > reporter, > > > > > > > > > which > > > > > > > > > > > provides both synchronous and asynchronous > > functionalities > > > > (as > > > > > > was > > > > > > > > > > brought > > > > > > > > > > > up above). > > > > > > > > > > > > > > > > > > > > > > This would be an independent library, they can be > easily > > > > > bundled > > > > > > > and > > > > > > > > > > loaded > > > > > > > > > > > with the other connectors. The connect framework will > be > > > > > > decoupled > > > > > > > > from > > > > > > > > > > > this utility. > > > > > > > > > > > > > > > > > > > > > > I understand that a similar option is in the rejected > > > > > > alternatives, > > > > > > > > > > mostly > > > > > > > > > > > because of configuration overhead, but the > configuration > > > > > required > > > > > > > > here > > > > > > > > > > can > > > > > > > > > > > come directly from the worker properties (and just be > > copy > > > > > pasted > > > > > > > > from > > > > > > > > > > > there, maybe with a prefix). and I wonder (if maybe > part > > > as a > > > > > > > future > > > > > > > > > > KIP), > > > > > > > > > > > we can evaluate a strategy where certain worker configs > > can > > > > be > > > > > > > passed > > > > > > > > > to > > > > > > > > > > a > > > > > > > > > > > connector (for example, the producer/consume/admin > ones), > > > so > > > > > end > > > > > > > > users > > > > > > > > > do > > > > > > > > > > > not have to. > > > > > > > > > > > > > > > > > > > > > > Overall, we would get clean APIs, contracts and > > developers > > > > get > > > > > > > > freedom > > > > > > > > > to > > > > > > > > > > > use these libraries and functionalities however they > > want. > > > > The > > > > > > only > > > > > > > > > > > drawback is how this is configured (end-users will have > > to > > > > add > > > > > > more > > > > > > > > > lines > > > > > > > > > > > in the json/properties files). But all configs can > simply > > > > come > > > > > > from > > > > > > > > > > worker, > > > > > > > > > > > I believe this is relatively minor issue. We should be > > able > > > > to > > > > > > work > > > > > > > > out > > > > > > > > > > > compatibility issues in the implementations, so that > the > > > > > library > > > > > > > can > > > > > > > > > > safely > > > > > > > > > > > run (and degrade functionality if needed) with old > > workers. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah < > > > > > as...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Just wanted to clarify that I am on board with adding > > the > > > > > > > > overloaded > > > > > > > > > > > > put(...) method. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Aakash > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah < > > > > > > as...@confluent.io> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Randall and Konstantine, > > > > > > > > > > > > > > > > > > > > > > > > > > As Chris and Arjun mentioned, I think the main > > concern > > > is > > > > > the > > > > > > > > > > potential > > > > > > > > > > > > > gap in which developers don't implement the > > deprecated > > > > > method > > > > > > > due > > > > > > > > > to > > > > > > > > > > a > > > > > > > > > > > > > misunderstanding of use cases. Using the setter > > method > > > > > > approach > > > > > > > > > > ensures > > > > > > > > > > > > > that the developer won't break backwards > > compatibility > > > > when > > > > > > > using > > > > > > > > > the > > > > > > > > > > > new > > > > > > > > > > > > > method due to a mistake. That being said, I think > the > > > > value > > > > > > > added > > > > > > > > > in > > > > > > > > > > > > > clarity of contract of when the error reporter will > > be > > > > > > invoked > > > > > > > > and > > > > > > > > > > > > overall > > > > > > > > > > > > > aesthetic while maintaining backwards compatibility > > > > > outweighs > > > > > > > the > > > > > > > > > > > > potential > > > > > > > > > > > > > mistake of a developer in not implementing the > > original > > > > > > > put(...) > > > > > > > > > > > method. > > > > > > > > > > > > > > > > > > > > > > > > > > With respect to synchrony, I agree with > Konstantine's > > > > > point, > > > > > > > that > > > > > > > > > we > > > > > > > > > > > > > should make it an opt-in feature of making the > > reporter > > > > > only > > > > > > > > > > > synchronous. > > > > > > > > > > > > > At the same time, I believe it is important to > > relieve > > > as > > > > > > much > > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > burden of implementation as possible from the > > developer > > > > in > > > > > > this > > > > > > > > > case, > > > > > > > > > > > and > > > > > > > > > > > > > thus I think using a Callback rather than a Future > > > would > > > > be > > > > > > > > easier > > > > > > > > > on > > > > > > > > > > > the > > > > > > > > > > > > > developer, while adding asynchronous functionality > > with > > > > the > > > > > > > > ability > > > > > > > > > > to > > > > > > > > > > > > > opt-in synchronous functionality. I also believe > > making > > > > it > > > > > > > opt-in > > > > > > > > > > > > > synchronous vs. the other way simplifies > > implementation > > > > for > > > > > > the > > > > > > > > > > > developer > > > > > > > > > > > > > (blocking vs creating a new thread). > > > > > > > > > > > > > > > > > > > > > > > > > > Please let me know your thoughts. I would like to > > come > > > > to a > > > > > > > > > consensus > > > > > > > > > > > > soon > > > > > > > > > > > > > due to the AK 2.6 deadlines; I will then shortly > > update > > > > the > > > > > > KIP > > > > > > > > and > > > > > > > > > > > > start a > > > > > > > > > > > > > vote. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Aakash > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall Hauch < > > > > > > > rha...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun Satish < > > > > > > > > > > arjun.sat...@gmail.com> > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Couple of thoughts: > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > 1. If we add new parameters to put(..), and new > > > > > connectors > > > > > > > > > > implement > > > > > > > > > > > > >> only > > > > > > > > > > > > >> > this method, it makes them backward incompatible > > > with > > > > > > older > > > > > > > > > > > workers. I > > > > > > > > > > > > >> > think newer connectors may only choose to only > > > > implement > > > > > > the > > > > > > > > > > latest > > > > > > > > > > > > >> method, > > > > > > > > > > > > >> > and we are passing the compatibility problems > back > > > to > > > > > the > > > > > > > > > > connector > > > > > > > > > > > > >> > developers. > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> New connectors would have to implement both if > they > > > want > > > > > to > > > > > > > run > > > > > > > > in > > > > > > > > > > > older > > > > > > > > > > > > >> runtimes. > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > 2. if we deprecate the older put() method and > > > > eventually > > > > > > > > remove > > > > > > > > > > it, > > > > > > > > > > > > then > > > > > > > > > > > > >> > old connectors are forward incompatible. If we > are > > > not > > > > > > going > > > > > > > > to > > > > > > > > > > > remove > > > > > > > > > > > > >> it, > > > > > > > > > > > > >> > then maybe we should not deprecate it? > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> I don't think we'll ever remove deprecated methods > > -- > > > > > > there's > > > > > > > no > > > > > > > > > > > reason > > > > > > > > > > > > to > > > > > > > > > > > > >> cut off older connectors. > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > 3. if a record is realized to be erroneous > outside > > > > put() > > > > > > > (say, > > > > > > > > > in > > > > > > > > > > > > flush > > > > > > > > > > > > >> or > > > > > > > > > > > > >> > preCommit), how will it be reported? > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> This is a concern no matter how the reporter is > > passed > > > > to > > > > > > the > > > > > > > > > task. > > > > > > > > > > > > >> Actually, I think it's more clear that the > reporter > > > > passed > > > > > > > > through > > > > > > > > > > > > >> `put(...)` should be used to record errors on the > > > > > > SinkRecords > > > > > > > > > passed > > > > > > > > > > > in > > > > > > > > > > > > >> the > > > > > > > > > > > > >> same method call. > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > I do think the concern over aesthetics is an > > > important > > > > > > one, > > > > > > > > but > > > > > > > > > > the > > > > > > > > > > > > >> > trade-off here is to exclude many connectors > that > > > are > > > > > out > > > > > > > > there > > > > > > > > > > from > > > > > > > > > > > > >> > running on worker versions. there may be > > production > > > > > > > > deployments > > > > > > > > > > that > > > > > > > > > > > > >> need > > > > > > > > > > > > >> > one old and one new connector that now cannot > work > > > on > > > > > any > > > > > > > > > version > > > > > > > > > > > of a > > > > > > > > > > > > >> > single worker. Building connectors is complex, > and > > > > it's > > > > > > > kinda > > > > > > > > > > unfair > > > > > > > > > > > > to > > > > > > > > > > > > >> > expect folks to make changes over aesthetic > > reasons > > > > > alone. > > > > > > > > This > > > > > > > > > is > > > > > > > > > > > > >> probably > > > > > > > > > > > > >> > the reason why popular framework APIs very > rarely > > > (and > > > > > > > > probably > > > > > > > > > > > never) > > > > > > > > > > > > >> > change. > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> I don't see how passing the reporter through an > > > > overloaded > > > > > > > > > > `put(...)` > > > > > > > > > > > is > > > > > > > > > > > > >> less backward compatible. Because the runtime > > provides > > > > the > > > > > > > > > SinkTask > > > > > > > > > > > base > > > > > > > > > > > > >> class, the runtime has control over what the > methods > > > do > > > > by > > > > > > > > > default. > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Overall, yes, the "public void > > > > > > > > > > > > >> errantRecordReporter(BiConsumer<SinkRecord, > > > > > > > > > > > > >> > Throwable> reporter) {}" proposal in the > original > > > KIP > > > > is > > > > > > > > > somewhat > > > > > > > > > > > of a > > > > > > > > > > > > >> > mouthful, but are there are any simpler > > alternatives > > > > > that > > > > > > do > > > > > > > > not > > > > > > > > > > > > exclude > > > > > > > > > > > > >> > existing connectors, adding operational burdens > > and > > > > yet > > > > > > > > provide > > > > > > > > > a > > > > > > > > > > > > clean > > > > > > > > > > > > >> > contract? > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> IMO, overloading `put(...)` is cleaner and easier > to > > > > > > > understand > > > > > > > > -- > > > > > > > > > > > plus > > > > > > > > > > > > >> the > > > > > > > > > > > > >> other benefits in my earlier email. > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Best, > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > PS: Apologies if the language is incorrect or > some > > > > > points > > > > > > > are > > > > > > > > > > > unclear. > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall Hauch < > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM Konstantine > > > > > Karantasis < > > > > > > > > > > > > >> > > konstant...@confluent.io> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > Thanks for the quick response Aakash. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > To your last point, modern APIs like this > tend > > > to > > > > be > > > > > > > > > > > asynchronous > > > > > > > > > > > > >> (see > > > > > > > > > > > > >> > > > admin, producer in Kafka) and such > definition > > > > > results > > > > > > in > > > > > > > > > more > > > > > > > > > > > > >> > expressive > > > > > > > > > > > > >> > > > and well defined APIs. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > +1 > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > What you describe is easily an opt-in > feature > > > for > > > > > the > > > > > > > > > > connector > > > > > > > > > > > > >> > > developer. > > > > > > > > > > > > >> > > > At the same time, the latest description > > above, > > > > > gives > > > > > > us > > > > > > > > > > better > > > > > > > > > > > > >> chances > > > > > > > > > > > > >> > > for > > > > > > > > > > > > >> > > > this API to remain like this for longer, > > because > > > > it > > > > > > > covers > > > > > > > > > > both > > > > > > > > > > > > the > > > > > > > > > > > > >> > sync > > > > > > > > > > > > >> > > > and async per `put` user cases. > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > +1 > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > Given how simple the sync implementation > > > > > > > > > > > > >> > > > is, just by complying with the return type > of > > > the > > > > > > > method, > > > > > > > > I > > > > > > > > > > > still > > > > > > > > > > > > >> think > > > > > > > > > > > > >> > > the > > > > > > > > > > > > >> > > > BiFunction definition that returns a Future > > > makes > > > > > > sense. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Konstantine > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM Aakash > Shah < > > > > > > > > > > > as...@confluent.io> > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > Thanks for the additional feedback. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > I see the benefits of adding an overloaded > > > > > put(...) > > > > > > > over > > > > > > > > > > > > >> alternatives > > > > > > > > > > > > >> > > > and I > > > > > > > > > > > > >> > > > > am on board going forward with this > > approach. > > > It > > > > > > will > > > > > > > > > > > definitely > > > > > > > > > > > > >> set > > > > > > > > > > > > >> > > > forth > > > > > > > > > > > > >> > > > > a contract of where the reporter will be > > used > > > > with > > > > > > > > better > > > > > > > > > > > > >> aesthetics. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > The original idea of going with a > > synchronous > > > > > > approach > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > > > >> error > > > > > > > > > > > > >> > > > > reporter was to ease the connector > > developer's > > > > job > > > > > > > > > > interacting > > > > > > > > > > > > >> with > > > > > > > > > > > > >> > and > > > > > > > > > > > > >> > > > > handling the error reporter. The tradeoff > > for > > > > > > having a > > > > > > > > > > > > >> > synchronous-only > > > > > > > > > > > > >> > > > > reporter would be lower throughput on the > > > > > reporter; > > > > > > > this > > > > > > > > > was > > > > > > > > > > > > >> thought > > > > > > > > > > > > >> > to > > > > > > > > > > > > >> > > > be > > > > > > > > > > > > >> > > > > fine since arguably most circumstances > would > > > not > > > > > > > include > > > > > > > > > > > > >> consistently > > > > > > > > > > > > >> > > > large > > > > > > > > > > > > >> > > > > amounts of records being sent to the error > > > > > reporter. > > > > > > > > Even > > > > > > > > > if > > > > > > > > > > > > this > > > > > > > > > > > > >> was > > > > > > > > > > > > >> > > the > > > > > > > > > > > > >> > > > > case, an argument can be made that the > lower > > > > > > > throughput > > > > > > > > > > would > > > > > > > > > > > be > > > > > > > > > > > > >> of > > > > > > > > > > > > >> > > > > assistance in this case, as it would allow > > > more > > > > > time > > > > > > > for > > > > > > > > > the > > > > > > > > > > > > user > > > > > > > > > > > > >> to > > > > > > > > > > > > >> > > > > realize the connector is having records > sent > > > to > > > > > the > > > > > > > > error > > > > > > > > > > > > reporter > > > > > > > > > > > > >> > > before > > > > > > > > > > > > >> > > > > many are sent. However, if we are strongly > > in > > > > > favor > > > > > > of > > > > > > > > > > having > > > > > > > > > > > > the > > > > > > > > > > > > >> > > option > > > > > > > > > > > > >> > > > of > > > > > > > > > > > > >> > > > > asynchronous functionality available for > the > > > > > > > developer, > > > > > > > > > > then I > > > > > > > > > > > > am > > > > > > > > > > > > >> > fine > > > > > > > > > > > > >> > > > with > > > > > > > > > > > > >> > > > > that as well. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Lastly, I am on board with changing the > name > > > to > > > > > > > > > > > > >> failedRecordReporter, > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Please let me know your thoughts. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > > > > > >> > > > > Aakash > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM Randall > > Hauch > > > < > > > > > > > > > > > rha...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > Konstantine said: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > I notice Randall also used BiFunction > in > > > his > > > > > > > > example, > > > > > > > > > I > > > > > > > > > > > > >> wonder if > > > > > > > > > > > > >> > > > it's > > > > > > > > > > > > >> > > > > > for > > > > > > > > > > > > >> > > > > > > similar reasons. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Nope. Just a typo on my part. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > There appear to be three outstanding > > > > questions. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > First, Konstantine suggested calling > this > > > > > > > > > > > > >> "failedRecordReporter". I > > > > > > > > > > > > >> > > > think > > > > > > > > > > > > >> > > > > > this is minor, but using this new name > may > > > be > > > > a > > > > > > bit > > > > > > > > more > > > > > > > > > > > > precise > > > > > > > > > > > > >> > and > > > > > > > > > > > > >> > > > I'd > > > > > > > > > > > > >> > > > > be > > > > > > > > > > > > >> > > > > > fine with this. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Second, should the reporter method be > > > > > > synchronous? I > > > > > > > > > think > > > > > > > > > > > the > > > > > > > > > > > > >> two > > > > > > > > > > > > >> > > > > options > > > > > > > > > > > > >> > > > > > are: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord, > > Throwable>` > > > > that > > > > > > > > returns > > > > > > > > > > > > nothing > > > > > > > > > > > > >> > and > > > > > > > > > > > > >> > > > > blocks > > > > > > > > > > > > >> > > > > > (at this time). > > > > > > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord, > Throwable, > > > > > > > > > Future<Void>>` > > > > > > > > > > > that > > > > > > > > > > > > >> > > returns > > > > > > > > > > > > >> > > > a > > > > > > > > > > > > >> > > > > > future that the user can optionally use > to > > > be > > > > > > > > > synchronous. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > I do agree with Konstantine that option > 2b > > > > gives > > > > > > us > > > > > > > > more > > > > > > > > > > > room > > > > > > > > > > > > >> for > > > > > > > > > > > > >> > > > future > > > > > > > > > > > > >> > > > > > semantic changes, and since the producer > > > write > > > > > is > > > > > > > > > already > > > > > > > > > > > > >> > > asynchronous > > > > > > > > > > > > >> > > > > this > > > > > > > > > > > > >> > > > > > should be straightforward to implement. > I > > > > think > > > > > > the > > > > > > > > > > concern > > > > > > > > > > > > >> here is > > > > > > > > > > > > >> > > > that > > > > > > > > > > > > >> > > > > if > > > > > > > > > > > > >> > > > > > the sink task does not *use* the future > to > > > > make > > > > > > this > > > > > > > > > > > > >> synchronous, > > > > > > > > > > > > >> > it > > > > > > > > > > > > >> > > is > > > > > > > > > > > > >> > > > > > very possible that the error records > could > > > be > > > > > > > written > > > > > > > > > out > > > > > > > > > > of > > > > > > > > > > > > >> order > > > > > > > > > > > > >> > > (due > > > > > > > > > > > > >> > > > > to > > > > > > > > > > > > >> > > > > > retries). But this won't be an issue if > > the > > > > > > > > > implementation > > > > > > > > > > > > uses > > > > > > > > > > > > >> > > > > > > `max.in.flight.requests.per.connection=1` > > > for > > > > > > > writing > > > > > > > > > the > > > > > > > > > > > > error > > > > > > > > > > > > >> > > > records. > > > > > > > > > > > > >> > > > > > It's a little less clear, but honestly > IMO > > > > > passing > > > > > > > the > > > > > > > > > > > > reporter > > > > > > > > > > > > >> in > > > > > > > > > > > > >> > > the > > > > > > > > > > > > >> > > > > > `put(...)` method helps make this lambda > > > > easier > > > > > to > > > > > > > > > > > understand, > > > > > > > > > > > > >> for > > > > > > > > > > > > >> > > some > > > > > > > > > > > > >> > > > > > strange reason. So unless there are good > > > > reasons > > > > > > to > > > > > > > > > avoid > > > > > > > > > > > > this, > > > > > > > > > > > > >> I'd > > > > > > > > > > > > >> > > be > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > >> > > > > > favor of 2b and returning a Future. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Third, how do we pass the reporter > lambda > > / > > > > > method > > > > > > > > > > reference > > > > > > > > > > > > to > > > > > > > > > > > > >> the > > > > > > > > > > > > >> > > > task? > > > > > > > > > > > > >> > > > > > My proposal to pass the reporter via an > > > > overload > > > > > > > > > > `put(...)` > > > > > > > > > > > > >> still > > > > > > > > > > > > >> > is > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > >> > > > > > most attractive to me, for several > > reasons: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > 3a. There's no need to pass the reporter > > > > > > separately > > > > > > > > > *and* > > > > > > > > > > to > > > > > > > > > > > > >> > describe > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > >> > > > > > changes in method call ordering. > > > > > > > > > > > > >> > > > > > 3b. As mentioned above, for some reason > > > > passing > > > > > it > > > > > > > via > > > > > > > > > > > > >> `put(...)` > > > > > > > > > > > > >> > > makes > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > >> > > > > > intent more clear that it be used when > > > > > processing > > > > > > > the > > > > > > > > > > > > >> SinkRecord, > > > > > > > > > > > > >> > and > > > > > > > > > > > > >> > > > > that > > > > > > > > > > > > >> > > > > > it shouldn't be used in `start(...)`, > > > > > > > > `preCommit(...)`, > > > > > > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or any of > the > > > > other > > > > > > > task > > > > > > > > > > > methods. > > > > > > > > > > > > >> As > > > > > > > > > > > > >> > > > Andrew > > > > > > > > > > > > >> > > > > > pointed out earlier, *describing* this > in > > > the > > > > > KIP > > > > > > > and > > > > > > > > in > > > > > > > > > > > > JavaDoc > > > > > > > > > > > > >> > will > > > > > > > > > > > > >> > > > be > > > > > > > > > > > > >> > > > > > tough to be exact yet succinct. > > > > > > > > > > > > >> > > > > > 3c. There is already precedence for > > evolving > > > > > > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`, and the > > > > pattern > > > > > is > > > > > > > > > > > identical. > > > > > > > > > > > > >> > > > > > 3d. Backward compatibility is easy to > > > > > understand, > > > > > > > and > > > > > > > > at > > > > > > > > > > the > > > > > > > > > > > > >> same > > > > > > > > > > > > >> > > time > > > > > > > > > > > > >> > > > > it's > > > > > > > > > > > > >> > > > > > pretty easy to describe what > > implementations > > > > > that > > > > > > > want > > > > > > > > > to > > > > > > > > > > > take > > > > > > > > > > > > >> > > > advantage > > > > > > > > > > > > >> > > > > of > > > > > > > > > > > > >> > > > > > this feature should do. > > > > > > > > > > > > >> > > > > > 3e. Minimal changes to the interface: > > we're > > > > just > > > > > > > > > *adding* > > > > > > > > > > > one > > > > > > > > > > > > >> > default > > > > > > > > > > > > >> > > > > > method that calls the existing method > and > > > > > > > deprecating > > > > > > > > > the > > > > > > > > > > > > >> existing > > > > > > > > > > > > >> > > > > > `put(...)`. > > > > > > > > > > > > >> > > > > > 3f. Deprecating the existing `put(...)` > > > makes > > > > it > > > > > > > more > > > > > > > > > > clear > > > > > > > > > > > > in a > > > > > > > > > > > > >> > > > > > programmatic sense that new sink > > > > implementations > > > > > > > > should > > > > > > > > > > use > > > > > > > > > > > > the > > > > > > > > > > > > >> > > > reporter, > > > > > > > > > > > > >> > > > > > and that we recommend old sinks evolve > to > > > use > > > > > it. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Some of these benefits apply to some of > > the > > > > > other > > > > > > > > > > > suggestions, > > > > > > > > > > > > >> but > > > > > > > > > > > > >> > I > > > > > > > > > > > > >> > > > > think > > > > > > > > > > > > >> > > > > > none of the other suggestions have all > of > > > > these > > > > > > > > > benefits. > > > > > > > > > > > For > > > > > > > > > > > > >> > > example, > > > > > > > > > > > > >> > > > > > overloading `initialize(...)` is more > > > > difficult > > > > > > > since > > > > > > > > > most > > > > > > > > > > > > sink > > > > > > > > > > > > >> > > > > connectors > > > > > > > > > > > > >> > > > > > don't override it and therefore would be > > > less > > > > > > > subject > > > > > > > > to > > > > > > > > > > > > >> > deprecations > > > > > > > > > > > > >> > > > > > warnings. Overloading `start(...)` is > less > > > > > > > attractive. > > > > > > > > > > > Adding > > > > > > > > > > > > a > > > > > > > > > > > > >> > > method > > > > > > > > > > > > >> > > > > IMO > > > > > > > > > > > > >> > > > > > shares the fewest of these benefits. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > The one disadvantage of this approach is > > > that > > > > > sink > > > > > > > > task > > > > > > > > > > > > >> > > implementations > > > > > > > > > > > > >> > > > > > can't rely upon the reporter upon > startup. > > > IMO > > > > > > > that's > > > > > > > > an > > > > > > > > > > > > >> acceptable > > > > > > > > > > > > >> > > > > > tradeoff to get the cleaner and more > > > explicit > > > > > API, > > > > > > > > > > > especially > > > > > > > > > > > > if > > > > > > > > > > > > >> > the > > > > > > > > > > > > >> > > > API > > > > > > > > > > > > >> > > > > > contract is that Connect will pass the > > same > > > > > > reporter > > > > > > > > > > > instance > > > > > > > > > > > > to > > > > > > > > > > > > >> > each > > > > > > > > > > > > >> > > > > call > > > > > > > > > > > > >> > > > > > to `put(...)` on a single task instance. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Best regards, > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Randall > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM Andrew > > > > > Schofield < > > > > > > > > > > > > >> > > > > > andrew_schofi...@live.com> > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > Hi, > > > > > > > > > > > > >> > > > > > > Randall's suggestion is really good. I > > > think > > > > > it > > > > > > > > gives > > > > > > > > > > the > > > > > > > > > > > > >> > > flexibility > > > > > > > > > > > > >> > > > > > > required and also > > > > > > > > > > > > >> > > > > > > keeps the interface the right way > round. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > > > > > > > >> > > > > > > Andrew Schofield > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash Shah" < > > > > > > > > > > as...@confluent.io> > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > Hi Randall, > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > 1. This is a great suggestion, but I > > > find > > > > > that > > > > > > > > > adding > > > > > > > > > > an > > > > > > > > > > > > >> > > overloaded > > > > > > > > > > > > >> > > > > > > > put(...) which essentially > deprecates > > > the > > > > > old > > > > > > > > > put(...) > > > > > > > > > > > to > > > > > > > > > > > > >> only > > > > > > > > > > > > >> > be > > > > > > > > > > > > >> > > > > used > > > > > > > > > > > > >> > > > > > > when > > > > > > > > > > > > >> > > > > > > > a connector is deployed on older > > > versions > > > > of > > > > > > > > Connect > > > > > > > > > > > adds > > > > > > > > > > > > >> > enough > > > > > > > > > > > > >> > > > of a > > > > > > > > > > > > >> > > > > > > > complication that could cause > > connectors > > > > to > > > > > > > break > > > > > > > > if > > > > > > > > > > the > > > > > > > > > > > > old > > > > > > > > > > > > >> > > > put(...) > > > > > > > > > > > > >> > > > > > > > doesn't correctly invoke the > > overloaded > > > > > > > put(...); > > > > > > > > > > either > > > > > > > > > > > > >> that, > > > > > > > > > > > > >> > or > > > > > > > > > > > > >> > > > it > > > > > > > > > > > > >> > > > > > will > > > > > > > > > > > > >> > > > > > > > add duplication of functionality > > across > > > > the > > > > > > two > > > > > > > > > > put(...) > > > > > > > > > > > > >> > > methods. I > > > > > > > > > > > > >> > > > > > think > > > > > > > > > > > > >> > > > > > > > the older method simplifies things > > with > > > > the > > > > > > idea > > > > > > > > > that > > > > > > > > > > a > > > > > > > > > > > > >> > DLQ/error > > > > > > > > > > > > >> > > > > > > reporter > > > > > > > > > > > > >> > > > > > > > will or will not be passed into the > > > method > > > > > > > > depending > > > > > > > > > > on > > > > > > > > > > > > the > > > > > > > > > > > > >> > > version > > > > > > > > > > > > >> > > > > of > > > > > > > > > > > > >> > > > > > > AK. > > > > > > > > > > > > >> > > > > > > > However, I also understand the > > aesthetic > > > > > > > advantage > > > > > > > > > of > > > > > > > > > > > this > > > > > > > > > > > > >> > method > > > > > > > > > > > > >> > > > vs > > > > > > > > > > > > >> > > > > > the > > > > > > > > > > > > >> > > > > > > > setter method, so I am okay with > going > > > in > > > > > this > > > > > > > > > > direction > > > > > > > > > > > > if > > > > > > > > > > > > >> > > others > > > > > > > > > > > > >> > > > > > agree > > > > > > > > > > > > >> > > > > > > > with adding the overloaded put(...). > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > 2. Yes, your assumption is correct. > > Yes, > > > > we > > > > > > can > > > > > > > > > remove > > > > > > > > > > > the > > > > > > > > > > > > >> > "Order > > > > > > > > > > > > >> > > > of > > > > > > > > > > > > >> > > > > > > > Operations" if we go with the > > overloaded > > > > > > > put(...) > > > > > > > > > > > > direction. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > 3. Great point, I will remove them > > from > > > > the > > > > > > KIP. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > 4. Yeah, accept(...) will be > > > synchronous. > > > > I > > > > > > will > > > > > > > > > > change > > > > > > > > > > > it > > > > > > > > > > > > >> to > > > > > > > > > > > > >> > be > > > > > > > > > > > > >> > > > > > clearer, > > > > > > > > > > > > >> > > > > > > > thanks. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > 5. This KIP will use existing > metrics > > as > > > > > well > > > > > > > > > > introduce > > > > > > > > > > > > new > > > > > > > > > > > > >> > > > metrics. > > > > > > > > > > > > >> > > > > I > > > > > > > > > > > > >> > > > > > > will > > > > > > > > > > > > >> > > > > > > > update this section to fully specify > > the > > > > > > > metrics. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > Please let me know what you think. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks, > > > > > > > > > > > > >> > > > > > > > Aakash > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > On Thu, May 14, 2020 at 3:52 PM > > Randall > > > > > Hauch > > > > > > < > > > > > > > > > > > > >> > rha...@gmail.com> > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Hi, Aakash. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Thanks for the KIP. Connect does > > need > > > an > > > > > > > > improved > > > > > > > > > > > > ability > > > > > > > > > > > > >> for > > > > > > > > > > > > >> > > > sink > > > > > > > > > > > > >> > > > > > > > > connectors to report individual > > > records > > > > as > > > > > > > being > > > > > > > > > > > > >> problematic, > > > > > > > > > > > > >> > > and > > > > > > > > > > > > >> > > > > > this > > > > > > > > > > > > >> > > > > > > > > integrates nicely with the > existing > > > DLQ > > > > > > > feature. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > I also appreciate the desire to > > > maintain > > > > > > > > > > compatibility > > > > > > > > > > > > so > > > > > > > > > > > > >> > that > > > > > > > > > > > > >> > > > > > > connectors > > > > > > > > > > > > >> > > > > > > > > can take advantage of this feature > > > when > > > > > > > deployed > > > > > > > > > in > > > > > > > > > > a > > > > > > > > > > > > >> runtime > > > > > > > > > > > > >> > > > that > > > > > > > > > > > > >> > > > > > > supports > > > > > > > > > > > > >> > > > > > > > > this feature, but can safely and > > > easily > > > > do > > > > > > > > without > > > > > > > > > > the > > > > > > > > > > > > >> > feature > > > > > > > > > > > > >> > > > when > > > > > > > > > > > > >> > > > > > > > > deployed to an older runtime. But > I > > do > > > > > > > > understand > > > > > > > > > > > > Andrew's > > > > > > > > > > > > >> > > > concern > > > > > > > > > > > > >> > > > > > > about > > > > > > > > > > > > >> > > > > > > > > the aesthetics. Have you > considered > > > > > > > overloading > > > > > > > > > the > > > > > > > > > > > > >> > `put(...)` > > > > > > > > > > > > >> > > > > method > > > > > > > > > > > > >> > > > > > > and > > > > > > > > > > > > >> > > > > > > > > adding the `reporter` as a second > > > > > parameter? > > > > > > > > > > > Essentially > > > > > > > > > > > > >> it > > > > > > > > > > > > >> > > would > > > > > > > > > > > > >> > > > > add > > > > > > > > > > > > >> > > > > > > the > > > > > > > > > > > > >> > > > > > > > > one method (with proper JavaDoc) > to > > > > > > `SinkTask` > > > > > > > > > only: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > ``` > > > > > > > > > > > > >> > > > > > > > > public void > > > > put(Collection<SinkRecord> > > > > > > > > > records, > > > > > > > > > > > > >> > > > > > > BiFunction<SinkRecord, > > > > > > > > > > > > >> > > > > > > > > Throwable> reporter) { > > > > > > > > > > > > >> > > > > > > > > put(records); > > > > > > > > > > > > >> > > > > > > > > } > > > > > > > > > > > > >> > > > > > > > > ``` > > > > > > > > > > > > >> > > > > > > > > and the WorkerSinkTask would be > > > changed > > > > to > > > > > > > call > > > > > > > > > > > > >> > > `put(Collection, > > > > > > > > > > > > >> > > > > > > > > BiFunction)` instead. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Sink connector implementations > that > > > > don't > > > > > do > > > > > > > > > > anything > > > > > > > > > > > > >> > different > > > > > > > > > > > > >> > > > can > > > > > > > > > > > > >> > > > > > > still > > > > > > > > > > > > >> > > > > > > > > override `put(Collection)`, and it > > > still > > > > > > works > > > > > > > > as > > > > > > > > > > > > before. > > > > > > > > > > > > >> > > > > Developers > > > > > > > > > > > > >> > > > > > > that > > > > > > > > > > > > >> > > > > > > > > want to change their sink > connector > > > > > > > > > implementations > > > > > > > > > > to > > > > > > > > > > > > >> > support > > > > > > > > > > > > >> > > > this > > > > > > > > > > > > >> > > > > > new > > > > > > > > > > > > >> > > > > > > > > feature would do the following, > > which > > > > > would > > > > > > > work > > > > > > > > > in > > > > > > > > > > > > older > > > > > > > > > > > > >> and > > > > > > > > > > > > >> > > > newer > > > > > > > > > > > > >> > > > > > > Connect > > > > > > > > > > > > >> > > > > > > > > runtimes: > > > > > > > > > > > > >> > > > > > > > > ``` > > > > > > > > > > > > >> > > > > > > > > public void > > > > put(Collection<SinkRecord> > > > > > > > > > records) > > > > > > > > > > { > > > > > > > > > > > > >> > > > > > > > > put(records, null); > > > > > > > > > > > > >> > > > > > > > > } > > > > > > > > > > > > >> > > > > > > > > public void > > > > put(Collection<SinkRecord> > > > > > > > > > records, > > > > > > > > > > > > >> > > > > > > BiFunction<SinkRecord, > > > > > > > > > > > > >> > > > > > > > > Throwable> reporter) { > > > > > > > > > > > > >> > > > > > > > > // the normal > > > `put(Collection)` > > > > > > logic > > > > > > > > goes > > > > > > > > > > > here, > > > > > > > > > > > > >> but > > > > > > > > > > > > >> > > can > > > > > > > > > > > > >> > > > > > > optionally > > > > > > > > > > > > >> > > > > > > > > use `reporter` if non-null > > > > > > > > > > > > >> > > > > > > > > } > > > > > > > > > > > > >> > > > > > > > > ``` > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > I think this has all the same > > benefits > > > > of > > > > > > the > > > > > > > > > > current > > > > > > > > > > > > KIP, > > > > > > > > > > > > >> > but > > > > > > > > > > > > >> > > > > > > > > it's noticeably simpler and > > hopefully > > > > more > > > > > > > > > > > aesthetically > > > > > > > > > > > > >> > > > pleasing. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > As for Andrew's second concern > about > > > > "the > > > > > > task > > > > > > > > can > > > > > > > > > > > send > > > > > > > > > > > > >> > errant > > > > > > > > > > > > >> > > > > > records > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > >> > > > > > > > > it within put(...)" being too > > > > restrictive. > > > > > > My > > > > > > > > > guess > > > > > > > > > > is > > > > > > > > > > > > >> that > > > > > > > > > > > > >> > > this > > > > > > > > > > > > >> > > > > was > > > > > > > > > > > > >> > > > > > > more > > > > > > > > > > > > >> > > > > > > > > an attempt at describing the basic > > > > > behavior, > > > > > > > and > > > > > > > > > > less > > > > > > > > > > > > >> about > > > > > > > > > > > > >> > > > > requiring > > > > > > > > > > > > >> > > > > > > the > > > > > > > > > > > > >> > > > > > > > > reporter only being called within > > the > > > > > > > `put(...)` > > > > > > > > > > > method > > > > > > > > > > > > >> and > > > > > > > > > > > > >> > not > > > > > > > > > > > > >> > > > by > > > > > > > > > > > > >> > > > > > > methods > > > > > > > > > > > > >> > > > > > > > > to which `put(...)` synchronously > or > > > > > > > > > asynchronously > > > > > > > > > > > > >> > delegates. > > > > > > > > > > > > >> > > > Can > > > > > > > > > > > > >> > > > > > you > > > > > > > > > > > > >> > > > > > > > > confirm whether my assumption is > > > > correct? > > > > > If > > > > > > > so, > > > > > > > > > > then > > > > > > > > > > > > >> perhaps > > > > > > > > > > > > >> > > my > > > > > > > > > > > > >> > > > > > > suggestion > > > > > > > > > > > > >> > > > > > > > > helps work around this issue as > > well, > > > > > since > > > > > > > > there > > > > > > > > > > > would > > > > > > > > > > > > >> be no > > > > > > > > > > > > >> > > > > > > restriction > > > > > > > > > > > > >> > > > > > > > > on when the reporter is called, > and > > > the > > > > > > whole > > > > > > > > > "Order > > > > > > > > > > > of > > > > > > > > > > > > >> > > > Operations" > > > > > > > > > > > > >> > > > > > > section > > > > > > > > > > > > >> > > > > > > > > could potentially be removed. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Third, it's not clear to me why > the > > > > "Error > > > > > > > > > Reporter > > > > > > > > > > > > >> Object" > > > > > > > > > > > > >> > > > > > subsection > > > > > > > > > > > > >> > > > > > > in > > > > > > > > > > > > >> > > > > > > > > the "Proposal" section lists the > > > worker > > > > > > > > > > configuration > > > > > > > > > > > > >> > > properties > > > > > > > > > > > > >> > > > > that > > > > > > > > > > > > >> > > > > > > were > > > > > > > > > > > > >> > > > > > > > > previously introduced with > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect > > > > > > > > > > > > >> > > > > > > > > . > > > > > > > > > > > > >> > > > > > > > > Maybe it's worth mentioning that > the > > > > error > > > > > > > > > reporter > > > > > > > > > > > > >> > > functionality > > > > > > > > > > > > >> > > > > > will > > > > > > > > > > > > >> > > > > > > > > reuse or build upon KIP-298, > > including > > > > > > reusing > > > > > > > > the > > > > > > > > > > > > >> > > configuration > > > > > > > > > > > > >> > > > > > > properties > > > > > > > > > > > > >> > > > > > > > > defined in KIP-298. But IIUC, the > > KIP > > > > does > > > > > > not > > > > > > > > > > propose > > > > > > > > > > > > >> > changing > > > > > > > > > > > > >> > > > any > > > > > > > > > > > > >> > > > > > > > > technical or semantic aspect of > > these > > > > > > > > > configuration > > > > > > > > > > > > >> > properties, > > > > > > > > > > > > >> > > > and > > > > > > > > > > > > >> > > > > > > > > therefore the KIP would be more > > clear > > > > and > > > > > > > > succinct > > > > > > > > > > > > without > > > > > > > > > > > > >> > > them. > > > > > > > > > > > > >> > > > > > > *That* the > > > > > > > > > > > > >> > > > > > > > > error reporter will use these > > > properties > > > > > is > > > > > > > part > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > UX > > > > > > > > > > > > >> > and > > > > > > > > > > > > >> > > > > > > therefore > > > > > > > > > > > > >> > > > > > > > > necessary to mention, but *how* it > > > uses > > > > > > those > > > > > > > > > > > properties > > > > > > > > > > > > >> is > > > > > > > > > > > > >> > > > really > > > > > > > > > > > > >> > > > > up > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > >> > > > > > > > > the implementation. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Fourth, the "Synchrony" section > has > > a > > > > > > sentence > > > > > > > > > that > > > > > > > > > > is > > > > > > > > > > > > >> > > confusing, > > > > > > > > > > > > >> > > > > or > > > > > > > > > > > > >> > > > > > > not as > > > > > > > > > > > > >> > > > > > > > > clear as it could be. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > "If a record is sent to the > > error > > > > > > > reporter, > > > > > > > > > > > > >> processing of > > > > > > > > > > > > >> > > the > > > > > > > > > > > > >> > > > > > next > > > > > > > > > > > > >> > > > > > > > > errant record in accept(...) will > > not > > > > > begin > > > > > > > > until > > > > > > > > > > the > > > > > > > > > > > > >> > producer > > > > > > > > > > > > >> > > > > > > successfully > > > > > > > > > > > > >> > > > > > > > > sends the errant record to Kafka." > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > This sentence is a bit difficult > to > > > > > > > understand, > > > > > > > > > but > > > > > > > > > > > IIUC > > > > > > > > > > > > >> this > > > > > > > > > > > > >> > > > > really > > > > > > > > > > > > >> > > > > > > just > > > > > > > > > > > > >> > > > > > > > > means that "accept(...)" will be > > > > > synchronous > > > > > > > and > > > > > > > > > > will > > > > > > > > > > > > >> block > > > > > > > > > > > > >> > > until > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > >> > > > > > > > > errant record has been > successfully > > > > > written > > > > > > to > > > > > > > > > > Kafka. > > > > > > > > > > > If > > > > > > > > > > > > >> so, > > > > > > > > > > > > >> > > > let's > > > > > > > > > > > > >> > > > > > say > > > > > > > > > > > > >> > > > > > > > > that. The rest of the paragraph is > > > fine. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Finally, is this KIP proposing new > > > > > metrics, > > > > > > or > > > > > > > > > that > > > > > > > > > > > > >> existing > > > > > > > > > > > > >> > > > > metrics > > > > > > > > > > > > >> > > > > > > would > > > > > > > > > > > > >> > > > > > > > > be used to track the error > reporter > > > > usage? > > > > > > If > > > > > > > > the > > > > > > > > > > > > former, > > > > > > > > > > > > >> > then > > > > > > > > > > > > >> > > > > please > > > > > > > > > > > > >> > > > > > > > > fully-specify what these metrics > > will > > > > be, > > > > > > > > > similarly > > > > > > > > > > to > > > > > > > > > > > > how > > > > > > > > > > > > >> > > > metrics > > > > > > > > > > > > >> > > > > > are > > > > > > > > > > > > >> > > > > > > > > specified in > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework > > > > > > > > > > > > >> > > > > > > > > . > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Thoughts? > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Best regards, > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Randall > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > On Mon, May 11, 2020 at 4:49 PM > > Andrew > > > > > > > > Schofield < > > > > > > > > > > > > >> > > > > > > > > andrew_schofi...@live.com> > > > > > > > > > > > > >> > > > > > > > > wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Hi Aakash, > > > > > > > > > > > > >> > > > > > > > > > Thanks for sorting out the > replies > > > to > > > > > the > > > > > > > > > mailing > > > > > > > > > > > > list. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > First, I do like the idea of > > > improving > > > > > > error > > > > > > > > > > > reporting > > > > > > > > > > > > >> in > > > > > > > > > > > > >> > > sink > > > > > > > > > > > > >> > > > > > > > > connectors. > > > > > > > > > > > > >> > > > > > > > > > I'd like a simple > > > > > > > > > > > > >> > > > > > > > > > way to put bad records onto the > > DLQ. > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I think this KIP is considerably > > > more > > > > > > > > > complicated > > > > > > > > > > > than > > > > > > > > > > > > >> it > > > > > > > > > > > > > > >