1. Fair enough. In the end it doesn't matter that much to us, so I just
figured from basic principles if this is a config then it should go in the
StreamsConfig. Also happy to hear what others think

2. We need the client-specific configs (for example to extract the client
ids for monitoring, resource management, etc). However, I checked our
implementation and realized it's not enough to just pass them in -- we
actually want to modify some configs on the fly, before they are used to
construct the client. If absolutely necessary I'm sure we could find a
workaround for this by overriding the configs in the original StreamsConfig
via the client prefixes, but that would be kind of a hassle so I'd be happy
if we found a way to maintain the read-write mode for client configs as well

3. Perhaps we do need a PR -- not for the feature implementation itself,
but an example of how this would be used. As I understand it, the whole
point is to be able to cast the client to KafkaConsumer (or some consumer
class?) to access its internal methods. What is the specific plan for how
this will work with the interceptors proposed in this KIP?

I was imagining the idea was that Streams would retain a reference to the
underlying KafkaConsumer (ie what's passed in to the interceptor callback)
and it would be invoking those internal methods on that KafkaConsumer,
while all the public methods are to be invoked on the consumer returned by
the interceptor. But as long as some internal methods are being invoked on
a consumer then there's a part of Streams that is inherently dependent on
having a valid KafkaConsumer implementation, which is dependent on having a
valid cluster it can connect to...no?

I really think it would help to at least outline what these internal
methods are. Right now I'm assuming they are going to result in the
consumer sending an RPC or taking some other action that would be
problematic if there was no valid cluster, but maybe that's not the case?
Don't want to waste your time because my assumptions are wrong, so some
specifics would help.  :)

On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for the input Sophie.
>
> About (1) -- I am personally happy either way. Let's wait what others
> prefer (constructor only, config only, both), and I can update the KIP
> later. Does does seem to be a core issue for the KIP.
>
>
> For (2), I guess we can do this, and I am open to add it. Wondering if
> you would need to get access to the client specific config KS computes,
> or if we should let `KafkaClientIntercptor implement Configurable` and
> thus add a single `configure(Map)` call passing in the user's provided
> config only once?
>
>
> About (3), I don't follow your concern. As pointed out in the KIP
> itself, we cannot enforce that no MockClient is returned, and for a mock
> case, one would not have a broker cluster and not have a fully fletched
> KS runtime, thus, just switching out the client should still work as
> before (to be fair: I don't have full PR for this yet -- if we think it
> would be helpful, I can work more on it an push to GitHub so we get a
> clearer picture):
>
> MyInterceptor implement KafkaClientInterceptor {
>    MockConsumer mockConsumer = new MockConsumer();
>
>    public Consumer wrapConsumer(KafkaConsumer consumer) {
>      // this would never make any call into `consumer` at all
>      return mockConsumer;
>    }
> }
>
> Note that the return type is `Consumer`, ie, the top level interface,
> but not `KafkaConsumer` and thus you can return anything you want.
>
> Similarly, you can intercept any call transparently:
>
> MyInterceptor implement KafkaClientInterceptor {
>
>    public Consumer wrapConsumer(KafkaConsumer consumer) {
>      return new Consumer {
>        @Overrride
>        public ConsumerRecords<K, V> poll(final long timeoutMs) {
>          // add 500ms to the timeout, and forward the call
>          ConsumerRecords r1 = consumer.poll(timeoutMs + 500);
>
>          // apply some filtering to modify the result
>          Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new ...;
>          for(ConsumerRecord r : r1) {
>            if (...) {
>              r2.put(r1.partition, r1....)
>            }
>          }
>          return new ConsumerRecords(r2);
>        }
>
>        @Overwrite
>        public Map<TopicPartition, Long>
> endOffsets(Collection<TopicPartition> partitions) {
>
>           // don't forward call at all
>
>           Map offsets = new HashMap<>;
>           offset.put(...); // add anything you want
>
>           return offsets;
>         }
>      };
>    }
> }
>
> Of course, for a real deployment, one must be careful what to intercept,
> forward, or not forward into the actual client, but you folks know what
> you are doing to I am not worried about it. In general, yes, if calls
> are intercepted incorrectly, one could break Kafka Streams, but this is
> true right now, too, so I don't think anything really changes.
>
> I guess, in the end, the new interface allows you to do everything you
> did before, but we still change the API contract a little bit, as Kafka
> Streams provides a client instance now.
>
>
> Does this help?
>
>
> -Matthias
>
>
>
>
> On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote:
> > I have one more thing to add/emphasize around point 3)  -- I should
> clarify
> > that the need to modify return values and skip delegated calls are
> > essential to our own client wrappers. In other words, this isn't
> something
> > specific to mock/test clients. Just wanted to point that out so I didn't
> > accidentally cause you to waste time looking for a workaround for testing
> > specifically, I was just using mock/test clients as an example case.
> >
> > For a specific example, we do some things around intercepting seek calls
> in
> > order to set offsets correctly for our remote stores, such as overriding
> > the #seekToBeginning so it instead seeks to specific offsets. This isn't
> > the only thing but I think it showcases clearly how performing the call
> > being intercepted (in this case a seekToBeginning) would end up
> completely
> > undoing the interceptor's actions (the seek to a specific offset).
> >
> > Hope this makes sense! Thanks
> >
> > On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman <
> sop...@responsive.dev>
> > wrote:
> >
> >> Thanks Matthias!
> >>
> >> 1. Imo it makes more sense for the new client interceptor to be
> >> configurable via config and not by KafkaStreams constructor. Let's take
> the
> >> opportunity to reduce the API surface area of the already massively
> >> overloaded KafkaStreams constructor and fix the inclusion of the
> >> KafkaClientSupplier/Interceptor in the primary KafkaStreams interface.
> We
> >> already started moving in this direction with KIP-884 which added the
> >> *default.client.supplier* config. Think it was just an oversight that we
> >> didn't deprecate the constructors in that same KIP (didn't see this
> >> mentioned in the rejected alternatives section).
> >>
> >> 2. We need to continue passing in the config map to the
> >> interceptors/wrappers. Happy to elaborate if need be but this is
> absolutely
> >> essential to us :)
> >>
> >> 3. I'm a bit confused about how injecting mock/test clients would work
> >> with these interceptors. If I understand the proposed API correctly,
> these
> >> are simply transparent wrappers that don't allow one to skip the
> delegated
> >> call or mock the returned value. There are many users who plug in mock
> >> clients to unit test their code without spinning up a broker (especially
> >> since the EmbeddedKafkaCluster isn't even a public API) and verify
> things
> >> outside the bounds of the TTD. There are many examples of this in Kafka
> >> Streams itself -- perhaps you could show an example?
> >>
> >> 4. FYI I am reaching out about whether there are any true custom client
> >> alternatives out there or in the planning. So far I'm not aware of any
> and
> >> we can/should proceed assuming there are none, but I'll update the
> thread
> >> if I learn about something new here.
> >>
> >> On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> I would like to start a discussion on KIP-1088:
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor
> >>>
> >>> We consider this KIP a requirement/blocker for KIP-1071.
> >>>
> >>> For the beginning of the discussion, I would like to stay focused on
> the
> >>> _how_ and not necessarily talk about names... I am happy to change any
> >>> class/methods names, but it might be distracting in the beginning of
> the
> >>> discussion.
> >>>
> >>> Looking forward to your feedback.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>
> >
>

Reply via email to