Hello Gary,
1. The InternalTopicManager is only used by the Streams-instantiated PartitionAssignor to create internal topics for auto-repartitioning and changelog. 2. About "RecordCollector.Supplier": you are right, and as I wrote in the above email you have to force casting it to RecordCollector.Supplier, theoretically this is not safe but the internal Impl is always used. If you know before hand of all the possible topics that you would want to send based on the key-value pair, you can then use KStreams.branch() to branch the source stream into multiple ones based on the content, with each branched stream to a different topic. Guozhang On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gog...@gmail.com> wrote: > Guozhang. I was just looking at the source for this, and it looks like the > RecordCollector.Supplier is part of the internal ProcessorContextImpl > class. I don't think that's exposed to me, is it? > > If I create a processor class that extends AbstractProcess, it only has > access to the ProcessorContext interface, which doesn't expose the > Supplier. > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > > > What if we were to use kafka connect instead of streams? Does it have the > > ability to specify partitions, rf, segment size etc? > > > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > > > >> Thanks Guozhang. > >> > >> So there's no way we could also use InternalTopicManager to specify the > >> number of partitions and RF? > >> > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main > >> /java/org/apache/kafka/streams/processor/internals/InternalT > >> opicManager.java > >> > >> On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> wrote: > >> > >>> Hello Gary, > >>> > >>> This is also doable in the Processor API, you can use the record > >>> collector > >>> from ProcessorContext to send data to arbitrary topics, i.e.: > >>> > >>> RecordCollector collector = ((RecordCollector.Supplier) > >>> context).recordCollector(); > >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer, > >>> valSerializer, partitioner); > >>> > >>> > >>> But note that if the new topic, e.g. "123456_lv2" is not created, then > >>> the send call will thrown an exception unless the borker-side config > >>> "auto.topic.create.enabled" is set to true; and even in this case, the > >>> topic will be auto-created with the pre-defined number of partitions, > >>> i.e. you cannot control how the topics can be created with what > >>> configs such as compaction policy, num.partitions, segment sizes, etc. > >>> If that works for you then I think it should be fine. > >>> > >>> > >>> Guozhang > >>> > >>> > >>> > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> wrote: > >>> > >>> > Is it possible, in a kafka streaming job, to write to another topic > >>> based > >>> > on the key in the messages? > >>> > > >>> > For example, say the message is: > >>> > > >>> > 123456#{"id":56789,"type":1} > >>> > > >>> > where the key is 123456, # is the delimeter, and the {} is the json > >>> data. > >>> > > >>> > And I want to push the json data to another topic that will have the > >>> name > >>> > 123456_lv2. > >>> > > >>> > Is this possible with kafka streaming? > >>> > > >>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > >> > >> > > > -- -- Guozhang