Hi Greg,

Thanks for the detailed review!

> What is the expected state/behavior for SinkRecords
> which do not have original T/P/O information after the
> upgrade? Just browsing, it appears that tests make
> extensive use of the existing public SinkRecord
> constructors  for both Transformations and Connectors.

I'm not sure I follow - are you asking about how the tests will be updated
post this change or about how upgrades will look like for clusters in
production? For the latter, we won't have to worry about sink records
without original T/P/O information at all once a cluster is fully rolled
and we will make it (hopefully) abundantly clear that connectors need to
account for missing original T/P/O getter methods if they expect to be
deployed on older Connect runtimes.

> What is the expected behavior for Transformation
> implementations which do not use the newRecord
> methods and instead use public SinkRecord constructors?
> The KIP mentions this as a justification for the
> originalKafkaOffset method, but if existing implementations
> are using the existing constructors, those constructors won't
> forward the original T/P/O information to later transforms or
> the task.

There shouldn't be any difference in behavior here - the framework will add
the original T/P/O metadata to the record after the entire transformation
chain has been applied and just before sending the record to the task for
processing. The KIP doesn't propose that transformations themselves should
also be able to retrieve original T/P/O information for a sink record.

> This reasoning and the KIP design seems to imply that the
> connector is better equipped to solve this problem than the
> framework, but the stated reasons are not convincing for me.

