Hi Guozhang,

Possibly, but the use case I'm working with  is having a of collector
object , for aggregate statistics for example, that would output results
intermittently (via punctuate).

The issue for me is that 'transform(..)'  returns a key-value pair for each
message, possibly of a different type.

I've achieved something similar in the KStream api using the form of
 map(...).aggregateByKey(....).to(...)  but using that approach I need to
map each message to an intermediate form and do the periodic aggregations
of "stats" objects.

What I'd really like is a way to attach a sink to a processor.

With that in mind, instead of introducing a "proccessTo" method, another
option could to change the return type of "process" from void to
KStream<K,V>.

Then the use case becomes 'process(..).to(...)', similar to
'transform(..).to(..).

I've made those changes locally and everything compiles fine and running my
simple drive program achieves the desired results.

I know I could be splitting hairs here,  but in my opinion, it would be
nice to have.

Thanks for your time!

Bill


On Sun, Mar 13, 2016 at 4:28 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Bill,
>
> We added transform() together with process() to support any user-customized
> stateful processor that can still concatenate to another KStream.
>
> So for your case, would `transform(...).to(topic)` provide the same
> functionality as "processTo(topic, ...)"?
>
> Guozhang
>
>
> On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Hi All,
> >
> > While working with KStream/KStreamImp I discovered that there does not
> seem
> > to be any way to connect the results of the KStream.process method with a
> > sink node.
> >
> > I'd like to propose an addition to the API a "processTo" method.
> >
> > I've looked at and used the "transform", "reduceByKey" and
> "aggregateByKey"
> >  methods, but "processTo" would work like a more general purpose
> collector
> > terminating the KStream and allow for writing out results to an arbitrary
> > topic (regardless of key type).
> >
> >
> >  I've done a quick prototype and some  initial testing locally on my
> fork.
> > If you think this could be useful I can add unit tests and create a PR.
> > I've included the proposed code changes and the test driver code below
> >
> >
> > KStream.java additions
> >
> > void processTo(String topic,  ProcessorSupplier<K,V> processorSupplier,
> > String... stateStoreNames);
> >
> > void processTo(String topic, ProcessorSupplier<K,V> processorSupplier,
> >  Serializer<K> keySerializer, Serializer<V> valSerializer, String...
> > stateStoreNames);
> >
> >
> > KStreamImpl.java additions
> >
> >  @Override
> >     public void processTo(String topic, ProcessorSupplier<K, V>
> > processorSupplier,  String... stateStoreNames) {
> >         processTo(topic, processorSupplier, null, null, stateStoreNames);
> >     }
> >
> >     @SuppressWarnings("unchecked")
> >     @Override
> >     public void processTo(String topic,ProcessorSupplier<K, V>
> > processorSupplier,  Serializer<K> keySerializer, Serializer<V>
> > valSerializer, String... stateStoreNames) {
> >         String processorName = topology.newName(PROCESSOR_NAME);
> >         String sinkName = topology.newName(SINK_NAME);
> >         StreamPartitioner<K, V> streamPartitioner = null;
> >
> >         if (keySerializer != null && keySerializer instanceof
> > WindowedSerializer) {
> >             WindowedSerializer<Object> windowedSerializer =
> > (WindowedSerializer<Object>) keySerializer;
> >             streamPartitioner = (StreamPartitioner<K, V>) new
> > WindowedStreamPartitioner<Object, V>(windowedSerializer);
> >         }
> >
> >         topology.addProcessor(processorName, processorSupplier,
> this.name
> > );
> >         topology.addSink(sinkName,topic, keySerializer, valSerializer,
> > streamPartitioner, processorName);
> >         topology.connectProcessorAndStateStores(processorName,
> > stateStoreNames);
> >     }
> >
> >
> > Test Driver
> >
> > public class TestDriver {
> >
> >     public static void main(String[] args) {
> >         StreamsConfig config = new StreamsConfig(getProperties());
> >         KStreamBuilder kStreamBuilder = new KStreamBuilder();
> >
> >         KStream<String,String> transactionKStream =
> >  kStreamBuilder.stream("input");
> >
> >         transactionKStream.processTo("output",UpperCaseProcessor::new);
> >
> >         System.out.println("Starting process-to Example");
> >         KafkaStreams kafkaStreams = new
> > KafkaStreams(kStreamBuilder,config);
> >         kafkaStreams.start();
> >         System.out.println("Now started process-to Example");
> >     }
> >
> >     private static class UpperCaseProcessor extends
> > AbstractProcessor<String, String> {
> >         @Override
> >         public void process(String key, String value) {
> >             context().forward(key, value.toUpperCase());
> >             context().commit();
> >         }
> >     }
> >
> >     private static Properties getProperties() {
> >         Properties props = new Properties();
> >         props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
> >         props.put("group.id", "test-streams");
> >         props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
> >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");
> >         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
> >         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > StringSerializer.class);
> >         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > StringSerializer.class);
> >         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > StringDeserializer.class);
> >         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > StringDeserializer.class);
> >         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > WallclockTimestampExtractor.class);
> >         return props;
> >     }
> >
> > }
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to