Hi All, While working with KStream/KStreamImp I discovered that there does not seem to be any way to connect the results of the KStream.process method with a sink node.
I'd like to propose an addition to the API a "processTo" method. I've looked at and used the "transform", "reduceByKey" and "aggregateByKey" methods, but "processTo" would work like a more general purpose collector terminating the KStream and allow for writing out results to an arbitrary topic (regardless of key type). I've done a quick prototype and some initial testing locally on my fork. If you think this could be useful I can add unit tests and create a PR. I've included the proposed code changes and the test driver code below KStream.java additions void processTo(String topic, ProcessorSupplier<K,V> processorSupplier, String... stateStoreNames); void processTo(String topic, ProcessorSupplier<K,V> processorSupplier, Serializer<K> keySerializer, Serializer<V> valSerializer, String... stateStoreNames); KStreamImpl.java additions @Override public void processTo(String topic, ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) { processTo(topic, processorSupplier, null, null, stateStoreNames); } @SuppressWarnings("unchecked") @Override public void processTo(String topic,ProcessorSupplier<K, V> processorSupplier, Serializer<K> keySerializer, Serializer<V> valSerializer, String... stateStoreNames) { String processorName = topology.newName(PROCESSOR_NAME); String sinkName = topology.newName(SINK_NAME); StreamPartitioner<K, V> streamPartitioner = null; if (keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); } topology.addProcessor(processorName, processorSupplier, this.name); topology.addSink(sinkName,topic, keySerializer, valSerializer, streamPartitioner, processorName); topology.connectProcessorAndStateStores(processorName, stateStoreNames); } Test Driver public class TestDriver { public static void main(String[] args) { StreamsConfig config = new StreamsConfig(getProperties()); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String,String> transactionKStream = kStreamBuilder.stream("input"); transactionKStream.processTo("output",UpperCaseProcessor::new); System.out.println("Starting process-to Example"); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,config); kafkaStreams.start(); System.out.println("Now started process-to Example"); } private static class UpperCaseProcessor extends AbstractProcessor<String, String> { @Override public void process(String key, String value) { context().forward(key, value.toUpperCase()); context().commit(); } } private static Properties getProperties() { Properties props = new Properties(); props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test"); props.put("group.id", "test-streams"); props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return props; } }