On Fri, May 15, 2020 at 1:45 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Thanks for the quick response Aakash.
>
> To your last point, modern APIs like this tend to be asynchronous (see
> admin, producer in Kafka) and such definition results in more expressive
> and well defined APIs.
>

+1


> What you describe is easily an opt-in feature for the connector developer.
> At the same time, the latest description above, gives us better chances for
> this API to remain like this for longer, because it covers both the sync
> and async per `put` user cases.


+1


> Given how simple the sync implementation
> is, just by complying with the return type of the method, I still think the
> BiFunction definition that returns a Future makes sense.
>
> Konstantine
>
>
>
> On Fri, May 15, 2020 at 11:27 AM Aakash Shah <as...@confluent.io> wrote:
>
> > Thanks for the additional feedback.
> >
> > I see the benefits of adding an overloaded put(...) over alternatives
> and I
> > am on board going forward with this approach. It will definitely set
> forth
> > a contract of where the reporter will be used with better aesthetics.
> >
> > The original idea of going with a synchronous approach for the error
> > reporter was to ease the connector developer's job interacting with and
> > handling the error reporter. The tradeoff for having a synchronous-only
> > reporter would be lower throughput on the reporter; this was thought to
> be
> > fine since arguably most circumstances would not include consistently
> large
> > amounts of records being sent to the error reporter. Even if this was the
> > case, an argument can be made that the lower throughput would be of
> > assistance in this case, as it would allow more time for the user to
> > realize the connector is having records sent to the error reporter before
> > many are sent. However, if we are strongly in favor of having the option
> of
> > asynchronous functionality available for the developer, then I am fine
> with
> > that as well.
> >
> > Lastly, I am on board with changing the name to failedRecordReporter,
> >
> > Please let me know your thoughts.
> >
> > Thanks,
> > Aakash
> >
> > On Fri, May 15, 2020 at 9:10 AM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > Konstantine said:
> > >
> > > > I notice Randall also used BiFunction in his example, I wonder if
> it's
> > > for
> > > > similar reasons.
> > > >
> > >
> > > Nope. Just a typo on my part.
> > >
> > > There appear to be three outstanding questions.
> > >
> > > First, Konstantine suggested calling this "failedRecordReporter". I
> think
> > > this is minor, but using this new name may be a bit more precise and
> I'd
> > be
> > > fine with this.
> > >
> > > Second, should the reporter method be synchronous? I think the two
> > options
> > > are:
> > >
> > > 2a. Use `BiConsumer<SinkRecord, Throwable>` that returns nothing and
> > blocks
> > > (at this time).
> > > 2b. Use `BiFunction<SinkRecord, Throwable, Future<Void>>` that returns
> a
> > > future that the user can optionally use to be synchronous.
> > >
> > > I do agree with Konstantine that option 2b gives us more room for
> future
> > > semantic changes, and since the producer write is already asynchronous
> > this
> > > should be straightforward to implement. I think the concern here is
> that
> > if
> > > the sink task does not *use* the future to make this synchronous, it is
> > > very possible that the error records could be written out of order (due
> > to
> > > retries). But this won't be an issue if the implementation uses
> > > `max.in.flight.requests.per.connection=1` for writing the error
> records.
> > > It's a little less clear, but honestly IMO passing the reporter in the
> > > `put(...)` method helps make this lambda easier to understand, for some
> > > strange reason. So unless there are good reasons to avoid this, I'd be
> in
> > > favor of 2b and returning a Future.
> > >
> > > Third, how do we pass the reporter lambda / method reference to the
> task?
> > > My proposal to pass the reporter via an overload `put(...)` still is
> the
> > > most attractive to me, for several reasons:
> > >
> > > 3a. There's no need to pass the reporter separately *and* to describe
> the
> > > changes in method call ordering.
> > > 3b. As mentioned above, for some reason passing it via `put(...)` makes
> > the
> > > intent more clear that it be used when processing the SinkRecord, and
> > that
> > > it shouldn't be used in `start(...)`, `preCommit(...)`,
> > > `onPartitionsAssigned(...)`, or any of the other task methods. As
> Andrew
> > > pointed out earlier, *describing* this in the KIP and in JavaDoc will
> be
> > > tough to be exact yet succinct.
> > > 3c. There is already precedence for evolving
> > > `SourceTask.commitRecord(...)`, and the pattern is identical.
> > > 3d. Backward compatibility is easy to understand, and at the same time
> > it's
> > > pretty easy to describe what implementations that want to take
> advantage
> > of
> > > this feature should do.
> > > 3e. Minimal changes to the interface: we're just *adding* one default
> > > method that calls the existing method and deprecating the existing
> > > `put(...)`.
> > > 3f. Deprecating the existing `put(...)` makes it more clear in a
> > > programmatic sense that new sink implementations should use the
> reporter,
> > > and that we recommend old sinks evolve to use it.
> > >
> > > Some of these benefits apply to some of the other suggestions, but I
> > think
> > > none of the other suggestions have all of these benefits. For example,
> > > overloading `initialize(...)` is more difficult since most sink
> > connectors
> > > don't override it and therefore would be less subject to deprecations
> > > warnings. Overloading `start(...)` is less attractive. Adding a method
> > IMO
> > > shares the fewest of these benefits.
> > >
> > > The one disadvantage of this approach is that sink task implementations
> > > can't rely upon the reporter upon startup. IMO that's an acceptable
> > > tradeoff to get the cleaner and more explicit API, especially if the
> API
> > > contract is that Connect will pass the same reporter instance to each
> > call
> > > to `put(...)` on a single task instance.
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> > > On Fri, May 15, 2020 at 6:59 AM Andrew Schofield <
> > > andrew_schofi...@live.com>
> > > wrote:
> > >
> > > > Hi,
> > > > Randall's suggestion is really good. I think it gives the flexibility
> > > > required and also
> > > > keeps the interface the right way round.
> > > >
> > > > Thanks,
> > > > Andrew Schofield
> > > >
> > > > On 15/05/2020, 02:07, "Aakash Shah" <as...@confluent.io> wrote:
> > > >
> > > > > Hi Randall,
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > 1. This is a great suggestion, but I find that adding an overloaded
> > > > > put(...) which essentially deprecates the old put(...) to only be
> > used
> > > > when
> > > > > a connector is deployed on older versions of Connect adds enough
> of a
> > > > > complication that could cause connectors to break if the old
> put(...)
> > > > > doesn't correctly invoke the overloaded put(...); either that, or
> it
> > > will
> > > > > add duplication of functionality across the two put(...) methods. I
> > > think
> > > > > the older method simplifies things with the idea that a DLQ/error
> > > > reporter
> > > > > will or will not be passed into the method depending on the version
> > of
> > > > AK.
> > > > > However, I also understand the aesthetic advantage of this method
> vs
> > > the
> > > > > setter method, so I am okay with going in this direction if others
> > > agree
> > > > > with adding the overloaded put(...).
> > > > >
> > > > > 2. Yes, your assumption is correct. Yes, we can remove the "Order
> of
> > > > > Operations" if we go with the overloaded put(...) direction.
> > > > >
> > > > > 3. Great point, I will remove them from the KIP.
> > > > >
> > > > > 4. Yeah, accept(...) will be synchronous. I will change it to be
> > > clearer,
> > > > > thanks.
> > > > >
> > > > > 5. This KIP will use existing metrics as well introduce new
> metrics.
> > I
> > > > will
> > > > > update this section to fully specify the metrics.
> > > > >
> > > > > Please let me know what you think.
> > > > >
> > > > > Thanks,
> > > > > Aakash
> > > > >
> > > > > On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi, Aakash.
> > > > > >
> > > > > > Thanks for the KIP. Connect does need an improved ability for
> sink
> > > > > > connectors to report individual records as being problematic, and
> > > this
> > > > > > integrates nicely with the existing DLQ feature.
> > > > > >
> > > > > > I also appreciate the desire to maintain compatibility so that
> > > > connectors
> > > > > > can take advantage of this feature when deployed in a runtime
> that
> > > > supports
> > > > > > this feature, but can safely and easily do without the feature
> when
> > > > > > deployed to an older runtime. But I do understand Andrew's
> concern
> > > > about
> > > > > > the aesthetics. Have you considered overloading the `put(...)`
> > method
> > > > and
> > > > > > adding the `reporter` as a second parameter? Essentially it would
> > add
> > > > the
> > > > > > one method (with proper JavaDoc) to `SinkTask` only:
> > > > > >
> > > > > > ```
> > > > > >     public void put(Collection<SinkRecord> records,
> > > > BiFunction<SinkRecord,
> > > > > > Throwable> reporter) {
> > > > > >         put(records);
> > > > > >     }
> > > > > > ```
> > > > > > and the WorkerSinkTask would be changed to call `put(Collection,
> > > > > > BiFunction)` instead.
> > > > > >
> > > > > > Sink connector implementations that don't do anything different
> can
> > > > still
> > > > > > override `put(Collection)`, and it still works as before.
> > Developers
> > > > that
> > > > > > want to change their sink connector implementations to support
> this
> > > new
> > > > > > feature would do the following, which would work in older and
> newer
> > > > Connect
> > > > > > runtimes:
> > > > > > ```
> > > > > >     public void put(Collection<SinkRecord> records) {
> > > > > >         put(records, null);
> > > > > >     }
> > > > > >     public void put(Collection<SinkRecord> records,
> > > > BiFunction<SinkRecord,
> > > > > > Throwable> reporter) {
> > > > > >         // the normal `put(Collection)` logic goes here, but can
> > > > optionally
> > > > > > use `reporter` if non-null
> > > > > >     }
> > > > > > ```
> > > > > >
> > > > > > I think this has all the same benefits of the current KIP, but
> > > > > > it's noticeably simpler and hopefully more aesthetically
> pleasing.
> > > > > >
> > > > > > As for Andrew's second concern about "the task can send errant
> > > records
> > > > to
> > > > > > it within put(...)" being too restrictive. My guess is that this
> > was
> > > > more
> > > > > > an attempt at describing the basic behavior, and less about
> > requiring
> > > > the
> > > > > > reporter only being called within the `put(...)` method and not
> by
> > > > methods
> > > > > > to which `put(...)` synchronously or asynchronously delegates.
> Can
> > > you
> > > > > > confirm whether my assumption is correct? If so, then perhaps my
> > > > suggestion
> > > > > > helps work around this issue as well, since there would be no
> > > > restriction
> > > > > > on when the reporter is called, and the whole "Order of
> Operations"
> > > > section
> > > > > > could potentially be removed.
> > > > > >
> > > > > > Third, it's not clear to me why the "Error Reporter Object"
> > > subsection
> > > > in
> > > > > > the "Proposal" section lists the worker configuration properties
> > that
> > > > were
> > > > > > previously introduced with
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > > > > > .
> > > > > > Maybe it's worth mentioning that the error reporter functionality
> > > will
> > > > > > reuse or build upon KIP-298, including reusing the configuration
> > > > properties
> > > > > > defined in KIP-298. But IIUC, the KIP does not propose changing
> any
> > > > > > technical or semantic aspect of these configuration properties,
> and
> > > > > > therefore the KIP would be more clear and succinct without them.
> > > > *That* the
> > > > > > error reporter will use these properties is part of the UX and
> > > > therefore
> > > > > > necessary to mention, but *how* it uses those properties is
> really
> > up
> > > > to
> > > > > > the implementation.
> > > > > >
> > > > > > Fourth, the "Synchrony" section has a sentence that is confusing,
> > or
> > > > not as
> > > > > > clear as it could be.
> > > > > >
> > > > > >     "If a record is sent to the error reporter, processing of the
> > > next
> > > > > > errant record in accept(...) will not begin until the producer
> > > > successfully
> > > > > > sends the errant record to Kafka."
> > > > > >
> > > > > > This sentence is a bit difficult to understand, but IIUC this
> > really
> > > > just
> > > > > > means that "accept(...)" will be synchronous and will block until
> > the
> > > > > > errant record has been successfully written to Kafka. If so,
> let's
> > > say
> > > > > > that. The rest of the paragraph is fine.
> > > > > >
> > > > > > Finally, is this KIP proposing new metrics, or that existing
> > metrics
> > > > would
> > > > > > be used to track the error reporter usage? If the former, then
> > please
> > > > > > fully-specify what these metrics will be, similarly to how
> metrics
> > > are
> > > > > > specified in
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> > > > > > .
> > > > > >
> > > > > > Thoughts?
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Randall
> > > > > >
> > > > > > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
> > > > > > andrew_schofi...@live.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Aakash,
> > > > > > > Thanks for sorting out the replies to the mailing list.
> > > > > > >
> > > > > > > First, I do like the idea of improving error reporting in sink
> > > > > > connectors.
> > > > > > > I'd like a simple
> > > > > > > way to put bad records onto the DLQ.
> > > > > > >
> > > > > > > I think this KIP is considerably more complicated than it
> seems.
> > > The
> > > > > > > guidance on the
> > > > > > > SinkTask.put() method is that it should send the records
> > > > asynchronously
> > > > > > > and immediately
> > > > > > > return, so the task is likely to want to report errors
> > > asynchronously
> > > > > > > too.  Currently the KIP
> > > > > > > states that "the task can send errant records to it within
> > > put(...)"
> > > > and
> > > > > > > that's too restrictive.
> > > > > > > The task ought to be able to report any unflushed records, but
> > the
> > > > > > > synchronisation of this is going
> > > > > > > to be tricky. I suppose the connector author needs to make sure
> > > that
> > > > all
> > > > > > > errant records have
> > > > > > > been reported before returning control from SinkTask.flush(...)
> > or
> > > > > > perhaps
> > > > > > > SinkTask.preCommit(...).
> > > > > > >
> > > > > > > I think the interface is a little strange too. I can see that
> > this
> > > > was
> > > > > > > done so it's possible to deliver a connector
> > > > > > > that supports error reporting but it can also work in earlier
> > > > versions of
> > > > > > > the KC runtime. But, the
> > > > > > > pattern so far is that the task uses the methods of
> > SinkTaskContext
> > > > to
> > > > > > > access utilities in the Kafka
> > > > > > > Connect runtime, and I suggest that reporting a bad record is
> > such
> > > a
> > > > > > > utility. SinkTaskContext has
> > > > > > > changed before when the configs() methods was added, so I think
> > > > there is
> > > > > > > precedent for adding a method.
> > > > > > > The way the KIP adds a method to SinkTask that the KC runtime
> > calls
> > > > to
> > > > > > > provide the error reporting utility
> > > > > > > seems not to match what has gone before.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Andrew
> > > > > > >
> > > > > > > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io>
> wrote:
> > > > > > >
> > > > > > >     I wasn't previously added to the dev mailing list, so I'd
> > like
> > > to
> > > > > > post
> > > > > > > my
> > > > > > >     discussion with Andrew Schofield below for visibility and
> > > further
> > > > > > >     discussion:
> > > > > > >
> > > > > > >     Hi Andrew,
> > > > > > >
> > > > > > >     Thanks for the reply. The main concern with this approach
> > would
> > > > be
> > > > > > its
> > > > > > >     backward compatibility. I’ve highlighted the thoughts
> around
> > > the
> > > > > > > backwards
> > > > > > >     compatibility of the initial approach, please let me know
> > what
> > > > you
> > > > > > > think.
> > > > > > >
> > > > > > >     Thanks,
> > > > > > >     Aakash
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > > > > > >
> > > > > > >     Hi,
> > > > > > >     By adding a new method to the SinkContext interface in say
> > > Kafka
> > > > > > 2.6, a
> > > > > > >     connector that calls it would require a Kafka 2.6 connect
> > > > runtime. I
> > > > > > > don't
> > > > > > >     quite see how that's a backward compatibility problem. It's
> > > just
> > > > that
> > > > > > > new
> > > > > > >     connectors need the latest interface. I might not quite be
> > > > > > > understanding,
> > > > > > >     but I think it would be fine.
> > > > > > >
> > > > > > >     Thanks,
> > > > > > >     Andrew
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > > > > > >
> > > > > > >     Hi Andrew,
> > > > > > >
> > > > > > >     I apologize for the way the reply was sent. I just
> subscribed
> > > to
> > > > the
> > > > > > > dev
> > > > > > >     mailing list so it should be resolved now.
> > > > > > >
> > > > > > >     You are correct, new connectors would simply require the
> > latest
> > > > > > > interface.
> > > > > > >     However, we want to remove that requirement - in other
> words,
> > > we
> > > > want
> > > > > > > to
> > > > > > >     allow the possibility that someone wants the latest
> > > connector/to
> > > > > > > upgrade to
> > > > > > >     the latest version, but deploys it on an older version of
> AK.
> > > > > > > Basically, we
> > > > > > >     don't want to enforce the necessity of upgrading AK to get
> > the
> > > > latest
> > > > > > >     interface. In the current approach, there would be no issue
> > of
> > > > > > > deploying a
> > > > > > >     new connector on an older version of AK, as the Connect
> > > framework
> > > > > > would
> > > > > > >     simply not invoke the new method.
> > > > > > >
> > > > > > >     Please let me know what you think and if I need to clarify
> > > > anything.
> > > > > > >
> > > > > > >     Thanks,
> > > > > > >     Aakash
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to