It's a bug in the implementation. It's already fixed for upcoming 2.4.1 and 2.5.0 releases
Cf. https://issues.apache.org/jira/browse/KAFKA-9533 -Matthias On 2/22/20 1:46 AM, Sachin Mittal wrote: > Hi, > The javadoc of this method states: > If the return value of ValueTransformer#transform() > <https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/ValueTransformer.html#transform-V-> > is null, no records are emitted. > > However when I check the underlying processor for the same: > KStreamTransformValuesProcessor#process > It seems to simply forward to downstream with value: > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > > On the other hand if I look at the processor for > > KStream#transform > KStreamFlatTransformProcessor#process > > It has proper code to check for null. > public void process(final KIn key, final VIn value) { > final Iterable<KeyValue<KOut, VOut>> pairs = > transformer.transform(key, value); > if (pairs != null) { > for (final KeyValue<KOut, VOut> pair : pairs) { > context().forward(pair.key, pair.value); > } > } > } > > So I was just wondering if say we want to have say dedup operation, do > we call stream#transform or stream#transformValue would also work if > we return null from the transformer. > > Thanks > Sachin >
signature.asc
Description: OpenPGP digital signature