On 1/30/14 8:18 PM, Joel Koshy wrote:
That's a good point about 1A - does seem that we would need to have
some kind of TTL for each topic's metadata.

Also, WRT ZK dependency I don't think that decision (for the Java
client) affects other clients. i.e., other client implementations can
use whatever discovery mechanism it chooses. That said, I prefer not
having a ZK dependency for the same reasons covered earlier in this
thread.
FWIW, I think including ZK for broker discovery is a nice feature. Users of kafka-python are constantly asking for something like this. If client dependencies are a concern, then we could abstract the bootstrap strategy into a simple pluggable interface so we could publish a separate artifact. I could also imagine some AWS-specific bootstrap strategy (e.g., get hosts from a particular security group, load balancer/auto-scaling group, etc).

Or, we could just include ZK

-David

On Thu, Jan 30, 2014 at 4:34 PM, Jun Rao <jun...@gmail.com> wrote:
With option 1A, if we increase # partitions on a topic, how will the
producer find out newly created partitions? Do we expect the producer to
periodically call getCluster()?

As for ZK dependency, one of the goals of client rewrite is to reduce
dependencies so that one can implement the client in languages other than
java. ZK client is only available in a small number of languages.

Thanks,

Jun


On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

Clark and all,

I thought a little bit about the serialization question. Here are the
options I see and the pros and cons I can think of. I'd love to hear
people's preferences if you have a strong one.

One important consideration is that however the producer works will also
need to be how the new consumer works (which we hope to write next). That
is if you put objects in, you should get objects out. So we need to think
through both sides.

Options:

Option 0: What is in the existing scala code and the java code I
posted--Serializer and Partitioner plugin provided by the user via config.
Partitioner has a sane default, but Serializer needs to be specified in
config.

Pros: How it works today in the scala code.
Cons: You have to adapt your serialization library of choice to our
interfaces. The reflective class loading means typo in the serializer name
give odd errors. Likewise there is little type safety--the ProducerRecord
takes Object and any type errors between the object provided and the
serializer give occurs at runtime.

Option 1: No plugins

This would mean byte[] key, byte[] value, and partitioning done by client
by passing in a partition *number* directly.

The problem with this is that it is tricky to compute the partition
correctly and probably most people won't. We could add a getCluster()
method to return the Cluster instance you should use for partitioning. But
I suspect people would be lazy and not use that and instead hard-code
partitions which would break if partitions were added or they hard coded it
wrong. In my experience 3 partitioning strategies cover like 99% of cases
so not having a default implementation for this makes the common case
harder. Left to their own devices people will use bad hash functions and
get weird results.

Option 1A: Alternatively we could partition by the key using the existing
default partitioning strategy which only uses the byte[] anyway but instead
of having a partitionKey we could have a numerical partition override and
add the getCluster() method to get the cluster metadata. That would make
custom partitioning possible but handle the common case simply.

Option 2: Partitioner plugin remains, serializers go.

The problem here is that the partitioner might lose access to the
deserialized key which would occasionally be useful for semantic
partitioning schemes. The Partitioner could deserialize the key but that
would be inefficient and weird.

This problem could be fixed by having key and value be byte[] but retaining
partitionKey as an Object and passing it to the partitioner as is. Then if
you have a partitioner which requires the deserialized key you would need
to use this partition key. One weird side effect is that if you want to
have a custom partition key BUT want to partition by the bytes of that key
rather than the object value you must write a customer partitioner and
serialize it yourself.

Of these I think I prefer 1A but could be convinced of 0 since that is how
it works now.

Thoughts?

-Jay


On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote:

Jay - Thanks for the call for comments. Here's some initial input:

