Thanks Guozhang. I've gotten an example to work using your tips. So, is there no other way in streams to create a topic if "auto.topic.create.enabled" is set to false? Maybe by creating a record in zookeeper for that topic?
On 5 October 2016 at 17:20, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Gary, > > > 1. The InternalTopicManager is only used by the Streams-instantiated > PartitionAssignor to create internal topics for auto-repartitioning and > changelog. > > 2. About "RecordCollector.Supplier": you are right, and as I wrote in the > above email you have to force casting it to RecordCollector.Supplier, > theoretically this is not safe but the internal Impl is always used. > > > If you know before hand of all the possible topics that you would want to > send based on the key-value pair, you can then use KStreams.branch() to > branch the source stream into multiple ones based on the content, with each > branched stream to a different topic. > > > Guozhang > > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gog...@gmail.com> wrote: > > > Guozhang. I was just looking at the source for this, and it looks like > the > > RecordCollector.Supplier is part of the internal ProcessorContextImpl > > class. I don't think that's exposed to me, is it? > > > > If I create a processor class that extends AbstractProcess, it only has > > access to the ProcessorContext interface, which doesn't expose the > > Supplier. > > > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > > > > > What if we were to use kafka connect instead of streams? Does it have > the > > > ability to specify partitions, rf, segment size etc? > > > > > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > > > > > >> Thanks Guozhang. > > >> > > >> So there's no way we could also use InternalTopicManager to specify > the > > >> number of partitions and RF? > > >> > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main > > >> /java/org/apache/kafka/streams/processor/internals/InternalT > > >> opicManager.java > > >> > > >> On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> wrote: > > >> > > >>> Hello Gary, > > >>> > > >>> This is also doable in the Processor API, you can use the record > > >>> collector > > >>> from ProcessorContext to send data to arbitrary topics, i.e.: > > >>> > > >>> RecordCollector collector = ((RecordCollector.Supplier) > > >>> context).recordCollector(); > > >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer, > > >>> valSerializer, partitioner); > > >>> > > >>> > > >>> But note that if the new topic, e.g. "123456_lv2" is not created, > then > > >>> the send call will thrown an exception unless the borker-side config > > >>> "auto.topic.create.enabled" is set to true; and even in this case, > the > > >>> topic will be auto-created with the pre-defined number of partitions, > > >>> i.e. you cannot control how the topics can be created with what > > >>> configs such as compaction policy, num.partitions, segment sizes, > etc. > > >>> If that works for you then I think it should be fine. > > >>> > > >>> > > >>> Guozhang > > >>> > > >>> > > >>> > > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> > wrote: > > >>> > > >>> > Is it possible, in a kafka streaming job, to write to another topic > > >>> based > > >>> > on the key in the messages? > > >>> > > > >>> > For example, say the message is: > > >>> > > > >>> > 123456#{"id":56789,"type":1} > > >>> > > > >>> > where the key is 123456, # is the delimeter, and the {} is the json > > >>> data. > > >>> > > > >>> > And I want to push the json data to another topic that will have > the > > >>> name > > >>> > 123456_lv2. > > >>> > > > >>> > Is this possible with kafka streaming? > > >>> > > > >>> > > >>> > > >>> > > >>> -- > > >>> -- Guozhang > > >>> > > >> > > >> > > > > > > > > > -- > -- Guozhang >