+ dev (this thread has become a bit unwieldy)
On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Does it preclude those various implementations? i.e., it could become > a producer config: > default.partitioner.strategy="minimize-connections"/"roundrobin" - and > so on; and implement those partitioners internally in the producer. > Not as clear as a .class config, but it accomplishes the same effect > no? > > On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> One downside to the 1A proposal is that without a Partitioner interface we >> can't really package up and provide common partitioner implementations. >> Example of these would be >> 1. HashPartitioner - The default hash partitioning >> 2. RoundRobinPartitioner - Just round-robins over partitions >> 3. ConnectionMinimizingPartitioner - Choose partitions to minimize the >> number of nodes you need to connect maintain TCP connections to. >> 4. RangePartitioner - User provides break points that align partitions to >> key ranges >> 5. LocalityPartitioner - Prefer nodes on the same rack. This would be nice >> for stream-processing use cases that read from one topic and write to >> another. We would have to include rack information in our metadata. >> >> Having this kind of functionality included is actually kind of nice. >> >> -Jay >> >> >> On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >>> Clark and all, >>> >>> I thought a little bit about the serialization question. Here are the >>> options I see and the pros and cons I can think of. I'd love to hear >>> people's preferences if you have a strong one. >>> >>> One important consideration is that however the producer works will also >>> need to be how the new consumer works (which we hope to write next). That >>> is if you put objects in, you should get objects out. So we need to think >>> through both sides. >>> >>> Options: >>> >>> Option 0: What is in the existing scala code and the java code I >>> posted--Serializer and Partitioner plugin provided by the user via config. >>> Partitioner has a sane default, but Serializer needs to be specified in >>> config. >>> >>> Pros: How it works today in the scala code. >>> Cons: You have to adapt your serialization library of choice to our >>> interfaces. The reflective class loading means typo in the serializer name >>> give odd errors. Likewise there is little type safety--the ProducerRecord >>> takes Object and any type errors between the object provided and the >>> serializer give occurs at runtime. >>> >>> Option 1: No plugins >>> >>> This would mean byte[] key, byte[] value, and partitioning done by client >>> by passing in a partition *number* directly. >>> >>> The problem with this is that it is tricky to compute the partition >>> correctly and probably most people won't. We could add a getCluster() >>> method to return the Cluster instance you should use for partitioning. But >>> I suspect people would be lazy and not use that and instead hard-code >>> partitions which would break if partitions were added or they hard coded it >>> wrong. In my experience 3 partitioning strategies cover like 99% of cases >>> so not having a default implementation for this makes the common case >>> harder. Left to their own devices people will use bad hash functions and >>> get weird results. >>> >>> Option 1A: Alternatively we could partition by the key using the existing >>> default partitioning strategy which only uses the byte[] anyway but instead >>> of having a partitionKey we could have a numerical partition override and >>> add the getCluster() method to get the cluster metadata. That would make >>> custom partitioning possible but handle the common case simply. >>> >>> Option 2: Partitioner plugin remains, serializers go. >>> >>> The problem here is that the partitioner might lose access to the >>> deserialized key which would occasionally be useful for semantic >>> partitioning schemes. The Partitioner could deserialize the key but that >>> would be inefficient and weird. >>> >>> This problem could be fixed by having key and value be byte[] but >>> retaining partitionKey as an Object and passing it to the partitioner as >>> is. Then if you have a partitioner which requires the deserialized key you >>> would need to use this partition key. One weird side effect is that if you >>> want to have a custom partition key BUT want to partition by the bytes of >>> that key rather than the object value you must write a customer partitioner >>> and serialize it yourself. >>> >>> Of these I think I prefer 1A but could be convinced of 0 since that is how >>> it works now. >>> >>> Thoughts? >>> >>> -Jay >>> >>> >>> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote: >>> >>>> Jay - Thanks for the call for comments. Here's some initial input: >>>> >>>> - Make message serialization a client responsibility (making all messages >>>> byte[]). Reflection-based loading makes it harder to use generic codecs >>>> (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build up codec programmatically. >>>> Non-default partitioning should require an explicit partition key. >>>> >>>> - I really like the fact that it will be native Java. Please consider >>>> using >>>> native maven and not sbt, gradle, ivy, etc as they don't reliably play >>>> nice >>>> in the maven ecosystem. A jar without a well-formed pom doesn't feel like >>>> a >>>> real artifact. The pom's generated by sbt et al. are not well formed. >>>> Using >>>> maven will make builds and IDE integration much smoother. >>>> >>>> - Look at Nick Telford's dropwizard-extras package in which he defines >>>> some >>>> Jackson-compatible POJO's for loading configuration. Seems like your >>>> client >>>> migration is similar. The config objects should have constructors or >>>> factories that accept Map<String, String> and Properties for ease of >>>> migration. >>>> >>>> - Would you consider using the org.apache.kafka package for the new API >>>> (quibble) >>>> >>>> - Why create your own futures rather than use >>>> java.util.concurrent.Future<Long> or similar? Standard futures will play >>>> nice with other reactive libs and things like J8's ComposableFuture. >>>> >>>> Thanks again, >>>> C >>>> >>>> >>>> >>>> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com >>>> >wrote: >>>> >>>> > A couple comments: >>>> > >>>> > 1) Why does the config use a broker list instead of discovering the >>>> brokers >>>> > in ZooKeeper? It doesn't match the HighLevelConsumer API. >>>> > >>>> > 2) It looks like broker connections are created on demand. I'm >>>> wondering >>>> > if sometimes you might want to flush out config or network connectivity >>>> > issues before pushing the first message through. >>>> > >>>> > Should there also be a KafkaProducer.connect() or .open() method or >>>> > connectAll()? I guess it would try to connect to all brokers in the >>>> > BROKER_LIST_CONFIG >>>> > >>>> > HTH, >>>> > >>>> > Roger >>>> > >>>> > >>>> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> >>>> wrote: >>>> > >>>> > > As mentioned in a previous email we are working on a >>>> re-implementation of >>>> > > the producer. I would like to use this email thread to discuss the >>>> > details >>>> > > of the public API and the configuration. I would love for us to be >>>> > > incredibly picky about this public api now so it is as good as >>>> possible >>>> > and >>>> > > we don't need to break it in the future. >>>> > > >>>> > > The best way to get a feel for the API is actually to take a look at >>>> the >>>> > > javadoc, my hope is to get the api docs good enough so that it is >>>> > > self-explanatory: >>>> > > >>>> > > >>>> > >>>> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html >>>> > > >>>> > > Please take a look at this API and give me any thoughts you may have! >>>> > > >>>> > > It may also be reasonable to take a look at the configs: >>>> > > >>>> > > >>>> > >>>> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html >>>> > > >>>> > > The actual code is posted here: >>>> > > https://issues.apache.org/jira/browse/KAFKA-1227 >>>> > > >>>> > > A few questions or comments to kick things off: >>>> > > 1. We need to make a decision on whether serialization of the user's >>>> key >>>> > > and value should be done by the user (with our api just taking >>>> byte[]) or >>>> > > if we should take an object and allow the user to configure a >>>> Serializer >>>> > > class which we instantiate via reflection. We take the later approach >>>> in >>>> > > the current producer, and I have carried this through to this >>>> prototype. >>>> > > The tradeoff I see is this: taking byte[] is actually simpler, the >>>> user >>>> > can >>>> > > directly do whatever serialization they like. The complication is >>>> > actually >>>> > > partitioning. Currently partitioning is done by a similar plug-in api >>>> > > (Partitioner) which the user can implement and configure to override >>>> how >>>> > > partitions are assigned. If we take byte[] as input then we have no >>>> > access >>>> > > to the original object and partitioning MUST be done on the byte[]. >>>> This >>>> > is >>>> > > fine for hash partitioning. However for various types of semantic >>>> > > partitioning (range partitioning, or whatever) you would want access >>>> to >>>> > the >>>> > > original object. In the current approach a producer who wishes to send >>>> > > byte[] they have serialized in their own code can configure the >>>> > > BytesSerialization we supply which is just a "no op" serialization. >>>> > > 2. We should obsess over naming and make sure each of the class names >>>> are >>>> > > good. >>>> > > 3. Jun has already pointed out that we need to include the topic and >>>> > > partition in the response, which is absolutely right. I haven't done >>>> that >>>> > > yet but that definitely needs to be there. >>>> > > 4. Currently RecordSend.await will throw an exception if the request >>>> > > failed. The intention here is that producer.send(message).await() >>>> exactly >>>> > > simulates a synchronous call. Guozhang has noted that this is a little >>>> > > annoying since the user must then catch exceptions. However if we >>>> remove >>>> > > this then if the user doesn't check for errors they won't know one has >>>> > > occurred, which I predict will be a common mistake. >>>> > > 5. Perhaps there is more we could do to make the async callbacks and >>>> > future >>>> > > we give back intuitive and easy to program against? >>>> > > >>>> > > Some background info on implementation: >>>> > > >>>> > > At a high level the primary difference in this producer is that it >>>> > removes >>>> > > the distinction between the "sync" and "async" producer. Effectively >>>> all >>>> > > requests are sent asynchronously but always return a future response >>>> > object >>>> > > that gives the offset as well as any error that may have occurred when >>>> > the >>>> > > request is complete. The batching that is done in the async producer >>>> only >>>> > > today is done whenever possible now. This means that the sync >>>> producer, >>>> > > under load, can get performance as good as the async producer >>>> > (preliminary >>>> > > results show the producer getting 1m messages/sec). This works >>>> similar to >>>> > > group commit in databases but with respect to the actual network >>>> > > transmission--any messages that arrive while a send is in progress are >>>> > > batched together. It is also possible to encourage batching even under >>>> > low >>>> > > load to save server resources by introducing a delay on the send to >>>> allow >>>> > > more messages to accumulate; this is done using the linger.ms config >>>> > (this >>>> > > is similar to Nagle's algorithm in TCP). >>>> > > >>>> > > This producer does all network communication asynchronously and in >>>> > parallel >>>> > > to all servers so the performance penalty for acks=-1 and waiting on >>>> > > replication should be much reduced. I haven't done much benchmarking >>>> on >>>> > > this yet, though. >>>> > > >>>> > > The high level design is described a little here, though this is now a >>>> > > little out of date: >>>> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite >>>> > > >>>> > > -Jay >>>> > > >>>> > >>>> >>> >>>