Thanks Guozhang. I've gotten an example to work using your tips.

So, is there no other way in streams to create a topic if
"auto.topic.create.enabled"
is set to false?  Maybe by creating a record in zookeeper for that topic?



On 5 October 2016 at 17:20, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Gary,
>
>
> 1. The InternalTopicManager is only used by the Streams-instantiated
> PartitionAssignor to create internal topics for auto-repartitioning and
> changelog.
>
> 2. About "RecordCollector.Supplier": you are right, and as I wrote in the
> above email you have to force casting it to RecordCollector.Supplier,
> theoretically this is not safe but the internal Impl is always used.
>
>
> If you know before hand of all the possible topics that you would want to
> send based on the key-value pair, you can then use KStreams.branch() to
> branch the source stream into multiple ones based on the content, with each
> branched stream to a different topic.
>
>
> Guozhang
>
>
> On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gog...@gmail.com> wrote:
>
> > Guozhang. I was just looking at the source for this, and it looks like
> the
> > RecordCollector.Supplier is part of the internal ProcessorContextImpl
> > class.  I don't think that's exposed to me, is it?
> >
> > If I create a processor class that extends AbstractProcess, it only has
> > access to the ProcessorContext interface, which doesn't expose the
> > Supplier.
> >
> > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote:
> >
> > > What if we were to use kafka connect instead of streams? Does it have
> the
> > > ability to specify partitions, rf, segment size etc?
> > >
> > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote:
> > >
> > >> Thanks Guozhang.
> > >>
> > >> So there's no way we could also use InternalTopicManager to specify
> the
> > >> number of partitions and RF?
> > >>
> > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> > >> /java/org/apache/kafka/streams/processor/internals/InternalT
> > >> opicManager.java
> > >>
> > >> On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> wrote:
> > >>
> > >>> Hello Gary,
> > >>>
> > >>> This is also doable in the Processor API, you can use the record
> > >>> collector
> > >>> from ProcessorContext to send data to arbitrary topics, i.e.:
> > >>>
> > >>> RecordCollector collector = ((RecordCollector.Supplier)
> > >>> context).recordCollector();
> > >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
> > >>> valSerializer, partitioner);
> > >>>
> > >>>
> > >>> But note that if the new topic, e.g. "123456_lv2" is not created,
> then
> > >>> the send call will thrown an exception unless the borker-side config
> > >>> "auto.topic.create.enabled" is set to true; and even in this case,
> the
> > >>> topic will be auto-created with the pre-defined number of partitions,
> > >>> i.e. you cannot control how the topics can be created with what
> > >>> configs such as compaction policy, num.partitions, segment sizes,
> etc.
> > >>> If that works for you then I think it should be fine.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com>
> wrote:
> > >>>
> > >>> > Is it possible, in a kafka streaming job, to write to another topic
> > >>> based
> > >>> > on the key in the messages?
> > >>> >
> > >>> > For example, say the message is:
> > >>> >
> > >>> > 123456#{"id":56789,"type":1}
> > >>> >
> > >>> > where the key is 123456, # is the delimeter, and the {} is the json
> > >>> data.
> > >>> >
> > >>> > And I want to push the json data to another topic that will have
> the
> > >>> name
> > >>> > 123456_lv2.
> > >>> >
> > >>> > Is this possible with kafka streaming?
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to