Hi Sophie,

Thanks for the details!

Yeah, I think replacing the contract in the javadocs with a warning is better.


On 20.11.24 10:05, Sophie Blee-Goldman wrote:

Have you considered to add an interface that instead of wrapping a
processor adds methods for pre-processing and post-processing to the

The problem is that we actually want to be able to access all parts of the
ProcessorSupplier. This includes things like inspecting the StoreBuilders
returned by #stores (or even replacing them, eg for testing/debugging), as
well as being able to intercept the processor's #init call and get a handle
on the processor context.

I'm open to suggestions on how to enforce a true "wrapping" of the provided
processor, although as you noted, it could actually be useful to replace it
altogether rather than wrapping it, for testing/debugging scenarios. So I'd
prefer to leave it flexible and not enforce this -- I can update the
javadocs to remove the contract and instead just try to emphasize that
modifying the processor instead of simply wrapping it can lead to
unexpected results and should be done with care.

This is reminiscent of the discussion we had recently around KIP-1088. I
think the general idea holds here as well -- we should not try to enforce a
pure wrapper (either in code or in docs/contract) and just try to
communicate the potential dangers of not doing so.

On Tue, Nov 19, 2024 at 10:58 PM Bruno Cadonna <cado...@apache.org> wrote:

Hi Sophie,

Thanks for the KIP!

I am always a bit skeptical of contracts that are defined in the Java
docs, but not enforced programmatically. I am talking about the contract
that requires the wrapper to return the same processor supplier instance.

Have you considered to add an interface that instead of wrapping a
processor adds methods for pre-processing and post-processing to the

Or do you want to reconsider the contract?

Lucas and discussed it yesterday and Lucas noted that returning a no-op
processor instead of the original processor might be useful for debugging.


On 19.11.24 13:03, Lucas Brutschy wrote:

I'm a bit confused since the motivation promises a cleanup of
topologyconfig/streams config, but this doesn't seem to be part of this
kip. But on the whole, this change looks fine for me.

Thanks for the KIP!


Sophie Blee-Goldman <sop...@responsive.dev> schrieb am Di., 19. Nov.

One more small change -- originally I proposed to only add this config
the TopologyConfig, and not to the StreamsConfig. However while
implementing a POC I noticed that TopologyConfig does not have a
constructor that accepts a plain properties or config map, and only one
that takes in a StreamsConfig. Rather than adding a new constructor for
TopologyConfig I think it makes sense to just add this new config to
StreamsConfig and TopologyConfig, as most people will generally want to
create their TopologyConfig from a shared global StreamsConfig, rather
instantiating separate sets of configs.

I have made this change in the KIP so please take a look and let me
know if
you have any concerns. Happy to discuss alternatives

On Mon, Nov 18, 2024 at 3:27 PM Sophie Blee-Goldman <


Thanks Almog! That makes sense to me, I've updated the KIP so that the
ProcessorWrapper will extend Configurable but gave it a default no-op
implementation so that it's optional.

On Mon, Nov 18, 2024 at 1:57 PM Almog Gavra <almog.ga...@gmail.com>

Thanks Sophie! This KIP will certainly make it easier to implement any
of custom functionality across all processors in the DSL, I can
quite a few use cases for this.

One suggestion, we should consider including a configure() method that
takes in Map<String, ?> configs, so that it can be configured based on
things like application.id (e.g. for emitting custom metrics per

- Almog

On Fri, Nov 15, 2024 at 10:16 PM Sophie Blee-Goldman <

Hey all,

We have a short KIP we'd like to propose which will allow injecting
code modules around the processors of Kafka Streams applications,
DSL-built topologies.

Please let us know if you have any thoughts or concerns




Reply via email to