- Make message serialization a client responsibility (making all messages
byte[]). Reflection-based loading makes it harder to use generic codecs
(e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec
programmatically.
Non-default partitioning should require an explicit partition key.

- I really like the fact that it will be native Java. Please consider
using
native maven and not sbt, gradle, ivy, etc as they don't reliably play
nice
in the maven ecosystem. A jar without a well-formed pom doesn't feel
like a
real artifact. The pom's generated by sbt et al. are not well formed.
Using
maven will make builds and IDE integration much smoother.

- Look at Nick Telford's dropwizard-extras package in which he defines
some
Jackson-compatible POJO's for loading configuration. Seems like your
client
migration is similar. The config objects should have constructors or
factories that accept Map<String, String> and Properties for ease of
migration.

- Would you consider using the org.apache.kafka package for the new API
(quibble)

- Why create your own futures rather than use
java.util.concurrent.Future<Long> or similar? Standard futures will play
nice with other reactive libs and things like J8's ComposableFuture.

Thanks again,
C



On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
wrote:
A couple comments:

1) Why does the config use a broker list instead of discovering the
brokers
in ZooKeeper?  It doesn't match the HighLevelConsumer API.

2) It looks like broker connections are created on demand.  I'm
wondering
if sometimes you might want to flush out config or network connectivity
issues before pushing the first message through.

Should there also be a KafkaProducer.connect() or .open() method or
connectAll()?  I guess it would try to connect to all brokers in the
BROKER_LIST_CONFIG

HTH,

Roger


On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
wrote:
As mentioned in a previous email we are working on a
re-implementation
of
the producer. I would like to use this email thread to discuss the
details
of the public API and the configuration. I would love for us to be
incredibly picky about this public api now so it is as good as
possible
and
we don't need to break it in the future.

The best way to get a feel for the API is actually to take a look at
the
javadoc, my hope is to get the api docs good enough so that it is
self-explanatory:


http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
Please take a look at this API and give me any thoughts you may have!

It may also be reasonable to take a look at the configs:


http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
The actual code is posted here:
https://issues.apache.org/jira/browse/KAFKA-1227

A few questions or comments to kick things off:
1. We need to make a decision on whether serialization of the user's
key
and value should be done by the user (with our api just taking
byte[])
or
if we should take an object and allow the user to configure a
Serializer
class which we instantiate via reflection. We take the later approach
in
the current producer, and I have carried this through to this
prototype.
The tradeoff I see is this: taking byte[] is actually simpler, the
user
can
directly do whatever serialization they like. The complication is
actually
partitioning. Currently partitioning is done by a similar plug-in api
(Partitioner) which the user can implement and configure to override
how
partitions are assigned. If we take byte[] as input then we have no
access
to the original object and partitioning MUST be done on the byte[].
This
is
fine for hash partitioning. However for various types of semantic
partitioning (range partitioning, or whatever) you would want access
to
the
original object. In the current approach a producer who wishes to
send
byte[] they have serialized in their own code can configure the
BytesSerialization we supply which is just a "no op" serialization.
2. We should obsess over naming and make sure each of the class names
are
good.
3. Jun has already pointed out that we need to include the topic and
partition in the response, which is absolutely right. I haven't done
that
yet but that definitely needs to be there.
4. Currently RecordSend.await will throw an exception if the request
failed. The intention here is that producer.send(message).await()
exactly
simulates a synchronous call. Guozhang has noted that this is a
little
annoying since the user must then catch exceptions. However if we
remove
this then if the user doesn't check for errors they won't know one
has
occurred, which I predict will be a common mistake.
5. Perhaps there is more we could do to make the async callbacks and
future
we give back intuitive and easy to program against?

Some background info on implementation:

At a high level the primary difference in this producer is that it
removes
the distinction between the "sync" and "async" producer. Effectively
all
requests are sent asynchronously but always return a future response
object
that gives the offset as well as any error that may have occurred
when
the
request is complete. The batching that is done in the async producer
only
today is done whenever possible now. This means that the sync
producer,
under load, can get performance as good as the async producer
(preliminary
results show the producer getting 1m messages/sec). This works
similar
to
group commit in databases but with respect to the actual network
transmission--any messages that arrive while a send is in progress
are
batched together. It is also possible to encourage batching even
under
low
load to save server resources by introducing a delay on the send to
allow
more messages to accumulate; this is done using the linger.ms config
(this
is similar to Nagle's algorithm in TCP).

This producer does all network communication asynchronously and in
parallel
to all servers so the performance penalty for acks=-1 and waiting on
replication should be much reduced. I haven't done much benchmarking
on
this yet, though.

The high level design is described a little here, though this is now
a
little out of date:
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

-Jay


Reply via email to