Re: New Producer Public API

2014-02-06 Thread Jay Kreps
This is pretty hard to do with the architecture we've gone with as the stored events are not objects, but tightly packed serialized bytes. This approach is much better from a performance and memory management point of view, though, so I'd be very hesitant to change it. So it is pretty hard to provi

Re: New Producer Public API

2014-02-06 Thread S Ahmed
How about the following use case: Just before the producer actually sends the payload to kakfa, could an event be exposed that would allow one to loop through the messages and potentially delete some of them? Example: Say you have 100 messages, but before you send these messages to kakfa, you ca

Re: New Producer Public API

2014-02-05 Thread Jay Kreps
It might. I considered this but ended up going this way. Now that we have changed partitionKey=>partition it almost works. The difference is the consumer gets an offset too which the producer doesn't have. One thing I think this points to is the value of getting the consumer java api worked out ev

Re: New Producer Public API

2014-02-05 Thread Neha Narkhede
Currently, the user will send ProducerRecords using the new producer. The expectation will be that you get the same thing as output from the consumer. Since ProduceRecord is a holder for topic, partition, key and value, does it make sense to rename it to just Record? So, the send/receive APIs would

Re: New Producer Public API

2014-02-02 Thread Guozhang Wang
I think the most common motivate of having a customized partitioner is to make sure some messages always go to the same partition, but people may seldom want to know about which partition exactly they go to. If that is true, why not just assign the same byte array as partition key with the default

Re: New Producer Public API

2014-01-31 Thread Jay Kreps
Hey Tom, Agreed, there is definitely nothing that prevents our including partitioner implementations, but it does get a little less seamless. -Jay On Fri, Jan 31, 2014 at 2:35 PM, Tom Brown wrote: > Regarding partitioning APIs, I don't think there is not a common subset of > information that

Re: New Producer Public API

2014-01-31 Thread Tom Brown
Regarding partitioning APIs, I don't think there is not a common subset of information that is required for all strategies. Instead of modifying the core API to easily support all of the various partitioning strategies, offer the most common ones as libraries they can build into their own data pipe

Re: New Producer Public API

2014-01-31 Thread Joel Koshy
> The trouble with callbacks, IMHO, is determining the thread in which they > will be executed. Since the IO thread is usually the thread that knows when > the operation is complete, it's easiest to execute that callback within the > IO thread. This can lead the IO thread to spend all its time on c

Re: New Producer Public API

2014-01-31 Thread Jay Kreps
Oliver, Yeah that was my original plan--allow the registration of multiple callbacks on the future. But there is some additional implementation complexity because then you need more synchronization variables to ensure the callback gets executed even if the request has completed at the time the cal

Re: New Producer Public API

2014-01-31 Thread Oliver Dain
Hmmm.. I should read the docs more carefully before I open my big mouth: I just noticed the KafkaProducer#send overload that takes a callback. That definitely helps address my concern though I think the API would be cleaner if there was only one variant that returned a future and you could register

Re: New Producer Public API

2014-01-31 Thread Tom Brown
The trouble with callbacks, IMHO, is determining the thread in which they will be executed. Since the IO thread is usually the thread that knows when the operation is complete, it's easiest to execute that callback within the IO thread. This can lead the IO thread to spend all its time on callbacks

Re: New Producer Public API

2014-01-31 Thread Oliver Dain
I wanted to suggest an alternative to the serialization issue. As I understand it, the concern is that if the user is responsible for serialization it becomes difficult for them to compute the partition as the plugin that computes the partition would be called with byte[] forcing the user to de-ser

Re: New Producer Public API

2014-01-31 Thread Oliver Dain
Hey all, I¹m excited about having a new Producer API, and I really like the idea of removing the distinction between a synchronous and asynchronous producer. The one comment I have about the current API is that it¹s hard to write truly asynchronous code with the type of future returned by the send

Re: New Producer Public API

