[ https://issues.apache.org/jira/browse/KAFKA-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447393#comment-17447393 ]
Anders Kirkeby edited comment on KAFKA-10603 at 11/22/21, 1:26 PM: ------------------------------------------------------------------- With the deprecation of the _org.apache.kafka.streams.processor.Processor,_ we see issues when combining the Streams DSL with the Topology#addSink methods. I've added two snippets to showcase how we used in Kafka 2.8.1 and where it fails with the new APIs introduced in Kafka 3.0.0. The latter example fails as the Void, Void (KOut, VOut) requirements makes it so we can only forward Void, Void in the context.forward method. Simply removing the Void, Void requirement on the KStream#process should fix this. h3. KafkaStreams 2.x {code:java} import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.processor.*; import org.apache.kafka.streams.state.KeyValueStore; import java.time.Duration; class CleanupProcessor implements Processor<String, Integer> { @Override public void init(ProcessorContext context) { KeyValueStore<String, Integer> store = context.getStateStore("store"); Punctuator punctuator = timestamp -> store.all().forEachRemaining((record) -> { if (record.value > 20) { context.forward(record.key, null, To.child("tombstone-sink")); } else { store.put(record.key, record.value + 1); } }); context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, punctuator); } @Override public void process(String key, Integer value) {} // do nothing @Override public void close() {} } class Scratch { public static void main(String[] args) { StreamsBuilder sb = new StreamsBuilder(); // create a materialized store KTable<String, Integer> table = sb.table("input-topic", Materialized.as("store")); // attach the store to the cleanup processor table.toStream().process(CleanupProcessor::new, Named.as("tombstone-processor"), "store"); // build the topology Topology topology = sb.build(); // add a sink to the topology with references to the dsl-processor topology.addSink("tombstone-sink", "output-topic", "tombstone-processor"); System.out.println(topology.describe()); } } {code} h3. KafkaStreams 3.x {code:java} import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import java.time.Duration; class CleanupProcessor implements Processor<String, Integer, Void, Void> { @Override public void init(ProcessorContext<Void, Void> context) { KeyValueStore<String, Integer> store = context.getStateStore("store"); Punctuator punctuator = timestamp -> store.all().forEachRemaining((record) -> { if (record.value > 20) { context.forward(new Record<>(record.key, null, timestamp), "tombstone-sink"); // this fails as we can only forward Void, Void records } else { store.put(record.key, record.value + 1); } }); context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, punctuator); } @Override public void process(Record<String, Integer> record) { } // do nothing @Override public void close() { } } class Scratch { public static void main(String[] args) { StreamsBuilder sb = new StreamsBuilder(); // create a materialized store KTable<String, Integer> table = sb.table("input-topic", Materialized.as("store")); // attach the store to the cleanup processor table.toStream().process(CleanupProcessor::new, Named.as("tombstone-processor"), "store"); // build the topology Topology topology = sb.build(); // add a sink to the topology with references to the dsl-processor topology.addSink("tombstone-sink", "output-topic", "tombstone-processor"); System.out.println(topology.describe()); } }{code} was (Author: JIRAUSER280618): With the deprecation of the _org.apache.kafka.streams.processor.Processor,_ we see issues when combining the Streams DSL with the Topology#addSink methods. I've added two snippets to showcase how we used in Kafka 2.8.1 and where it fails with the new APIs introduced in Kafka 3.0.0. The latter example fails as the Void, Void (KOut, VOut) requirements makes it so we can only forward Void, Void in the context.forward method. Simply removing the Void, Void requirement on the KStream#process should fix this. h3. KafkaStreams 2.x {code:java} import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.processor.*; import org.apache.kafka.streams.state.KeyValueStore; import java.time.Duration; class CleanupProcessor implements Processor<String, Integer> { @Override public void init(ProcessorContext context) { KeyValueStore<String, Integer> store = context.getStateStore("store"); Punctuator punctuator = timestamp -> store.all().forEachRemaining((record) -> { if (record.value > 20) { context.forward(record.key, null, To.child("tombstone-sink")); } else { store.put(record.key, record.value + 1); } }); context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, punctuator); } @Override public void process(String key, Integer value) {} // do nothing @Override public void close() {} } class Scratch { public static void main(String[] args) { StreamsBuilder sb = new StreamsBuilder(); // create a materialized store KTable<String, Integer> table = sb.table("input-topic", Materialized.as("store")); // attach the store to the cleanup processor table.toStream().process(CleanupProcessor::new, Named.as("tombstone-processor"), "store"); // build the topology Topology topology = sb.build(); // add a sink to the topology with references to the dsl-processor topology.addSink("tombstone-sink", "output-topic", "tombstone-processor"); System.out.println(topology.describe()); } } {code} h3. KafkaStreams 3.x {code:java} import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import java.time.Duration; class CleanupProcessor implements Processor<String, Integer, Void, Void> { @Override public void init(ProcessorContext<Void, Void> context) { KeyValueStore<String, Integer> store = context.getStateStore("store"); Punctuator punctuator = timestamp -> store.all().forEachRemaining((record) -> { if (record.value > 20) { context.forward(new Record<>(record.key, null, timestamp), "tombstone-sink"); } else { store.put(record.key, record.value + 1); } }); context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, punctuator); } @Override public void process(Record<String, Integer> record) { } // do nothing @Override public void close() { } } class Scratch { public static void main(String[] args) { StreamsBuilder sb = new StreamsBuilder(); // create a materialized store KTable<String, Integer> table = sb.table("input-topic", Materialized.as("store")); // attach the store to the cleanup processor table.toStream().process(CleanupProcessor::new, Named.as("tombstone-processor"), "store"); // build the topology Topology topology = sb.build(); // add a sink to the topology with references to the dsl-processor topology.addSink("tombstone-sink", "output-topic", "tombstone-processor"); System.out.println(topology.describe()); } }{code} > Re-design KStream.process() and K*.transform*() operations > ---------------------------------------------------------- > > Key: KAFKA-10603 > URL: https://issues.apache.org/jira/browse/KAFKA-10603 > Project: Kafka > Issue Type: New Feature > Reporter: John Roesler > Priority: Major > Labels: needs-kip > > After the implementation of KIP-478, we have the ability to reconsider all > these APIs, and maybe just replace them with > {code:java} > // KStream > KStream<KOut, VOut> process(ProcessorSupplier<KIn, VIn, KOut, VOut>) > // KTable > KTable<KOut, VOut> process(ProcessorSupplier<KIn, VIn, KOut, VOut>){code} > > but it needs more thought and a KIP for sure. > > This ticket probably supercedes > https://issues.apache.org/jira/browse/KAFKA-8396 -- This message was sent by Atlassian Jira (v8.20.1#820001)