Hey Upesh, are you trying to plug in the custom partitioner via the `partitioner.class` ProducerConfig? That won't work in Streams for the exact reason you highlighted, which is why Streams has its own version of the interface called StreamPartitioner -- this is what you need to implement instead.
Unfortunately there is currently no config for Streams that will be applied across the application, so you will have to plug in the custom partitioner by passing it in directly to the operators. If you look at the various APIs of the DSL you'll notice many have an overload which takes in this parameter (eg see "Produced") As it turns out however I am currently working on a KIP for a default.stream.partitioner config that you will be able to set once rather than carefully passing it in across the topology. I'll take this as good evidence of the usefulness of this feature -- unfortunately you'll have to wait for a bit as it will not be available until version 3.5 most likely. Anyways trying to use the Producer config is an honest mistake, and we don't seem to include it in the documented list of client configs that can't be set in Streams. I've filed a ticket to fix up the docs and also to explicitly log a warning if any of these are set instead of silently ignoring them or flat out breaking as in this case https://issues.apache.org/jira/browse/KAFKA-14404 https://issues.apache.org/jira/browse/KAFKA-14405 On Fri, Nov 18, 2022 at 4:07 PM Upesh Desai <ude...@itrsgroup.com> wrote: > Hello all, > > > > We have been working on implementing a custom partitioner for our producer > within a simple stream application, that will partition the records by a > member field when sending them to the output topic. By looking at the > contract of the partition() method in the Partitioner interface, it would > seem that the value Object would be in its deserialized form when this > method is called: > > > > > > > > > > > > > > */** * Compute the partition for the given record. * * @param topic The > topic name * @param key The key to partition on (or null if no key) * > @param keyBytes The serialized key to partition on( or null if no key) * > @param value The value to partition on or null * @param valueBytes The > serialized value to partition on or null * @param cluster The current > cluster metadata */ *int partition(String topic, Object key, byte[] > keyBytes, Object value, byte[] valueBytes, Cluster cluster); > > > > For a regular producer that’s instantiated, this seems to work correctly. > However, within the RecordCollectorImpl class, we found that in a streams > app, the record key and value are serialized prior to being sent as seen > below: > > > > > > final ProducerRecord<byte[], byte[]> serializedRecord = new > ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers); > > streamsProducer.send(serializedRecord, (metadata, exception) -> { > > > > We didn’t want to have to deserialize the value object again within the > custom partitioner, so is there another way around this? Or is this a bug > within the streams producer code? > > > > Thanks in advance! > > Upesh Desai > <https://www.itrsgroup.com/> > Upesh Desai > Senior Software Developer > *ude...@itrsgroup.com* <ude...@itrsgroup.com> > *www.itrsgroup.com* <https://www.itrsgroup.com/> > Internet communications are not secure and therefore the ITRS Group does > not accept legal responsibility for the contents of this message. Any view > or opinions presented are solely those of the author and do not necessarily > represent those of the ITRS Group unless otherwise specifically stated. > [itrs.email.signature] >