Hi, Aakash.

Thanks for the KIP. Connect does need an improved ability for sink
connectors to report individual records as being problematic, and this
integrates nicely with the existing DLQ feature.

I also appreciate the desire to maintain compatibility so that connectors
can take advantage of this feature when deployed in a runtime that supports
this feature, but can safely and easily do without the feature when
deployed to an older runtime. But I do understand Andrew's concern about
the aesthetics. Have you considered overloading the `put(...)` method and
adding the `reporter` as a second parameter? Essentially it would add the
one method (with proper JavaDoc) to `SinkTask` only:

```
    public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
Throwable> reporter) {
        put(records);
    }
```
and the WorkerSinkTask would be changed to call `put(Collection,
BiFunction)` instead.

Sink connector implementations that don't do anything different can still
override `put(Collection)`, and it still works as before. Developers that
want to change their sink connector implementations to support this new
feature would do the following, which would work in older and newer Connect
runtimes:
```
    public void put(Collection<SinkRecord> records) {
        put(records, null);
    }
    public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
Throwable> reporter) {
        // the normal `put(Collection)` logic goes here, but can optionally
use `reporter` if non-null
    }
```

I think this has all the same benefits of the current KIP, but
it's noticeably simpler and hopefully more aesthetically pleasing.

As for Andrew's second concern about "the task can send errant records to
it within put(...)" being too restrictive. My guess is that this was more
an attempt at describing the basic behavior, and less about requiring the
reporter only being called within the `put(...)` method and not by methods
to which `put(...)` synchronously or asynchronously delegates. Can you
confirm whether my assumption is correct? If so, then perhaps my suggestion
helps work around this issue as well, since there would be no restriction
on when the reporter is called, and the whole "Order of Operations" section
could potentially be removed.

Third, it's not clear to me why the "Error Reporter Object" subsection in
the "Proposal" section lists the worker configuration properties that were
previously introduced with
https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect.
Maybe it's worth mentioning that the error reporter functionality will
reuse or build upon KIP-298, including reusing the configuration properties
defined in KIP-298. But IIUC, the KIP does not propose changing any
technical or semantic aspect of these configuration properties, and
therefore the KIP would be more clear and succinct without them. *That* the
error reporter will use these properties is part of the UX and therefore
necessary to mention, but *how* it uses those properties is really up to
the implementation.

Fourth, the "Synchrony" section has a sentence that is confusing, or not as
clear as it could be.

    "If a record is sent to the error reporter, processing of the next
errant record in accept(...) will not begin until the producer successfully
sends the errant record to Kafka."

This sentence is a bit difficult to understand, but IIUC this really just
means that "accept(...)" will be synchronous and will block until the
errant record has been successfully written to Kafka. If so, let's say
that. The rest of the paragraph is fine.

Finally, is this KIP proposing new metrics, or that existing metrics would
be used to track the error reporter usage? If the former, then please
fully-specify what these metrics will be, similarly to how metrics are
specified in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
.

Thoughts?

Best regards,

Randall

On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Aakash,
> Thanks for sorting out the replies to the mailing list.
>
> First, I do like the idea of improving error reporting in sink connectors.
> I'd like a simple
> way to put bad records onto the DLQ.
>
> I think this KIP is considerably more complicated than it seems. The
> guidance on the
> SinkTask.put() method is that it should send the records asynchronously
> and immediately
> return, so the task is likely to want to report errors asynchronously
> too.  Currently the KIP
> states that "the task can send errant records to it within put(...)" and
> that's too restrictive.
> The task ought to be able to report any unflushed records, but the
> synchronisation of this is going
> to be tricky. I suppose the connector author needs to make sure that all
> errant records have
> been reported before returning control from SinkTask.flush(...) or perhaps
> SinkTask.preCommit(...).
>
> I think the interface is a little strange too. I can see that this was
> done so it's possible to deliver a connector
> that supports error reporting but it can also work in earlier versions of
> the KC runtime. But, the
> pattern so far is that the task uses the methods of SinkTaskContext to
> access utilities in the Kafka
> Connect runtime, and I suggest that reporting a bad record is such a
> utility. SinkTaskContext has
> changed before when the configs() methods was added, so I think there is
> precedent for adding a method.
> The way the KIP adds a method to SinkTask that the KC runtime calls to
> provide the error reporting utility
> seems not to match what has gone before.
>
> Thanks,
> Andrew
>
> On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
>
>     I wasn't previously added to the dev mailing list, so I'd like to post
> my
>     discussion with Andrew Schofield below for visibility and further
>     discussion:
>
>     Hi Andrew,
>
>     Thanks for the reply. The main concern with this approach would be its
>     backward compatibility. I’ve highlighted the thoughts around the
> backwards
>     compatibility of the initial approach, please let me know what you
> think.
>
>     Thanks,
>     Aakash
>
>
> ____________________________________________________________________________________________________________________________
>
>     Hi,
>     By adding a new method to the SinkContext interface in say Kafka 2.6, a
>     connector that calls it would require a Kafka 2.6 connect runtime. I
> don't
>     quite see how that's a backward compatibility problem. It's just that
> new
>     connectors need the latest interface. I might not quite be
> understanding,
>     but I think it would be fine.
>
>     Thanks,
>     Andrew
>
>
> ____________________________________________________________________________________________________________________________
>
>     Hi Andrew,
>
>     I apologize for the way the reply was sent. I just subscribed to the
> dev
>     mailing list so it should be resolved now.
>
>     You are correct, new connectors would simply require the latest
> interface.
>     However, we want to remove that requirement - in other words, we want
> to
>     allow the possibility that someone wants the latest connector/to
> upgrade to
>     the latest version, but deploys it on an older version of AK.
> Basically, we
>     don't want to enforce the necessity of upgrading AK to get the latest
>     interface. In the current approach, there would be no issue of
> deploying a
>     new connector on an older version of AK, as the Connect framework would
>     simply not invoke the new method.
>
>     Please let me know what you think and if I need to clarify anything.
>
>     Thanks,
>     Aakash
>
>

Reply via email to