2014-01-31 Thread Jun Rao
For RangePartitioner, it seems that we will need the key object. Range-partitioning on the serialized key bytes is probably confusing. Thanks, Jun On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps wrote: > One downside to the 1A proposal is that without a Partitioner interface we > can't really pack

Re: New Producer Public API

2014-01-31 Thread David Arthur
On 1/24/14 7:41 PM, Jay Kreps wrote: Yeah I'll fix that name. Hmm, yeah, I agree that often you want to be able delay network connectivity until you have started everything up. But at the same time I kind of loath special init() methods because you always forget to call them and get one round o

Re: New Producer Public API

2014-01-30 Thread Jay Kreps
Joel-- Yeah we could theoretically retain a neutered Partitioner interface that only had access to the byte[] key not the original object (which we no longer have). Ideologically most partitioning should really happen based on the byte[] not the original object to retain multi-language compatibili

Re: New Producer Public API

2014-01-30 Thread Jay Kreps
I thought a bit about it and I think the getCluster() thing was overly simplistic because we try to only maintain metadata about the current set of topics the producer cares about so the cluster might not have the partitions for the topic the user cares about. I think actually what we need is a new

Re: New Producer Public API

2014-01-30 Thread Joel Koshy
+ dev (this thread has become a bit unwieldy) On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy wrote: > Does it preclude those various implementations? i.e., it could become > a producer config: > default.partitioner.strategy="minimize-connections"/"roundrobin" - and > so on; and implement those par

Re: New Producer Public API

2014-01-30 Thread Joel Koshy
Does it preclude those various implementations? i.e., it could become a producer config: default.partitioner.strategy="minimize-connections"/"roundrobin" - and so on; and implement those partitioners internally in the producer. Not as clear as a .class config, but it accomplishes the same effect no

Re: New Producer Public API

2014-01-30 Thread Jun Rao
With option 1A, if we increase # partitions on a topic, how will the producer find out newly created partitions? Do we expect the producer to periodically call getCluster()? As for ZK dependency, one of the goals of client rewrite is to reduce dependencies so that one can implement the client in l

Re: New Producer Public API

2014-01-30 Thread Jay Kreps
One downside to the 1A proposal is that without a Partitioner interface we can't really package up and provide common partitioner implementations. Example of these would be 1. HashPartitioner - The default hash partitioning 2. RoundRobinPartitioner - Just round-robins over partitions 3. ConnectionM

Re: New Producer Public API

2014-01-29 Thread Chris Riccomini
Hey Guys, My 2c. 1. RecordSend is a confusing name to me. Shouldn't it be RecordSendResponse? 2. Random nit: it's annoying to have the Javadoc info for the contstants on http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.h tml, but the string constant values on http://empa

Re: New Producer Public API

2014-01-29 Thread Neha Narkhede
>> The challenge of directly exposing ProduceRequestResult is that the offset provided is just the base offset and there is no way to know for a particular message where it was in relation to that base offset because the batching is transparent and non-deterministic. That's a good point. I need to

Re: New Producer Public API

2014-01-29 Thread Tom Brown
I strongly support the user of Future. In fact, the cancel method may not be useless. Since the producer is meant to be used by N threads, it could easily get overloaded such that a produce request could not be sent immediately and had to be queued. In that case, cancelling should cause it to not a

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
Yes, we will absolutely retain protocol compatibility with 0.8 though the java api will change. The prototype code I posted works with 0.8. -Jay On Wed, Jan 29, 2014 at 10:19 AM, Steve Morin wrote: > Is the new producer API going to maintain protocol compatibility with old > version if the API

Re: New Producer Public API

2014-01-29 Thread Tom Brown
Jay, I think you're confused between my use of "basic client" and "connection". There is one basic client for a cluster. An IO thread manages the tcp connections for any number of brokers. The basic client has a queue of requests each broker. When a tcp connection (associated with broker X) is rea

Re: New Producer Public API

