Guozhang. I was just looking at the source for this, and it looks like the RecordCollector.Supplier is part of the internal ProcessorContextImpl class. I don't think that's exposed to me, is it?
If I create a processor class that extends AbstractProcess, it only has access to the ProcessorContext interface, which doesn't expose the Supplier. On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > What if we were to use kafka connect instead of streams? Does it have the > ability to specify partitions, rf, segment size etc? > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > >> Thanks Guozhang. >> >> So there's no way we could also use InternalTopicManager to specify the >> number of partitions and RF? >> >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main >> /java/org/apache/kafka/streams/processor/internals/InternalT >> opicManager.java >> >> On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hello Gary, >>> >>> This is also doable in the Processor API, you can use the record >>> collector >>> from ProcessorContext to send data to arbitrary topics, i.e.: >>> >>> RecordCollector collector = ((RecordCollector.Supplier) >>> context).recordCollector(); >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer, >>> valSerializer, partitioner); >>> >>> >>> But note that if the new topic, e.g. "123456_lv2" is not created, then >>> the send call will thrown an exception unless the borker-side config >>> "auto.topic.create.enabled" is set to true; and even in this case, the >>> topic will be auto-created with the pre-defined number of partitions, >>> i.e. you cannot control how the topics can be created with what >>> configs such as compaction policy, num.partitions, segment sizes, etc. >>> If that works for you then I think it should be fine. >>> >>> >>> Guozhang >>> >>> >>> >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> wrote: >>> >>> > Is it possible, in a kafka streaming job, to write to another topic >>> based >>> > on the key in the messages? >>> > >>> > For example, say the message is: >>> > >>> > 123456#{"id":56789,"type":1} >>> > >>> > where the key is 123456, # is the delimeter, and the {} is the json >>> data. >>> > >>> > And I want to push the json data to another topic that will have the >>> name >>> > 123456_lv2. >>> > >>> > Is this possible with kafka streaming? >>> > >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >