Thanks Matthias for the KIP. A quick question regarding the sentence `It would be invalid to create a new client instance `: What would happen if the implemented class creates a new instance, or, in other words, how do we prevent it? Considering that `Config` is going to be passed in as well (the 2nd point raised by Sophie), Also, the new Consumer object (`mockConsumer`) you created in your example here <https://lists.apache.org/thread/l6dhq1rfl3xkq8g9wfqsvw89yjrgzbn8> confused me since it contracts with the above sentence.
Thanks, Alieh On Fri, Sep 6, 2024 at 5:08 AM Sophie Blee-Goldman <sop...@responsive.dev> wrote: > 1. Fair enough. In the end it doesn't matter that much to us, so I just > figured from basic principles if this is a config then it should go in the > StreamsConfig. Also happy to hear what others think > > 2. We need the client-specific configs (for example to extract the client > ids for monitoring, resource management, etc). However, I checked our > implementation and realized it's not enough to just pass them in -- we > actually want to modify some configs on the fly, before they are used to > construct the client. If absolutely necessary I'm sure we could find a > workaround for this by overriding the configs in the original StreamsConfig > via the client prefixes, but that would be kind of a hassle so I'd be happy > if we found a way to maintain the read-write mode for client configs as > well > > 3. Perhaps we do need a PR -- not for the feature implementation itself, > but an example of how this would be used. As I understand it, the whole > point is to be able to cast the client to KafkaConsumer (or some consumer > class?) to access its internal methods. What is the specific plan for how > this will work with the interceptors proposed in this KIP? > > I was imagining the idea was that Streams would retain a reference to the > underlying KafkaConsumer (ie what's passed in to the interceptor callback) > and it would be invoking those internal methods on that KafkaConsumer, > while all the public methods are to be invoked on the consumer returned by > the interceptor. But as long as some internal methods are being invoked on > a consumer then there's a part of Streams that is inherently dependent on > having a valid KafkaConsumer implementation, which is dependent on having a > valid cluster it can connect to...no? > > I really think it would help to at least outline what these internal > methods are. Right now I'm assuming they are going to result in the > consumer sending an RPC or taking some other action that would be > problematic if there was no valid cluster, but maybe that's not the case? > Don't want to waste your time because my assumptions are wrong, so some > specifics would help. :) > > On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org> wrote: > > > Thanks for the input Sophie. > > > > About (1) -- I am personally happy either way. Let's wait what others > > prefer (constructor only, config only, both), and I can update the KIP > > later. Does does seem to be a core issue for the KIP. > > > > > > For (2), I guess we can do this, and I am open to add it. Wondering if > > you would need to get access to the client specific config KS computes, > > or if we should let `KafkaClientIntercptor implement Configurable` and > > thus add a single `configure(Map)` call passing in the user's provided > > config only once? > > > > > > About (3), I don't follow your concern. As pointed out in the KIP > > itself, we cannot enforce that no MockClient is returned, and for a mock > > case, one would not have a broker cluster and not have a fully fletched > > KS runtime, thus, just switching out the client should still work as > > before (to be fair: I don't have full PR for this yet -- if we think it > > would be helpful, I can work more on it an push to GitHub so we get a > > clearer picture): > > > > MyInterceptor implement KafkaClientInterceptor { > > MockConsumer mockConsumer = new MockConsumer(); > > > > public Consumer wrapConsumer(KafkaConsumer consumer) { > > // this would never make any call into `consumer` at all > > return mockConsumer; > > } > > } > > > > Note that the return type is `Consumer`, ie, the top level interface, > > but not `KafkaConsumer` and thus you can return anything you want. > > > > Similarly, you can intercept any call transparently: > > > > MyInterceptor implement KafkaClientInterceptor { > > > > public Consumer wrapConsumer(KafkaConsumer consumer) { > > return new Consumer { > > @Overrride > > public ConsumerRecords<K, V> poll(final long timeoutMs) { > > // add 500ms to the timeout, and forward the call > > ConsumerRecords r1 = consumer.poll(timeoutMs + 500); > > > > // apply some filtering to modify the result > > Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new ...; > > for(ConsumerRecord r : r1) { > > if (...) { > > r2.put(r1.partition, r1....) > > } > > } > > return new ConsumerRecords(r2); > > } > > > > @Overwrite > > public Map<TopicPartition, Long> > > endOffsets(Collection<TopicPartition> partitions) { > > > > // don't forward call at all > > > > Map offsets = new HashMap<>; > > offset.put(...); // add anything you want > > > > return offsets; > > } > > }; > > } > > } > > > > Of course, for a real deployment, one must be careful what to intercept, > > forward, or not forward into the actual client, but you folks know what > > you are doing to I am not worried about it. In general, yes, if calls > > are intercepted incorrectly, one could break Kafka Streams, but this is > > true right now, too, so I don't think anything really changes. > > > > I guess, in the end, the new interface allows you to do everything you > > did before, but we still change the API contract a little bit, as Kafka > > Streams provides a client instance now. > > > > > > Does this help? > > > > > > -Matthias > > > > > > > > > > On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote: > > > I have one more thing to add/emphasize around point 3) -- I should > > clarify > > > that the need to modify return values and skip delegated calls are > > > essential to our own client wrappers. In other words, this isn't > > something > > > specific to mock/test clients. Just wanted to point that out so I > didn't > > > accidentally cause you to waste time looking for a workaround for > testing > > > specifically, I was just using mock/test clients as an example case. > > > > > > For a specific example, we do some things around intercepting seek > calls > > in > > > order to set offsets correctly for our remote stores, such as > overriding > > > the #seekToBeginning so it instead seeks to specific offsets. This > isn't > > > the only thing but I think it showcases clearly how performing the call > > > being intercepted (in this case a seekToBeginning) would end up > > completely > > > undoing the interceptor's actions (the seek to a specific offset). > > > > > > Hope this makes sense! Thanks > > > > > > On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman < > > sop...@responsive.dev> > > > wrote: > > > > > >> Thanks Matthias! > > >> > > >> 1. Imo it makes more sense for the new client interceptor to be > > >> configurable via config and not by KafkaStreams constructor. Let's > take > > the > > >> opportunity to reduce the API surface area of the already massively > > >> overloaded KafkaStreams constructor and fix the inclusion of the > > >> KafkaClientSupplier/Interceptor in the primary KafkaStreams interface. > > We > > >> already started moving in this direction with KIP-884 which added the > > >> *default.client.supplier* config. Think it was just an oversight that > we > > >> didn't deprecate the constructors in that same KIP (didn't see this > > >> mentioned in the rejected alternatives section). > > >> > > >> 2. We need to continue passing in the config map to the > > >> interceptors/wrappers. Happy to elaborate if need be but this is > > absolutely > > >> essential to us :) > > >> > > >> 3. I'm a bit confused about how injecting mock/test clients would work > > >> with these interceptors. If I understand the proposed API correctly, > > these > > >> are simply transparent wrappers that don't allow one to skip the > > delegated > > >> call or mock the returned value. There are many users who plug in mock > > >> clients to unit test their code without spinning up a broker > (especially > > >> since the EmbeddedKafkaCluster isn't even a public API) and verify > > things > > >> outside the bounds of the TTD. There are many examples of this in > Kafka > > >> Streams itself -- perhaps you could show an example? > > >> > > >> 4. FYI I am reaching out about whether there are any true custom > client > > >> alternatives out there or in the planning. So far I'm not aware of any > > and > > >> we can/should proceed assuming there are none, but I'll update the > > thread > > >> if I learn about something new here. > > >> > > >> On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > >> > > >>> Hi, > > >>> > > >>> I would like to start a discussion on KIP-1088: > > >>> > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor > > >>> > > >>> We consider this KIP a requirement/blocker for KIP-1071. > > >>> > > >>> For the beginning of the discussion, I would like to stay focused on > > the > > >>> _how_ and not necessarily talk about names... I am happy to change > any > > >>> class/methods names, but it might be distracting in the beginning of > > the > > >>> discussion. > > >>> > > >>> Looking forward to your feedback. > > >>> > > >>> > > >>> -Matthias > > >>> > > >> > > > > > >