Thanks Guozhang. So there's no way we could also use InternalTopicManager to specify the number of partitions and RF?
https://github.com/apache/kafka/blob/0.10.1/streams/src/ main/java/org/apache/kafka/streams/processor/internals/ InternalTopicManager.java On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Gary, > > This is also doable in the Processor API, you can use the record collector > from ProcessorContext to send data to arbitrary topics, i.e.: > > RecordCollector collector = ((RecordCollector.Supplier) > context).recordCollector(); > collector.send(new ProducerRecord<>(topic, *...*), keySerializer, > valSerializer, partitioner); > > > But note that if the new topic, e.g. "123456_lv2" is not created, then > the send call will thrown an exception unless the borker-side config > "auto.topic.create.enabled" is set to true; and even in this case, the > topic will be auto-created with the pre-defined number of partitions, > i.e. you cannot control how the topics can be created with what > configs such as compaction policy, num.partitions, segment sizes, etc. > If that works for you then I think it should be fine. > > > Guozhang > > > > On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> wrote: > > > Is it possible, in a kafka streaming job, to write to another topic based > > on the key in the messages? > > > > For example, say the message is: > > > > 123456#{"id":56789,"type":1} > > > > where the key is 123456, # is the delimeter, and the {} is the json data. > > > > And I want to push the json data to another topic that will have the name > > 123456_lv2. > > > > Is this possible with kafka streaming? > > > > > > -- > -- Guozhang >