2014-01-29 Thread Steve Morin
Is the new producer API going to maintain protocol compatibility with old version if the API under the hood? > On Jan 29, 2014, at 10:15, Jay Kreps wrote: > > The challenge of directly exposing ProduceRequestResult is that the offset > provided is just the base offset and there is no way to kno

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
The challenge of directly exposing ProduceRequestResult is that the offset provided is just the base offset and there is no way to know for a particular message where it was in relation to that base offset because the batching is transparent and non-deterministic. So I think we do need some kind of

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
Hey Neha, Error handling in RecordSend works as in Future you will get the exception if there is one from any of the accessor methods or await(). The purpose of hasError was that you can write things slightly more simply (which some people expressed preference for): if(send.hasError()) // d

Re: New Producer Public API

2014-01-29 Thread Neha Narkhede
Regarding the use of Futures - Agree that there are some downsides to using Futures but both approaches have some tradeoffs. - Standardization and usability Future is a widely used and understood Java API and given that the functionality that RecordSend hopes to provide is essentially that of Fut

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
Hey Tom, So is there one connection and I/O thread per broker and a low-level client for each of those, and then you hash into that to partition? Is it possible to batch across partitions or only within a partition? -Jay On Wed, Jan 29, 2014 at 8:41 AM, Tom Brown wrote: > Jay, > > There is bo

Re: New Producer Public API

2014-01-29 Thread Tom Brown
Jay, There is both a basic client object, and a number of IO processing threads. The client object manages connections, creating new ones when new machines are connected, or when existing connections die. It also manages a queue of requests for each server. The IO processing thread has a selector,

Re: New Producer Public API

2014-01-28 Thread Jay Kreps
Hey Neha, Can you elaborate on why you prefer using Java's Future? The downside in my mind is the use of the checked InterruptedException and ExecutionException. ExecutionException is arguable, but forcing you to catch InterruptedException, often in code that can't be interrupted, seems perverse.

Re: New Producer Public API

2014-01-28 Thread Neha Narkhede
Here are more thoughts on the public APIs - - I suggest we use java's Future instead of custom Future especially since it is part of the public API - Serialization: I like the simplicity of the producer APIs with the absence of serialization where we just deal with byte arrays for keys and values

Re: New Producer Public API

2014-01-28 Thread Jay Kreps
Hey Tom, That sounds cool. How did you end up handling parallel I/O if you wrap the individual connections? Don't you need some selector that selects over all the connections? -Jay On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown wrote: > I implemented a 0.7 client in pure java, and its API very cl

Re: New Producer Public API

2014-01-28 Thread Roger Hoover
That makes sense. Thanks, Jay. On Tue, Jan 28, 2014 at 4:38 PM, Jay Kreps wrote: > Hey Roger, > > We really can't use ListenableFuture directly though I agree it is nice. We > have had some previous experience with embedding google collection classes > in public apis, and it was quite the disa

Re: New Producer Public API

2014-01-28 Thread Jay Kreps
Hey Roger, We really can't use ListenableFuture directly though I agree it is nice. We have had some previous experience with embedding google collection classes in public apis, and it was quite the disaster. The problem has been that the google guys regularly go on a refactoring binge for no appa

Re: New Producer Public API

