On 1/24/14 7:41 PM, Jay Kreps wrote:
Yeah I'll fix that name.
Hmm, yeah, I agree that often you want to be able delay network
connectivity until you have started everything up. But at the same time I
kind of loath special init() methods because you always forget to call them
and get one round of error every time.
One pattern I've used in the past is to use lazy initialization but also
provide a method to eagerly do it. E.g., if init() wasn't called, the
first call of send() would call it for you.
I wonder if in those cases one could
just avoid creating the producer instance until you are ready to connect.
Basically if you think of the producer instance as the equivalent of a
socket connection or whatever this kind of makes sense.
-Jay
On Fri, Jan 24, 2014 at 4:34 PM, Roger Hoover <roger.hoo...@gmail.com>wrote:
Jay,
Thanks for the explanation. I didn't realize that the broker list was for
bootstrapping and was not required to be a complete list of all brokers
(although I see now that it's clearly stated in the text description of the
parameter). Nonetheless, does it still make sense to make the config
parameter more clear? Instead of BROKER_LIST_CONFIG, it could be something
like BROKER_LIST_INITIAL_CONFIG or BROKER_DISCOVERY_LIST_CONFIG or
BROKER_BOOTSTRAP_LIST_CONFIG?
I like the idea of proactively checking that at least one broker url is
working and failing fast if it is not. My 2 cents is that it should be
triggered by a method call like initialize() rather than doing it in the
constructor. Sometimes for unit tests or other purposes, you want to be
able to create objects without triggering network dependencies.
Cheers,
Roger
On Fri, Jan 24, 2014 at 4:13 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
Roger,
These are good questions.
1. The producer since 0.8 is actually zookeeper free, so this is not new
to
this client it is true for the current client as well. Our experience was
that direct zookeeper connections from zillions of producers wasn't a
good
idea for a number of reasons. Our intention is to remove this dependency
from the consumer as well. The configuration in the producer doesn't need
the full set of brokers, though, just one or two machines to bootstrap
the
state of the cluster from--in other words it isn't like you need to
reconfigure your clients every time you add some servers. This is exactly
how zookeeper works too--if we used zookeeper you would need to give a
list
of zk urls in case a particular zk server was down. Basically either way
you need a few statically configured nodes to go to discover the full
state
of the cluster. For people who don't like hard coding hosts you can use a
VIP or dns or something instead.
2. Yes this is a good point and was a concern I had too--the current
behavior is that with bad urls the client would start normally and then
hang trying to fetch metadata when the first message is sent and finally
give up and throw an exception. This is not ideal.
The challenge is this: we use the bootstrap urls to fetch metadata for
particular topics but we don't know which until we start getting messages
for them. We have the option of fetching metadata for all topics but the
problem is that for a cluster hosting tens of thousands of topics that is
actually a ton of data.
An alternative that this just made me think of is that we could
proactively
connect to bootstrap urls sequentially until one succeeds when the
producer
is first created and fail fast if we can't establish a connection. This
would not be wasted work as we could use the connection for the metadata
request when the first message is sent. I like this solution and will
implement it. So thanks for asking!
-Jay
On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
wrote:
A couple comments:
1) Why does the config use a broker list instead of discovering the
brokers
in ZooKeeper? It doesn't match the HighLevelConsumer API.
2) It looks like broker connections are created on demand. I'm
wondering
if sometimes you might want to flush out config or network connectivity
issues before pushing the first message through.
Should there also be a KafkaProducer.connect() or .open() method or
connectAll()? I guess it would try to connect to all brokers in the
BROKER_LIST_CONFIG
HTH,
Roger
On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
wrote:
As mentioned in a previous email we are working on a
re-implementation
of
the producer. I would like to use this email thread to discuss the
details
of the public API and the configuration. I would love for us to be
incredibly picky about this public api now so it is as good as
possible
and
we don't need to break it in the future.
The best way to get a feel for the API is actually to take a look at
the
javadoc, my hope is to get the api docs good enough so that it is
self-explanatory:
http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
Please take a look at this API and give me any thoughts you may have!
It may also be reasonable to take a look at the configs:
http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
The actual code is posted here:
https://issues.apache.org/jira/browse/KAFKA-1227
A few questions or comments to kick things off:
1. We need to make a decision on whether serialization of the user's
key
and value should be done by the user (with our api just taking
byte[])
or
if we should take an object and allow the user to configure a
Serializer
class which we instantiate via reflection. We take the later approach
in
the current producer, and I have carried this through to this
prototype.
The tradeoff I see is this: taking byte[] is actually simpler, the
user
can
directly do whatever serialization they like. The complication is
actually
partitioning. Currently partitioning is done by a similar plug-in api
(Partitioner) which the user can implement and configure to override
how
partitions are assigned. If we take byte[] as input then we have no
access
to the original object and partitioning MUST be done on the byte[].
This
is
fine for hash partitioning. However for various types of semantic
partitioning (range partitioning, or whatever) you would want access
to
the
original object. In the current approach a producer who wishes to
send
byte[] they have serialized in their own code can configure the
BytesSerialization we supply which is just a "no op" serialization.
2. We should obsess over naming and make sure each of the class names
are
good.
3. Jun has already pointed out that we need to include the topic and
partition in the response, which is absolutely right. I haven't done
that
yet but that definitely needs to be there.
4. Currently RecordSend.await will throw an exception if the request
failed. The intention here is that producer.send(message).await()
exactly
simulates a synchronous call. Guozhang has noted that this is a
little
annoying since the user must then catch exceptions. However if we
remove
this then if the user doesn't check for errors they won't know one
has
occurred, which I predict will be a common mistake.
5. Perhaps there is more we could do to make the async callbacks and
future
we give back intuitive and easy to program against?
Some background info on implementation:
At a high level the primary difference in this producer is that it
removes
the distinction between the "sync" and "async" producer. Effectively
all
requests are sent asynchronously but always return a future response
object
that gives the offset as well as any error that may have occurred
when
the
request is complete. The batching that is done in the async producer
only
today is done whenever possible now. This means that the sync
producer,
under load, can get performance as good as the async producer
(preliminary
results show the producer getting 1m messages/sec). This works
similar
to
group commit in databases but with respect to the actual network
transmission--any messages that arrive while a send is in progress
are
batched together. It is also possible to encourage batching even
under
low
load to save server resources by introducing a delay on the send to
allow
more messages to accumulate; this is done using the linger.ms config
(this
is similar to Nagle's algorithm in TCP).
This producer does all network communication asynchronously and in
parallel
to all servers so the performance penalty for acks=-1 and waiting on
replication should be much reduced. I haven't done much benchmarking
on
this yet, though.
The high level design is described a little here, though this is now
a
little out of date:
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
-Jay