Thanks Matthias for the KIP.

A quick question regarding the sentence `It would be invalid to create a
new client instance `: What would happen if the implemented class creates a
new instance, or, in other words, how do we prevent it? Considering that
`Config` is going to be passed in as well (the 2nd point raised by Sophie),
Also, the new Consumer object (`mockConsumer`) you created in your example
here <https://lists.apache.org/thread/l6dhq1rfl3xkq8g9wfqsvw89yjrgzbn8>
confused me since it contracts with the above sentence.

Thanks,
Alieh

On Fri, Sep 6, 2024 at 5:08 AM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> 1. Fair enough. In the end it doesn't matter that much to us, so I just
> figured from basic principles if this is a config then it should go in the
> StreamsConfig. Also happy to hear what others think
>
> 2. We need the client-specific configs (for example to extract the client
> ids for monitoring, resource management, etc). However, I checked our
> implementation and realized it's not enough to just pass them in -- we
> actually want to modify some configs on the fly, before they are used to
> construct the client. If absolutely necessary I'm sure we could find a
> workaround for this by overriding the configs in the original StreamsConfig
> via the client prefixes, but that would be kind of a hassle so I'd be happy
> if we found a way to maintain the read-write mode for client configs as
> well
>
> 3. Perhaps we do need a PR -- not for the feature implementation itself,
> but an example of how this would be used. As I understand it, the whole
> point is to be able to cast the client to KafkaConsumer (or some consumer
> class?) to access its internal methods. What is the specific plan for how
> this will work with the interceptors proposed in this KIP?
>
> I was imagining the idea was that Streams would retain a reference to the
> underlying KafkaConsumer (ie what's passed in to the interceptor callback)
> and it would be invoking those internal methods on that KafkaConsumer,
> while all the public methods are to be invoked on the consumer returned by
> the interceptor. But as long as some internal methods are being invoked on
> a consumer then there's a part of Streams that is inherently dependent on
> having a valid KafkaConsumer implementation, which is dependent on having a
> valid cluster it can connect to...no?
>
> I really think it would help to at least outline what these internal
> methods are. Right now I'm assuming they are going to result in the
> consumer sending an RPC or taking some other action that would be
> problematic if there was no valid cluster, but maybe that's not the case?
> Don't want to waste your time because my assumptions are wrong, so some
> specifics would help.  :)
>
> On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks for the input Sophie.
> >
> > About (1) -- I am personally happy either way. Let's wait what others
> > prefer (constructor only, config only, both), and I can update the KIP
> > later. Does does seem to be a core issue for the KIP.
> >
> >
> > For (2), I guess we can do this, and I am open to add it. Wondering if
> > you would need to get access to the client specific config KS computes,
> > or if we should let `KafkaClientIntercptor implement Configurable` and
> > thus add a single `configure(Map)` call passing in the user's provided
> > config only once?
> >
> >
> > About (3), I don't follow your concern. As pointed out in the KIP
> > itself, we cannot enforce that no MockClient is returned, and for a mock
> > case, one would not have a broker cluster and not have a fully fletched
> > KS runtime, thus, just switching out the client should still work as
> > before (to be fair: I don't have full PR for this yet -- if we think it
> > would be helpful, I can work more on it an push to GitHub so we get a
> > clearer picture):
> >
> > MyInterceptor implement KafkaClientInterceptor {
> >    MockConsumer mockConsumer = new MockConsumer();
> >
> >    public Consumer wrapConsumer(KafkaConsumer consumer) {
> >      // this would never make any call into `consumer` at all
> >      return mockConsumer;
> >    }
> > }
> >
> > Note that the return type is `Consumer`, ie, the top level interface,
> > but not `KafkaConsumer` and thus you can return anything you want.
> >
> > Similarly, you can intercept any call transparently:
> >
> > MyInterceptor implement KafkaClientInterceptor {
> >
> >    public Consumer wrapConsumer(KafkaConsumer consumer) {
> >      return new Consumer {
> >        @Overrride
> >        public ConsumerRecords<K, V> poll(final long timeoutMs) {
> >          // add 500ms to the timeout, and forward the call
> >          ConsumerRecords r1 = consumer.poll(timeoutMs + 500);
> >
> >          // apply some filtering to modify the result
> >          Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new ...;
> >          for(ConsumerRecord r : r1) {
> >            if (...) {
> >              r2.put(r1.partition, r1....)
> >            }
> >          }
> >          return new ConsumerRecords(r2);
> >        }
> >
> >        @Overwrite
> >        public Map<TopicPartition, Long>
> > endOffsets(Collection<TopicPartition> partitions) {
> >
> >           // don't forward call at all
> >
> >           Map offsets = new HashMap<>;
> >           offset.put(...); // add anything you want
> >
> >           return offsets;
> >         }
> >      };
> >    }
> > }
> >
> > Of course, for a real deployment, one must be careful what to intercept,
> > forward, or not forward into the actual client, but you folks know what
> > you are doing to I am not worried about it. In general, yes, if calls
> > are intercepted incorrectly, one could break Kafka Streams, but this is
> > true right now, too, so I don't think anything really changes.
> >
> > I guess, in the end, the new interface allows you to do everything you
> > did before, but we still change the API contract a little bit, as Kafka
> > Streams provides a client instance now.
> >
> >
> > Does this help?
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote:
> > > I have one more thing to add/emphasize around point 3)  -- I should
> > clarify
> > > that the need to modify return values and skip delegated calls are
> > > essential to our own client wrappers. In other words, this isn't
> > something
> > > specific to mock/test clients. Just wanted to point that out so I
> didn't
> > > accidentally cause you to waste time looking for a workaround for
> testing
> > > specifically, I was just using mock/test clients as an example case.
> > >
> > > For a specific example, we do some things around intercepting seek
> calls
> > in
> > > order to set offsets correctly for our remote stores, such as
> overriding
> > > the #seekToBeginning so it instead seeks to specific offsets. This
> isn't
> > > the only thing but I think it showcases clearly how performing the call
> > > being intercepted (in this case a seekToBeginning) would end up
> > completely
> > > undoing the interceptor's actions (the seek to a specific offset).
> > >
> > > Hope this makes sense! Thanks
> > >
> > > On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman <
> > sop...@responsive.dev>
> > > wrote:
> > >
> > >> Thanks Matthias!
> > >>
> > >> 1. Imo it makes more sense for the new client interceptor to be
> > >> configurable via config and not by KafkaStreams constructor. Let's
> take
> > the
> > >> opportunity to reduce the API surface area of the already massively
> > >> overloaded KafkaStreams constructor and fix the inclusion of the
> > >> KafkaClientSupplier/Interceptor in the primary KafkaStreams interface.
> > We
> > >> already started moving in this direction with KIP-884 which added the
> > >> *default.client.supplier* config. Think it was just an oversight that
> we
> > >> didn't deprecate the constructors in that same KIP (didn't see this
> > >> mentioned in the rejected alternatives section).
> > >>
> > >> 2. We need to continue passing in the config map to the
> > >> interceptors/wrappers. Happy to elaborate if need be but this is
> > absolutely
> > >> essential to us :)
> > >>
> > >> 3. I'm a bit confused about how injecting mock/test clients would work
> > >> with these interceptors. If I understand the proposed API correctly,
> > these
> > >> are simply transparent wrappers that don't allow one to skip the
> > delegated
> > >> call or mock the returned value. There are many users who plug in mock
> > >> clients to unit test their code without spinning up a broker
> (especially
> > >> since the EmbeddedKafkaCluster isn't even a public API) and verify
> > things
> > >> outside the bounds of the TTD. There are many examples of this in
> Kafka
> > >> Streams itself -- perhaps you could show an example?
> > >>
> > >> 4. FYI I am reaching out about whether there are any true custom
> client
> > >> alternatives out there or in the planning. So far I'm not aware of any
> > and
> > >> we can/should proceed assuming there are none, but I'll update the
> > thread
> > >> if I learn about something new here.
> > >>
> > >> On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I would like to start a discussion on KIP-1088:
> > >>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor
> > >>>
> > >>> We consider this KIP a requirement/blocker for KIP-1071.
> > >>>
> > >>> For the beginning of the discussion, I would like to stay focused on
> > the
> > >>> _how_ and not necessarily talk about names... I am happy to change
> any
> > >>> class/methods names, but it might be distracting in the beginning of
> > the
> > >>> discussion.
> > >>>
> > >>> Looking forward to your feedback.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>
> > >
> >
>

Reply via email to