Hi, Randall's suggestion is really good. I think it gives the flexibility required and also keeps the interface the right way round.
Thanks, Andrew Schofield On 15/05/2020, 02:07, "Aakash Shah" <as...@confluent.io> wrote: > Hi Randall, > > Thanks for the feedback. > > 1. This is a great suggestion, but I find that adding an overloaded > put(...) which essentially deprecates the old put(...) to only be used when > a connector is deployed on older versions of Connect adds enough of a > complication that could cause connectors to break if the old put(...) > doesn't correctly invoke the overloaded put(...); either that, or it will > add duplication of functionality across the two put(...) methods. I think > the older method simplifies things with the idea that a DLQ/error reporter > will or will not be passed into the method depending on the version of AK. > However, I also understand the aesthetic advantage of this method vs the > setter method, so I am okay with going in this direction if others agree > with adding the overloaded put(...). > > 2. Yes, your assumption is correct. Yes, we can remove the "Order of > Operations" if we go with the overloaded put(...) direction. > > 3. Great point, I will remove them from the KIP. > > 4. Yeah, accept(...) will be synchronous. I will change it to be clearer, > thanks. > > 5. This KIP will use existing metrics as well introduce new metrics. I will > update this section to fully specify the metrics. > > Please let me know what you think. > > Thanks, > Aakash > > On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com> wrote: > > > Hi, Aakash. > > > > Thanks for the KIP. Connect does need an improved ability for sink > > connectors to report individual records as being problematic, and this > > integrates nicely with the existing DLQ feature. > > > > I also appreciate the desire to maintain compatibility so that connectors > > can take advantage of this feature when deployed in a runtime that supports > > this feature, but can safely and easily do without the feature when > > deployed to an older runtime. But I do understand Andrew's concern about > > the aesthetics. Have you considered overloading the `put(...)` method and > > adding the `reporter` as a second parameter? Essentially it would add the > > one method (with proper JavaDoc) to `SinkTask` only: > > > > ``` > > public void put(Collection<SinkRecord> records, BiFunction<SinkRecord, > > Throwable> reporter) { > > put(records); > > } > > ``` > > and the WorkerSinkTask would be changed to call `put(Collection, > > BiFunction)` instead. > > > > Sink connector implementations that don't do anything different can still > > override `put(Collection)`, and it still works as before. Developers that > > want to change their sink connector implementations to support this new > > feature would do the following, which would work in older and newer Connect > > runtimes: > > ``` > > public void put(Collection<SinkRecord> records) { > > put(records, null); > > } > > public void put(Collection<SinkRecord> records, BiFunction<SinkRecord, > > Throwable> reporter) { > > // the normal `put(Collection)` logic goes here, but can optionally > > use `reporter` if non-null > > } > > ``` > > > > I think this has all the same benefits of the current KIP, but > > it's noticeably simpler and hopefully more aesthetically pleasing. > > > > As for Andrew's second concern about "the task can send errant records to > > it within put(...)" being too restrictive. My guess is that this was more > > an attempt at describing the basic behavior, and less about requiring the > > reporter only being called within the `put(...)` method and not by methods > > to which `put(...)` synchronously or asynchronously delegates. Can you > > confirm whether my assumption is correct? If so, then perhaps my suggestion > > helps work around this issue as well, since there would be no restriction > > on when the reporter is called, and the whole "Order of Operations" section > > could potentially be removed. > > > > Third, it's not clear to me why the "Error Reporter Object" subsection in > > the "Proposal" section lists the worker configuration properties that were > > previously introduced with > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect > > . > > Maybe it's worth mentioning that the error reporter functionality will > > reuse or build upon KIP-298, including reusing the configuration properties > > defined in KIP-298. But IIUC, the KIP does not propose changing any > > technical or semantic aspect of these configuration properties, and > > therefore the KIP would be more clear and succinct without them. *That* the > > error reporter will use these properties is part of the UX and therefore > > necessary to mention, but *how* it uses those properties is really up to > > the implementation. > > > > Fourth, the "Synchrony" section has a sentence that is confusing, or not as > > clear as it could be. > > > > "If a record is sent to the error reporter, processing of the next > > errant record in accept(...) will not begin until the producer successfully > > sends the errant record to Kafka." > > > > This sentence is a bit difficult to understand, but IIUC this really just > > means that "accept(...)" will be synchronous and will block until the > > errant record has been successfully written to Kafka. If so, let's say > > that. The rest of the paragraph is fine. > > > > Finally, is this KIP proposing new metrics, or that existing metrics would > > be used to track the error reporter usage? If the former, then please > > fully-specify what these metrics will be, similarly to how metrics are > > specified in > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework > > . > > > > Thoughts? > > > > Best regards, > > > > Randall > > > > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield < > > andrew_schofi...@live.com> > > wrote: > > > > > Hi Aakash, > > > Thanks for sorting out the replies to the mailing list. > > > > > > First, I do like the idea of improving error reporting in sink > > connectors. > > > I'd like a simple > > > way to put bad records onto the DLQ. > > > > > > I think this KIP is considerably more complicated than it seems. The > > > guidance on the > > > SinkTask.put() method is that it should send the records asynchronously > > > and immediately > > > return, so the task is likely to want to report errors asynchronously > > > too. Currently the KIP > > > states that "the task can send errant records to it within put(...)" and > > > that's too restrictive. > > > The task ought to be able to report any unflushed records, but the > > > synchronisation of this is going > > > to be tricky. I suppose the connector author needs to make sure that all > > > errant records have > > > been reported before returning control from SinkTask.flush(...) or > > perhaps > > > SinkTask.preCommit(...). > > > > > > I think the interface is a little strange too. I can see that this was > > > done so it's possible to deliver a connector > > > that supports error reporting but it can also work in earlier versions of > > > the KC runtime. But, the > > > pattern so far is that the task uses the methods of SinkTaskContext to > > > access utilities in the Kafka > > > Connect runtime, and I suggest that reporting a bad record is such a > > > utility. SinkTaskContext has > > > changed before when the configs() methods was added, so I think there is > > > precedent for adding a method. > > > The way the KIP adds a method to SinkTask that the KC runtime calls to > > > provide the error reporting utility > > > seems not to match what has gone before. > > > > > > Thanks, > > > Andrew > > > > > > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote: > > > > > > I wasn't previously added to the dev mailing list, so I'd like to > > post > > > my > > > discussion with Andrew Schofield below for visibility and further > > > discussion: > > > > > > Hi Andrew, > > > > > > Thanks for the reply. The main concern with this approach would be > > its > > > backward compatibility. I’ve highlighted the thoughts around the > > > backwards > > > compatibility of the initial approach, please let me know what you > > > think. > > > > > > Thanks, > > > Aakash > > > > > > > > > > > ____________________________________________________________________________________________________________________________ > > > > > > Hi, > > > By adding a new method to the SinkContext interface in say Kafka > > 2.6, a > > > connector that calls it would require a Kafka 2.6 connect runtime. I > > > don't > > > quite see how that's a backward compatibility problem. It's just that > > > new > > > connectors need the latest interface. I might not quite be > > > understanding, > > > but I think it would be fine. > > > > > > Thanks, > > > Andrew > > > > > > > > > > > ____________________________________________________________________________________________________________________________ > > > > > > Hi Andrew, > > > > > > I apologize for the way the reply was sent. I just subscribed to the > > > dev > > > mailing list so it should be resolved now. > > > > > > You are correct, new connectors would simply require the latest > > > interface. > > > However, we want to remove that requirement - in other words, we want > > > to > > > allow the possibility that someone wants the latest connector/to > > > upgrade to > > > the latest version, but deploys it on an older version of AK. > > > Basically, we > > > don't want to enforce the necessity of upgrading AK to get the latest > > > interface. In the current approach, there would be no issue of > > > deploying a > > > new connector on an older version of AK, as the Connect framework > > would > > > simply not invoke the new method. > > > > > > Please let me know what you think and if I need to clarify anything. > > > > > > Thanks, > > > Aakash > > > > > > > > >