Thanks for all the help gents. I really appreciate it. It's exactly what I needed.
On 7 October 2016 at 06:56, Michael Noll <mich...@confluent.io> wrote: > Gary, > > adding to what Guozhang said: Yes, you can programmatically create a new > Kafka topic from within your application. But how you'd do that varies a > bit between current Kafka versions and the upcoming 0.10.1 release. > > As of today (Kafka versions before the upcoming 0.10.1 release), you would > need to create your topic manually. This can be on the CLI (which doesn't > help you in your scenario) or programmatically. Right now programmatically > means you must directly talk to ZooKeeper, e.g. via zkclient. If you are > looking for an example, the code at [1] may be helpful. That code creates > a topic in ZK by using zkclient and Kafka's `AdminUtils`. > > Looking ahead, Kafka's upcoming 0.10.1 release introduces an admin client > for creating/deleting topics (this new functionality is part of the > not-yet-fully-completed work on KIP-4). This would give you a new > programmatic approach to create a topic without having to communicate with > ZooKeeper directly. > > Lastly, keep in mind that it takes a while for a Kafka topic to be > created. So you may run into race condition like situations. You may > therefore want to double-check that the newly created topic is actually > ready-to-use before beginning to write to it or read from it. > > Hope this helps! > Michael > > > > > [1] > https://github.com/confluentinc/examples/blob/ > master/kafka-streams/src/test/java/io/confluent/examples/ > streams/kafka/KafkaEmbedded.java#L133-L160 > > > > > > > > On Fri, Oct 7, 2016 at 8:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > If you can create a ZK client inside your processor implementation then > you > > can definitely to create any topics by talking to ZK directly, it's just > > that Kafka Streams public interface does not expose any efficient ways > > beyond that for now. > > > > Note that in KIP-4 we are trying to introduce the admin client for such > > tasks such as create / delete topics, it has added such requests in the > > upcoming 0.10.1.0 release, but the full implementation is yet to be > > completed. > > > > > > Guozhang > > > > > > On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden <gog...@gmail.com> wrote: > > > > > Thanks Guozhang. I've gotten an example to work using your tips. > > > > > > So, is there no other way in streams to create a topic if > > > "auto.topic.create.enabled" > > > is set to false? Maybe by creating a record in zookeeper for that > topic? > > > > > > > > > > > > On 5 October 2016 at 17:20, Guozhang Wang <wangg...@gmail.com> wrote: > > > > > > > Hello Gary, > > > > > > > > > > > > 1. The InternalTopicManager is only used by the Streams-instantiated > > > > PartitionAssignor to create internal topics for auto-repartitioning > and > > > > changelog. > > > > > > > > 2. About "RecordCollector.Supplier": you are right, and as I wrote in > > the > > > > above email you have to force casting it to RecordCollector.Supplier, > > > > theoretically this is not safe but the internal Impl is always used. > > > > > > > > > > > > If you know before hand of all the possible topics that you would > want > > to > > > > send based on the key-value pair, you can then use KStreams.branch() > to > > > > branch the source stream into multiple ones based on the content, > with > > > each > > > > branched stream to a different topic. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden <gog...@gmail.com> wrote: > > > > > > > > > Guozhang. I was just looking at the source for this, and it looks > > like > > > > the > > > > > RecordCollector.Supplier is part of the internal > ProcessorContextImpl > > > > > class. I don't think that's exposed to me, is it? > > > > > > > > > > If I create a processor class that extends AbstractProcess, it only > > has > > > > > access to the ProcessorContext interface, which doesn't expose the > > > > > Supplier. > > > > > > > > > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > > > > > > > > > > > What if we were to use kafka connect instead of streams? Does it > > have > > > > the > > > > > > ability to specify partitions, rf, segment size etc? > > > > > > > > > > > > On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote: > > > > > > > > > > > >> Thanks Guozhang. > > > > > >> > > > > > >> So there's no way we could also use InternalTopicManager to > > specify > > > > the > > > > > >> number of partitions and RF? > > > > > >> > > > > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main > > > > > >> /java/org/apache/kafka/streams/processor/internals/InternalT > > > > > >> opicManager.java > > > > > >> > > > > > >> On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > >> > > > > > >>> Hello Gary, > > > > > >>> > > > > > >>> This is also doable in the Processor API, you can use the > record > > > > > >>> collector > > > > > >>> from ProcessorContext to send data to arbitrary topics, i.e.: > > > > > >>> > > > > > >>> RecordCollector collector = ((RecordCollector.Supplier) > > > > > >>> context).recordCollector(); > > > > > >>> collector.send(new ProducerRecord<>(topic, *...*), > keySerializer, > > > > > >>> valSerializer, partitioner); > > > > > >>> > > > > > >>> > > > > > >>> But note that if the new topic, e.g. "123456_lv2" is not > created, > > > > then > > > > > >>> the send call will thrown an exception unless the borker-side > > > config > > > > > >>> "auto.topic.create.enabled" is set to true; and even in this > > case, > > > > the > > > > > >>> topic will be auto-created with the pre-defined number of > > > partitions, > > > > > >>> i.e. you cannot control how the topics can be created with what > > > > > >>> configs such as compaction policy, num.partitions, segment > sizes, > > > > etc. > > > > > >>> If that works for you then I think it should be fine. > > > > > >>> > > > > > >>> > > > > > >>> Guozhang > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> > > > > wrote: > > > > > >>> > > > > > >>> > Is it possible, in a kafka streaming job, to write to another > > > topic > > > > > >>> based > > > > > >>> > on the key in the messages? > > > > > >>> > > > > > > >>> > For example, say the message is: > > > > > >>> > > > > > > >>> > 123456#{"id":56789,"type":1} > > > > > >>> > > > > > > >>> > where the key is 123456, # is the delimeter, and the {} is > the > > > json > > > > > >>> data. > > > > > >>> > > > > > > >>> > And I want to push the json data to another topic that will > > have > > > > the > > > > > >>> name > > > > > >>> > 123456_lv2. > > > > > >>> > > > > > > >>> > Is this possible with kafka streaming? > > > > > >>> > > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> -- > > > > > >>> -- Guozhang > > > > > >>> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > >