Hi Chris,

Thanks for the feedback.

1) That's a fair point; while I did scan everything publicly available on
GitHub, you're right in that it won't cover all possible SMTs that are out
there. Thanks for the example use-case as well, I've updated the KIP to add
the two new proposed methods.

2) So it looks like with the current state of affairs, sink tasks that only
instantiate writers in the SinkTask::open method (and don't do the lazy
instantiation in SinkTask::put that you mentioned) might fail when used
with topic/partition mutating SMTs even if they don't do any asynchronous
processing? Since they could encounter records in SinkTask::put with
topics/partitions that they might not have created writers for. Thanks for
pointing this out, it's definitely another incompatibility that needs to be
called out and fixed. The overloaded method approach is interesting, but
comes with the caveat of yet more new methods that will need to be
implemented by existing connectors if they want to make use of this new
functionality. What do you think about retaining just the existing methods
but changing when they're called in the Connect runtime? For instance,
instead of calling SinkTask::open after partition assignment post a
consumer group rebalance, we could cache the currently "seen" topic
partitions (post transformation) and before each call to SinkTask::put
check whether there's any new "unseen" topic partitions, and if so call
SinkTask::open (and also update the cache of course). I don't think this
would break the existing contract with sink tasks where SinkTask::open is
expected to be called for a topic partition before any records from the
topic partition are sent via SinkTask::put? The SinkTask::close case is a
lot trickier however, and would require some sort of cache eviction policy
that would be deemed appropriate as you pointed out too.

Thanks,
Yash

On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Yash,
>
> I've had some time to think on this KIP and I think I'm in agreement about
> not blocking it on an official compatibility library or adding the "ack"
> API for sink records.
>
> I only have two more thoughts:
>
> 1. Because it is possible to manipulate sink record partitions and offsets
> with the current API we provide for transformations, I still believe
> methods should be added to the SinkRecord class to expose the original
> partition and offset, not just the original topic. The additional cognitive
> burden from these two methods is going to be minimal anyways; once users
> understand the difference between the transformed topic name and the
> original one, it's going to be trivial for them to understand how that same
> difference applies for partitions and offsets. It's not enough to scan the
> set of SMTs provided out of the box with Connect, ones developed by
> Confluent, or even everything available on GitHub, since there may be
> closed-source projects out there that rely on this ability. One potential
> use case could be re-routing partitions between Kafka and some other
> sharded system.
>
> 2. We still have to address the SinkTask::open [1] and SinkTask::close [2]
> methods. If a connector writes to the external system using the transformed
> topic partitions it reads from Kafka, then it's possible for the connector
> to lazily instantiate writers for topic partitions as it encounters them
> from records provided in SinkTask::put. However, connectors also need a way
> to de-allocate those writers (and the resources used by them) over time,
> which they can't do as easily. One possible approach here is to overload
> SinkTask::open and SinkTask::close with variants that distinguish between
> transformed and original topic partitions, and default to invoking the
> existing methods with just the original topic partitions. We would then
> have several options for how the Connect runtime can invoke these methods,
> but in general, an approach that guarantees that tasks are notified of
> transformed topic partitions in SinkTask::open before any records for that
> partition are given to it in SinkTask::put, and makes a best-effort attempt
> to close transformed topic partitions that appear to no longer be in use
> based on some eviction policy, would probably be sufficient.
>
> [1] -
>
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> [2] -
>
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
>
> Cheers,
>
> Chris
>
> On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > Thanks a lot for your inputs!
> >
> > > would provide a simple, clean interface for developers to determine
> > > which features are supported by the version of the Connect runtime
> > > that their plugin has been deployed onto
> >
> > I do like the idea of having such a public compatibility library - I
> think
> > it would remove a lot of restrictions from framework development if it
> were
> > to be widely adopted.
> >
> > > we might consider adding an API to "ack" sink records
> >
> > I agree that this does seem like a more intuitive and clean API, but I'm
> > concerned about the backward compatibility headache we'd be imposing on
> all
> > existing sink connectors. Connector developers will have to maintain two
> > separate ways of doing offset management if they want to use the new API
> > but continue supporting older versions of Kafka Connect.
> >
> > For now, I've reverted the KIP to the previous iteration which proposed
> the
> > addition of a new `SinkRecord` method to obtain the original Kafka topic
> > pre-transformation. One thing to note is that I've removed the method for
> > obtaining the original Kafka partition after a cursory search showed that
> > use cases for partition modifying SMTs are primarily on the source
> > connector side.
> >
> > Thanks,
> > Yash
> >
> > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi all,
> > >
> > > I have more comments I'd like to make on this KIP when I have time
> (sorry
> > > for the delay, Yash, and thanks for your patience!), but I did want to
> > > chime in and say that I'm also not sure about overloading
> SinkTask::put.
> > I
> > > share the concerns about creating an intuitive, simple API that Yash
> has
> > > raised. In addition, this approach doesn't seem very sustainable--what
> do
> > > we do if we encounter another case in the future that would warrant a
> > > similar solution? We probably don't want to create three, four, etc.
> > > overloaded variants of the method, each of which would have to be
> > > implemented by connector developers who want to both leverage the
> latest
> > > and greatest connector APIs and maintain compatibility with connect
> > > Clusters running older versions.
> > >
> > > I haven't been able to flesh this out into a design worth publishing in
> > its
> > > own KIP yet, but one alternative I've pitched to a few people with
> > > generally positive interest has been to develop an official
> compatibility
> > > library for Connect developers. This library would be released as its
> own
> > > Maven artifact (separate from connect-api, connect-runtime, etc.) and
> > would
> > > provide a simple, clean interface for developers to determine which
> > > features are supported by the version of the Connect runtime that their
> > > plugin has been deployed onto. Under the hood, this library might use
> > > reflection to determine whether classes, methods, etc. are available,
> but
> > > the developer wouldn't have to do anything more than check (for
> example)
> > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to know at any
> > point
> > > in the lifetime of their connector/task whether that feature is
> provided
> > by
> > > the runtime.
> > >
> > > One other high-level comment: this doesn't address every case, but we
> > might
> > > consider adding an API to "ack" sink records. This could use the
> > > SubmittedRecords class [1] (with some slight tweaks) under the hood to
> > > track the latest-acked offset for each topic partition. This way,
> > connector
> > > developers won't be responsible for tracking offsets at all in their
> sink
> > > tasks (eliminating issues with the accuracy of post-transformation
> T/P/O
> > > sink record information), and they'll only have to notify the Connect
> > > framework when a record has been successfully dispatched to the
> external
> > > system. This provides a cleaner, friendlier API, and also enables more
> > > fine-grained metrics like the ones proposed in KIP-767 [2].
> > >
> > > [1] -
> > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > [2] -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> > >
> > > > Hi Randall,
> > > >
> > > > It's been a while for this one but the more I think about it, the
> more
> > I
> > > > feel like the current approach with a new overloaded `SinkTask::put`
> > > method
> > > > might not be optimal. We're trying to fix a pretty corner case bug
> here
> > > > (usage of topic mutating SMTs with sink connectors that do their own
> > > offset
> > > > tracking) and I'm not sure that warrants a change to such a central
> > > > interface method. The new `SinkTask::put` method just seems somewhat
> > odd
> > > > and it may not be very understandable for a new reader - I don't
> think
> > > this
> > > > should be the case for a public interface method. Furthermore, even
> > with
> > > > elaborate documentation in place, I'm not sure if it'll be very
> obvious
> > > to
> > > > most people what the purpose of having these two `put` methods is and
> > how
> > > > they should be used by sink task implementations. What do you think?
> > > >
> > > > Thanks,
> > > > Yash
> > > >
> > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <yash.ma...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Randall,
> > > > >
> > > > > Thanks a lot for your valuable feedback so far! I've updated the
> KIP
> > > > based
> > > > > on our discussion above. Could you please take another look?
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <rha...@gmail.com>
> > > wrote:
> > > > >
> > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <yash.ma...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > Hi Randall,
> > > > >> >
> > > > >> > Thanks for elaborating. I think these are all very good points
> > and I
> > > > see
> > > > >> > why the overloaded `SinkTask::put` method is a cleaner solution
> > > > overall.
> > > > >> >
> > > > >> > > public void put(Collection<SinkRecord> records,
> Map<SinkRecord,
> > > > >> > TopicPartition> updatedTopicPartitions)
> > > > >> >
> > > > >> > I think this should be
> > > > >> >
> > > > >> > `public void put(Collection<SinkRecord> records, Map<SinkRecord,
> > > > >> > TopicPartition> originalTopicPartitions)`
> > > > >> >
> > > > >> > instead because the sink records themselves have the updated
> topic
> > > > >> > partitions (i.e. after all transformations have been applied)
> and
> > > the
> > > > >> KIP
> > > > >> > is proposing a way for the tasks to be able to access the
> original
> > > > topic
> > > > >> > partition (i.e. before transformations have been applied).
> > > > >> >
> > > > >>
> > > > >> Sounds good.
> > > > >>
> > > > >>
> > > > >> >
> > > > >> > > Of course, if the developer does not need separate methods,
> they
> > > can
> > > > >> > easily have the older `put` method simply delegate to the newer
> > > > method.
> > > > >> >
> > > > >> > If the developer does not need separate methods (i.e. they don't
> > > need
> > > > to
> > > > >> > use this new addition), they can simply continue implementing
> just
> > > the
> > > > >> > older `put` method right?
> > > > >> >
> > > > >>
> > > > >> Correct. We should update the JavaDoc of both methods to make this
> > > > clear,
> > > > >> and in general how the two methods should are used and should be
> > > > >> implemented. That can be part of the PR, and the KIP doesn't need
> > this
> > > > >> wording.
> > > > >>
> > > > >> >
> > > > >> > > Finally, this gives us a roadmap for *eventually* deprecating
> > the
> > > > >> older
> > > > >> > method, once the Connect runtime versions without this change
> are
> > > old
> > > > >> > enough.
> > > > >> >
> > > > >> > I'm not sure we'd ever want to deprecate the older method. Most
> > > common
> > > > >> sink
> > > > >> > connector implementations do not do their own offset tracking
> with
> > > > >> > asynchronous processing and will probably never have a need for
> > the
> > > > >> > additional parameter `Map<SinkRecord, TopicPartition>
> > > > >> > originalTopicPartitions` in the proposed new `put` method. These
> > > > >> connectors
> > > > >> > can continue implementing only the existing `SinkTask::put`
> method
> > > > which
> > > > >> > will be called by the default implementation of the newer
> > overloaded
> > > > >> `put`
> > > > >> > method.
> > > > >> >
> > > > >>
> > > > >> +1
> > > > >>
> > > > >>
> > > > >> >
> > > > >> > > the pre-commit methods use the same `Map<TopicPartition,
> > > > >> > OffsetAndMetadata> currentOffsets` data structure I'm suggesting
> > be
> > > > >> used.
> > > > >> >
> > > > >> > The data structure you're suggesting be used is a
> `Map<SinkRecord,
> > > > >> > TopicPartition>` which will map `SinkRecord` objects to the
> > original
> > > > >> topic
> > > > >> > partition of the corresponding `ConsumerRecord` right? To
> clarify,
> > > > this
> > > > >> is
> > > > >> > a new data structure that will need to be managed in the
> > > > >> `WorkerSinkTask`.
> > > > >> >
> > > > >>
> > > > >> Ah, you're right. Thanks for the correction.
> > > > >>
> > > > >> Best regards,
> > > > >> Randall
> > > > >>
> > > > >>
> > > > >> > Thanks,
> > > > >> > Yash
> > > > >>
> > > > >>
> > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <rha...@gmail.com>
> > > > wrote:
> > > > >> >
> > > > >> > > Hi, Yash.
> > > > >> > >
> > > > >> > > I'm not sure I quite understand why it would be "easier" for
> > > > connector
> > > > >> > > > developers to account for implementing two different
> > overloaded
> > > > >> `put`
> > > > >> > > > methods (assuming that they want to use this new feature)
> > versus
> > > > >> using
> > > > >> > a
> > > > >> > > > try-catch block around `SinkRecord` access methods?
> > > > >> > >
> > > > >> > >
> > > > >> > > Using a try-catch to try around an API method that *might* be
> > > there
> > > > >> is a
> > > > >> > > very unusual thing for most developers. Unfortunately, we've
> had
> > > to
> > > > >> > resort
> > > > >> > > to this atypical approach with Connect in places when there
> was
> > no
> > > > >> good
> > > > >> > > alternative. We seem to relying upon pattern because it's
> easier
> > > for
> > > > >> us,
> > > > >> > > not because it offers a better experience for Connector
> > > developers.
> > > > >> IMO,
> > > > >> > if
> > > > >> > > there's a practical alternative that uses normal development
> > > > practices
> > > > >> > and
> > > > >> > > techniques, then we should use that alternative. IIUC, there
> is
> > at
> > > > >> least
> > > > >> > > one practical alternative for this KIP that would not require
> > > > >> developers
> > > > >> > to
> > > > >> > > use the unusual try-catch to handle the case where methods are
> > not
> > > > >> found.
> > > > >> > >
> > > > >> > > I also think having two `put` methods is easier when the
> > Connector
> > > > >> has to
> > > > >> > > do different things for different Connect runtimes, too. One
> of
> > > > those
> > > > >> > > methods is called by newer Connect runtimes with the new
> > behavior,
> > > > and
> > > > >> > the
> > > > >> > > other method is called by an older Connect runtime. Of course,
> > if
> > > > the
> > > > >> > > developer does not need separate methods, they can easily have
> > the
> > > > >> older
> > > > >> > > `put` method simply delegate to the newer method.
> > > > >> > >
> > > > >> > > Finally, this gives us a roadmap for *eventually* deprecating
> > the
> > > > >> older
> > > > >> > > method, once the Connect runtime versions without this change
> > are
> > > > old
> > > > >> > > enough.
> > > > >> > >
> > > > >> > > I think the advantage of going with the
> > > > >> > > > proposed approach in the KIP is that it wouldn't require
> extra
> > > > >> > > book-keeping
> > > > >> > > > (the Map<SinkRecord,
> > > > >> > > > TopicPartition> in `WorkerSinkTask` in your proposed
> approach)
> > > > >> > > >
> > > > >> > >
> > > > >> > > The connector does have to do some of this bookkeeping in how
> > they
> > > > >> track
> > > > >> > > the topic partition offsets used in the `preCommit`, and the
> > > > >> pre-commit
> > > > >> > > methods use the same `Map<TopicPartition, OffsetAndMetadata>
> > > > >> > > currentOffsets`
> > > > >> > > data structure I'm suggesting be used.
> > > > >> > >
> > > > >> > > I hope that helps.
> > > > >> > >
> > > > >> > > Best regards,
> > > > >> > >
> > > > >> > > Randall
> > > > >> > >
> > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <
> > yash.ma...@gmail.com>
> > > > >> wrote:
> > > > >> > >
> > > > >> > > > Hi Randall,
> > > > >> > > >
> > > > >> > > > Thanks for reviewing the KIP!
> > > > >> > > >
> > > > >> > > > > That latter logic can get quite ugly.
> > > > >> > > >
> > > > >> > > > I'm not sure I quite understand why it would be "easier" for
> > > > >> connector
> > > > >> > > > developers to account for implementing two different
> > overloaded
> > > > >> `put`
> > > > >> > > > methods (assuming that they want to use this new feature)
> > versus
> > > > >> using
> > > > >> > a
> > > > >> > > > try-catch block around `SinkRecord` access methods? In both
> > > > cases, a
> > > > >> > > > connector developer would need to write additional code in
> > order
> > > > to
> > > > >> > > ensure
> > > > >> > > > that their connector continues working with older Connect
> > > > runtimes.
> > > > >> > > > Furthermore, we would probably need to carefully document
> how
> > > the
> > > > >> > > > implementation for the older `put` method should look like
> for
> > > > >> > connectors
> > > > >> > > > that want to use this new feature. I think the advantage of
> > > going
> > > > >> with
> > > > >> > > the
> > > > >> > > > proposed approach in the KIP is that it wouldn't require
> extra
> > > > >> > > book-keeping
> > > > >> > > > (the Map<SinkRecord,
> > > > >> > > > TopicPartition> in `WorkerSinkTask` in your proposed
> approach)
> > > and
> > > > >> also
> > > > >> > > the
> > > > >> > > > fact that the try-catch based logic is an already
> established
> > > > >> pattern
> > > > >> > > > through
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > >> > > > and other KIPs which added methods to source/sink
> > connector/task
> > > > >> > > contexts.
> > > > >> > > >
> > > > >> > > > Let me know if you still feel that having a new overloaded
> put
> > > > >> method
> > > > >> > is
> > > > >> > > a
> > > > >> > > > cleaner solution and I'd be happy to reconsider!
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Yash
> > > > >> > > >
> > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch <
> > > rha...@gmail.com>
> > > > >> > wrote:
> > > > >> > > >
> > > > >> > > > > Hi, Yash. Thanks for picking up this KIP and discussion.
> > > > >> > > > >
> > > > >> > > > > The KIP includes this rejected alternative:
> > > > >> > > > >
> > > > >> > > > > > 4. Update SinkTask.put in any way to pass the new
> > > information
> > > > >> > outside
> > > > >> > > > > > SinkRecord (e.g. a Map or a derived class)
> > > > >> > > > > >
> > > > >> > > > > >    -
> > > > >> > > > > >
> > > > >> > > > > >    Much more disruptive change without considerable pros
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > One advantage about doing this is that sink connector
> > > > >> implementations
> > > > >> > > can
> > > > >> > > > > more easily implement two different "put(...)" methods to
> > > handle
> > > > >> > > running
> > > > >> > > > in
> > > > >> > > > > a variety of runtimes, without having to use try-catch
> logic
> > > > >> around
> > > > >> > the
> > > > >> > > > > newer SinkRecord access methods. That latter logic can get
> > > quite
> > > > >> > ugly.
> > > > >> > > > >
> > > > >> > > > > For example, the existing `put` method has this signature:
> > > > >> > > > >
> > > > >> > > > > public abstract void put(Collection<SinkRecord> records);
> > > > >> > > > >
> > > > >> > > > > If we added an overloaded method that passed in a map of
> the
> > > old
> > > > >> > > > > topic+partition for each record (and defined the absence
> of
> > an
> > > > >> entry
> > > > >> > as
> > > > >> > > > > having an unchanged topic and partition):
> > > > >> > > > >
> > > > >> > > > > public void put(Collection<SinkRecord> records,
> > > Map<SinkRecord,
> > > > >> > > > > TopicPartition> updatedTopicPartitions) {
> > > > >> > > > > put(records);
> > > > >> > > > > }
> > > > >> > > > >
> > > > >> > > > > then a `SinkTask` implementation that wants to use this
> new
> > > > >> feature
> > > > >> > > could
> > > > >> > > > > simply implement both methods:
> > > > >> > > > >
> > > > >> > > > > public void put(Collection<SinkRecord> records) {
> > > > >> > > > > // Running in an older runtime, so no tracking of
> > SMT-modified
> > > > >> topic
> > > > >> > > > names
> > > > >> > > > > or partitions
> > > > >> > > > > put(records, Map.of());
> > > > >> > > > > }
> > > > >> > > > >
> > > > >> > > > > public void put(Collection<SinkRecord> records,
> > > Map<SinkRecord,
> > > > >> > > > > TopicPartition> updatedTopicPartitions) {
> > > > >> > > > > // real logic here
> > > > >> > > > > }
> > > > >> > > > >
> > > > >> > > > > This seems a lot easier than having to use try-catch
> logic,
> > > yet
> > > > >> still
> > > > >> > > > > allows sink connectors to utilize the new functionality
> and
> > > > still
> > > > >> > work
> > > > >> > > > with
> > > > >> > > > > older Connect runtimes.
> > > > >> > > > >
> > > > >> > > > > WDYT?
> > > > >> > > > >
> > > > >> > > > > Randall
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <
> > > yash.ma...@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hi all,
> > > > >> > > > > >
> > > > >> > > > > > I would like to (re)start a new discussion thread on
> > KIP-793
> > > > >> (Kafka
> > > > >> > > > > > Connect) which proposes some additions to the public
> > > > SinkRecord
> > > > >> > > > interface
> > > > >> > > > > > in order to support topic mutating SMTs for sink
> > connectors
> > > > >> that do
> > > > >> > > > their
> > > > >> > > > > > own offset tracking.
> > > > >> > > > > >
> > > > >> > > > > > Links:
> > > > >> > > > > >
> > > > >> > > > > > KIP:
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > >> > > > > >
> > > > >> > > > > > Older discussion thread:
> > > > >> > > > > >
> > > > >> https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h,
> > > > >> > > > > >
> > > > >> https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > >> > > > > >
> > > > >> > > > > > Jira: https://issues.apache.org/jira/browse/KAFKA-13431
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Yash
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to