Re: kafka stream to new topic based on message key

2016-10-07 Thread Michael Noll
Great, happy to hear that, Gary! On Fri, Oct 7, 2016 at 3:30 PM, Gary Ogden wrote: > Thanks for all the help gents. I really appreciate it. It's exactly what I > needed. > > On 7 October 2016 at 06:56, Michael Noll wrote: > > > Gary, > > > > adding to what Guozhang said: Yes, you can programma

Re: kafka stream to new topic based on message key

2016-10-07 Thread Gary Ogden
Thanks for all the help gents. I really appreciate it. It's exactly what I needed. On 7 October 2016 at 06:56, Michael Noll wrote: > Gary, > > adding to what Guozhang said: Yes, you can programmatically create a new > Kafka topic from within your application. But how you'd do that varies a > b

Re: kafka stream to new topic based on message key

2016-10-07 Thread Michael Noll
Gary, adding to what Guozhang said: Yes, you can programmatically create a new Kafka topic from within your application. But how you'd do that varies a bit between current Kafka versions and the upcoming 0.10.1 release. As of today (Kafka versions before the upcoming 0.10.1 release), you would

Re: kafka stream to new topic based on message key

2016-10-06 Thread Guozhang Wang
If you can create a ZK client inside your processor implementation then you can definitely to create any topics by talking to ZK directly, it's just that Kafka Streams public interface does not expose any efficient ways beyond that for now. Note that in KIP-4 we are trying to introduce the admin c

Re: kafka stream to new topic based on message key

2016-10-06 Thread Gary Ogden
Thanks Guozhang. I've gotten an example to work using your tips. So, is there no other way in streams to create a topic if "auto.topic.create.enabled" is set to false? Maybe by creating a record in zookeeper for that topic? On 5 October 2016 at 17:20, Guozhang Wang wrote: > Hello Gary, > > >

Re: kafka stream to new topic based on message key

2016-10-05 Thread Guozhang Wang
Hello Gary, 1. The InternalTopicManager is only used by the Streams-instantiated PartitionAssignor to create internal topics for auto-repartitioning and changelog. 2. About "RecordCollector.Supplier": you are right, and as I wrote in the above email you have to force casting it to RecordCollecto

Re: kafka stream to new topic based on message key

2016-10-05 Thread Gary Ogden
Guozhang. I was just looking at the source for this, and it looks like the RecordCollector.Supplier is part of the internal ProcessorContextImpl class. I don't think that's exposed to me, is it? If I create a processor class that extends AbstractProcess, it only has access to the ProcessorContext

Re: kafka stream to new topic based on message key

2016-10-05 Thread Gary Ogden
What if we were to use kafka connect instead of streams? Does it have the ability to specify partitions, rf, segment size etc? On 5 October 2016 at 09:42, Gary Ogden wrote: > Thanks Guozhang. > > So there's no way we could also use InternalTopicManager to specify the > number of partitions and R

Re: kafka stream to new topic based on message key

2016-10-05 Thread Gary Ogden
Thanks Guozhang. So there's no way we could also use InternalTopicManager to specify the number of partitions and RF? https://github.com/apache/kafka/blob/0.10.1/streams/src/ main/java/org/apache/kafka/streams/processor/internals/ InternalTopicManager.java On 4 October 2016 at 19:34, Guozhang Wa

Re: kafka stream to new topic based on message key

2016-10-04 Thread Guozhang Wang
Hello Gary, This is also doable in the Processor API, you can use the record collector from ProcessorContext to send data to arbitrary topics, i.e.: RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); collector.send(new ProducerRecord<>(topic, *...*), keySerializer

kafka stream to new topic based on message key

2016-10-04 Thread Gary Ogden
Is it possible, in a kafka streaming job, to write to another topic based on the key in the messages? For example, say the message is: 123456#{"id":56789,"type":1} where the key is 123456, # is the delimeter, and the {} is the json data. And I want to push the json data to another topic that wi