Jay Kreps <jay.kreps@...> writes:

> 
> Hey Daniel,
> 
> partitionsFor() will block the very first time it sees a new topic that 
it
> doesn't have metadata for yet. If you want to ensure you don't block even
> that one time, call it prior to your regular usage so it initializes 
then.
> 
> The rationale for adding a partition in ProducerRecord was that there are
> actually cases where you want to associate a key with the record but not
> use it for partitioning. The rationale for not retaining the pluggable
> partitioner was that you could always just use the partition (many people
> dislike the plugin apis and asked for it). Personally, like you, I
> preferred the plugin apis.
> 
> We aren't cut off from exposing the existing partitioner usage as a 
public
> api that you can override if there is anyone who wants that. I think one
> nice thing about it would be the ability to ship with an alternative
> partitioning strategy that you could enable purely in config. For example
> the old producer had a partitioning strategy that attempted to minimize 
the
> number of TCP connections for cases where there was no key. 98% of people
> loathed that, but for 2% of people it was useful and this would be a way 
to
> still include that behavior for people migrating to the new producer.
> 
> -Jay
> 
> On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener <
> daniel.wegener@...> wrote:
> 
> > Gwen Shapira <gshapira <at> ...> writes:
> >
> > >
> > > Hi Daniel,
> > >
> > > I think you can still use the same logic you had in the custom
> > partitioner
> > > in the old producer. You just move it to the client that creates the
> > > records.
> > > The reason you don't cache the result of partitionsFor is that the
> > producer
> > > should handle the caching for you, so its not necessarily a long or
> > > blocking call.
> > >
> > > I see it as a pretty small change to the API. But I'm not sure what 
drove
> > > the change either.
> > >
> > > Gwen
> > >
> > > On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener <
> > > Daniel.Wegener <at> ...> wrote:
> > >
> > > > Hello Kafka-users!
> > > >
> > > > I am facing a migration from a kind of ( a bit self plumbed) kafka
> > 0.8.1
> > > > producer to the new kafka-clients API. I just recognized, that the 
new
> > > > KafkaProducer initializes its own Partitioner that cannot be 
changed
> > (final
> > > > field, no ctor-param, no
> > > > Class.forName(config.getPartitionerClassNameFoo()).newInstance()). 
Is
> > this
> > > > an intentional change?
> > > > If i understand the new API correctly, one should either define a 
key
> > for
> > > > a message and let the default Partitioner care that it will be
> > distributed
> > > > over all available partitions or to set an explicit partition 
number
> > per
> > > > message that will be written to.
> > > >
> > > > The old API api allowed to create ProducerRecors with a key and/or 
a
> > key
> > > > used only for partitioning (but one that is not sent down the wire) 
and
> > > > then to provide a custom Partitioner that later could distribute 
this
> > > > partitioning key over all available partitions when the message is
> > actually
> > > > sent.
> > > >
> > > > The difference in the new procuder API is that we need to know the
> > exact
> > > > number of available partitions before we even create a 
ProducerRecord.
> > If
> > > > we dont ensure the correct number of partitions and try to send a
> > message
> > > > to a partition that does not exist, the whole message will blow up
> > later
> > > > when the producer tries to send it.
> > > >
> > > > I dont expect the partition count to change that often but the API-
doc
> > > > states that a partitionsFor(String topic) result _should not_ be
> > cached.
> > > > But I do not really want to check for changed partition counts 
before
> > every
> > > > creation of a ProducerRecord. The old pluggable partitioner was, 
for
> > us,
> > > > especially useful for partition-stickyness by business keys (and 
thus
> > > > stateful processing stages across multiple topics). This ensured 
that a
> > > > message that was processed on stage1:partition2 will eventually be
> > > > processed on stageN:partition2. Not very clever in terms of 
scalability
> > per
> > > > stage, but it makes reasoning about the message flow alot easier.
> > > >
> > > > So for a single ProducerRecord, for my understanding it might be 
nicer
> > to
> > > > have a nullable partitionFunction:(ProducerRecord,Int)=>Int (or 
Java8
> > > > lambda equivalent) instead of the nullable Partition attribute and
> > evaluate
> > > > this function in the producer.
> > > >
> > > > Kind regards
> > > >
> > > >
> > > > -------------------------------------------------------------------
----
> > -------------
> > > > Daniel Wegener
> > > > Holisticon AG
> > > >
> > > >
> > >
> >
> > Cheers for you quick reply Gwen!
> > Good to know that partitionsFor is (most time) fast and non-blocking. 
Im
> > just wondering if this leaves a (maybe rather artificial?) use case
> > uncovered: If you really try to use the Producer in a completly non-
> > blocking fashion (with block.on.buffer.full=false, maybe for a 
reactive-
> > streams adapter?) you would still have to call partitionsFor what may
> > occasionally be blocking.
> >
> > Dont get me wrong, I am happy with this solution but I think the old 
API
> > was a bit clearer about what you really give into the producer and what 
it
> > does with it (a partition number (that may be discovered as invalid 
later
> > and throws exceptions) vs a partitioner that let you distribute your
> > payloads over the available number of partitions). In the new api it 
just
> > feels a bit more - well - java'ish :).
> >
> > Kind regards
> > Daniel
> >
> >
> 

Hi Jay.

Thank you for this clarification, I agree it's perfectly fine to enforce 
the metadata resolution when you start a producer once.

If there would be support for some kind of user defined partitioner I'd 
have the following thougts:

- Let the user still be able to optionally choose a partition for a 
ProducerRecord.
- Let the user optionally provide a _CustomPartitioner_ as KafkaProducer 
ctor 
param or ctor props.
- Keep the current default partitioners behavior that will just prefer the 
result of the CustomPartitioner over its default strategies (hashing the 
encoded key before round-robin), but AFTER trying the 
ProducerRecord.partition.

The type signature of a CustomPartitioner could look like this:

```
public interface CustomPartitioner<K,V> extends Configurable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic     The topic name
     * @param key       The key to partition on (or null if no key)
     * @param value     The value to partition on
     * @param cluster   The current cluster metadata
     * @returns a partition or {@code null} if this partitioner cannot make 
a useful decision. This will lead to a fallback to the default partitioning 
behaviour.
     */
    public Integer partition(String topic, K key, V value, Cluster 
cluster);

}
```

This would:
- Not introduce any breaking changes to the API
- Allow users to partition based on their unserialized "business" keys or 
Values. This is consistent with the type-parameterized user provided 
serializers.

It might still make the concept of partitioning more complex and thus 
harder to grasp.

An implementation could look like this: 
https://github.com/danielwegener/kafka/commit/5cedcb88601d6aff2138ffda2447a
fe60d389f6c (just a raw sketch though).

What do you think?

Kind regards
Daniel



Reply via email to