I see your point. Yeah I think it is a good way to add a Partitioner into
addSink(...) but the Partitioner interface in producer is a bit overkill:

"partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster)"

whereas for us we only want to partition on (K key, V value).

Perhaps we should add a new Partitioner interface in Kafka Streams?

Guozhang

On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> wrote:

> This overrides the partitioning logic for all topics, right? That means I
> have to explicitly call the default partitioning logic for all topics
> except those that my Producer forwards. I’m guess the best way to do by
> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
> with multiple sinks in my topology, I have to put all of the partitioning
> logic inside a single class.
>
> What would you think about adding an overloaded TopologyBuilder.addSink(…)
> method that takes a Partitioner (or better yet a smaller functional
> interface). The resulting SinkProcessor could use that Partitioner instance
> to set the partition number? That’d be super convenient for users, would
> keep the logic where it belongs (where the topology defines the sinks), and
> best of all the implementations won't have to worry about any other topics,
> such as those used by stores, metrics, or other sinks.
>
> Best regards,
>
> Randall
>
>
> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
> wrote:
>
> Hi Randall,
>
> You can try to set the partitioner class as
> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its
> interface
> can be found in
>
> org.apache.kafka.clients.producer.Partitioner
>
> Let me know if it works for you.
>
> Guozhang
>
> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> wrote:
>
> > The new streams API added with KIP-28 is great. I’ve been using it on a
> > prototype for a few weeks, and I’m looking forward to it being included
> in
> > 0.9.0. However, at the moment, a Processor implementation is not able to
> > specify the partition number when it outputs messages.
> >
> > I’d be happy to log a JIRA and create a PR to add it to the API, but
> > without knowing all of the history I’m wondering if leaving it out of
> the
> > API was intentional.
> >
> > Thoughts?
> >
> > Best regards,
> >
> > Randall Hauch
> >
>
>
>
> --
> -- Guozhang
>
>


-- 
-- Guozhang

Reply via email to