2014-01-28 Thread Tom Brown
I implemented a 0.7 client in pure java, and its API very closely resembled this. (When multiple people independently engineer the same solution, it's probably good... right?). However, there were a few architectural differences with my client: 1. The basic client itself was just an asynchronous l

Re: New Producer Public API

2014-01-28 Thread Jay Kreps
Hmmm, I would really strongly urge us to not introduce a zk dependency just for discovery. People who want to implement this can certainly do so by simply looking up urls and setting them in the consumer config, but our experience with doing this at large scale was pretty bad. Hardcoding the discov

Re: New Producer Public API

2014-01-28 Thread Roger Hoover
+1 ListenableFuture: If this works similar to Deferreds in Twisted Python or Promised IO in Javascript, I think this is a great pattern for decoupling your callback logic from the place where the Future is generated. You can register as many callbacks as you like, each in the appropriate layer of

Re: New Producer Public API

2014-01-28 Thread Jay Kreps
Hey Ross, - ListenableFuture: Interesting. That would be an alternative to the direct callback support we provide. There could be pros to this, let me think about it. - We could provide layering, but I feel that the serialization is such a small thing we should just make a decision and chose one,

Re: New Producer Public API

2014-01-28 Thread Neha Narkhede
>> The producer since 0.8 is actually zookeeper free, so this is not new to this client it is true for the current client as well. Our experience was that direct zookeeper connections from zillions of producers wasn't a good idea for a number of reasons. The problem with several thousand connectio

Re: New Producer Public API

2014-01-28 Thread Scott Clasen
+1 to zk bootstrap + close as an option at least On Tue, Jan 28, 2014 at 10:09 AM, Neha Narkhede wrote: > >> The producer since 0.8 is actually zookeeper free, so this is not new to > this client it is true for the current client as well. Our experience was > that direct zookeeper connections f

Re: New Producer Public API

2014-01-28 Thread Mattijs Ugen (DT)
Sorry to tune in a bit late, but here goes. > 1. The producer since 0.8 is actually zookeeper free, so this is not new to > this client it is true for the current client as well. Our experience was > that direct zookeeper connections from zillions of producers wasn't a good > idea for a number of

Re: New Producer Public API

2014-01-28 Thread Ross Black
Hi Jay, - Just to add some more info/confusion about possibly using Future ... If Kafka uses a JDK future, it plays nicely with other frameworks as well. Google Guava has a ListenableFuture that allows callback handling to be added via the returned future, and allows the callbacks to be passed

Re: New Producer Public API

2014-01-27 Thread Xavier Stevens
AutoCloseable would be nice for us as most of our code is using Java 7 at this point. I like Dropwizard's configuration mapping to POJOs via Jackson, but if you wanted to stick with property maps I don't care enough to object. If the producer only dealt with bytes, is there a way we could still d

Re: New Producer Public API

2014-01-27 Thread Jay Kreps
Clark, Yeah good point. Okay I'm sold on Closable. Autoclosable would be much better, but for now we are retaining 1.6 compatibility and I suspect the use case of temporarily creating a producer would actually be a more rare case. -Jay On Mon, Jan 27, 2014 at 9:29 AM, Clark Breyman wrote: > r

Re: New Producer Public API

2014-01-27 Thread Clark Breyman
re: "Using package to avoid ambiguity" - Unlike Scala, this is really cumbersome in Java as it doesn't support package imports or import aliases, so the only way to distinguish is to use the fully qualified path. re: Closable - it can throw IOException but is not required to. Same with AutoClosea

Re: New Producer Public API

2014-01-27 Thread Clark Breyman
Jay - Config - your explanation makes sense. I'm just so accustomed to having Jackson automatically map my configuration objects to POJOs that I've stopped using property files. They are lingua franca. The only thought might be to separate the config interface from the implementation to allow for

Re: New Producer Public API

2014-01-26 Thread Jay Kreps
Just to keep all the points in a single thread, here are a few other points brought up by others: 1. Sriram/Guozhang: Should RecordSend.await() throw an exception if one occurred during the call. The argument for this is that it is similar to Future and an exception is the least surprising way to s

Re: New Producer Public API

2014-01-26 Thread Jay Kreps
Clark, With respect to maven it would be great to know if you see any issues with the gradle stuff. For serialization I would love to hear if any of the options I outlined seemed good to you or if you have another idea. For futures, that would be awesome to see how it would help. I agree that ju

Re: New Producer Public API

2014-01-26 Thread Jay Kreps
Thanks for the detailed thoughts. Let me elaborate on the config thing. I agree that at first glance key-value strings don't seem like a very good configuration api for a client. Surely a well-typed config class would be better! I actually disagree and let me see if I can convince you. My reasoni

Re: New Producer Public API

2014-01-26 Thread Clark Breyman
Thanks Jay. I'll see if I can put together a more complete response, perhaps as separate threads so that topics don't get entangled. In the mean time, here's a couple responses: Serialization: you've broken out a sub-thread so i'll reply there. My bias is that I like generics (except for type-eras

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
Clark and all, I thought a little bit about the serialization question. Here are the options I see and the pros and cons I can think of. I'd love to hear people's preferences if you have a strong one. One important consideration is that however the producer works will also need to be how the new

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
Hey Joe, Metadata: Yes, this is how it works. You give a URL or a few URLs to bootstrap from. From then on any metadata change will percolate up to all producers so you should be able to dynamically change the cluster in any way without needing to restart or reconfigure the producers. So I think y

Re: New Producer Public API

2014-01-24 Thread Joseph Lawson
My 2 cents: Getting the broker metadata via active brokers is the way to go. It allows one to dynamically rebalance or introduce a whole new set of servers into a cluster just by adding them to the cluster and migrating partitions. We use this to periodically introduce newer Kafka cluster cloud

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
Yeah I'll fix that name. Hmm, yeah, I agree that often you want to be able delay network connectivity until you have started everything up. But at the same time I kind of loath special init() methods because you always forget to call them and get one round of error every time. I wonder if in those

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
Hey Clark, - Serialization: Yes I agree with these though I don't consider the loss of generics a big issue. I'll try to summarize what I would consider the best alternative api with raw byte[]. - Maven: We had this debate a few months back and the consensus was gradle. Is there a specific issue

Re: New Producer Public API

2014-01-24 Thread Roger Hoover
Jay, Thanks for the explanation. I didn't realize that the broker list was for bootstrapping and was not required to be a complete list of all brokers (although I see now that it's clearly stated in the text description of the parameter). Nonetheless, does it still make sense to make the config

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
Roger, These are good questions. 1. The producer since 0.8 is actually zookeeper free, so this is not new to this client it is true for the current client as well. Our experience was that direct zookeeper connections from zillions of producers wasn't a good idea for a number of reasons. Our inten

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
Andrey, I think this should perform okay. We already create a number of objects per message sent, one more shouldn't have too much performance impact if it is just thousands per second. -Jay On Fri, Jan 24, 2014 at 2:28 PM, Andrey Yegorov wrote: > So for each message that I need to send asynch

