Hi,

Thanks for the KIP!


BC1.
I echo point S3 from Sophie. I find it quite confusing having those methods return an interceptor that is used as a client in the Streams code. Naming might already do the trick. Something like Intercepted(Admin|StreamsConsumer|Consumer|Producer) or Wrapped(Admin|StreamsConsumer|Consumer|Producer) come to my mind.

Related to this: Why is the interface with all the wrap* methods also called Interceptor? The functionality is quite different from the other interceptors.


BC2.
Why do you not use interfaces instead of the classes for the clients in the wrap* signatures? We just want to intercept the calls to the interface, right?

Related to this, why do you pass-in also the configs of the clients, if you only want to wrap the calls to them?


BC3.
Why do we need KafkaStreamsConsumer in the public API? Isn't the StreamsConsumer interface enough?


BC4.
I am wondering, whether a kind of observer pattern would be better. That means, in Streams we do not call the interceptor but we call the client and for each call we also call the corresponding method on the passed in object -- basically the user implements callbacks.

That would make it even clearer that you should not inject your own client implementations since we remove the user's object from the direct call to the client. The injected object cannot participate into the RPCs at all. The trade-off is that the user only gets a call before the actual call to the client or afterwards. To alleviate that, we could provide an interface for the passed-in object that has a pre-call and post-call for each client call. Internally, we would have a wrapper around the clients that calls the corresponding method of the user-provided method. Something like:

public ConsumerRecords<K, V> poll(Duration timeout) {
    ConsumerRecords<K, V> records = actualConsumer.poll(timeout);
    return userConsumerObserver.poll(timeout, records);
}

or with the pre and post calls

public ConsumerRecords<K, V> poll(Duration timeout) {
    userConsumerObserver.prePoll(timeout);
    ConsumerRecords<K, V> records = actualConsumer.poll(timeout);
    return userConsumerObserver.postPoll(timeout, records);
}


Best,
Bruno


On 05.10.24 02:31, Sophie Blee-Goldman wrote:
Thanks for the update Matthias! I'm totally in agreement with the new
proposal and have mainly only cosmetic points and minor nits remaining.
Once we come to agreement on these I would be happy to move this to a vote
(unless others chime in with new concerns of course)

S1. One thing that jumped out at me is that only the main consumer
implements the new StreamsConsumer interface, whereas the restore consumer
and global consumers both remain on the original plain 'Consumer' type.
This decision in itself makes sense to me, but it makes me question the
naming and whether it might confuse users to have only 1 of 3 consumer
types in Kafka Streams use the "StreamsConsumer". Have you considered any
other names for the interface yet? I suppose "StreamsMainConsumer" is the
most literal option, but I would prefer to hint at the underlying behavior
difference in some way and call it something like "StreamsGroupConsumer"
since that is fundamentally what it is.

