This is pretty hard to do with the architecture we've gone with as the
stored events are not objects, but tightly packed serialized bytes. This
approach is much better from a performance and memory management point of
view, though, so I'd be very hesitant to change it. So it is pretty hard to
provi
How about the following use case:
Just before the producer actually sends the payload to kakfa, could an
event be exposed that would allow one to loop through the messages and
potentially delete some of them?
Example:
Say you have 100 messages, but before you send these messages to kakfa, you
ca
It might. I considered this but ended up going this way. Now that we have
changed partitionKey=>partition it almost works. The difference is the
consumer gets an offset too which the producer doesn't have.
One thing I think this points to is the value of getting the consumer java
api worked out ev
Currently, the user will send ProducerRecords using the new producer. The
expectation will be that you get the same thing as output from the
consumer. Since ProduceRecord is a holder for topic, partition, key and
value, does it make sense to rename it to just Record? So, the send/receive
APIs would
I think the most common motivate of having a customized partitioner is to
make sure some messages always go to the same partition, but people may
seldom want to know about which partition exactly they go to. If that is
true, why not just assign the same byte array as partition key with the
default
Hey Tom,
Agreed, there is definitely nothing that prevents our including partitioner
implementations, but it does get a little less seamless.
-Jay
On Fri, Jan 31, 2014 at 2:35 PM, Tom Brown wrote:
> Regarding partitioning APIs, I don't think there is not a common subset of
> information that
Regarding partitioning APIs, I don't think there is not a common subset of
information that is required for all strategies. Instead of modifying the
core API to easily support all of the various partitioning strategies,
offer the most common ones as libraries they can build into their own data
pipe
> The trouble with callbacks, IMHO, is determining the thread in which they
> will be executed. Since the IO thread is usually the thread that knows when
> the operation is complete, it's easiest to execute that callback within the
> IO thread. This can lead the IO thread to spend all its time on c
Oliver,
Yeah that was my original plan--allow the registration of multiple
callbacks on the future. But there is some additional implementation
complexity because then you need more synchronization variables to ensure
the callback gets executed even if the request has completed at the time
the cal
Hmmm.. I should read the docs more carefully before I open my big mouth: I
just noticed the KafkaProducer#send overload that takes a callback. That
definitely helps address my concern though I think the API would be
cleaner if there was only one variant that returned a future and you could
register
The trouble with callbacks, IMHO, is determining the thread in which they
will be executed. Since the IO thread is usually the thread that knows when
the operation is complete, it's easiest to execute that callback within the
IO thread. This can lead the IO thread to spend all its time on callbacks
I wanted to suggest an alternative to the serialization issue. As I
understand it, the concern is that if the user is responsible for
serialization it becomes difficult for them to compute the partition as
the plugin that computes the partition would be called with byte[] forcing
the user to de-ser
Hey all,
I¹m excited about having a new Producer API, and I really like the idea of
removing the distinction between a synchronous and asynchronous producer.
The one comment I have about the current API is that it¹s hard to write
truly asynchronous code with the type of future returned by the send
For RangePartitioner, it seems that we will need the key object.
Range-partitioning on the serialized key bytes is probably confusing.
Thanks,
Jun
On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps wrote:
> One downside to the 1A proposal is that without a Partitioner interface we
> can't really pack
On 1/24/14 7:41 PM, Jay Kreps wrote:
Yeah I'll fix that name.
Hmm, yeah, I agree that often you want to be able delay network
connectivity until you have started everything up. But at the same time I
kind of loath special init() methods because you always forget to call them
and get one round o
Joel--
Yeah we could theoretically retain a neutered Partitioner interface that
only had access to the byte[] key not the original object (which we no
longer have). Ideologically most partitioning should really happen based on
the byte[] not the original object to retain multi-language compatibili
I thought a bit about it and I think the getCluster() thing was overly
simplistic because we try to only maintain metadata about the current set
of topics the producer cares about so the cluster might not have the
partitions for the topic the user cares about. I think actually what we
need is a new
+ dev
(this thread has become a bit unwieldy)
On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy wrote:
> Does it preclude those various implementations? i.e., it could become
> a producer config:
> default.partitioner.strategy="minimize-connections"/"roundrobin" - and
> so on; and implement those par
Does it preclude those various implementations? i.e., it could become
a producer config:
default.partitioner.strategy="minimize-connections"/"roundrobin" - and
so on; and implement those partitioners internally in the producer.
Not as clear as a .class config, but it accomplishes the same effect
no
With option 1A, if we increase # partitions on a topic, how will the
producer find out newly created partitions? Do we expect the producer to
periodically call getCluster()?
As for ZK dependency, one of the goals of client rewrite is to reduce
dependencies so that one can implement the client in l
One downside to the 1A proposal is that without a Partitioner interface we
can't really package up and provide common partitioner implementations.
Example of these would be
1. HashPartitioner - The default hash partitioning
2. RoundRobinPartitioner - Just round-robins over partitions
3. ConnectionM
Hey Guys,
My 2c.
1. RecordSend is a confusing name to me. Shouldn't it be
RecordSendResponse?
2. Random nit: it's annoying to have the Javadoc info for the contstants
on
http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.h
tml, but the string constant values on
http://empa
>> The challenge of directly exposing ProduceRequestResult is that the
offset
provided is just the base offset and there is no way to know for a
particular message where it was in relation to that base offset because the
batching is transparent and non-deterministic.
That's a good point. I need to
I strongly support the user of Future. In fact, the cancel method may not
be useless. Since the producer is meant to be used by N threads, it could
easily get overloaded such that a produce request could not be sent
immediately and had to be queued. In that case, cancelling should cause it
to not a
Yes, we will absolutely retain protocol compatibility with 0.8 though the
java api will change. The prototype code I posted works with 0.8.
-Jay
On Wed, Jan 29, 2014 at 10:19 AM, Steve Morin wrote:
> Is the new producer API going to maintain protocol compatibility with old
> version if the API
Jay,
I think you're confused between my use of "basic client" and "connection".
There is one basic client for a cluster. An IO thread manages the tcp
connections for any number of brokers. The basic client has a queue of
requests each broker. When a tcp connection (associated with broker X) is
rea
Is the new producer API going to maintain protocol compatibility with old
version if the API under the hood?
> On Jan 29, 2014, at 10:15, Jay Kreps wrote:
>
> The challenge of directly exposing ProduceRequestResult is that the offset
> provided is just the base offset and there is no way to kno
The challenge of directly exposing ProduceRequestResult is that the offset
provided is just the base offset and there is no way to know for a
particular message where it was in relation to that base offset because the
batching is transparent and non-deterministic. So I think we do need some
kind of
Hey Neha,
Error handling in RecordSend works as in Future you will get the exception
if there is one from any of the accessor methods or await().
The purpose of hasError was that you can write things slightly more simply
(which some people expressed preference for):
if(send.hasError())
// d
Regarding the use of Futures -
Agree that there are some downsides to using Futures but both approaches
have some tradeoffs.
- Standardization and usability
Future is a widely used and understood Java API and given that the
functionality that RecordSend hopes to provide is essentially that of
Fut
Hey Tom,
So is there one connection and I/O thread per broker and a low-level client
for each of those, and then you hash into that to partition? Is it possible
to batch across partitions or only within a partition?
-Jay
On Wed, Jan 29, 2014 at 8:41 AM, Tom Brown wrote:
> Jay,
>
> There is bo
Jay,
There is both a basic client object, and a number of IO processing threads.
The client object manages connections, creating new ones when new machines
are connected, or when existing connections die. It also manages a queue of
requests for each server. The IO processing thread has a selector,
Hey Neha,
Can you elaborate on why you prefer using Java's Future? The downside in my
mind is the use of the checked InterruptedException and ExecutionException.
ExecutionException is arguable, but forcing you to catch
InterruptedException, often in code that can't be interrupted, seems
perverse.
Here are more thoughts on the public APIs -
- I suggest we use java's Future instead of custom Future especially since
it is part of the public API
- Serialization: I like the simplicity of the producer APIs with the
absence of serialization where we just deal with byte arrays for keys and
values
Hey Tom,
That sounds cool. How did you end up handling parallel I/O if you wrap the
individual connections? Don't you need some selector that selects over all
the connections?
-Jay
On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown wrote:
> I implemented a 0.7 client in pure java, and its API very cl
That makes sense. Thanks, Jay.
On Tue, Jan 28, 2014 at 4:38 PM, Jay Kreps wrote:
> Hey Roger,
>
> We really can't use ListenableFuture directly though I agree it is nice. We
> have had some previous experience with embedding google collection classes
> in public apis, and it was quite the disa
Hey Roger,
We really can't use ListenableFuture directly though I agree it is nice. We
have had some previous experience with embedding google collection classes
in public apis, and it was quite the disaster. The problem has been that
the google guys regularly go on a refactoring binge for no appa
I implemented a 0.7 client in pure java, and its API very closely resembled
this. (When multiple people independently engineer the same solution, it's
probably good... right?). However, there were a few architectural
differences with my client:
1. The basic client itself was just an asynchronous l
Hmmm, I would really strongly urge us to not introduce a zk dependency just
for discovery. People who want to implement this can certainly do so by
simply looking up urls and setting them in the consumer config, but our
experience with doing this at large scale was pretty bad. Hardcoding the
discov
+1 ListenableFuture: If this works similar to Deferreds in Twisted Python
or Promised IO in Javascript, I think this is a great pattern for
decoupling your callback logic from the place where the Future is
generated. You can register as many callbacks as you like, each in the
appropriate layer of
Hey Ross,
- ListenableFuture: Interesting. That would be an alternative to the direct
callback support we provide. There could be pros to this, let me think
about it.
- We could provide layering, but I feel that the serialization is such a
small thing we should just make a decision and chose one,
>> The producer since 0.8 is actually zookeeper free, so this is not new to
this client it is true for the current client as well. Our experience was
that direct zookeeper connections from zillions of producers wasn't a good
idea for a number of reasons.
The problem with several thousand connectio
+1 to zk bootstrap + close as an option at least
On Tue, Jan 28, 2014 at 10:09 AM, Neha Narkhede wrote:
> >> The producer since 0.8 is actually zookeeper free, so this is not new to
> this client it is true for the current client as well. Our experience was
> that direct zookeeper connections f
Sorry to tune in a bit late, but here goes.
> 1. The producer since 0.8 is actually zookeeper free, so this is not new to
> this client it is true for the current client as well. Our experience was
> that direct zookeeper connections from zillions of producers wasn't a good
> idea for a number of
Hi Jay,
- Just to add some more info/confusion about possibly using Future ...
If Kafka uses a JDK future, it plays nicely with other frameworks as well.
Google Guava has a ListenableFuture that allows callback handling to be
added via the returned future, and allows the callbacks to be passed
AutoCloseable would be nice for us as most of our code is using Java 7 at
this point.
I like Dropwizard's configuration mapping to POJOs via Jackson, but if you
wanted to stick with property maps I don't care enough to object.
If the producer only dealt with bytes, is there a way we could still d
Clark,
Yeah good point. Okay I'm sold on Closable. Autoclosable would be much
better, but for now we are retaining 1.6 compatibility and I suspect the
use case of temporarily creating a producer would actually be a more rare
case.
-Jay
On Mon, Jan 27, 2014 at 9:29 AM, Clark Breyman wrote:
> r
re: "Using package to avoid ambiguity" - Unlike Scala, this is really
cumbersome in Java as it doesn't support package imports or import aliases,
so the only way to distinguish is to use the fully qualified path.
re: Closable - it can throw IOException but is not required to. Same with
AutoClosea
Jay -
Config - your explanation makes sense. I'm just so accustomed to having
Jackson automatically map my configuration objects to POJOs that I've
stopped using property files. They are lingua franca. The only thought
might be to separate the config interface from the implementation to allow
for
Just to keep all the points in a single thread, here are a few other points
brought up by others:
1. Sriram/Guozhang: Should RecordSend.await() throw an exception if one
occurred during the call. The argument for this is that it is similar to
Future and an exception is the least surprising way to s
Clark,
With respect to maven it would be great to know if you see any issues with
the gradle stuff.
For serialization I would love to hear if any of the options I outlined
seemed good to you or if you have another idea.
For futures, that would be awesome to see how it would help. I agree that
ju
Thanks for the detailed thoughts. Let me elaborate on the config thing.
I agree that at first glance key-value strings don't seem like a very good
configuration api for a client. Surely a well-typed config class would be
better! I actually disagree and let me see if I can convince you.
My reasoni
Thanks Jay. I'll see if I can put together a more complete response,
perhaps as separate threads so that topics don't get entangled. In the mean
time, here's a couple responses:
Serialization: you've broken out a sub-thread so i'll reply there. My bias
is that I like generics (except for type-eras
Clark and all,
I thought a little bit about the serialization question. Here are the
options I see and the pros and cons I can think of. I'd love to hear
people's preferences if you have a strong one.
One important consideration is that however the producer works will also
need to be how the new
Hey Joe,
Metadata: Yes, this is how it works. You give a URL or a few URLs to
bootstrap from. From then on any metadata change will percolate up to all
producers so you should be able to dynamically change the cluster in any
way without needing to restart or reconfigure the producers. So I think y
My 2 cents:
Getting the broker metadata via active brokers is the way to go. It allows one
to dynamically rebalance or introduce a whole new set of servers into a cluster
just by adding them to the cluster and migrating partitions. We use this to
periodically introduce newer Kafka cluster cloud
Yeah I'll fix that name.
Hmm, yeah, I agree that often you want to be able delay network
connectivity until you have started everything up. But at the same time I
kind of loath special init() methods because you always forget to call them
and get one round of error every time. I wonder if in those
Hey Clark,
- Serialization: Yes I agree with these though I don't consider the loss of
generics a big issue. I'll try to summarize what I would consider the best
alternative api with raw byte[].
- Maven: We had this debate a few months back and the consensus was gradle.
Is there a specific issue
Jay,
Thanks for the explanation. I didn't realize that the broker list was for
bootstrapping and was not required to be a complete list of all brokers
(although I see now that it's clearly stated in the text description of the
parameter). Nonetheless, does it still make sense to make the config
Roger,
These are good questions.
1. The producer since 0.8 is actually zookeeper free, so this is not new to
this client it is true for the current client as well. Our experience was
that direct zookeeper connections from zillions of producers wasn't a good
idea for a number of reasons. Our inten
Andrey,
I think this should perform okay. We already create a number of objects per
message sent, one more shouldn't have too much performance impact if it is
just thousands per second.
-Jay
On Fri, Jan 24, 2014 at 2:28 PM, Andrey Yegorov wrote:
> So for each message that I need to send asynch
+1 all of Clark's points above.
On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman wrote:
> Jay - Thanks for the call for comments. Here's some initial input:
>
> - Make message serialization a client responsibility (making all messages
> byte[]). Reflection-based loading makes it harder to use gen
Jay - Thanks for the call for comments. Here's some initial input:
- Make message serialization a client responsibility (making all messages
byte[]). Reflection-based loading makes it harder to use generic codecs
(e.g. Envelope) or build up codec programmatically.
Non-default partitioning should
A couple comments:
1) Why does the config use a broker list instead of discovering the brokers
in ZooKeeper? It doesn't match the HighLevelConsumer API.
2) It looks like broker connections are created on demand. I'm wondering
if sometimes you might want to flush out config or network connectivi
So for each message that I need to send asynchronously I have to create a
new instance of callback and hold on to the message?
This looks nice in theory but in case of few thousands of request/sec this
could use up too much extra memory and push too much to garbage collector,
especially in case con
If I understand your use case I think usage would be something like
producer.send(message, new Callback() {
public void onCompletion(RecordSend send) {
if(send.hasError())
log.write(message);
}
});
Reasonable?
In other words you can include references to any variable
I love the callback in send() but I do not see how it helps in case of an
error.
Imagine the usecase: I want to write messages to the log so I can replay
them to kafka later in case if async send failed.
>From a brief look at the API I see that I'll get back RecordSend object
(which is not true al
67 matches
Mail list logo