Exactly. There is not config to set a global one, hence, you need to set
if for each operator that writes into a topic (and for which you want a
special partitioning). Not sure from the top of my head if there are
more methods than `to()` and `through()` that accept a custom
`StreamPartitioner`.


-Matthias

On 11/17/19 8:23 PM, Mikkel Gadegaard wrote:
> Actually not to sure what you meant by: "pass it into the corresponding
> methods.
> For example, `to()` or `through()`" ?
> 
> Could you elaborate on that?
> 
> Thanks
> Mikkel
> 
> --
> 
> 
> On Sat, Nov 16, 2019 at 12:43 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Well. You could always run it in an IDE and set a breakpoint when the
>> partition is computed to get insight.
>>
>>>  I guess another approach could be to generate a random uuid and use
>> that for the message record key instead?
>>
>> That is certainly possible. Why don't you try to write a custom
>> `StreamPartitioner` though what would be the straight forward solution.
>>
>>
>> -Matthias
>>
>> On 11/15/19 2:12 AM, Mikkel Gadegaard wrote:
>>> Definitely not null keys. They are time based UUIDs. Basically the test
>> set I’m running is a collection of articles stored in Cassandra and their
>> key is the uuid generated when inserted there.
>>>
>>> Get the articles from bing api and its the same set that bing returns in
>> both cases (same number (67) of articles and same articles). So my theory
>> were that the time based UUIDs where so similar that the hash and modulo
>> ended up being the same. But after reading your responses I’m back at just
>> being puzzled. I guess another approach could be to generate a random uuid
>> and use that for the message record key instead?
>>>
>>> Mikkel Gadegaard
>>>
>>>> On Nov 15, 2019, at 01:39, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>>>
>>>> That is puzzling to me, too. Could it be that you have `null` keys for
>>>> the "new topic" you mentioned in your original email? For `null` keys,
>>>> the fallback would be round-robin.
>>>>
>>>> Or you just got lucky and the keys you write get distributed evenly "by
>>>> chance" -- in general, if the data is not skewed, hash partitioning
>>>> should result in a fairly even distribution, too.
>>>>
>>>> -Matthias
>>>>
>>>>> On 11/15/19 1:21 AM, Mikkel Gadegaard wrote:
>>>>> Well it definitely gives me something to move ahead with.
>>>>>
>>>>> I am however puzzled how I could observe a really even distribution
>> over
>>>>> the partitions when specifying `PARTITIONER_CLASS_CONFIG`, whereas
>> when I
>>>>> remove it the same set of test messages are written to only one
>> partition.
>>>>>
>>>>> Thanks
>>>>> Mikkel
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> On Fri, Nov 15, 2019 at 12:22 AM Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> In Kafka Streams the producer config `PARTITIONER_CLASS_CONFIG` does
>> not
>>>>>> take effect, because Kafka Streams computes and set partition numbers
>>>>>> explicitly and thus the producer does never use the partitioner to
>>>>>> compute a partition, but accepts whatever Kafka Streams specifies on
>>>>>> each `ProducerRecord`.
>>>>>>
>>>>>> If you want to change the partitioning strategy, you need to
>> implement a
>>>>>> custom `StreamPartitioner` and pass it into the corresponding methods.
>>>>>> For example, `to()` or `through()`.
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 11/14/19 9:51 AM, Mikkel Gadegaard wrote:
>>>>>>> I've set up a POC using KafkaStreams with microservices consuming and
>>>>>>> producing from/to topics. In the beginning I hadn't thought about
>>>>>>> partition strategy, and so I was using the DefaultPartitioner for
>>>>>> producer
>>>>>>> partition assignments. My messages have keys (I use these for
>>>>>>> forking/joining), and the keys are time based UUIDs, this causes some
>>>>>>> rather uneven distribution on my topics. I looked around google and
>>>>>>> stumbled on KIP-369 (Alternative Partitioner to Support "Always
>>>>>>> Round-Robin" Selection) and figured that would be what I needed, so
>> since
>>>>>>> 2.4 isn't out yet I borrowed the class from the PR on github, added
>> it to
>>>>>>> my project and added the property to my config, like so:
>>>>>>>
>>>>>>> streamProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>>>>>> RoundRobinPartitioner.class.getCanonicalName());
>>>>>>>
>>>>>>>
>>>>>>> And the round robin strategy works on a newly added topic, spreading
>>>>>>> messages evenly over 4 partitions. But, and I'm finally getting to my
>>>>>>> question, it doesn't seem to have any effect on existing topics, in
>> other
>>>>>>> words, it seems to be continuing to use the DefaultPartitioner for
>> topics
>>>>>>> created before I added the RoundRobinPartioner class to my
>>>>>>> project/properties.
>>>>>>>
>>>>>>> Is it me that just hasn't understood that it is impossible to change
>>>>>>> strategy for an existing partition or do I have to do something
>> specific
>>>>>>> apart from re-deploying the Microservice containing the producer?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Mikkel
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to