S2. Building off of S1, personally I always thought something like
"groupConsumer" or "streamsGroupConsumer" would be a better, more
descriptive name than "mainConsumer" for the consumer whose job is to join
the application's consumer group (or "streams group" after KIP-1071). So
perhaps we can also take the opportunity to do this renaming and change the
interceptor's method name from "#wrapMainConsumer" to
"#wrapStreamsGroupConsumer" or something like that? (Would also be nice to
rename the variable names in the code to reflect this but that's an
implementation detail, and can be done in a standalone PR after the KIP is
completed. I'll do it myself if I have to :P )

S3. I'm a little confused about the need to introduce the intermediary
"<ClientName>Interceptor" classes, like AdminInterceptor and
ConsumerInterceptor and so on. Can't these methods just return the client
type directly? Doesn't seem like we're adding anything to these interfaces
and just extending the client class. Are they supposed to be marker
interfaces or something like that? It feels like we're adding unnecessary
noise to the API so I'd just like to understand the motivation behind this
choice

S4. Can you fill in the javadocs for the Interceptor class? Just want to
make sure we include some specific things. Mainly, if we're going to
include a StreamsConfig method of injecting the interceptor, we should make
sure users know to have their interceptor implementation extend
Configurable in order to obtain the application configs for any setup they
need to do when instantiated via the default constructor. Alternatively we
can just have the KafkaClientInterceptor extend Configurable itself as we
do in some similar cases.

S5. On the subject of the new config:
5a. Since it requires a class or class name, let's add a "class" suffix, eg
"default.client.interceptor.class"
5b. Can you include the doc string in the KIP? The main thing I want to
make sure gets included is the prioritization, that is: what happens if
someone defines a KafkaClientInterceptor  class via StreamsConfig but also
passes in an instance to the KafkaStreams constructor? Personally I'd say
the KafkaStreams constructor instance should get preference and override
the configured class, but the important thing is to document the behavior,
whatever it may be
5c. I forget the KIP number but recently we discussed removing the
"default" prefix from some config names. I'm wondering whether it makes
sense to include it here or if we should use this opportunity to strip the
"default" from this config. On the one hand, since you can override the
configured value by passing an instance in to the KafkaStreams constructor,
maybe this should be considered a default indeed. On the other hand you're
still only able to specify one interceptor per app so I'm personally
leaning more towards just "client.interceptor.class" without the "default"
prefix. Don't feel too strongly about this either way though so I just
wanted to raise the question, and am happy with whatever you prefer

S6. Final nit: it bothers me slightly that the class name is
"KafkaClientInterceptor" but all the methods are "wrapXXX". Is it a wrapper
or an interceptor? Am I just being pedantic or should we change the name to
"KafkaClientWrapper"? (I don't feel all that strongly about this either,
just wondering if the inconsistency bugs anyone else)

Looking forward to getting this KIP done at last!

On Thu, Oct 3, 2024 at 5:21 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks for your feedback Alieh and Sophie.

For everybody: we did sync about open questions in person, and I hope to
reply w/o forgetting to add enough context about what we did discuss in
person. If anything is missing/unclear, please let us know.


Based on the in-person discussion, we agree to add a new interface

      interface StreamsConsumer extends Consumer { }

and a new class

      public class KafkaStreamsConsumer
          extends KafkaConsumer implements StreamsConsumer { }

And let the new `KafkaClientInterceptor` use these new interface/class
instead of `Consumer/KafkaConsumer` for the main consumer.

Note that we don't add any new methods to this interface at this point.
The reason is, that the design for KIP-1071 is still to early to know
what we will exactly need, and we just want to setup the code early such
that we can add new methods easily in the future.

The alternative to adding `StreamsConsumer` would be to add this new
interface later if we need it, however, we opted to introduce it right
away to avoid future "deprecation noise".

We also decided to add a config parameter into the callbacks, allowing
interceptor code to inspect the used configuration for a client. This
will be read-only as we did already create the client before calling the
interceptor for wrapping. If there is demand in the future, we can
always extend the interceptor interface to also allow changing configs.


I also have a rough POC PR for reference. It does not yet reflect the
latest points as mentioned in this email, but I hope it will help to
clarify a things: https://github.com/apache/kafka/pull/17333


In addition, I update the KIP with a proposal to add predefined
"interceptor classes"
([Admin|StreamsConsumer|Consumer|Producer]Interceptor) to improve
backward compatibility. Details are on the KIP.


I also add a new config for the interceptor class, similar to the
existing config for the current client-supplier one.




Below more detailed replies for Alieh/Sophie.


What would happen if the implemented class creates a
new instance

If KIP-1071 is not used, nothing. It would just work. However, if
KIP-1071 is used, it would most likely break KS. -- However, given that
we now introduce the new `XxxInterceptor` classes, it would be a little
bit harder to return new instances (even if still not impossible...)


in other words, how do we prevent it?

As I tried to point out on the KIP, I believe we cannot completely
prevent it; we can only make it an API contract and if users violate the
API contract, it's their own fault if KS breaks. That is why we need to
move off `KafkaClientSupplier` so we can change the API contract w/o
breaking anybody. Using the `XxxInterceptor` classes also helps


About the MockConsumer: As said, we cannot prevent it and for our own
tests we might still need to create a new instance (we would need to let
`MockConsumer extend ConsumerInterceptor` but that's easy). For most
(all?) tests when we mock a client, we don't really connect to a broker
cluster, but it's some unit test. For unit tests, nothing should break.
And for actual integration test, I don't think what we would mock
clients...



Whatever
we end up doing, if there's a way for someone to implement it incorrectly
then it would be good to figure out how we can detect this and warn the
user. I'm not quite sure how much of a safety net we can construct here
but
we could try to cover some of the obvious misssteps, for example by
asserting that the returned Consumer is not an instance of KafkaConsumer
unless it's the same exact KafkaConsumer object passed into the
#getMainConsumer call. Not sure if there are other checks we could do?

I don't think that is possible by definition. We do pass in
`KafkaConsumer` but do expect a `ConsumerInterceptor` object back. The
fundamental problem is, that we cannot know if the object we get is an
actually wrapper/intercpetor for the passed-in client, or if the object
is not wrapper at all, but a `new MyKafkaConsumer extends
ConsumerInterceptor {...}` object. -- But I think it's ok if we cannot
guard against it fully -- we can also not guard against users messing
with KS using reflections...



The real issue with constructing and
returning a different KafkaConsumer instead of wrapping the one passed
in,
is that Streams will be invoking some methods on the user's Consumer
while
invoking other methods on the original KafkaConsumer.

No. All methods of the user's Consumer would be invoked. KS only
_constructs_ the clients (to hook into some internals, and "connect" KS
runtime code to consumer internals for KIP-1071), but otherwise KS would
only used the interceptor for all calls on the client. And with the new
`StreamsConsumer` interface, we can add new method in the future.



If every consumer method
call is being made directly on the Consumer returned from
#getMainConsumer,
it doesn't really matter if they constructed a different one.

If would matter, as KS code would now be "decoupled" from this consumer
instance. KS runtime can only tap into the client it does create itself,
by using non-public constructor to pass-in non-public "callbacks" which
we need to integrate with KIP-1071. (At least for now...)

As discussed in person: we might want to make part of these things
public, but we don't know yet, which (if any) and how the concrete API
should be designed. Thus, we want to keep everything non-public for now,
and when KIP-1071 evolves, we will know better what we need.



If this KIP is
going to immediately deprecate the old KafkaClientSupplier, then we need
to
support implementing an interceptor while configured to use the current
rebalancing protocol.

I don't think we need to do this, but we could make it mutually
exclusive. However, I don't anticipate any issue to actually support the
interceptor interface w/ the old protocol. -- And the KIP explicitly
disallows to use both supplier and interceptor, and also disallows using
KIP-1071 plus supplier.



That is, can we
assume that the KafkaConsumer passed into #getMainConsumer will only be
constructed via the internal constructor/put into "Streams group mode" if
the app is configured to use the new protocol, and otherwise it will just
pass in a regular KafkaConsumer?

Yes. That's why I don't see any issue as stated above.




-Matthias




On 9/9/24 10:26 PM, Sophie Blee-Goldman wrote:
I agree with Alieh, there is definitely some inconsistency in what we do
or
don't want to support/allow. However imo we should lean into the more
flexible approach of allowing an alternative consumer to be constructed
and
returned, rather than attempting to enforce the interceptor to strictly
wrap the provided consumer. Mainly because the ability to bypass the
"real"
KafkaConsumer and mock/stub methods is pretty essential to the test/mock
consumer use cases. Personally I feel this is a very advanced feature and
we should let people who know what they're doing implement any kind of
consumer for this, and if they misuse the API and break things then it's
on
them. I feel we can document things clearly enough so that it will be
difficult to shoot yourself in the foot by accident (plus if they do end
up
breaking things eg by constructing a new Consumer and just returning that
instead, it will at least break quickly and obviously. Figuring out that
the problem was in your interceptor might be challenging though. Whatever
we end up doing, if there's a way for someone to implement it incorrectly
then it would be good to figure out how we can detect this and warn the
user. I'm not quite sure how much of a safety net we can construct here
but
we could try to cover some of the obvious misssteps, for example by
asserting that the returned Consumer is not an instance of KafkaConsumer
unless it's the same exact KafkaConsumer object passed into the
#getMainConsumer call. Not sure if there are other checks we could do?

I do suspect this problem just goes away if we pivot to the
StreamsConsumer
approach we've discussed a bit before. The real issue with constructing
and
returning a different KafkaConsumer instead of wrapping the one passed
in,
is that Streams will be invoking some methods on the user's Consumer
while
invoking other methods on the original KafkaConsumer. Another reason that
splitting up the consumer into a public interface and a hidden internal
one
seems to complicate things for no clear reason. If every consumer method
call is being made directly on the Consumer returned from
#getMainConsumer,
it doesn't really matter if they constructed a different one. In fact we
could probably just skip having Streams construct its own consumer and
instead let the user construct it or provide an alternative
implementation
instead. As I understand it, the only reason we want to create the
underlying consumer within Streams is to access an internal constructor
that puts it into "Streams" mode. But that could be solved in a different
way, for example if/when Streams invokes a Streams-specific method like
#setProcessId this automatically puts the consumer into Streams mode. Not
sure if this works since I don't have the full picture with details on
this
internal constructor, just feels like that doesn't have to be the only
possible way to do things (not necessarily better ways, just different)

On a somewhat related note, I had a question about the deprecation plan.
As
I understand it, the stated goal of KIP-1071 is to eventually deprecate
and
then remove  support for the current rebalancing protocol, but the
timeline
is undefined and nothing is being deprecated right now. If this KIP is
going to immediately deprecate the old KafkaClientSupplier, then we need
to
support implementing an interceptor while configured to use the current
rebalancing protocol. Is this reading of the KIP correct? That is, can we
assume that the KafkaConsumer passed into #getMainConsumer will only be
constructed via the internal constructor/put into "Streams group mode" if
the app is configured to use the new protocol, and otherwise it will just
pass in a regular KafkaConsumer? Just want to make sure my understanding
is
correct here

On Fri, Sep 6, 2024 at 8:29 AM Alieh Saeedi <asae...@confluent.io.invalid

wrote:

Sorry for the typo:
contracts = contradicts*

On Fri, Sep 6, 2024 at 5:20 PM Alieh Saeedi <asae...@confluent.io>
wrote:

Thanks Matthias for the KIP.

A quick question regarding the sentence `It would be invalid to create
a
new client instance `: What would happen if the implemented class
creates
a new instance, or, in other words, how do we prevent it? Considering
that
`Config` is going to be passed in as well (the 2nd point raised by
Sophie),
Also, the new Consumer object (`mockConsumer`) you created in your
example
here <https://lists.apache.org/thread/l6dhq1rfl3xkq8g9wfqsvw89yjrgzbn8

confused me since it contracts with the above sentence.

Thanks,
Alieh

On Fri, Sep 6, 2024 at 5:08 AM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

1. Fair enough. In the end it doesn't matter that much to us, so I
just
figured from basic principles if this is a config then it should go in
the
StreamsConfig. Also happy to hear what others think

2. We need the client-specific configs (for example to extract the
client
ids for monitoring, resource management, etc). However, I checked our
implementation and realized it's not enough to just pass them in -- we
actually want to modify some configs on the fly, before they are used
to
construct the client. If absolutely necessary I'm sure we could find a
workaround for this by overriding the configs in the original
StreamsConfig
via the client prefixes, but that would be kind of a hassle so I'd be
happy
if we found a way to maintain the read-write mode for client configs
as
well

3. Perhaps we do need a PR -- not for the feature implementation
itself,
but an example of how this would be used. As I understand it, the
whole
point is to be able to cast the client to KafkaConsumer (or some
consumer
class?) to access its internal methods. What is the specific plan for
how
this will work with the interceptors proposed in this KIP?

I was imagining the idea was that Streams would retain a reference to
the
underlying KafkaConsumer (ie what's passed in to the interceptor
callback)
and it would be invoking those internal methods on that KafkaConsumer,
while all the public methods are to be invoked on the consumer
returned
by
the interceptor. But as long as some internal methods are being
invoked
on
a consumer then there's a part of Streams that is inherently dependent
on
having a valid KafkaConsumer implementation, which is dependent on
having
a
valid cluster it can connect to...no?

I really think it would help to at least outline what these internal
methods are. Right now I'm assuming they are going to result in the
consumer sending an RPC or taking some other action that would be
problematic if there was no valid cluster, but maybe that's not the
case?
Don't want to waste your time because my assumptions are wrong, so
some
specifics would help.  :)

On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for the input Sophie.

About (1) -- I am personally happy either way. Let's wait what others
prefer (constructor only, config only, both), and I can update the
KIP
later. Does does seem to be a core issue for the KIP.


For (2), I guess we can do this, and I am open to add it. Wondering
if
you would need to get access to the client specific config KS
computes,
or if we should let `KafkaClientIntercptor implement Configurable`
and
thus add a single `configure(Map)` call passing in the user's
provided
config only once?


About (3), I don't follow your concern. As pointed out in the KIP
itself, we cannot enforce that no MockClient is returned, and for a
mock
case, one would not have a broker cluster and not have a fully
fletched
KS runtime, thus, just switching out the client should still work as
before (to be fair: I don't have full PR for this yet -- if we think
it
would be helpful, I can work more on it an push to GitHub so we get a
clearer picture):

MyInterceptor implement KafkaClientInterceptor {
     MockConsumer mockConsumer = new MockConsumer();

     public Consumer wrapConsumer(KafkaConsumer consumer) {
       // this would never make any call into `consumer` at all
       return mockConsumer;
     }
}

Note that the return type is `Consumer`, ie, the top level interface,
but not `KafkaConsumer` and thus you can return anything you want.

Similarly, you can intercept any call transparently:

MyInterceptor implement KafkaClientInterceptor {

     public Consumer wrapConsumer(KafkaConsumer consumer) {
       return new Consumer {
         @Overrride
         public ConsumerRecords<K, V> poll(final long timeoutMs) {
           // add 500ms to the timeout, and forward the call
           ConsumerRecords r1 = consumer.poll(timeoutMs + 500);

           // apply some filtering to modify the result
           Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new
...;
           for(ConsumerRecord r : r1) {
             if (...) {
               r2.put(r1.partition, r1....)
             }
           }
           return new ConsumerRecords(r2);
         }

         @Overwrite
         public Map<TopicPartition, Long>
endOffsets(Collection<TopicPartition> partitions) {

            // don't forward call at all

            Map offsets = new HashMap<>;
            offset.put(...); // add anything you want

            return offsets;
          }
       };
     }
}

Of course, for a real deployment, one must be careful what to
intercept,
forward, or not forward into the actual client, but you folks know
what
you are doing to I am not worried about it. In general, yes, if calls
are intercepted incorrectly, one could break Kafka Streams, but this
is
true right now, too, so I don't think anything really changes.

I guess, in the end, the new interface allows you to do everything
you
did before, but we still change the API contract a little bit, as
Kafka
Streams provides a client instance now.


Does this help?


-Matthias




On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote:
I have one more thing to add/emphasize around point 3)  -- I should
clarify
that the need to modify return values and skip delegated calls are
essential to our own client wrappers. In other words, this isn't
something
specific to mock/test clients. Just wanted to point that out so I
didn't
accidentally cause you to waste time looking for a workaround for
testing
specifically, I was just using mock/test clients as an example case.

For a specific example, we do some things around intercepting seek
calls
in
order to set offsets correctly for our remote stores, such as
overriding
the #seekToBeginning so it instead seeks to specific offsets. This
isn't
the only thing but I think it showcases clearly how performing the
call
being intercepted (in this case a seekToBeginning) would end up
completely
undoing the interceptor's actions (the seek to a specific offset).

Hope this makes sense! Thanks

On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

Thanks Matthias!

1. Imo it makes more sense for the new client interceptor to be
configurable via config and not by KafkaStreams constructor. Let's
take
the
opportunity to reduce the API surface area of the already massively
overloaded KafkaStreams constructor and fix the inclusion of the
KafkaClientSupplier/Interceptor in the primary KafkaStreams
interface.
We
already started moving in this direction with KIP-884 which added
the
*default.client.supplier* config. Think it was just an oversight
that we
didn't deprecate the constructors in that same KIP (didn't see this
mentioned in the rejected alternatives section).

2. We need to continue passing in the config map to the
interceptors/wrappers. Happy to elaborate if need be but this is
absolutely
essential to us :)

3. I'm a bit confused about how injecting mock/test clients would
work
with these interceptors. If I understand the proposed API
correctly,
these
are simply transparent wrappers that don't allow one to skip the
delegated
call or mock the returned value. There are many users who plug in
mock
clients to unit test their code without spinning up a broker
(especially
since the EmbeddedKafkaCluster isn't even a public API) and verify
things
outside the bounds of the TTD. There are many examples of this in
Kafka
Streams itself -- perhaps you could show an example?

4. FYI I am reaching out about whether there are any true custom
client
alternatives out there or in the planning. So far I'm not aware of
any
and
we can/should proceed assuming there are none, but I'll update the
thread
if I learn about something new here.

On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org>
wrote:

Hi,

I would like to start a discussion on KIP-1088:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor

We consider this KIP a requirement/blocker for KIP-1071.

For the beginning of the discussion, I would like to stay focused
on
the
_how_ and not necessarily talk about names... I am happy to change
any
class/methods names, but it might be distracting in the beginning
of
the
discussion.

Looking forward to your feedback.


-Matthias











Reply via email to