HI All, Matthias - thanks for the KIP!
I'm in favor of this proposal, but I'd like to summarize my understanding to ensure I'm getting everything. 1. KafkaStreams will remain in control of creating the client unless the user overrides that behavior (I think this would be a rare case). Although we can't wholly enforce this creation rule, it's evident that doing so breaks the contract, and users will need to understand they are on their own at that point. Of course, providing a MockConsumer instance would be supported or expected for testing. 2. It allows users to override given method(s), in other words, "cherry-pick" the behavior to modify easily, rather than implementing their own client with overridden methods. (Admittedly, the difference between the two approaches is minor.) 3. While we could only provide a `StreamsConsumer` and add methods as needed, providing an interceptor seems more direct in guiding users on how to modify behavior. I'm good with the KIP as it stands now. I get what Sophie and Bruno are alluding to with the naming, so maybe a name of `WrappedX` would be more appropriate, but I don't have a strong opinion. Thanks, Bill On Mon, Oct 14, 2024 at 4:17 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi, > > Thanks for the KIP! > > > BC1. > I echo point S3 from Sophie. I find it quite confusing having those > methods return an interceptor that is used as a client in the Streams > code. Naming might already do the trick. Something like > Intercepted(Admin|StreamsConsumer|Consumer|Producer) or > Wrapped(Admin|StreamsConsumer|Consumer|Producer) come to my mind. > > Related to this: Why is the interface with all the wrap* methods also > called Interceptor? The functionality is quite different from the other > interceptors. > > > BC2. > Why do you not use interfaces instead of the classes for the clients in > the wrap* signatures? We just want to intercept the calls to the > interface, right? > > Related to this, why do you pass-in also the configs of the clients, if > you only want to wrap the calls to them? > > > BC3. > Why do we need KafkaStreamsConsumer in the public API? Isn't the > StreamsConsumer interface enough? > > > BC4. > I am wondering, whether a kind of observer pattern would be better. That > means, in Streams we do not call the interceptor but we call the client > and for each call we also call the corresponding method on the passed in > object -- basically the user implements callbacks. > > That would make it even clearer that you should not inject your own > client implementations since we remove the user's object from the direct > call to the client. The injected object cannot participate into the RPCs > at all. The trade-off is that the user only gets a call before the > actual call to the client or afterwards. To alleviate that, we could > provide an interface for the passed-in object that has a pre-call and > post-call for each client call. > Internally, we would have a wrapper around the clients that calls the > corresponding method of the user-provided method. Something like: > > public ConsumerRecords<K, V> poll(Duration timeout) { > ConsumerRecords<K, V> records = actualConsumer.poll(timeout); > return userConsumerObserver.poll(timeout, records); > } > > or with the pre and post calls > > public ConsumerRecords<K, V> poll(Duration timeout) { > userConsumerObserver.prePoll(timeout); > ConsumerRecords<K, V> records = actualConsumer.poll(timeout); > return userConsumerObserver.postPoll(timeout, records); > } > > > Best, > Bruno > > > On 05.10.24 02:31, Sophie Blee-Goldman wrote: > > Thanks for the update Matthias! I'm totally in agreement with the new > > proposal and have mainly only cosmetic points and minor nits remaining. > > Once we come to agreement on these I would be happy to move this to a > vote > > (unless others chime in with new concerns of course) > > > > S1. One thing that jumped out at me is that only the main consumer > > implements the new StreamsConsumer interface, whereas the restore > consumer > > and global consumers both remain on the original plain 'Consumer' type. > > This decision in itself makes sense to me, but it makes me question the > > naming and whether it might confuse users to have only 1 of 3 consumer > > types in Kafka Streams use the "StreamsConsumer". Have you considered any > > other names for the interface yet? I suppose "StreamsMainConsumer" is the > > most literal option, but I would prefer to hint at the underlying > behavior > > difference in some way and call it something like "StreamsGroupConsumer" > > since that is fundamentally what it is. > > > > S2. Building off of S1, personally I always thought something like > > "groupConsumer" or "streamsGroupConsumer" would be a better, more > > descriptive name than "mainConsumer" for the consumer whose job is to > join > > the application's consumer group (or "streams group" after KIP-1071). So > > perhaps we can also take the opportunity to do this renaming and change > the > > interceptor's method name from "#wrapMainConsumer" to > > "#wrapStreamsGroupConsumer" or something like that? (Would also be nice > to > > rename the variable names in the code to reflect this but that's an > > implementation detail, and can be done in a standalone PR after the KIP > is > > completed. I'll do it myself if I have to :P ) > > > > S3. I'm a little confused about the need to introduce the intermediary > > "<ClientName>Interceptor" classes, like AdminInterceptor and > > ConsumerInterceptor and so on. Can't these methods just return the client > > type directly? Doesn't seem like we're adding anything to these > interfaces > > and just extending the client class. Are they supposed to be marker > > interfaces or something like that? It feels like we're adding unnecessary > > noise to the API so I'd just like to understand the motivation behind > this > > choice > > > > S4. Can you fill in the javadocs for the Interceptor class? Just want to > > make sure we include some specific things. Mainly, if we're going to > > include a StreamsConfig method of injecting the interceptor, we should > make > > sure users know to have their interceptor implementation extend > > Configurable in order to obtain the application configs for any setup > they > > need to do when instantiated via the default constructor. Alternatively > we > > can just have the KafkaClientInterceptor extend Configurable itself as we > > do in some similar cases. > > > > S5. On the subject of the new config: > > 5a. Since it requires a class or class name, let's add a "class" suffix, > eg > > "default.client.interceptor.class" > > 5b. Can you include the doc string in the KIP? The main thing I want to > > make sure gets included is the prioritization, that is: what happens if > > someone defines a KafkaClientInterceptor class via StreamsConfig but > also > > passes in an instance to the KafkaStreams constructor? Personally I'd say > > the KafkaStreams constructor instance should get preference and override > > the configured class, but the important thing is to document the > behavior, > > whatever it may be > > 5c. I forget the KIP number but recently we discussed removing the > > "default" prefix from some config names. I'm wondering whether it makes > > sense to include it here or if we should use this opportunity to strip > the > > "default" from this config. On the one hand, since you can override the > > configured value by passing an instance in to the KafkaStreams > constructor, > > maybe this should be considered a default indeed. On the other hand > you're > > still only able to specify one interceptor per app so I'm personally > > leaning more towards just "client.interceptor.class" without the > "default" > > prefix. Don't feel too strongly about this either way though so I just > > wanted to raise the question, and am happy with whatever you prefer > > > > S6. Final nit: it bothers me slightly that the class name is > > "KafkaClientInterceptor" but all the methods are "wrapXXX". Is it a > wrapper > > or an interceptor? Am I just being pedantic or should we change the name > to > > "KafkaClientWrapper"? (I don't feel all that strongly about this either, > > just wondering if the inconsistency bugs anyone else) > > > > Looking forward to getting this KIP done at last! > > > > On Thu, Oct 3, 2024 at 5:21 PM Matthias J. Sax <mj...@apache.org> wrote: > > > >> Thanks for your feedback Alieh and Sophie. > >> > >> For everybody: we did sync about open questions in person, and I hope to > >> reply w/o forgetting to add enough context about what we did discuss in > >> person. If anything is missing/unclear, please let us know. > >> > >> > >> Based on the in-person discussion, we agree to add a new interface > >> > >> interface StreamsConsumer extends Consumer { } > >> > >> and a new class > >> > >> public class KafkaStreamsConsumer > >> extends KafkaConsumer implements StreamsConsumer { } > >> > >> And let the new `KafkaClientInterceptor` use these new interface/class > >> instead of `Consumer/KafkaConsumer` for the main consumer. > >> > >> Note that we don't add any new methods to this interface at this point. > >> The reason is, that the design for KIP-1071 is still to early to know > >> what we will exactly need, and we just want to setup the code early such > >> that we can add new methods easily in the future. > >> > >> The alternative to adding `StreamsConsumer` would be to add this new > >> interface later if we need it, however, we opted to introduce it right > >> away to avoid future "deprecation noise". > >> > >> We also decided to add a config parameter into the callbacks, allowing > >> interceptor code to inspect the used configuration for a client. This > >> will be read-only as we did already create the client before calling the > >> interceptor for wrapping. If there is demand in the future, we can > >> always extend the interceptor interface to also allow changing configs. > >> > >> > >> I also have a rough POC PR for reference. It does not yet reflect the > >> latest points as mentioned in this email, but I hope it will help to > >> clarify a things: https://github.com/apache/kafka/pull/17333 > >> > >> > >> In addition, I update the KIP with a proposal to add predefined > >> "interceptor classes" > >> ([Admin|StreamsConsumer|Consumer|Producer]Interceptor) to improve > >> backward compatibility. Details are on the KIP. > >> > >> > >> I also add a new config for the interceptor class, similar to the > >> existing config for the current client-supplier one. > >> > >> > >> > >> > >> Below more detailed replies for Alieh/Sophie. > >> > >> > >>> What would happen if the implemented class creates a > >>> new instance > >> > >> If KIP-1071 is not used, nothing. It would just work. However, if > >> KIP-1071 is used, it would most likely break KS. -- However, given that > >> we now introduce the new `XxxInterceptor` classes, it would be a little > >> bit harder to return new instances (even if still not impossible...) > >> > >> > >>> in other words, how do we prevent it? > >> > >> As I tried to point out on the KIP, I believe we cannot completely > >> prevent it; we can only make it an API contract and if users violate the > >> API contract, it's their own fault if KS breaks. That is why we need to > >> move off `KafkaClientSupplier` so we can change the API contract w/o > >> breaking anybody. Using the `XxxInterceptor` classes also helps > >> > >> > >> About the MockConsumer: As said, we cannot prevent it and for our own > >> tests we might still need to create a new instance (we would need to let > >> `MockConsumer extend ConsumerInterceptor` but that's easy). For most > >> (all?) tests when we mock a client, we don't really connect to a broker > >> cluster, but it's some unit test. For unit tests, nothing should break. > >> And for actual integration test, I don't think what we would mock > >> clients... > >> > >> > >> > >>> Whatever > >>> we end up doing, if there's a way for someone to implement it > incorrectly > >>> then it would be good to figure out how we can detect this and warn the > >>> user. I'm not quite sure how much of a safety net we can construct here > >> but > >>> we could try to cover some of the obvious misssteps, for example by > >>> asserting that the returned Consumer is not an instance of > KafkaConsumer > >>> unless it's the same exact KafkaConsumer object passed into the > >>> #getMainConsumer call. Not sure if there are other checks we could do? > >> > >> I don't think that is possible by definition. We do pass in > >> `KafkaConsumer` but do expect a `ConsumerInterceptor` object back. The > >> fundamental problem is, that we cannot know if the object we get is an > >> actually wrapper/intercpetor for the passed-in client, or if the object > >> is not wrapper at all, but a `new MyKafkaConsumer extends > >> ConsumerInterceptor {...}` object. -- But I think it's ok if we cannot > >> guard against it fully -- we can also not guard against users messing > >> with KS using reflections... > >> > >> > >> > >>> The real issue with constructing and > >>> returning a different KafkaConsumer instead of wrapping the one passed > >> in, > >>> is that Streams will be invoking some methods on the user's Consumer > >> while > >>> invoking other methods on the original KafkaConsumer. > >> > >> No. All methods of the user's Consumer would be invoked. KS only > >> _constructs_ the clients (to hook into some internals, and "connect" KS > >> runtime code to consumer internals for KIP-1071), but otherwise KS would > >> only used the interceptor for all calls on the client. And with the new > >> `StreamsConsumer` interface, we can add new method in the future. > >> > >> > >> > >>> If every consumer method > >>> call is being made directly on the Consumer returned from > >> #getMainConsumer, > >>> it doesn't really matter if they constructed a different one. > >> > >> If would matter, as KS code would now be "decoupled" from this consumer > >> instance. KS runtime can only tap into the client it does create itself, > >> by using non-public constructor to pass-in non-public "callbacks" which > >> we need to integrate with KIP-1071. (At least for now...) > >> > >> As discussed in person: we might want to make part of these things > >> public, but we don't know yet, which (if any) and how the concrete API > >> should be designed. Thus, we want to keep everything non-public for now, > >> and when KIP-1071 evolves, we will know better what we need. > >> > >> > >> > >>> If this KIP is > >>> going to immediately deprecate the old KafkaClientSupplier, then we > need > >> to > >>> support implementing an interceptor while configured to use the current > >>> rebalancing protocol. > >> > >> I don't think we need to do this, but we could make it mutually > >> exclusive. However, I don't anticipate any issue to actually support the > >> interceptor interface w/ the old protocol. -- And the KIP explicitly > >> disallows to use both supplier and interceptor, and also disallows using > >> KIP-1071 plus supplier. > >> > >> > >> > >>> That is, can we > >>> assume that the KafkaConsumer passed into #getMainConsumer will only be > >>> constructed via the internal constructor/put into "Streams group mode" > if > >>> the app is configured to use the new protocol, and otherwise it will > just > >>> pass in a regular KafkaConsumer? > >> > >> Yes. That's why I don't see any issue as stated above. > >> > >> > >> > >> > >> -Matthias > >> > >> > >> > >> > >> On 9/9/24 10:26 PM, Sophie Blee-Goldman wrote: > >>> I agree with Alieh, there is definitely some inconsistency in what we > do > >> or > >>> don't want to support/allow. However imo we should lean into the more > >>> flexible approach of allowing an alternative consumer to be constructed > >> and > >>> returned, rather than attempting to enforce the interceptor to strictly > >>> wrap the provided consumer. Mainly because the ability to bypass the > >> "real" > >>> KafkaConsumer and mock/stub methods is pretty essential to the > test/mock > >>> consumer use cases. Personally I feel this is a very advanced feature > and > >>> we should let people who know what they're doing implement any kind of > >>> consumer for this, and if they misuse the API and break things then > it's > >> on > >>> them. I feel we can document things clearly enough so that it will be > >>> difficult to shoot yourself in the foot by accident (plus if they do > end > >> up > >>> breaking things eg by constructing a new Consumer and just returning > that > >>> instead, it will at least break quickly and obviously. Figuring out > that > >>> the problem was in your interceptor might be challenging though. > Whatever > >>> we end up doing, if there's a way for someone to implement it > incorrectly > >>> then it would be good to figure out how we can detect this and warn the > >>> user. I'm not quite sure how much of a safety net we can construct here > >> but > >>> we could try to cover some of the obvious misssteps, for example by > >>> asserting that the returned Consumer is not an instance of > KafkaConsumer > >>> unless it's the same exact KafkaConsumer object passed into the > >>> #getMainConsumer call. Not sure if there are other checks we could do? > >>> > >>> I do suspect this problem just goes away if we pivot to the > >> StreamsConsumer > >>> approach we've discussed a bit before. The real issue with constructing > >> and > >>> returning a different KafkaConsumer instead of wrapping the one passed > >> in, > >>> is that Streams will be invoking some methods on the user's Consumer > >> while > >>> invoking other methods on the original KafkaConsumer. Another reason > that > >>> splitting up the consumer into a public interface and a hidden internal > >> one > >>> seems to complicate things for no clear reason. If every consumer > method > >>> call is being made directly on the Consumer returned from > >> #getMainConsumer, > >>> it doesn't really matter if they constructed a different one. In fact > we > >>> could probably just skip having Streams construct its own consumer and > >>> instead let the user construct it or provide an alternative > >> implementation > >>> instead. As I understand it, the only reason we want to create the > >>> underlying consumer within Streams is to access an internal constructor > >>> that puts it into "Streams" mode. But that could be solved in a > different > >>> way, for example if/when Streams invokes a Streams-specific method like > >>> #setProcessId this automatically puts the consumer into Streams mode. > Not > >>> sure if this works since I don't have the full picture with details on > >> this > >>> internal constructor, just feels like that doesn't have to be the only > >>> possible way to do things (not necessarily better ways, just different) > >>> > >>> On a somewhat related note, I had a question about the deprecation > plan. > >> As > >>> I understand it, the stated goal of KIP-1071 is to eventually deprecate > >> and > >>> then remove support for the current rebalancing protocol, but the > >> timeline > >>> is undefined and nothing is being deprecated right now. If this KIP is > >>> going to immediately deprecate the old KafkaClientSupplier, then we > need > >> to > >>> support implementing an interceptor while configured to use the current > >>> rebalancing protocol. Is this reading of the KIP correct? That is, can > we > >>> assume that the KafkaConsumer passed into #getMainConsumer will only be > >>> constructed via the internal constructor/put into "Streams group mode" > if > >>> the app is configured to use the new protocol, and otherwise it will > just > >>> pass in a regular KafkaConsumer? Just want to make sure my > understanding > >> is > >>> correct here > >>> > >>> On Fri, Sep 6, 2024 at 8:29 AM Alieh Saeedi > <asae...@confluent.io.invalid > >>> > >>> wrote: > >>> > >>>> Sorry for the typo: > >>>> contracts = contradicts* > >>>> > >>>> On Fri, Sep 6, 2024 at 5:20 PM Alieh Saeedi <asae...@confluent.io> > >> wrote: > >>>> > >>>>> Thanks Matthias for the KIP. > >>>>> > >>>>> A quick question regarding the sentence `It would be invalid to > create > >> a > >>>>> new client instance `: What would happen if the implemented class > >> creates > >>>>> a new instance, or, in other words, how do we prevent it? Considering > >>>> that > >>>>> `Config` is going to be passed in as well (the 2nd point raised by > >>>> Sophie), > >>>>> Also, the new Consumer object (`mockConsumer`) you created in your > >>>> example > >>>>> here < > https://lists.apache.org/thread/l6dhq1rfl3xkq8g9wfqsvw89yjrgzbn8 > >>> > >>>>> confused me since it contracts with the above sentence. > >>>>> > >>>>> Thanks, > >>>>> Alieh > >>>>> > >>>>> On Fri, Sep 6, 2024 at 5:08 AM Sophie Blee-Goldman < > >>>> sop...@responsive.dev> > >>>>> wrote: > >>>>> > >>>>>> 1. Fair enough. In the end it doesn't matter that much to us, so I > >> just > >>>>>> figured from basic principles if this is a config then it should go > in > >>>> the > >>>>>> StreamsConfig. Also happy to hear what others think > >>>>>> > >>>>>> 2. We need the client-specific configs (for example to extract the > >>>> client > >>>>>> ids for monitoring, resource management, etc). However, I checked > our > >>>>>> implementation and realized it's not enough to just pass them in -- > we > >>>>>> actually want to modify some configs on the fly, before they are > used > >> to > >>>>>> construct the client. If absolutely necessary I'm sure we could > find a > >>>>>> workaround for this by overriding the configs in the original > >>>>>> StreamsConfig > >>>>>> via the client prefixes, but that would be kind of a hassle so I'd > be > >>>>>> happy > >>>>>> if we found a way to maintain the read-write mode for client configs > >> as > >>>>>> well > >>>>>> > >>>>>> 3. Perhaps we do need a PR -- not for the feature implementation > >> itself, > >>>>>> but an example of how this would be used. As I understand it, the > >> whole > >>>>>> point is to be able to cast the client to KafkaConsumer (or some > >>>> consumer > >>>>>> class?) to access its internal methods. What is the specific plan > for > >>>> how > >>>>>> this will work with the interceptors proposed in this KIP? > >>>>>> > >>>>>> I was imagining the idea was that Streams would retain a reference > to > >>>> the > >>>>>> underlying KafkaConsumer (ie what's passed in to the interceptor > >>>> callback) > >>>>>> and it would be invoking those internal methods on that > KafkaConsumer, > >>>>>> while all the public methods are to be invoked on the consumer > >> returned > >>>> by > >>>>>> the interceptor. But as long as some internal methods are being > >> invoked > >>>> on > >>>>>> a consumer then there's a part of Streams that is inherently > dependent > >>>> on > >>>>>> having a valid KafkaConsumer implementation, which is dependent on > >>>> having > >>>>>> a > >>>>>> valid cluster it can connect to...no? > >>>>>> > >>>>>> I really think it would help to at least outline what these internal > >>>>>> methods are. Right now I'm assuming they are going to result in the > >>>>>> consumer sending an RPC or taking some other action that would be > >>>>>> problematic if there was no valid cluster, but maybe that's not the > >>>> case? > >>>>>> Don't want to waste your time because my assumptions are wrong, so > >> some > >>>>>> specifics would help. :) > >>>>>> > >>>>>> On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>>> > >>>>>>> Thanks for the input Sophie. > >>>>>>> > >>>>>>> About (1) -- I am personally happy either way. Let's wait what > others > >>>>>>> prefer (constructor only, config only, both), and I can update the > >> KIP > >>>>>>> later. Does does seem to be a core issue for the KIP. > >>>>>>> > >>>>>>> > >>>>>>> For (2), I guess we can do this, and I am open to add it. Wondering > >> if > >>>>>>> you would need to get access to the client specific config KS > >>>> computes, > >>>>>>> or if we should let `KafkaClientIntercptor implement Configurable` > >> and > >>>>>>> thus add a single `configure(Map)` call passing in the user's > >> provided > >>>>>>> config only once? > >>>>>>> > >>>>>>> > >>>>>>> About (3), I don't follow your concern. As pointed out in the KIP > >>>>>>> itself, we cannot enforce that no MockClient is returned, and for a > >>>> mock > >>>>>>> case, one would not have a broker cluster and not have a fully > >>>> fletched > >>>>>>> KS runtime, thus, just switching out the client should still work > as > >>>>>>> before (to be fair: I don't have full PR for this yet -- if we > think > >>>> it > >>>>>>> would be helpful, I can work more on it an push to GitHub so we > get a > >>>>>>> clearer picture): > >>>>>>> > >>>>>>> MyInterceptor implement KafkaClientInterceptor { > >>>>>>> MockConsumer mockConsumer = new MockConsumer(); > >>>>>>> > >>>>>>> public Consumer wrapConsumer(KafkaConsumer consumer) { > >>>>>>> // this would never make any call into `consumer` at all > >>>>>>> return mockConsumer; > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> Note that the return type is `Consumer`, ie, the top level > interface, > >>>>>>> but not `KafkaConsumer` and thus you can return anything you want. > >>>>>>> > >>>>>>> Similarly, you can intercept any call transparently: > >>>>>>> > >>>>>>> MyInterceptor implement KafkaClientInterceptor { > >>>>>>> > >>>>>>> public Consumer wrapConsumer(KafkaConsumer consumer) { > >>>>>>> return new Consumer { > >>>>>>> @Overrride > >>>>>>> public ConsumerRecords<K, V> poll(final long timeoutMs) { > >>>>>>> // add 500ms to the timeout, and forward the call > >>>>>>> ConsumerRecords r1 = consumer.poll(timeoutMs + 500); > >>>>>>> > >>>>>>> // apply some filtering to modify the result > >>>>>>> Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new > >> ...; > >>>>>>> for(ConsumerRecord r : r1) { > >>>>>>> if (...) { > >>>>>>> r2.put(r1.partition, r1....) > >>>>>>> } > >>>>>>> } > >>>>>>> return new ConsumerRecords(r2); > >>>>>>> } > >>>>>>> > >>>>>>> @Overwrite > >>>>>>> public Map<TopicPartition, Long> > >>>>>>> endOffsets(Collection<TopicPartition> partitions) { > >>>>>>> > >>>>>>> // don't forward call at all > >>>>>>> > >>>>>>> Map offsets = new HashMap<>; > >>>>>>> offset.put(...); // add anything you want > >>>>>>> > >>>>>>> return offsets; > >>>>>>> } > >>>>>>> }; > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> Of course, for a real deployment, one must be careful what to > >>>> intercept, > >>>>>>> forward, or not forward into the actual client, but you folks know > >>>> what > >>>>>>> you are doing to I am not worried about it. In general, yes, if > calls > >>>>>>> are intercepted incorrectly, one could break Kafka Streams, but > this > >>>> is > >>>>>>> true right now, too, so I don't think anything really changes. > >>>>>>> > >>>>>>> I guess, in the end, the new interface allows you to do everything > >> you > >>>>>>> did before, but we still change the API contract a little bit, as > >>>> Kafka > >>>>>>> Streams provides a client instance now. > >>>>>>> > >>>>>>> > >>>>>>> Does this help? > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote: > >>>>>>>> I have one more thing to add/emphasize around point 3) -- I > should > >>>>>>> clarify > >>>>>>>> that the need to modify return values and skip delegated calls are > >>>>>>>> essential to our own client wrappers. In other words, this isn't > >>>>>>> something > >>>>>>>> specific to mock/test clients. Just wanted to point that out so I > >>>>>> didn't > >>>>>>>> accidentally cause you to waste time looking for a workaround for > >>>>>> testing > >>>>>>>> specifically, I was just using mock/test clients as an example > case. > >>>>>>>> > >>>>>>>> For a specific example, we do some things around intercepting seek > >>>>>> calls > >>>>>>> in > >>>>>>>> order to set offsets correctly for our remote stores, such as > >>>>>> overriding > >>>>>>>> the #seekToBeginning so it instead seeks to specific offsets. This > >>>>>> isn't > >>>>>>>> the only thing but I think it showcases clearly how performing the > >>>>>> call > >>>>>>>> being intercepted (in this case a seekToBeginning) would end up > >>>>>>> completely > >>>>>>>> undoing the interceptor's actions (the seek to a specific offset). > >>>>>>>> > >>>>>>>> Hope this makes sense! Thanks > >>>>>>>> > >>>>>>>> On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman < > >>>>>>> sop...@responsive.dev> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Thanks Matthias! > >>>>>>>>> > >>>>>>>>> 1. Imo it makes more sense for the new client interceptor to be > >>>>>>>>> configurable via config and not by KafkaStreams constructor. > Let's > >>>>>> take > >>>>>>> the > >>>>>>>>> opportunity to reduce the API surface area of the already > massively > >>>>>>>>> overloaded KafkaStreams constructor and fix the inclusion of the > >>>>>>>>> KafkaClientSupplier/Interceptor in the primary KafkaStreams > >>>>>> interface. > >>>>>>> We > >>>>>>>>> already started moving in this direction with KIP-884 which added > >>>> the > >>>>>>>>> *default.client.supplier* config. Think it was just an oversight > >>>>>> that we > >>>>>>>>> didn't deprecate the constructors in that same KIP (didn't see > this > >>>>>>>>> mentioned in the rejected alternatives section). > >>>>>>>>> > >>>>>>>>> 2. We need to continue passing in the config map to the > >>>>>>>>> interceptors/wrappers. Happy to elaborate if need be but this is > >>>>>>> absolutely > >>>>>>>>> essential to us :) > >>>>>>>>> > >>>>>>>>> 3. I'm a bit confused about how injecting mock/test clients would > >>>>>> work > >>>>>>>>> with these interceptors. If I understand the proposed API > >>>> correctly, > >>>>>>> these > >>>>>>>>> are simply transparent wrappers that don't allow one to skip the > >>>>>>> delegated > >>>>>>>>> call or mock the returned value. There are many users who plug in > >>>>>> mock > >>>>>>>>> clients to unit test their code without spinning up a broker > >>>>>> (especially > >>>>>>>>> since the EmbeddedKafkaCluster isn't even a public API) and > verify > >>>>>>> things > >>>>>>>>> outside the bounds of the TTD. There are many examples of this in > >>>>>> Kafka > >>>>>>>>> Streams itself -- perhaps you could show an example? > >>>>>>>>> > >>>>>>>>> 4. FYI I am reaching out about whether there are any true custom > >>>>>> client > >>>>>>>>> alternatives out there or in the planning. So far I'm not aware > of > >>>>>> any > >>>>>>> and > >>>>>>>>> we can/should proceed assuming there are none, but I'll update > the > >>>>>>> thread > >>>>>>>>> if I learn about something new here. > >>>>>>>>> > >>>>>>>>> On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org > > > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I would like to start a discussion on KIP-1088: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor > >>>>>>>>>> > >>>>>>>>>> We consider this KIP a requirement/blocker for KIP-1071. > >>>>>>>>>> > >>>>>>>>>> For the beginning of the discussion, I would like to stay > focused > >>>> on > >>>>>>> the > >>>>>>>>>> _how_ and not necessarily talk about names... I am happy to > change > >>>>>> any > >>>>>>>>>> class/methods names, but it might be distracting in the > beginning > >>>> of > >>>>>>> the > >>>>>>>>>> discussion. > >>>>>>>>>> > >>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > >