Hi Guozhang, Possibly, but the use case I'm working with is having a of collector object , for aggregate statistics for example, that would output results intermittently (via punctuate).
The issue for me is that 'transform(..)' returns a key-value pair for each message, possibly of a different type. I've achieved something similar in the KStream api using the form of map(...).aggregateByKey(....).to(...) but using that approach I need to map each message to an intermediate form and do the periodic aggregations of "stats" objects. What I'd really like is a way to attach a sink to a processor. With that in mind, instead of introducing a "proccessTo" method, another option could to change the return type of "process" from void to KStream<K,V>. Then the use case becomes 'process(..).to(...)', similar to 'transform(..).to(..). I've made those changes locally and everything compiles fine and running my simple drive program achieves the desired results. I know I could be splitting hairs here, but in my opinion, it would be nice to have. Thanks for your time! Bill On Sun, Mar 13, 2016 at 4:28 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Bill, > > We added transform() together with process() to support any user-customized > stateful processor that can still concatenate to another KStream. > > So for your case, would `transform(...).to(topic)` provide the same > functionality as "processTo(topic, ...)"? > > Guozhang > > > On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck <bbej...@gmail.com> wrote: > > > Hi All, > > > > While working with KStream/KStreamImp I discovered that there does not > seem > > to be any way to connect the results of the KStream.process method with a > > sink node. > > > > I'd like to propose an addition to the API a "processTo" method. > > > > I've looked at and used the "transform", "reduceByKey" and > "aggregateByKey" > > methods, but "processTo" would work like a more general purpose > collector > > terminating the KStream and allow for writing out results to an arbitrary > > topic (regardless of key type). > > > > > > I've done a quick prototype and some initial testing locally on my > fork. > > If you think this could be useful I can add unit tests and create a PR. > > I've included the proposed code changes and the test driver code below > > > > > > KStream.java additions > > > > void processTo(String topic, ProcessorSupplier<K,V> processorSupplier, > > String... stateStoreNames); > > > > void processTo(String topic, ProcessorSupplier<K,V> processorSupplier, > > Serializer<K> keySerializer, Serializer<V> valSerializer, String... > > stateStoreNames); > > > > > > KStreamImpl.java additions > > > > @Override > > public void processTo(String topic, ProcessorSupplier<K, V> > > processorSupplier, String... stateStoreNames) { > > processTo(topic, processorSupplier, null, null, stateStoreNames); > > } > > > > @SuppressWarnings("unchecked") > > @Override > > public void processTo(String topic,ProcessorSupplier<K, V> > > processorSupplier, Serializer<K> keySerializer, Serializer<V> > > valSerializer, String... stateStoreNames) { > > String processorName = topology.newName(PROCESSOR_NAME); > > String sinkName = topology.newName(SINK_NAME); > > StreamPartitioner<K, V> streamPartitioner = null; > > > > if (keySerializer != null && keySerializer instanceof > > WindowedSerializer) { > > WindowedSerializer<Object> windowedSerializer = > > (WindowedSerializer<Object>) keySerializer; > > streamPartitioner = (StreamPartitioner<K, V>) new > > WindowedStreamPartitioner<Object, V>(windowedSerializer); > > } > > > > topology.addProcessor(processorName, processorSupplier, > this.name > > ); > > topology.addSink(sinkName,topic, keySerializer, valSerializer, > > streamPartitioner, processorName); > > topology.connectProcessorAndStateStores(processorName, > > stateStoreNames); > > } > > > > > > Test Driver > > > > public class TestDriver { > > > > public static void main(String[] args) { > > StreamsConfig config = new StreamsConfig(getProperties()); > > KStreamBuilder kStreamBuilder = new KStreamBuilder(); > > > > KStream<String,String> transactionKStream = > > kStreamBuilder.stream("input"); > > > > transactionKStream.processTo("output",UpperCaseProcessor::new); > > > > System.out.println("Starting process-to Example"); > > KafkaStreams kafkaStreams = new > > KafkaStreams(kStreamBuilder,config); > > kafkaStreams.start(); > > System.out.println("Now started process-to Example"); > > } > > > > private static class UpperCaseProcessor extends > > AbstractProcessor<String, String> { > > @Override > > public void process(String key, String value) { > > context().forward(key, value.toUpperCase()); > > context().commit(); > > } > > } > > > > private static Properties getProperties() { > > Properties props = new Properties(); > > props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test"); > > props.put("group.id", "test-streams"); > > props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test"); > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092"); > > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > "localhost:2181"); > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); > > props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > StringSerializer.class); > > props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > StringSerializer.class); > > props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > StringDeserializer.class); > > props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > StringDeserializer.class); > > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > WallclockTimestampExtractor.class); > > return props; > > } > > > > } > > > > > > -- > -- Guozhang >