Re: New Producer Public API

2014-01-24 Thread Xavier Stevens
+1 all of Clark's points above. On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman wrote: > Jay - Thanks for the call for comments. Here's some initial input: > > - Make message serialization a client responsibility (making all messages > byte[]). Reflection-based loading makes it harder to use gen

Re: New Producer Public API

2014-01-24 Thread Clark Breyman
Jay - Thanks for the call for comments. Here's some initial input: - Make message serialization a client responsibility (making all messages byte[]). Reflection-based loading makes it harder to use generic codecs (e.g. Envelope) or build up codec programmatically. Non-default partitioning should

Re: New Producer Public API

2014-01-24 Thread Roger Hoover
A couple comments: 1) Why does the config use a broker list instead of discovering the brokers in ZooKeeper? It doesn't match the HighLevelConsumer API. 2) It looks like broker connections are created on demand. I'm wondering if sometimes you might want to flush out config or network connectivi

Re: New Producer Public API

2014-01-24 Thread Andrey Yegorov
So for each message that I need to send asynchronously I have to create a new instance of callback and hold on to the message? This looks nice in theory but in case of few thousands of request/sec this could use up too much extra memory and push too much to garbage collector, especially in case con

Re: New Producer Public API

2014-01-24 Thread Jay Kreps
If I understand your use case I think usage would be something like producer.send(message, new Callback() { public void onCompletion(RecordSend send) { if(send.hasError()) log.write(message); } }); Reasonable? In other words you can include references to any variable

Re: New Producer Public API

2014-01-24 Thread Andrey Yegorov
I love the callback in send() but I do not see how it helps in case of an error. Imagine the usecase: I want to write messages to the log so I can replay them to kafka later in case if async send failed. >From a brief look at the API I see that I'll get back RecordSend object (which is not true al