+ dev

(this thread has become a bit unwieldy)


On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> Does it preclude those various implementations? i.e., it could become
> a producer config:
> default.partitioner.strategy="minimize-connections"/"roundrobin" - and
> so on; and implement those partitioners internally in the producer.
> Not as clear as a .class config, but it accomplishes the same effect
> no?
>
> On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>> One downside to the 1A proposal is that without a Partitioner interface we
>> can't really package up and provide common partitioner implementations.
>> Example of these would be
>> 1. HashPartitioner - The default hash partitioning
>> 2. RoundRobinPartitioner - Just round-robins over partitions
>> 3. ConnectionMinimizingPartitioner - Choose partitions to minimize the
>> number of nodes you need to connect maintain TCP connections to.
>> 4. RangePartitioner - User provides break points that align partitions to
>> key ranges
>> 5. LocalityPartitioner - Prefer nodes on the same rack. This would be nice
>> for stream-processing use cases that read from one topic and write to
>> another. We would have to include rack information in our metadata.
>>
>> Having this kind of functionality included is actually kind of nice.
>>
>> -Jay
>>
>>
>> On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>
>>> Clark and all,
>>>
>>> I thought a little bit about the serialization question. Here are the
>>> options I see and the pros and cons I can think of. I'd love to hear
>>> people's preferences if you have a strong one.
>>>
>>> One important consideration is that however the producer works will also
>>> need to be how the new consumer works (which we hope to write next). That
>>> is if you put objects in, you should get objects out. So we need to think
>>> through both sides.
>>>
>>> Options:
>>>
>>> Option 0: What is in the existing scala code and the java code I
>>> posted--Serializer and Partitioner plugin provided by the user via config.
>>> Partitioner has a sane default, but Serializer needs to be specified in
>>> config.
>>>
>>> Pros: How it works today in the scala code.
>>> Cons: You have to adapt your serialization library of choice to our
>>> interfaces. The reflective class loading means typo in the serializer name
>>> give odd errors. Likewise there is little type safety--the ProducerRecord
>>> takes Object and any type errors between the object provided and the
>>> serializer give occurs at runtime.
>>>
>>> Option 1: No plugins
>>>
>>> This would mean byte[] key, byte[] value, and partitioning done by client
>>> by passing in a partition *number* directly.
>>>
>>> The problem with this is that it is tricky to compute the partition
>>> correctly and probably most people won't. We could add a getCluster()
>>> method to return the Cluster instance you should use for partitioning. But
>>> I suspect people would be lazy and not use that and instead hard-code
>>> partitions which would break if partitions were added or they hard coded it
>>> wrong. In my experience 3 partitioning strategies cover like 99% of cases
>>> so not having a default implementation for this makes the common case
>>> harder. Left to their own devices people will use bad hash functions and
>>> get weird results.
>>>
>>> Option 1A: Alternatively we could partition by the key using the existing
>>> default partitioning strategy which only uses the byte[] anyway but instead
>>> of having a partitionKey we could have a numerical partition override and
>>> add the getCluster() method to get the cluster metadata. That would make
>>> custom partitioning possible but handle the common case simply.
>>>
>>> Option 2: Partitioner plugin remains, serializers go.
>>>
>>> The problem here is that the partitioner might lose access to the
>>> deserialized key which would occasionally be useful for semantic
>>> partitioning schemes. The Partitioner could deserialize the key but that
>>> would be inefficient and weird.
>>>
>>> This problem could be fixed by having key and value be byte[] but
>>> retaining partitionKey as an Object and passing it to the partitioner as
>>> is. Then if you have a partitioner which requires the deserialized key you
>>> would need to use this partition key. One weird side effect is that if you
>>> want to have a custom partition key BUT want to partition by the bytes of
>>> that key rather than the object value you must write a customer partitioner
>>> and serialize it yourself.
>>>
>>> Of these I think I prefer 1A but could be convinced of 0 since that is how
>>> it works now.
>>>
>>> Thoughts?
>>>
>>> -Jay
>>>
>>>
>>> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote:
>>>
>>>> Jay - Thanks for the call for comments. Here's some initial input:
>>>>
>>>> - Make message serialization a client responsibility (making all messages
>>>> byte[]). Reflection-based loading makes it harder to use generic codecs
>>>> (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec programmatically.
>>>> Non-default partitioning should require an explicit partition key.
>>>>
>>>> - I really like the fact that it will be native Java. Please consider
>>>> using
>>>> native maven and not sbt, gradle, ivy, etc as they don't reliably play
>>>> nice
>>>> in the maven ecosystem. A jar without a well-formed pom doesn't feel like
>>>> a
>>>> real artifact. The pom's generated by sbt et al. are not well formed.
>>>> Using
>>>> maven will make builds and IDE integration much smoother.
>>>>
>>>> - Look at Nick Telford's dropwizard-extras package in which he defines
>>>> some
>>>> Jackson-compatible POJO's for loading configuration. Seems like your
>>>> client
>>>> migration is similar. The config objects should have constructors or
>>>> factories that accept Map<String, String> and Properties for ease of
>>>> migration.
>>>>
>>>> - Would you consider using the org.apache.kafka package for the new API
>>>> (quibble)
>>>>
>>>> - Why create your own futures rather than use
>>>> java.util.concurrent.Future<Long> or similar? Standard futures will play
>>>> nice with other reactive libs and things like J8's ComposableFuture.
>>>>
>>>> Thanks again,
>>>> C
>>>>
>>>>
>>>>
>>>> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
>>>> >wrote:
>>>>
>>>> > A couple comments:
>>>> >
>>>> > 1) Why does the config use a broker list instead of discovering the
>>>> brokers
>>>> > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
>>>> >
>>>> > 2) It looks like broker connections are created on demand.  I'm
>>>> wondering
>>>> > if sometimes you might want to flush out config or network connectivity
>>>> > issues before pushing the first message through.
>>>> >
>>>> > Should there also be a KafkaProducer.connect() or .open() method or
>>>> > connectAll()?  I guess it would try to connect to all brokers in the
>>>> > BROKER_LIST_CONFIG
>>>> >
>>>> > HTH,
>>>> >
>>>> > Roger
>>>> >
>>>> >
>>>> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
>>>> wrote:
>>>> >
>>>> > > As mentioned in a previous email we are working on a
>>>> re-implementation of
>>>> > > the producer. I would like to use this email thread to discuss the
>>>> > details
>>>> > > of the public API and the configuration. I would love for us to be
>>>> > > incredibly picky about this public api now so it is as good as
>>>> possible
>>>> > and
>>>> > > we don't need to break it in the future.
>>>> > >
>>>> > > The best way to get a feel for the API is actually to take a look at
>>>> the
>>>> > > javadoc, my hope is to get the api docs good enough so that it is
>>>> > > self-explanatory:
>>>> > >
>>>> > >
>>>> >
>>>> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
>>>> > >
>>>> > > Please take a look at this API and give me any thoughts you may have!
>>>> > >
>>>> > > It may also be reasonable to take a look at the configs:
>>>> > >
>>>> > >
>>>> >
>>>> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
>>>> > >
>>>> > > The actual code is posted here:
>>>> > > https://issues.apache.org/jira/browse/KAFKA-1227
>>>> > >
>>>> > > A few questions or comments to kick things off:
>>>> > > 1. We need to make a decision on whether serialization of the user's
>>>> key
>>>> > > and value should be done by the user (with our api just taking
>>>> byte[]) or
>>>> > > if we should take an object and allow the user to configure a
>>>> Serializer
>>>> > > class which we instantiate via reflection. We take the later approach
>>>> in
>>>> > > the current producer, and I have carried this through to this
>>>> prototype.
>>>> > > The tradeoff I see is this: taking byte[] is actually simpler, the
>>>> user
>>>> > can
>>>> > > directly do whatever serialization they like. The complication is
>>>> > actually
>>>> > > partitioning. Currently partitioning is done by a similar plug-in api
>>>> > > (Partitioner) which the user can implement and configure to override
>>>> how
>>>> > > partitions are assigned. If we take byte[] as input then we have no
>>>> > access
>>>> > > to the original object and partitioning MUST be done on the byte[].
>>>> This
>>>> > is
>>>> > > fine for hash partitioning. However for various types of semantic
>>>> > > partitioning (range partitioning, or whatever) you would want access
>>>> to
>>>> > the
>>>> > > original object. In the current approach a producer who wishes to send
>>>> > > byte[] they have serialized in their own code can configure the
>>>> > > BytesSerialization we supply which is just a "no op" serialization.
>>>> > > 2. We should obsess over naming and make sure each of the class names
>>>> are
>>>> > > good.
>>>> > > 3. Jun has already pointed out that we need to include the topic and
>>>> > > partition in the response, which is absolutely right. I haven't done
>>>> that
>>>> > > yet but that definitely needs to be there.
>>>> > > 4. Currently RecordSend.await will throw an exception if the request
>>>> > > failed. The intention here is that producer.send(message).await()
>>>> exactly
>>>> > > simulates a synchronous call. Guozhang has noted that this is a little
>>>> > > annoying since the user must then catch exceptions. However if we
>>>> remove
>>>> > > this then if the user doesn't check for errors they won't know one has
>>>> > > occurred, which I predict will be a common mistake.
>>>> > > 5. Perhaps there is more we could do to make the async callbacks and
>>>> > future
>>>> > > we give back intuitive and easy to program against?
>>>> > >
>>>> > > Some background info on implementation:
>>>> > >
>>>> > > At a high level the primary difference in this producer is that it
>>>> > removes
>>>> > > the distinction between the "sync" and "async" producer. Effectively
>>>> all
>>>> > > requests are sent asynchronously but always return a future response
>>>> > object
>>>> > > that gives the offset as well as any error that may have occurred when
>>>> > the
>>>> > > request is complete. The batching that is done in the async producer
>>>> only
>>>> > > today is done whenever possible now. This means that the sync
>>>> producer,
>>>> > > under load, can get performance as good as the async producer
>>>> > (preliminary
>>>> > > results show the producer getting 1m messages/sec). This works
>>>> similar to
>>>> > > group commit in databases but with respect to the actual network
>>>> > > transmission--any messages that arrive while a send is in progress are
>>>> > > batched together. It is also possible to encourage batching even under
>>>> > low
>>>> > > load to save server resources by introducing a delay on the send to
>>>> allow
>>>> > > more messages to accumulate; this is done using the linger.ms config
>>>> > (this
>>>> > > is similar to Nagle's algorithm in TCP).
>>>> > >
>>>> > > This producer does all network communication asynchronously and in
>>>> > parallel
>>>> > > to all servers so the performance penalty for acks=-1 and waiting on
>>>> > > replication should be much reduced. I haven't done much benchmarking
>>>> on
>>>> > > this yet, though.
>>>> > >
>>>> > > The high level design is described a little here, though this is now a
>>>> > > little out of date:
>>>> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>>>> > >
>>>> > > -Jay
>>>> > >
>>>> >
>>>>
>>>
>>>

Reply via email to