I see your point. Yeah I think it is a good way to add a Partitioner into addSink(...) but the Partitioner interface in producer is a bit overkill:
"partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)" whereas for us we only want to partition on (K key, V value). Perhaps we should add a new Partitioner interface in Kafka Streams? Guozhang On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> wrote: > This overrides the partitioning logic for all topics, right? That means I > have to explicitly call the default partitioning logic for all topics > except those that my Producer forwards. I’m guess the best way to do by > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > with multiple sinks in my topology, I have to put all of the partitioning > logic inside a single class. > > What would you think about adding an overloaded TopologyBuilder.addSink(…) > method that takes a Partitioner (or better yet a smaller functional > interface). The resulting SinkProcessor could use that Partitioner instance > to set the partition number? That’d be super convenient for users, would > keep the logic where it belongs (where the topology defines the sinks), and > best of all the implementations won't have to worry about any other topics, > such as those used by stores, metrics, or other sinks. > > Best regards, > > Randall > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Hi Randall, > > You can try to set the partitioner class as > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its > interface > can be found in > > org.apache.kafka.clients.producer.Partitioner > > Let me know if it works for you. > > Guozhang > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> wrote: > > > The new streams API added with KIP-28 is great. I’ve been using it on a > > prototype for a few weeks, and I’m looking forward to it being included > in > > 0.9.0. However, at the moment, a Processor implementation is not able to > > specify the partition number when it outputs messages. > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > without knowing all of the history I’m wondering if leaving it out of > the > > API was intentional. > > > > Thoughts? > > > > Best regards, > > > > Randall Hauch > > > > > > -- > -- Guozhang > > -- -- Guozhang