Hi,
Randall's suggestion is really good. I think it gives the flexibility required 
and also
keeps the interface the right way round.

Thanks,
Andrew Schofield

On 15/05/2020, 02:07, "Aakash Shah" <as...@confluent.io> wrote:

> Hi Randall,
> 
> Thanks for the feedback.
> 
> 1. This is a great suggestion, but I find that adding an overloaded
> put(...) which essentially deprecates the old put(...) to only be used when
> a connector is deployed on older versions of Connect adds enough of a
> complication that could cause connectors to break if the old put(...)
> doesn't correctly invoke the overloaded put(...); either that, or it will
> add duplication of functionality across the two put(...) methods. I think
> the older method simplifies things with the idea that a DLQ/error reporter
> will or will not be passed into the method depending on the version of AK.
> However, I also understand the aesthetic advantage of this method vs the
> setter method, so I am okay with going in this direction if others agree
> with adding the overloaded put(...).
> 
> 2. Yes, your assumption is correct. Yes, we can remove the "Order of
> Operations" if we go with the overloaded put(...) direction.
> 
> 3. Great point, I will remove them from the KIP.
> 
> 4. Yeah, accept(...) will be synchronous. I will change it to be clearer,
> thanks.
> 
> 5. This KIP will use existing metrics as well introduce new metrics. I will
> update this section to fully specify the metrics.
> 
> Please let me know what you think.
> 
> Thanks,
> Aakash
> 
> On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com> wrote:
> 
> > Hi, Aakash.
> >
> > Thanks for the KIP. Connect does need an improved ability for sink
> > connectors to report individual records as being problematic, and this
> > integrates nicely with the existing DLQ feature.
> >
> > I also appreciate the desire to maintain compatibility so that connectors
> > can take advantage of this feature when deployed in a runtime that supports
> > this feature, but can safely and easily do without the feature when
> > deployed to an older runtime. But I do understand Andrew's concern about
> > the aesthetics. Have you considered overloading the `put(...)` method and
> > adding the `reporter` as a second parameter? Essentially it would add the
> > one method (with proper JavaDoc) to `SinkTask` only:
> >
> > ```
> >     public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
> > Throwable> reporter) {
> >         put(records);
> >     }
> > ```
> > and the WorkerSinkTask would be changed to call `put(Collection,
> > BiFunction)` instead.
> >
> > Sink connector implementations that don't do anything different can still
> > override `put(Collection)`, and it still works as before. Developers that
> > want to change their sink connector implementations to support this new
> > feature would do the following, which would work in older and newer Connect
> > runtimes:
> > ```
> >     public void put(Collection<SinkRecord> records) {
> >         put(records, null);
> >     }
> >     public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
> > Throwable> reporter) {
> >         // the normal `put(Collection)` logic goes here, but can optionally
> > use `reporter` if non-null
> >     }
> > ```
> >
> > I think this has all the same benefits of the current KIP, but
> > it's noticeably simpler and hopefully more aesthetically pleasing.
> >
> > As for Andrew's second concern about "the task can send errant records to
> > it within put(...)" being too restrictive. My guess is that this was more
> > an attempt at describing the basic behavior, and less about requiring the
> > reporter only being called within the `put(...)` method and not by methods
> > to which `put(...)` synchronously or asynchronously delegates. Can you
> > confirm whether my assumption is correct? If so, then perhaps my suggestion
> > helps work around this issue as well, since there would be no restriction
> > on when the reporter is called, and the whole "Order of Operations" section
> > could potentially be removed.
> >
> > Third, it's not clear to me why the "Error Reporter Object" subsection in
> > the "Proposal" section lists the worker configuration properties that were
> > previously introduced with
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > .
> > Maybe it's worth mentioning that the error reporter functionality will
> > reuse or build upon KIP-298, including reusing the configuration properties
> > defined in KIP-298. But IIUC, the KIP does not propose changing any
> > technical or semantic aspect of these configuration properties, and
> > therefore the KIP would be more clear and succinct without them. *That* the
> > error reporter will use these properties is part of the UX and therefore
> > necessary to mention, but *how* it uses those properties is really up to
> > the implementation.
> >
> > Fourth, the "Synchrony" section has a sentence that is confusing, or not as
> > clear as it could be.
> >
> >     "If a record is sent to the error reporter, processing of the next
> > errant record in accept(...) will not begin until the producer successfully
> > sends the errant record to Kafka."
> >
> > This sentence is a bit difficult to understand, but IIUC this really just
> > means that "accept(...)" will be synchronous and will block until the
> > errant record has been successfully written to Kafka. If so, let's say
> > that. The rest of the paragraph is fine.
> >
> > Finally, is this KIP proposing new metrics, or that existing metrics would
> > be used to track the error reporter usage? If the former, then please
> > fully-specify what these metrics will be, similarly to how metrics are
> > specified in
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> > .
> >
> > Thoughts?
> >
> > Best regards,
> >
> > Randall
> >
> > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
> > andrew_schofi...@live.com>
> > wrote:
> >
> > > Hi Aakash,
> > > Thanks for sorting out the replies to the mailing list.
> > >
> > > First, I do like the idea of improving error reporting in sink
> > connectors.
> > > I'd like a simple
> > > way to put bad records onto the DLQ.
> > >
> > > I think this KIP is considerably more complicated than it seems. The
> > > guidance on the
> > > SinkTask.put() method is that it should send the records asynchronously
> > > and immediately
> > > return, so the task is likely to want to report errors asynchronously
> > > too.  Currently the KIP
> > > states that "the task can send errant records to it within put(...)" and
> > > that's too restrictive.
> > > The task ought to be able to report any unflushed records, but the
> > > synchronisation of this is going
> > > to be tricky. I suppose the connector author needs to make sure that all
> > > errant records have
> > > been reported before returning control from SinkTask.flush(...) or
> > perhaps
> > > SinkTask.preCommit(...).
> > >
> > > I think the interface is a little strange too. I can see that this was
> > > done so it's possible to deliver a connector
> > > that supports error reporting but it can also work in earlier versions of
> > > the KC runtime. But, the
> > > pattern so far is that the task uses the methods of SinkTaskContext to
> > > access utilities in the Kafka
> > > Connect runtime, and I suggest that reporting a bad record is such a
> > > utility. SinkTaskContext has
> > > changed before when the configs() methods was added, so I think there is
> > > precedent for adding a method.
> > > The way the KIP adds a method to SinkTask that the KC runtime calls to
> > > provide the error reporting utility
> > > seems not to match what has gone before.
> > >
> > > Thanks,
> > > Andrew
> > >
> > > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
> > >
> > >     I wasn't previously added to the dev mailing list, so I'd like to
> > post
> > > my
> > >     discussion with Andrew Schofield below for visibility and further
> > >     discussion:
> > >
> > >     Hi Andrew,
> > >
> > >     Thanks for the reply. The main concern with this approach would be
> > its
> > >     backward compatibility. I’ve highlighted the thoughts around the
> > > backwards
> > >     compatibility of the initial approach, please let me know what you
> > > think.
> > >
> > >     Thanks,
> > >     Aakash
> > >
> > >
> > >
> > ____________________________________________________________________________________________________________________________
> > >
> > >     Hi,
> > >     By adding a new method to the SinkContext interface in say Kafka
> > 2.6, a
> > >     connector that calls it would require a Kafka 2.6 connect runtime. I
> > > don't
> > >     quite see how that's a backward compatibility problem. It's just that
> > > new
> > >     connectors need the latest interface. I might not quite be
> > > understanding,
> > >     but I think it would be fine.
> > >
> > >     Thanks,
> > >     Andrew
> > >
> > >
> > >
> > ____________________________________________________________________________________________________________________________
> > >
> > >     Hi Andrew,
> > >
> > >     I apologize for the way the reply was sent. I just subscribed to the
> > > dev
> > >     mailing list so it should be resolved now.
> > >
> > >     You are correct, new connectors would simply require the latest
> > > interface.
> > >     However, we want to remove that requirement - in other words, we want
> > > to
> > >     allow the possibility that someone wants the latest connector/to
> > > upgrade to
> > >     the latest version, but deploys it on an older version of AK.
> > > Basically, we
> > >     don't want to enforce the necessity of upgrading AK to get the latest
> > >     interface. In the current approach, there would be no issue of
> > > deploying a
> > >     new connector on an older version of AK, as the Connect framework
> > would
> > >     simply not invoke the new method.
> > >
> > >     Please let me know what you think and if I need to clarify anything.
> > >
> > >     Thanks,
> > >     Aakash
> > >
> > >
> >
>

Reply via email to