Hi All,

While working with KStream/KStreamImp I discovered that there does not seem
to be any way to connect the results of the KStream.process method with a
sink node.

I'd like to propose an addition to the API a "processTo" method.

I've looked at and used the "transform", "reduceByKey" and "aggregateByKey"
 methods, but "processTo" would work like a more general purpose collector
terminating the KStream and allow for writing out results to an arbitrary
topic (regardless of key type).


 I've done a quick prototype and some  initial testing locally on my fork.
If you think this could be useful I can add unit tests and create a PR.
I've included the proposed code changes and the test driver code below


KStream.java additions

void processTo(String topic,  ProcessorSupplier<K,V> processorSupplier,
String... stateStoreNames);

void processTo(String topic, ProcessorSupplier<K,V> processorSupplier,
 Serializer<K> keySerializer, Serializer<V> valSerializer, String...
stateStoreNames);


KStreamImpl.java additions

 @Override
    public void processTo(String topic, ProcessorSupplier<K, V>
processorSupplier,  String... stateStoreNames) {
        processTo(topic, processorSupplier, null, null, stateStoreNames);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void processTo(String topic,ProcessorSupplier<K, V>
processorSupplier,  Serializer<K> keySerializer, Serializer<V>
valSerializer, String... stateStoreNames) {
        String processorName = topology.newName(PROCESSOR_NAME);
        String sinkName = topology.newName(SINK_NAME);
        StreamPartitioner<K, V> streamPartitioner = null;

        if (keySerializer != null && keySerializer instanceof
WindowedSerializer) {
            WindowedSerializer<Object> windowedSerializer =
(WindowedSerializer<Object>) keySerializer;
            streamPartitioner = (StreamPartitioner<K, V>) new
WindowedStreamPartitioner<Object, V>(windowedSerializer);
        }

        topology.addProcessor(processorName, processorSupplier, this.name);
        topology.addSink(sinkName,topic, keySerializer, valSerializer,
streamPartitioner, processorName);
        topology.connectProcessorAndStateStores(processorName,
stateStoreNames);
    }


Test Driver

public class TestDriver {

    public static void main(String[] args) {
        StreamsConfig config = new StreamsConfig(getProperties());
        KStreamBuilder kStreamBuilder = new KStreamBuilder();

        KStream<String,String> transactionKStream =
 kStreamBuilder.stream("input");

        transactionKStream.processTo("output",UpperCaseProcessor::new);

        System.out.println("Starting process-to Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,config);
        kafkaStreams.start();
        System.out.println("Now started process-to Example");
    }

    private static class UpperCaseProcessor extends
AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            context().forward(key, value.toUpperCase());
            context().commit();
        }
    }

    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
        props.put("group.id", "test-streams");
        props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class);
        return props;
    }

}

Reply via email to