This was added to the KIP by the original author, but I don't think the
intention was to imply that the connector is better equipped to solve this
problem than the framework. The intention is to provide complete
information to the connector ("physical" and "virtual coordinates" instead
of the currently incomplete "virtual coordinates" as you've termed it) so
that connectors can use the virtual coordinates for writing data to the
sink system and physical coordinates for offset reporting back to the
framework. The rejected alternative basically says that we can't do a
deterministic mapping from virtual coordinates to physical coordinates
without doing a lot of book-keeping.

I agree with the rest of your analysis on the tradeoffs between the
proposed approach versus the seemingly more attractive approach of handling
everything purely in the framework and only exposing "virtual coordinates"
to the connectors. I think the biggest thorn here is maintaining backward
compatibility with the considerable ecosystem of existing connectors which
is something Connect has always been burdened by.

Thanks,
Yash

On Wed, Mar 8, 2023 at 6:54 AM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hi Yash,
>
> I always use this issue as an example of a bug being caused by design
> rather than by implementation error, and once it's fixed I'll need to find
> something else to talk about :)
> So glad to see this get fixed!
>
> I'll chime in to support some of the earlier discussions that seem to have
> been resolved:
>
> 1. With respect to SinkRecord methods vs an overloaded put(): I agree with
> the current design but I justify it a little bit differently than has
> already been discussed.
> If we were designing this interface on day 1 without backwards
> compatibility in mind, which design would make more sense? Or for a
> different framing: In the future when old runtimes and connectors are
> retired and the old interfaces are removed, which design is going to look
> more strange and unmotivated?
> Applied to this design decision, I would say that the original T/P/O are
> properties of a single SinkRecord and make sense as getters, and it would
> be strange to store them in an auxiliary map.
>
> 2. Following up this change with a compatibility library to make the
> interface easier to use is the right choice to make here. This change
> should be focused on correctness in allowing developers to fix the
> incompatibility and we can be concerned with coming up with a more
> ergonomic solution in the compatibility library.
> The API should be focused on generality, correctness, and performance
> because those cannot be worked-around after the fact. Connector
> implementations and/or libraries can be concerned with trading off some
> generality and/or performance for ease-of-use.
>
> 3. I think that the difference in behavior of the new open/close methods as
> compared to the old methods is significant, and requires good documentation
> to help connector developers avoid lazy and incorrect migrations. I am
> happy to have that addressed in code review after the KIP is approved.
>
> I had some questions:
>
> 4. What is the expected state/behavior for SinkRecords which do not have
> original T/P/O information after the upgrade? Just browsing, it appears
> that tests make extensive use of the existing public SinkRecord
> constructors for both Transformations and Connectors.
>
> 5. What is the expected behavior for Transformation implementations which
> do not use the newRecord methods and instead use public SinkRecord
> constructors? The KIP mentions this as a justification for the
> originalKafkaOffset method, but if existing implementations are using the
> existing constructors, those constructors won't forward the original T/P/O
> information to later transforms or the task.
>
> For the last few points, I want to discuss this rejected alternative:
>
> > Address the offsets problem entirely within the framework, doing some
> kind of mapping from the transformed topic back to the original topic.
> > * This would only work in the cases where there’s no overlap between the
> transformed topic names, but would break for the rest of the
> transformations (e.g. static transformation, topic = “a”).
> > * Even if we wanted to limit the support to those cases, it would require
> considerable bookkeeping to add a validation to verify that the
> transformation chain adheres to that expectation (and fail fast if it
> doesn’t).
>
> 6. This reasoning and the KIP design seems to imply that the connector is
> better equipped to solve this problem than the framework, but the stated
> reasons are not convincing for me.
> * A static transformation still causes an offset collision in the connector
> * The connector is not permitted to see the transformation chain to do any
> fail-fast assertions
>
> Suppose we were to think of the records at the end of the transformation
> chain as being in "virtual partitions" with "virtual offsets".
> For example, with identity-routing SMTs, the virtual coordinates are
> exactly the same as the underlying physical coordinates. For 1-1 renames,
> each virtual topic would be the renamed topic corresponding to the
> underlying topic. For fan-out from one topic to multiple virtual topics,
> virtual offsets would use the underlying kafka offsets with gaps for
> records going to other virtual partitions. Virtual topics with dropped
> records have similar gaps in the offsets.
> Currently, these virtual coordinates are passed into the connector via
> SinkTask::put, but SinkTask::open/close/preCommit and
> SinkTaskContext::assignment/offsets/pause/resume all use physical
> coordinates.
> This proposal patches put,open, and close to have both physical and virtual
> coordinates, but leaves the other methods with physical coordinates. After
> this proposal, connectors would be intentionally made aware of the
> distinction between physical and virtual coordinates, and manage their own
> bookkeeping for the two systems.
>
> To avoid that connector logic, we could use virtual coordinates in all
> connector calls, never revealing that they are different from the physical
> coordinates. There's a whole design shopping list that we'd need:
> * Renumbering mechanism for disambiguating and making virtual offsets
> monotonic in the case of topic/partition collisions
> * Data structure and strategy for translating virtual offsets back to
> physical offsets
> * New limits on SinkTaskContext::offsets() calls to prevent rewinding
> before the latest commit
> * Backwards compatibility and upgrade design
>
> 7. This alternative was very appealing to me, because the strength of a
> plugin framework is the composability of different components. Among a
> collection of N connectors and M transforms, it should ideally only take
> N + M work to understand how the components combine to build the whole.
> However, once you start adding special cases to some plugins to support
> interactions with others, the whole system can take N * M work to
> understand. From a complexity standpoint, it would be very good for the
> framework to solve this in a way which was connector-agnostic.
> The current design compromises the logical isolation of the plugins
> slightly, but they can collapse offsets very memory-efficiently, and re-use
> the existing raw coordinate functions and keep everything else backwards
> compatible. After deriving all of the above, I think that's a reasonable
> tradeoff to make.
>
> Thanks,
> Greg
>
> On Tue, Feb 21, 2023 at 10:17 AM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > We'll probably want to make a few tweaks to the Javadocs for the new
> > methods (I'm imagining that notes on compatibility with older versions
> will
> > be required), but I believe what's proposed in the KIP is good enough to
> > approve with the understanding that it may not exactly match what gets
> > implemented/merged.
> >
> > LGTM, thanks again for the KIP!
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Feb 21, 2023 at 12:18 PM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> >
> > > Hi Chris,
> > >
> > > > we might try to introduce a framework-level configuration
> > > > property to dictate which of the pre-transform and post-transform
> > > > topic partitions are used for the fallback call to the single-arg
> > > > variant if a task class has not overridden the multi-arg variant
> > >
> > > Thanks for the explanation and I agree that this will be a tad bit too
> > > convoluted. :)
> > >
> > > Please do let me know if you'd like any further amendments to the KIP!
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Feb 21, 2023 at 8:42 PM Chris Egerton <chr...@aiven.io.invalid
> >
> > > wrote:
> > >
> > > > Hi Yash,
> > > >
> > > > I think the use case for pre-transform TPO coordinates (and topic
> > > partition
> > > > writers created/destroyed in close/open) tends to boil down to
> > > exactly-once
> > > > semantics, where it's desirable to preserve the guarantees that Kafka
> > > > provides (every record has a unique TPO trio, and records are ordered
> > by
> > > > offset within a topic partition).
> > > >
> > > > It's my understanding that this approach is utilized in several
> > > connectors
> > > > out there today, and it might break these connectors to start using
> the
> > > > post-transform topic partitions automatically in their open/close
> > > methods.
> > > >
> > > > If we want to get really fancy with this and try to obviate or at
> least
> > > > reduce the need for per-connector code changes, we might try to
> > > introduce a
> > > > framework-level configuration property to dictate which of the
> > > > pre-transform and post-transform topic partitions are used for the
> > > fallback
> > > > call to the single-arg variant if a task class has not overridden the
> > > > multi-arg variant. But I think this is going a bit too far and would
> > > prefer
> > > > to keep things simple(r) for now.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > >
> > > > On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <yash.ma...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > > I was actually envisioning something like `void
> > > > > > open(Collection<TopicPartition> originalPartitions,
> > > > > > Collection<TopicPartition> transformedPartitions)`
> > > > >
> > > > > Ah okay, this does make a lot more sense. Sorry, I think I
> > > misunderstood
> > > > > you earlier. I do agree with you that this seems better than
> > splitting
> > > it
> > > > > off into two new sets of open / close methods from a complexity
> > > > standpoint.
> > > > >
> > > > > > Plus, if a connector is intentionally designed to use
> > > > > > pre-transformation topic partitions in its open/close
> > > > > > methods, wouldn't we just be trading one form of the
> > > > > >  problem for another by making this switch?
> > > > >
> > > > > On thinking about this a bit more, I'm not so convinced that we
> need
> > to
> > > > > expose the pre-transform / original topic partitions in the new
> open
> > /
> > > > > close methods. The purpose of the open / close methods is to allow
> > sink
> > > > > tasks to allocate and deallocate resources for each topic partition
> > > > > assigned to the task and the purpose of topic-mutating SMTs is to
> > > > > essentially modify the source topic name from the point of view of
> > the
> > > > sink
> > > > > connector. Why would a sink connector ever need to or want to
> > allocate
> > > > > resources for pre-transform topic partitions? Is the argument here
> > that
> > > > > since we'll be exposing both the pre-transform and post-transform
> > topic
> > > > > partitions per record, we should also expose the same info via
> open /
> > > > close
> > > > > and allow sink connector implementations to disregard
> topic-mutating
> > > SMTs
> > > > > completely if they wanted to?
> > > > >
> > > > > Either way, I've gone ahead and updated the KIP to reflect all of
> > > > > our previous discussion here since it had become quite outdated.
> I've
> > > > also
> > > > > updated the KIP title from "Sink Connectors: Support topic-mutating
> > > SMTs
> > > > > for async connectors (preCommit users)" to "Allow sink connectors
> to
> > be
> > > > > used with topic-mutating SMTs" since the improvements to the open /
> > > close
> > > > > mechanism doesn't pertain only to asynchronous sink connectors. The
> > new
> > > > KIP
> > > > > URL is:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton
> > <chr...@aiven.io.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Yash,
> > > > > >
> > > > > > I was actually envisioning something like `void
> > > > > > open(Collection<TopicPartition>
> > > > > > originalPartitions, Collection<TopicPartition>
> > > transformedPartitions)`,
> > > > > > since we already convert and transform each batch of records that
> > we
> > > > poll
> > > > > > from the sink task's consumer en masse, meaning we could discover
> > > > several
> > > > > > new transformed partitions in between consecutive calls to
> > > > SinkTask::put.
> > > > > >
> > > > > > It's also worth noting that we'll probably want to deprecate the
> > > > existing
> > > > > > open/close methods, at which point keeping one non-deprecated
> > variant
> > > > of
> > > > > > each seems more appealing and less complex than keeping two.
> > > > > >
> > > > > > Honestly though, I think we're both on the same page enough that
> I
> > > > > wouldn't
> > > > > > object to either approach. We've probably reached the saturation
> > > point
> > > > > for
> > > > > > ROI here and as long as we provide developers a way to get the
> > > > > information
> > > > > > they need from the runtime and take care to add Javadocs and
> update
> > > our
> > > > > > docs page (possibly including the connector development
> > quickstart),
> > > it
> > > > > > should be fine.
> > > > > >
> > > > > > At this point, it might be worth updating the KIP based on recent
> > > > > > discussion so that others can see the latest proposal, and we can
> > > both
> > > > > take
> > > > > > a look and make sure everything looks good enough before opening
> a
> > > vote
> > > > > > thread.
> > > > > >
> > > > > > Finally, I think you make a convincing case for a time-based
> > eviction
> > > > > > policy. I wasn't thinking about the fairly common SMT pattern of
> > > > > deriving a
> > > > > > topic name from, e.g., a record field or header.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <
> yash.ma...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Chris,
> > > > > > >
> > > > > > > > Plus, if a connector is intentionally designed to
> > > > > > > > use pre-transformation topic partitions in its
> > > > > > > > open/close methods, wouldn't we just be trading
> > > > > > > > one form of the problem for another by making this
> > > > > > > > switch?
> > > > > > >
> > > > > > > Thanks, this makes sense, and given that the KIP already
> > proposes a
> > > > way
> > > > > > for
> > > > > > > sink connector implementations to distinguish between
> > pre-transform
> > > > and
> > > > > > > post-transform topics per record, I think I'm convinced that
> > going
> > > > with
> > > > > > new
> > > > > > > `open()` / `close()` methods is the right approach. However, I
> > > still
> > > > > feel
> > > > > > > like having overloaded methods will make it a lot less
> > unintuitive
> > > > > given
> > > > > > > that the two sets of methods would be different in terms of
> when
> > > > > they're
> > > > > > > called and what arguments they are passed (also I'm presuming
> > that
> > > > the
> > > > > > > overloaded methods you're prescribing will only have a single
> > > > > > > `TopicPartition` rather than a `Collection<TopicPartition>` as
> > > their
> > > > > > > parameters). I guess my concern is largely around the fact that
> > it
> > > > > won't
> > > > > > be
> > > > > > > possible to distinguish between the overloaded methods' use
> cases
> > > > just
> > > > > > from
> > > > > > > the method signatures. I agree that naming is going to be
> > difficult
> > > > > here,
> > > > > > > but I think that having two sets of `SinkTask::openXyz` /
> > > > > > > `SinkTask::closeXyz` methods will be less complicated to
> > understand
> > > > > from
> > > > > > a
> > > > > > > connector developer perspective (as compared to overloaded
> > methods
> > > > with
> > > > > > > only differing documentation). Of your suggested options, I
> think
> > > > > > > `openPreTransform` / `openPostTransform` are the most
> > > comprehensible
> > > > > > ones.
> > > > > > >
> > > > > > > > BTW, I wouldn't say that we can't make assumptions
> > > > > > > > about the relationships between pre- and post-transformation
> > > > > > > >  topic partitions.
> > > > > > >
> > > > > > > I meant that the framework wouldn't be able to
> deterministically
> > > know
> > > > > > when
> > > > > > > to close a post-transform topic partition given that SMTs could
> > use
> > > > > > > per-record data / metadata to manipulate the topic names as and
> > how
> > > > > > > required (which supports the suggestion to use an eviction
> policy
> > > > based
> > > > > > > mechanism to call SinkTask::close for post-transform topic
> > > > partitions).
> > > > > > >
> > > > > > > > We might utilize a policy that assumes a deterministic
> > > > > > > > mapping from the former to the latter, for example.
> > > > > > >
> > > > > > > Wouldn't this be making the assumption that SMTs only use the
> > topic
> > > > > name
> > > > > > > itself and no other data / metadata while computing the new
> topic
> > > > name?
> > > > > > Are
> > > > > > > you suggesting that since this assumption could work for a
> > majority
> > > > of
> > > > > > > SMTs, it might be more efficient overall in terms of reducing
> the
> > > > > number
> > > > > > of
> > > > > > > "false-positive" calls to `SinkTask::closePostTransform` (and
> > we'll
> > > > > also
> > > > > > be
> > > > > > > able to call `SinkTask::closePostTransform` immediately after
> > topic
> > > > > > > partitions are revoked from the consumer)? I was thinking
> > something
> > > > > more
> > > > > > > generic along the lines of a simple time based eviction policy
> > that
> > > > > > > wouldn't be making any assumptions regarding the SMT
> > > implementations.
> > > > > > > Either way, I do like your earlier suggestion of keeping this
> > logic
> > > > > > > internal and not painting ourselves into a corner by promising
> > any
> > > > > > > particular behavior in the KIP.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yash
> > > > > > >
> > > > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Yash,
> > > > > > > >
> > > > > > > > I think the key difference between adding methods/overloads
> > > related
> > > > > to
> > > > > > > > SinkTask::open/SinkTask::close and SinkTask::put is that this
> > > isn't
> > > > > > > > auxiliary information that may or may not be useful to
> > connector
> > > > > > > > developers. It's actually critical for them to understand the
> > > > > > difference
> > > > > > > > between the two concepts here, even if they look very
> similar.
> > > And
> > > > > > yes, I
> > > > > > > > do believe that switching from pre-transform to
> post-transform
> > > > topic
> > > > > > > > partitions is too big a change in behavior here. Plus, if a
> > > > connector
> > > > > > is
> > > > > > > > intentionally designed to use pre-transformation topic
> > partitions
> > > > in
> > > > > > its
> > > > > > > > open/close methods, wouldn't we just be trading one form of
> the
> > > > > problem
> > > > > > > for
> > > > > > > > another by making this switch?
> > > > > > > >
> > > > > > > > One possible alternative to overloading the existing methods
> is
> > > to
> > > > > > split
> > > > > > > > SinkTask::open into openOriginal (or possibly openPhysical or
> > > > > > > > openPreTransform) and openTransformed (or openLogical or
> > > > > > > > openPostTransform), with a similar change for
> SinkTask::close.
> > > The
> > > > > > > default
> > > > > > > > implementation for SinkTask::openOriginal can be to call
> > > > > > SinkTask::open,
> > > > > > > > and the same can go for SinkTask::close. However, I prefer
> > > > > overloading
> > > > > > > the
> > > > > > > > existing methods since this alternative increases complexity
> > and
> > > > none
> > > > > > of
> > > > > > > > the names are very informative.
> > > > > > > >
> > > > > > > > BTW, I wouldn't say that we can't make assumptions about the
> > > > > > > relationships
> > > > > > > > between pre- and post-transformation topic partitions. We
> might
> > > > > > utilize a
> > > > > > > > policy that assumes a deterministic mapping from the former
> to
> > > the
> > > > > > > latter,
> > > > > > > > for example. The distinction I'd draw is that the assumptions
> > we
> > > > make
> > > > > > can
> > > > > > > > and probably should favor some cases in terms of performance
> > > (i.e.,
> > > > > > > > reducing the number of unnecessary calls to close/open over a
> > > given
> > > > > > sink
> > > > > > > > task's lifetime), but should not lead to guaranteed resource
> > > leaks
> > > > or
> > > > > > > > failure to obey API contract in any cases.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <
> > > yash.ma...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chris,
> > > > > > > > >
> > > > > > > > > > especially if connectors are intentionally designed
> around
> > > > > > > > > > original topic partitions instead of transformed ones.
> > > > > > > > >
> > > > > > > > > Ha, that's a good point and reminds me of Hyrum's Law [1]
> :)
> > > > > > > > >
> > > > > > > > > > I think we have to provide connector developers with some
> > > > > > > > > > way to differentiate between the two, but maybe there's a
> > way
> > > > > > > > > >  to do this that I haven't thought of yet
> > > > > > > > >
> > > > > > > > > I can't think of a better way to do this either; would
> > invoking
> > > > the
> > > > > > > > > existing `SinkTask::open` and `SinkTask::close` methods
> with
> > > > > > > > post-transform
> > > > > > > > > topic partitions instead of pre-transform topic partitions
> > not
> > > be
> > > > > > > > > acceptable even in a minor / major AK release? I feel like
> > the
> > > > > > proposed
> > > > > > > > > approach of adding overloaded `SinkTask::open` /
> > > > `SinkTask::close`
> > > > > > > > methods
> > > > > > > > > to differentiate between pre-transform and post-transform
> > topic
> > > > > > > > partitions
> > > > > > > > > has similar pitfalls to the idea of the overloaded
> > > > `SinkTask::put`
> > > > > > > method
> > > > > > > > > we discarded earlier.
> > > > > > > > >
> > > > > > > > > > Either way, I'm glad that the general idea of a cache and
> > > > > > > > > > eviction policy for SinkTask::close seem reasonable; if
> > > > > > > > > > we decide to go this route, it might make sense for the
> KIP
> > > > > > > > > > to include an outline of one or more high-level
> strategies
> > > > > > > > > > we might take, but without promising any particular
> > behavior
> > > > > > > > > > beyond occasionally calling SinkTask::close for
> > > post-transform
> > > > > > > > > > topic partitions. I'm hoping that this logic can stay
> > > internal,
> > > > > > > > > > and by notpainting ourselves into a corner with the KIP,
> we
> > > > > > > > > > give ourselves leeway to tweak it in the future if
> > necessary
> > > > > > > > > > without filing another KIP or introducing a pluggable
> > > > interface.
> > > > > > > > >
> > > > > > > > > Thanks, that's a good idea. Given the flexibility of SMTs,
> > the
> > > > > > > framework
> > > > > > > > > can't really make any assumptions around topic partitions
> > post
> > > > > > > > > transformation nor does it have any way to definitively get
> > any
> > > > > such
> > > > > > > > > information from transformations which is why the idea of a
> > > cache
> > > > > > with
> > > > > > > an
> > > > > > > > > eviction policy makes perfect sense!
> > > > > > > > >
> > > > > > > > > [1] - https://www.hyrumslaw.com/
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yash
> > > > > > > > >
> > > > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton
> > > > > <chr...@aiven.io.invalid
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yash,
> > > > > > > > > >
> > > > > > > > > > > So it looks like with the current state of affairs,
> sink
> > > > tasks
> > > > > > that
> > > > > > > > > only
> > > > > > > > > > instantiate writers in the SinkTask::open method (and
> don't
> > > do
> > > > > the
> > > > > > > lazy
> > > > > > > > > > instantiation in SinkTask::put that you mentioned) might
> > fail
> > > > > when
> > > > > > > used
> > > > > > > > > > with topic/partition mutating SMTs even if they don't do
> > any
> > > > > > > > asynchronous
> > > > > > > > > > processing?
> > > > > > > > > >
> > > > > > > > > > Yep, exactly 👍
> > > > > > > > > >
> > > > > > > > > > > What do you think about retaining just the existing
> > methods
> > > > > > > > > > but changing when they're called in the Connect runtime?
> > For
> > > > > > > instance,
> > > > > > > > > > instead of calling SinkTask::open after partition
> > assignment
> > > > > post a
> > > > > > > > > > consumer group rebalance, we could cache the currently
> > "seen"
> > > > > topic
> > > > > > > > > > partitions (post transformation) and before each call to
> > > > > > > SinkTask::put
> > > > > > > > > > check whether there's any new "unseen" topic partitions,
> > and
> > > if
> > > > > so
> > > > > > > call
> > > > > > > > > > SinkTask::open (and also update the cache of course).
> > > > > > > > > >
> > > > > > > > > > IMO the issue here is that it's a drastic change in
> > behavior
> > > to
> > > > > > start
> > > > > > > > > > invoking SinkTask::open and SinkTask::close with
> > > post-transform
> > > > > > topic
> > > > > > > > > > partitions instead of pre-transform, especially if
> > connectors
> > > > are
> > > > > > > > > > intentionally designed around original topic partitions
> > > instead
> > > > > of
> > > > > > > > > > transformed ones. I think we have to provide connector
> > > > developers
> > > > > > > with
> > > > > > > > > some
> > > > > > > > > > way to differentiate between the two, but maybe there's a
> > way
> > > > to
> > > > > do
> > > > > > > > this
> > > > > > > > > > that I haven't thought of yet. Interested to hear your
> > > > thoughts.
> > > > > > > > > >
> > > > > > > > > > Either way, I'm glad that the general idea of a cache and
> > > > > eviction
> > > > > > > > policy
> > > > > > > > > > for SinkTask::close seem reasonable; if we decide to go
> > this
> > > > > route,
> > > > > > > it
> > > > > > > > > > might make sense for the KIP to include an outline of one
> > or
> > > > more
> > > > > > > > > > high-level strategies we might take, but without
> promising
> > > any
> > > > > > > > particular
> > > > > > > > > > behavior beyond occasionally calling SinkTask::close for
> > > > > > > post-transform
> > > > > > > > > > topic partitions. I'm hoping that this logic can stay
> > > internal,
> > > > > and
> > > > > > > by
> > > > > > > > > not
> > > > > > > > > > painting ourselves into a corner with the KIP, we give
> > > > ourselves
> > > > > > > leeway
> > > > > > > > > to
> > > > > > > > > > tweak it in the future if necessary without filing
> another
> > > KIP
> > > > or
> > > > > > > > > > introducing a pluggable interface.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chris,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the feedback.
> > > > > > > > > > >
> > > > > > > > > > > 1) That's a fair point; while I did scan everything
> > > publicly
> > > > > > > > available
> > > > > > > > > on
> > > > > > > > > > > GitHub, you're right in that it won't cover all
> possible
> > > SMTs
> > > > > > that
> > > > > > > > are
> > > > > > > > > > out
> > > > > > > > > > > there. Thanks for the example use-case as well, I've
> > > updated
> > > > > the
> > > > > > > KIP
> > > > > > > > to
> > > > > > > > > > add
> > > > > > > > > > > the two new proposed methods.
> > > > > > > > > > >
> > > > > > > > > > > 2) So it looks like with the current state of affairs,
> > sink
> > > > > tasks
> > > > > > > > that
> > > > > > > > > > only
> > > > > > > > > > > instantiate writers in the SinkTask::open method (and
> > don't
> > > > do
> > > > > > the
> > > > > > > > lazy
> > > > > > > > > > > instantiation in SinkTask::put that you mentioned)
> might
> > > fail
> > > > > > when
> > > > > > > > used
> > > > > > > > > > > with topic/partition mutating SMTs even if they don't
> do
> > > any
> > > > > > > > > asynchronous
> > > > > > > > > > > processing? Since they could encounter records in
> > > > SinkTask::put
> > > > > > > with
> > > > > > > > > > > topics/partitions that they might not have created
> > writers
> > > > for.
> > > > > > > > Thanks
> > > > > > > > > > for
> > > > > > > > > > > pointing this out, it's definitely another
> > incompatibility
> > > > that
> > > > > > > needs
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > called out and fixed. The overloaded method approach is
> > > > > > > interesting,
> > > > > > > > > but
> > > > > > > > > > > comes with the caveat of yet more new methods that will
> > > need
> > > > to
> > > > > > be
> > > > > > > > > > > implemented by existing connectors if they want to make
> > use
> > > > of
> > > > > > this
> > > > > > > > new
> > > > > > > > > > > functionality. What do you think about retaining just
> the
> > > > > > existing
> > > > > > > > > > methods
> > > > > > > > > > > but changing when they're called in the Connect
> runtime?
> > > For
> > > > > > > > instance,
> > > > > > > > > > > instead of calling SinkTask::open after partition
> > > assignment
> > > > > > post a
> > > > > > > > > > > consumer group rebalance, we could cache the currently
> > > "seen"
> > > > > > topic
> > > > > > > > > > > partitions (post transformation) and before each call
> to
> > > > > > > > SinkTask::put
> > > > > > > > > > > check whether there's any new "unseen" topic
> partitions,
> > > and
> > > > if
> > > > > > so
> > > > > > > > call
> > > > > > > > > > > SinkTask::open (and also update the cache of course). I
> > > don't
> > > > > > think
> > > > > > > > > this
> > > > > > > > > > > would break the existing contract with sink tasks where
> > > > > > > > SinkTask::open
> > > > > > > > > is
> > > > > > > > > > > expected to be called for a topic partition before any
> > > > records
> > > > > > from
> > > > > > > > the
> > > > > > > > > > > topic partition are sent via SinkTask::put? The
> > > > SinkTask::close
> > > > > > > case
> > > > > > > > > is a
> > > > > > > > > > > lot trickier however, and would require some sort of
> > cache
> > > > > > eviction
> > > > > > > > > > policy
> > > > > > > > > > > that would be deemed appropriate as you pointed out
> too.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton
> > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > >
> > > > > > > > > > > > I've had some time to think on this KIP and I think
> I'm
> > > in
> > > > > > > > agreement
> > > > > > > > > > > about
> > > > > > > > > > > > not blocking it on an official compatibility library
> or
> > > > > adding
> > > > > > > the
> > > > > > > > > > "ack"
> > > > > > > > > > > > API for sink records.
> > > > > > > > > > > >
> > > > > > > > > > > > I only have two more thoughts:
> > > > > > > > > > > >
> > > > > > > > > > > > 1. Because it is possible to manipulate sink record
> > > > > partitions
> > > > > > > and
> > > > > > > > > > > offsets
> > > > > > > > > > > > with the current API we provide for transformations,
> I
> > > > still
> > > > > > > > believe
> > > > > > > > > > > > methods should be added to the SinkRecord class to
> > expose
> > > > the
> > > > > > > > > original
> > > > > > > > > > > > partition and offset, not just the original topic.
> The
> > > > > > additional
> > > > > > > > > > > cognitive
> > > > > > > > > > > > burden from these two methods is going to be minimal
> > > > anyways;
> > > > > > > once
> > > > > > > > > > users
> > > > > > > > > > > > understand the difference between the transformed
> topic
> > > > name
> > > > > > and
> > > > > > > > the
> > > > > > > > > > > > original one, it's going to be trivial for them to
> > > > understand
> > > > > > how
> > > > > > > > > that
> > > > > > > > > > > same
> > > > > > > > > > > > difference applies for partitions and offsets. It's
> not
> > > > > enough
> > > > > > to
> > > > > > > > > scan
> > > > > > > > > > > the
> > > > > > > > > > > > set of SMTs provided out of the box with Connect,
> ones
> > > > > > developed
> > > > > > > by
> > > > > > > > > > > > Confluent, or even everything available on GitHub,
> > since
> > > > > there
> > > > > > > may
> > > > > > > > be
> > > > > > > > > > > > closed-source projects out there that rely on this
> > > ability.
> > > > > One
> > > > > > > > > > potential
> > > > > > > > > > > > use case could be re-routing partitions between Kafka
> > and
> > > > > some
> > > > > > > > other
> > > > > > > > > > > > sharded system.
> > > > > > > > > > > >
> > > > > > > > > > > > 2. We still have to address the SinkTask::open [1]
> and
> > > > > > > > > SinkTask::close
> > > > > > > > > > > [2]
> > > > > > > > > > > > methods. If a connector writes to the external system
> > > using
> > > > > the
> > > > > > > > > > > transformed
> > > > > > > > > > > > topic partitions it reads from Kafka, then it's
> > possible
> > > > for
> > > > > > the
> > > > > > > > > > > connector
> > > > > > > > > > > > to lazily instantiate writers for topic partitions as
> > it
> > > > > > > encounters
> > > > > > > > > > them
> > > > > > > > > > > > from records provided in SinkTask::put. However,
> > > connectors
> > > > > > also
> > > > > > > > > need a
> > > > > > > > > > > way
> > > > > > > > > > > > to de-allocate those writers (and the resources used
> by
> > > > them)
> > > > > > > over
> > > > > > > > > > time,
> > > > > > > > > > > > which they can't do as easily. One possible approach
> > here
> > > > is
> > > > > to
> > > > > > > > > > overload
> > > > > > > > > > > > SinkTask::open and SinkTask::close with variants that
> > > > > > distinguish
> > > > > > > > > > between
> > > > > > > > > > > > transformed and original topic partitions, and
> default
> > to
> > > > > > > invoking
> > > > > > > > > the
> > > > > > > > > > > > existing methods with just the original topic
> > partitions.
> > > > We
> > > > > > > would
> > > > > > > > > then
> > > > > > > > > > > > have several options for how the Connect runtime can
> > > invoke
> > > > > > these
> > > > > > > > > > > methods,
> > > > > > > > > > > > but in general, an approach that guarantees that
> tasks
> > > are
> > > > > > > notified
> > > > > > > > > of
> > > > > > > > > > > > transformed topic partitions in SinkTask::open before
> > any
> > > > > > records
> > > > > > > > for
> > > > > > > > > > > that
> > > > > > > > > > > > partition are given to it in SinkTask::put, and
> makes a
> > > > > > > best-effort
> > > > > > > > > > > attempt
> > > > > > > > > > > > to close transformed topic partitions that appear to
> no
> > > > > longer
> > > > > > be
> > > > > > > > in
> > > > > > > > > > use
> > > > > > > > > > > > based on some eviction policy, would probably be
> > > > sufficient.
> > > > > > > > > > > >
> > > > > > > > > > > > [1] -
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> > > > > > > > > > > > [2] -
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <
> > > > > > yash.ma...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks a lot for your inputs!
> > > > > > > > > > > > >
> > > > > > > > > > > > > > would provide a simple, clean interface for
> > > developers
> > > > to
> > > > > > > > > determine
> > > > > > > > > > > > > > which features are supported by the version of
> the
> > > > > Connect
> > > > > > > > > runtime
> > > > > > > > > > > > > > that their plugin has been deployed onto
> > > > > > > > > > > > >
> > > > > > > > > > > > > I do like the idea of having such a public
> > > compatibility
> > > > > > > library
> > > > > > > > -
> > > > > > > > > I
> > > > > > > > > > > > think
> > > > > > > > > > > > > it would remove a lot of restrictions from
> framework
> > > > > > > development
> > > > > > > > if
> > > > > > > > > > it
> > > > > > > > > > > > were
> > > > > > > > > > > > > to be widely adopted.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > we might consider adding an API to "ack" sink
> > records
> > > > > > > > > > > > >
> > > > > > > > > > > > > I agree that this does seem like a more intuitive
> and
> > > > clean
> > > > > > > API,
> > > > > > > > > but
> > > > > > > > > > > I'm
> > > > > > > > > > > > > concerned about the backward compatibility headache
> > > we'd
> > > > be
> > > > > > > > > imposing
> > > > > > > > > > on
> > > > > > > > > > > > all
> > > > > > > > > > > > > existing sink connectors. Connector developers will
> > > have
> > > > to
> > > > > > > > > maintain
> > > > > > > > > > > two
> > > > > > > > > > > > > separate ways of doing offset management if they
> want
> > > to
> > > > > use
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > API
> > > > > > > > > > > > > but continue supporting older versions of Kafka
> > > Connect.
> > > > > > > > > > > > >
> > > > > > > > > > > > > For now, I've reverted the KIP to the previous
> > > iteration
> > > > > > which
> > > > > > > > > > proposed
> > > > > > > > > > > > the
> > > > > > > > > > > > > addition of a new `SinkRecord` method to obtain the
> > > > > original
> > > > > > > > Kafka
> > > > > > > > > > > topic
> > > > > > > > > > > > > pre-transformation. One thing to note is that I've
> > > > removed
> > > > > > the
> > > > > > > > > method
> > > > > > > > > > > for
> > > > > > > > > > > > > obtaining the original Kafka partition after a
> > cursory
> > > > > search
> > > > > > > > > showed
> > > > > > > > > > > that
> > > > > > > > > > > > > use cases for partition modifying SMTs are
> primarily
> > on
> > > > the
> > > > > > > > source
> > > > > > > > > > > > > connector side.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton
> > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I have more comments I'd like to make on this KIP
> > > when
> > > > I
> > > > > > have
> > > > > > > > > time
> > > > > > > > > > > > (sorry
> > > > > > > > > > > > > > for the delay, Yash, and thanks for your
> > patience!),
> > > > but
> > > > > I
> > > > > > > did
> > > > > > > > > want
> > > > > > > > > > > to
> > > > > > > > > > > > > > chime in and say that I'm also not sure about
> > > > overloading
> > > > > > > > > > > > SinkTask::put.
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > share the concerns about creating an intuitive,
> > > simple
> > > > > API
> > > > > > > that
> > > > > > > > > > Yash
> > > > > > > > > > > > has
> > > > > > > > > > > > > > raised. In addition, this approach doesn't seem
> > very
> > > > > > > > > > > sustainable--what
> > > > > > > > > > > > do
> > > > > > > > > > > > > > we do if we encounter another case in the future
> > that
> > > > > would
> > > > > > > > > > warrant a
> > > > > > > > > > > > > > similar solution? We probably don't want to
> create
> > > > three,
> > > > > > > four,
> > > > > > > > > > etc.
> > > > > > > > > > > > > > overloaded variants of the method, each of which
> > > would
> > > > > have
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > > > > implemented by connector developers who want to
> > both
> > > > > > leverage
> > > > > > > > the
> > > > > > > > > > > > latest
> > > > > > > > > > > > > > and greatest connector APIs and maintain
> > > compatibility
> > > > > with
> > > > > > > > > connect
> > > > > > > > > > > > > > Clusters running older versions.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I haven't been able to flesh this out into a
> design
> > > > worth
> > > > > > > > > > publishing
> > > > > > > > > > > in
> > > > > > > > > > > > > its
> > > > > > > > > > > > > > own KIP yet, but one alternative I've pitched to
> a
> > > few
> > > > > > people
> > > > > > > > > with
> > > > > > > > > > > > > > generally positive interest has been to develop
> an
> > > > > official
> > > > > > > > > > > > compatibility
> > > > > > > > > > > > > > library for Connect developers. This library
> would
> > be
> > > > > > > released
> > > > > > > > as
> > > > > > > > > > its
> > > > > > > > > > > > own
> > > > > > > > > > > > > > Maven artifact (separate from connect-api,
> > > > > connect-runtime,
> > > > > > > > etc.)
> > > > > > > > > > and
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > provide a simple, clean interface for developers
> to
> > > > > > determine
> > > > > > > > > which
> > > > > > > > > > > > > > features are supported by the version of the
> > Connect
> > > > > > runtime
> > > > > > > > that
> > > > > > > > > > > their
> > > > > > > > > > > > > > plugin has been deployed onto. Under the hood,
> this
> > > > > library
> > > > > > > > might
> > > > > > > > > > use
> > > > > > > > > > > > > > reflection to determine whether classes, methods,
> > > etc.
> > > > > are
> > > > > > > > > > available,
> > > > > > > > > > > > but
> > > > > > > > > > > > > > the developer wouldn't have to do anything more
> > than
> > > > > check
> > > > > > > (for
> > > > > > > > > > > > example)
> > > > > > > > > > > > > >
> > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()`
> > > > to
> > > > > > know
> > > > > > > > at
> > > > > > > > > > any
> > > > > > > > > > > > > point
> > > > > > > > > > > > > > in the lifetime of their connector/task whether
> > that
> > > > > > feature
> > > > > > > is
> > > > > > > > > > > > provided
> > > > > > > > > > > > > by
> > > > > > > > > > > > > > the runtime.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > One other high-level comment: this doesn't
> address
> > > > every
> > > > > > > case,
> > > > > > > > > but
> > > > > > > > > > we
> > > > > > > > > > > > > might
> > > > > > > > > > > > > > consider adding an API to "ack" sink records.
> This
> > > > could
> > > > > > use
> > > > > > > > the
> > > > > > > > > > > > > > SubmittedRecords class [1] (with some slight
> > tweaks)
> > > > > under
> > > > > > > the
> > > > > > > > > hood
> > > > > > > > > > > to
> > > > > > > > > > > > > > track the latest-acked offset for each topic
> > > partition.
> > > > > > This
> > > > > > > > way,
> > > > > > > > > > > > > connector
> > > > > > > > > > > > > > developers won't be responsible for tracking
> > offsets
> > > at
> > > > > all
> > > > > > > in
> > > > > > > > > > their
> > > > > > > > > > > > sink
> > > > > > > > > > > > > > tasks (eliminating issues with the accuracy of
> > > > > > > > > post-transformation
> > > > > > > > > > > > T/P/O
> > > > > > > > > > > > > > sink record information), and they'll only have
> to
> > > > notify
> > > > > > the
> > > > > > > > > > Connect
> > > > > > > > > > > > > > framework when a record has been successfully
> > > > dispatched
> > > > > to
> > > > > > > the
> > > > > > > > > > > > external
> > > > > > > > > > > > > > system. This provides a cleaner, friendlier API,
> > and
> > > > also
> > > > > > > > enables
> > > > > > > > > > > more
> > > > > > > > > > > > > > fine-grained metrics like the ones proposed in
> > > KIP-767
> > > > > [2].
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1] -
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > > > > > > > > > > > > [2] -
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <
> > > > > > > > yash.ma...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It's been a while for this one but the more I
> > think
> > > > > about
> > > > > > > it,
> > > > > > > > > the
> > > > > > > > > > > > more
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > feel like the current approach with a new
> > > overloaded
> > > > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > might not be optimal. We're trying to fix a
> > pretty
> > > > > corner
> > > > > > > > case
> > > > > > > > > > bug
> > > > > > > > > > > > here
> > > > > > > > > > > > > > > (usage of topic mutating SMTs with sink
> > connectors
> > > > that
> > > > > > do
> > > > > > > > > their
> > > > > > > > > > > own
> > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > tracking) and I'm not sure that warrants a
> change
> > > to
> > > > > > such a
> > > > > > > > > > central
> > > > > > > > > > > > > > > interface method. The new `SinkTask::put`
> method
> > > just
> > > > > > seems
> > > > > > > > > > > somewhat
> > > > > > > > > > > > > odd
> > > > > > > > > > > > > > > and it may not be very understandable for a new
> > > > reader
> > > > > -
> > > > > > I
> > > > > > > > > don't
> > > > > > > > > > > > think
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > should be the case for a public interface
> method.
> > > > > > > > Furthermore,
> > > > > > > > > > even
> > > > > > > > > > > > > with
> > > > > > > > > > > > > > > elaborate documentation in place, I'm not sure
> if
> > > > it'll
> > > > > > be
> > > > > > > > very
> > > > > > > > > > > > obvious
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > most people what the purpose of having these
> two
> > > > `put`
> > > > > > > > methods
> > > > > > > > > is
> > > > > > > > > > > and
> > > > > > > > > > > > > how
> > > > > > > > > > > > > > > they should be used by sink task
> implementations.
> > > > What
> > > > > do
> > > > > > > you
> > > > > > > > > > > think?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <
> > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks a lot for your valuable feedback so
> far!
> > > > I've
> > > > > > > > updated
> > > > > > > > > > the
> > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > based
> > > > > > > > > > > > > > > > on our discussion above. Could you please
> take
> > > > > another
> > > > > > > > look?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall
> Hauch <
> > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <
> > > > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> > Hi Randall,
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > Thanks for elaborating. I think these are
> > all
> > > > very
> > > > > > > good
> > > > > > > > > > points
> > > > > > > > > > > > > and I
> > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > >> > why the overloaded `SinkTask::put` method
> > is a
> > > > > > cleaner
> > > > > > > > > > > solution
> > > > > > > > > > > > > > > overall.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > > public void put(Collection<SinkRecord>
> > > > records,
> > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > >> > TopicPartition> updatedTopicPartitions)
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > I think this should be
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > `public void put(Collection<SinkRecord>
> > > records,
> > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > >> > TopicPartition> originalTopicPartitions)`
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > instead because the sink records
> themselves
> > > have
> > > > > the
> > > > > > > > > updated
> > > > > > > > > > > > topic
> > > > > > > > > > > > > > > >> > partitions (i.e. after all transformations
> > > have
> > > > > been
> > > > > > > > > > applied)
> > > > > > > > > > > > and
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > > > >> > is proposing a way for the tasks to be
> able
> > to
> > > > > > access
> > > > > > > > the
> > > > > > > > > > > > original
> > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > >> > partition (i.e. before transformations
> have
> > > been
> > > > > > > > applied).
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Sounds good.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > > Of course, if the developer does not
> need
> > > > > separate
> > > > > > > > > > methods,
> > > > > > > > > > > > they
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > >> > easily have the older `put` method simply
> > > > delegate
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > newer
> > > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > If the developer does not need separate
> > > methods
> > > > > > (i.e.
> > > > > > > > they
> > > > > > > > > > > don't
> > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > >> > use this new addition), they can simply
> > > continue
> > > > > > > > > > implementing
> > > > > > > > > > > > just
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> > older `put` method right?
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Correct. We should update the JavaDoc of
> both
> > > > > methods
> > > > > > to
> > > > > > > > > make
> > > > > > > > > > > this
> > > > > > > > > > > > > > > clear,
> > > > > > > > > > > > > > > >> and in general how the two methods should
> are
> > > used
> > > > > and
> > > > > > > > > should
> > > > > > > > > > be
> > > > > > > > > > > > > > > >> implemented. That can be part of the PR, and
> > the
> > > > KIP
> > > > > > > > doesn't
> > > > > > > > > > > need
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > >> wording.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for
> > > > > *eventually*
> > > > > > > > > > > deprecating
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > >> > method, once the Connect runtime versions
> > > > without
> > > > > > this
> > > > > > > > > > change
> > > > > > > > > > > > are
> > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > >> > enough.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > I'm not sure we'd ever want to deprecate
> the
> > > > older
> > > > > > > > method.
> > > > > > > > > > > Most
> > > > > > > > > > > > > > common
> > > > > > > > > > > > > > > >> sink
> > > > > > > > > > > > > > > >> > connector implementations do not do their
> > own
> > > > > offset
> > > > > > > > > > tracking
> > > > > > > > > > > > with
> > > > > > > > > > > > > > > >> > asynchronous processing and will probably
> > > never
> > > > > > have a
> > > > > > > > > need
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> > additional parameter `Map<SinkRecord,
> > > > > > TopicPartition>
> > > > > > > > > > > > > > > >> > originalTopicPartitions` in the proposed
> new
> > > > `put`
> > > > > > > > method.
> > > > > > > > > > > These
> > > > > > > > > > > > > > > >> connectors
> > > > > > > > > > > > > > > >> > can continue implementing only the
> existing
> > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > method
> > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > >> > will be called by the default
> implementation
> > > of
> > > > > the
> > > > > > > > newer
> > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > >> > method.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> +1
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > > the pre-commit methods use the same
> > > > > > > > `Map<TopicPartition,
> > > > > > > > > > > > > > > >> > OffsetAndMetadata> currentOffsets` data
> > > > structure
> > > > > > I'm
> > > > > > > > > > > suggesting
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > >> used.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > The data structure you're suggesting be
> used
> > > is
> > > > a
> > > > > > > > > > > > `Map<SinkRecord,
> > > > > > > > > > > > > > > >> > TopicPartition>` which will map
> `SinkRecord`
> > > > > objects
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > > original
> > > > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > > > >> > partition of the corresponding
> > > `ConsumerRecord`
> > > > > > right?
> > > > > > > > To
> > > > > > > > > > > > clarify,
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > > >> > a new data structure that will need to be
> > > > managed
> > > > > in
> > > > > > > the
> > > > > > > > > > > > > > > >> `WorkerSinkTask`.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Ah, you're right. Thanks for the correction.
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Best regards,
> > > > > > > > > > > > > > > >> Randall
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > > > >> > Yash
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall
> > Hauch <
> > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > > Hi, Yash.
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > I'm not sure I quite understand why it
> > would
> > > > be
> > > > > > > > "easier"
> > > > > > > > > > for
> > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > >> > > > developers to account for implementing
> > two
> > > > > > > different
> > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > >> > > > methods (assuming that they want to
> use
> > > this
> > > > > new
> > > > > > > > > > feature)
> > > > > > > > > > > > > versus
> > > > > > > > > > > > > > > >> using
> > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > >> > > > try-catch block around `SinkRecord`
> > access
> > > > > > > methods?
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > Using a try-catch to try around an API
> > > method
> > > > > that
> > > > > > > > > *might*
> > > > > > > > > > > be
> > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > >> is a
> > > > > > > > > > > > > > > >> > > very unusual thing for most developers.
> > > > > > > Unfortunately,
> > > > > > > > > > we've
> > > > > > > > > > > > had
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > >> > resort
> > > > > > > > > > > > > > > >> > > to this atypical approach with Connect
> in
> > > > places
> > > > > > > when
> > > > > > > > > > there
> > > > > > > > > > > > was
> > > > > > > > > > > > > no
> > > > > > > > > > > > > > > >> good
> > > > > > > > > > > > > > > >> > > alternative. We seem to relying upon
> > pattern
> > > > > > because
> > > > > > > > > it's
> > > > > > > > > > > > easier
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > >> us,
> > > > > > > > > > > > > > > >> > > not because it offers a better
> experience
> > > for
> > > > > > > > Connector
> > > > > > > > > > > > > > developers.
> > > > > > > > > > > > > > > >> IMO,
> > > > > > > > > > > > > > > >> > if
> > > > > > > > > > > > > > > >> > > there's a practical alternative that
> uses
> > > > normal
> > > > > > > > > > development
> > > > > > > > > > > > > > > practices
> > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > >> > > techniques, then we should use that
> > > > alternative.
> > > > > > > IIUC,
> > > > > > > > > > there
> > > > > > > > > > > > is
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > > >> least
> > > > > > > > > > > > > > > >> > > one practical alternative for this KIP
> > that
> > > > > would
> > > > > > > not
> > > > > > > > > > > require
> > > > > > > > > > > > > > > >> developers
> > > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > > >> > > use the unusual try-catch to handle the
> > case
> > > > > where
> > > > > > > > > methods
> > > > > > > > > > > are
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > >> found.
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > I also think having two `put` methods is
> > > > easier
> > > > > > when
> > > > > > > > the
> > > > > > > > > > > > > Connector
> > > > > > > > > > > > > > > >> has to
> > > > > > > > > > > > > > > >> > > do different things for different
> Connect
> > > > > > runtimes,
> > > > > > > > too.
> > > > > > > > > > One
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > those
> > > > > > > > > > > > > > > >> > > methods is called by newer Connect
> > runtimes
> > > > with
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > > behavior,
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > >> > > other method is called by an older
> Connect
> > > > > > runtime.
> > > > > > > Of
> > > > > > > > > > > course,
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> > > developer does not need separate
> methods,
> > > they
> > > > > can
> > > > > > > > > easily
> > > > > > > > > > > have
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > >> > > `put` method simply delegate to the
> newer
> > > > > method.
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for
> > > > > *eventually*
> > > > > > > > > > > deprecating
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > >> > > method, once the Connect runtime
> versions
> > > > > without
> > > > > > > this
> > > > > > > > > > > change
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > >> > > enough.
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > I think the advantage of going with the
> > > > > > > > > > > > > > > >> > > > proposed approach in the KIP is that
> it
> > > > > wouldn't
> > > > > > > > > require
> > > > > > > > > > > > extra
> > > > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in
> > > your
> > > > > > > proposed
> > > > > > > > > > > > approach)
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > The connector does have to do some of
> this
> > > > > > > bookkeeping
> > > > > > > > > in
> > > > > > > > > > > how
> > > > > > > > > > > > > they
> > > > > > > > > > > > > > > >> track
> > > > > > > > > > > > > > > >> > > the topic partition offsets used in the
> > > > > > `preCommit`,
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > > > > > >> pre-commit
> > > > > > > > > > > > > > > >> > > methods use the same
> `Map<TopicPartition,
> > > > > > > > > > OffsetAndMetadata>
> > > > > > > > > > > > > > > >> > > currentOffsets`
> > > > > > > > > > > > > > > >> > > data structure I'm suggesting be used.
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > I hope that helps.
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > Best regards,
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > Randall
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash
> > Mayya <
> > > > > > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> > > > Hi Randall,
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > Thanks for reviewing the KIP!
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > > That latter logic can get quite
> ugly.
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > I'm not sure I quite understand why it
> > > would
> > > > > be
> > > > > > > > > "easier"
> > > > > > > > > > > for
> > > > > > > > > > > > > > > >> connector
> > > > > > > > > > > > > > > >> > > > developers to account for implementing
> > two
> > > > > > > different
> > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > >> > > > methods (assuming that they want to
> use
> > > this
> > > > > new
> > > > > > > > > > feature)
> > > > > > > > > > > > > versus
> > > > > > > > > > > > > > > >> using
> > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > >> > > > try-catch block around `SinkRecord`
> > access
> > > > > > > methods?
> > > > > > > > In
> > > > > > > > > > > both
> > > > > > > > > > > > > > > cases, a
> > > > > > > > > > > > > > > >> > > > connector developer would need to
> write
> > > > > > additional
> > > > > > > > > code
> > > > > > > > > > in
> > > > > > > > > > > > > order
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > >> > > ensure
> > > > > > > > > > > > > > > >> > > > that their connector continues working
> > > with
> > > > > > older
> > > > > > > > > > Connect
> > > > > > > > > > > > > > > runtimes.
> > > > > > > > > > > > > > > >> > > > Furthermore, we would probably need to
> > > > > carefully
> > > > > > > > > > document
> > > > > > > > > > > > how
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > >> > > > implementation for the older `put`
> > method
> > > > > should
> > > > > > > > look
> > > > > > > > > > like
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > >> > connectors
> > > > > > > > > > > > > > > >> > > > that want to use this new feature. I
> > think
> > > > the
> > > > > > > > > advantage
> > > > > > > > > > > of
> > > > > > > > > > > > > > going
> > > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > >> > > > proposed approach in the KIP is that
> it
> > > > > wouldn't
> > > > > > > > > require
> > > > > > > > > > > > extra
> > > > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in
> > > your
> > > > > > > proposed
> > > > > > > > > > > > approach)
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > >> also
> > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > >> > > > fact that the try-catch based logic is
> > an
> > > > > > already
> > > > > > > > > > > > established
> > > > > > > > > > > > > > > >> pattern
> > > > > > > > > > > > > > > >> > > > through
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > > > > > > > > > > > > >> > > > and other KIPs which added methods to
> > > > > > source/sink
> > > > > > > > > > > > > connector/task
> > > > > > > > > > > > > > > >> > > contexts.
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > Let me know if you still feel that
> > having
> > > a
> > > > > new
> > > > > > > > > > overloaded
> > > > > > > > > > > > put
> > > > > > > > > > > > > > > >> method
> > > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > > >> > > a
> > > > > > > > > > > > > > > >> > > > cleaner solution and I'd be happy to
> > > > > reconsider!
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > > > >> > > > Yash
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM
> Randall
> > > > > Hauch <
> > > > > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this
> > KIP
> > > > and
> > > > > > > > > > discussion.
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > The KIP includes this rejected
> > > > alternative:
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way
> to
> > > > pass
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > >> > outside
> > > > > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map or a
> derived
> > > > class)
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > >    -
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > >    Much more disruptive change
> > without
> > > > > > > > > considerable
> > > > > > > > > > > pros
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > One advantage about doing this is
> that
> > > > sink
> > > > > > > > > connector
> > > > > > > > > > > > > > > >> implementations
> > > > > > > > > > > > > > > >> > > can
> > > > > > > > > > > > > > > >> > > > > more easily implement two different
> > > > > "put(...)"
> > > > > > > > > methods
> > > > > > > > > > > to
> > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > > >> > > running
> > > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > > >> > > > > a variety of runtimes, without
> having
> > to
> > > > use
> > > > > > > > > try-catch
> > > > > > > > > > > > logic
> > > > > > > > > > > > > > > >> around
> > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > >> > > > > newer SinkRecord access methods.
> That
> > > > latter
> > > > > > > logic
> > > > > > > > > can
> > > > > > > > > > > get
> > > > > > > > > > > > > > quite
> > > > > > > > > > > > > > > >> > ugly.
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > For example, the existing `put`
> method
> > > has
> > > > > > this
> > > > > > > > > > > signature:
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > public abstract void
> > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > records);
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > If we added an overloaded method
> that
> > > > passed
> > > > > > in
> > > > > > > a
> > > > > > > > > map
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > >> > > > > topic+partition for each record (and
> > > > defined
> > > > > > the
> > > > > > > > > > absence
> > > > > > > > > > > > of
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > > >> entry
> > > > > > > > > > > > > > > >> > as
> > > > > > > > > > > > > > > >> > > > > having an unchanged topic and
> > > partition):
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > public void
> put(Collection<SinkRecord>
> > > > > > records,
> > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > >> > > > > TopicPartition>
> > updatedTopicPartitions)
> > > {
> > > > > > > > > > > > > > > >> > > > > put(records);
> > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > then a `SinkTask` implementation
> that
> > > > wants
> > > > > to
> > > > > > > use
> > > > > > > > > > this
> > > > > > > > > > > > new
> > > > > > > > > > > > > > > >> feature
> > > > > > > > > > > > > > > >> > > could
> > > > > > > > > > > > > > > >> > > > > simply implement both methods:
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > public void
> put(Collection<SinkRecord>
> > > > > > records)
> > > > > > > {
> > > > > > > > > > > > > > > >> > > > > // Running in an older runtime, so
> no
> > > > > tracking
> > > > > > > of
> > > > > > > > > > > > > SMT-modified
> > > > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > > > >> > > > names
> > > > > > > > > > > > > > > >> > > > > or partitions
> > > > > > > > > > > > > > > >> > > > > put(records, Map.of());
> > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > public void
> put(Collection<SinkRecord>
> > > > > > records,
> > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > >> > > > > TopicPartition>
> > updatedTopicPartitions)
> > > {
> > > > > > > > > > > > > > > >> > > > > // real logic here
> > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > This seems a lot easier than having
> to
> > > use
> > > > > > > > try-catch
> > > > > > > > > > > > logic,
> > > > > > > > > > > > > > yet
> > > > > > > > > > > > > > > >> still
> > > > > > > > > > > > > > > >> > > > > allows sink connectors to utilize
> the
> > > new
> > > > > > > > > > functionality
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > >> > work
> > > > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > > > >> > > > > older Connect runtimes.
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > WDYT?
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > Randall
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash
> > > Mayya
> > > > <
> > > > > > > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > > > > Hi all,
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > > I would like to (re)start a new
> > > > discussion
> > > > > > > > thread
> > > > > > > > > on
> > > > > > > > > > > > > KIP-793
> > > > > > > > > > > > > > > >> (Kafka
> > > > > > > > > > > > > > > >> > > > > > Connect) which proposes some
> > additions
> > > > to
> > > > > > the
> > > > > > > > > public
> > > > > > > > > > > > > > > SinkRecord
> > > > > > > > > > > > > > > >> > > > interface
> > > > > > > > > > > > > > > >> > > > > > in order to support topic mutating
> > > SMTs
> > > > > for
> > > > > > > sink
> > > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > >> that do
> > > > > > > > > > > > > > > >> > > > their
> > > > > > > > > > > > > > > >> > > > > > own offset tracking.
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > > Links:
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > > KIP:
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > > Older discussion thread:
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h
> > > > > > ,
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > > Jira:
> > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13431
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > > > >> > > > > > Yash
> > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to