Hello Gary, This is also doable in the Processor API, you can use the record collector from ProcessorContext to send data to arbitrary topics, i.e.:
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); collector.send(new ProducerRecord<>(topic, *...*), keySerializer, valSerializer, partitioner); But note that if the new topic, e.g. "123456_lv2" is not created, then the send call will thrown an exception unless the borker-side config "auto.topic.create.enabled" is set to true; and even in this case, the topic will be auto-created with the pre-defined number of partitions, i.e. you cannot control how the topics can be created with what configs such as compaction policy, num.partitions, segment sizes, etc. If that works for you then I think it should be fine. Guozhang On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> wrote: > Is it possible, in a kafka streaming job, to write to another topic based > on the key in the messages? > > For example, say the message is: > > 123456#{"id":56789,"type":1} > > where the key is 123456, # is the delimeter, and the {} is the json data. > > And I want to push the json data to another topic that will have the name > 123456_lv2. > > Is this possible with kafka streaming? > -- -- Guozhang