Hey Jay,

Reading over the wiki (and email thread). Here are some questions/comments:

"Make the producer fully async to to allow issuing sends to all brokers
simultaneously and having multiple in-flight requests simultaneously. This
will dramatically reduce the impact of latency on throughput (which is
important with replication)."

Can you say a bit more about this? You're only talking about the async
case, right? If I set a producer to sync, acks=-1, producer.send() will
still block as expected, right?

"Move to server-side offset management will allow us to scale this
facility which is currently a big scalability problem for high-commit rate
consumers due to zk non scalability."

Just confirming that the proposal still allows us to store a K/V map (or
metadata), in addition to just offsets, right? This was in the older
proposal that I saw, but I just wanted to make sure. The consumer API
don't seem to reflect this.

"""
SendResponse r = producer.send(new KafkaMessage(topic, key, message));
r.onCompletion(new Runnable() {System.out.println("All done")})
r.getOffset()
r.getError()
"""

To block (wait on the future to return) in the new API, I have to either
call getOffset or getError? Or is onCompletion blocking?

"List<MessageAndOffset> messages = consumer.poll(timeout);"

Does it make sense to allow to poll only specific topics? If we move to an
epoll/selector model, would it be OK to just poll for certain topic
handles, and let the other handles sit there with their socket buffers
filling up? The use case that I'm thinking of is where you've registered
two topics, but only want to consume from one for a while. In the API
you've proposed, the consumer keeps feeding messages from the other topic,
which you then have to buffer (and potentially run out of memory). A
work-around is to have one consumer per-topic, in this case, but I'd
rather just let the OS-level socket buffer do the buffering, if that's
possible.

"The timeout the user specifies will be purely to ensure we have a
mechanism to give control back to the user even when no messages are
delivered. It is up to the user to ensure poll() is called again within
the heartbeat frequency set for the consumer group. Internally the timeout
on our select() may uses a shorter timeout to ensure the heartbeat
frequency is met even when no messages are delivered."

I don't think I understand this. What's the heartbeat you're talking about
here? Is this the consumer membership group heartbeat?

"We will introduce a set of RPC apis for managing partition assignment on
the consumer side ... This set of APIs is orthogonal to the APIs for
producing and consuming data, it is responsible for group membership."

I'm a little confused. I think what you're saying is the API for managing
partition assignment is totally generic, but the consumer is going to use
it to manage its partition groups, right? I could use it for other
partition assignment, if I wanted to, though, correct? Is this going to
require instantiating a consumer to use, or will there be some other
partition group API/connection/thingy that I can use?

"Create a new group with the given name and the specified minimum
heartbeat frequency. Return the id/host/port of the server acting as the
controller for that group.
If the ephemeral flag is set the group will disappear when the last client
exits."

What happens if create_group X is called more than once? Can this be used
for leadership election (first call creates the group, and is notified
that it' the leader)? There are a lot of gaps to fill in there, but it
could be a pretty useful feature.

"on receiving acknowledgements from all consumers of the group membership
change the controller sends the a group_changed message to all the new
group members"

Suppose that an existing consumer owns partition 7 for a given topic. A
new consumer joins the group, and the partition assignments work out such
that the new consumer should own partition 7. As I understand it, tho
means that the old consumer should stop consuming from partition 7 when
begin_group_change is sent, right? This means a) no one consumes partition
7 until consensus is gathered, and b) if the any consumer dies before its
ack is sent, you have to wait up to one full heartbeat before partition 7
is consumed again, correct? What kind of heartbeat do we expect to be
"normal"? 10s? 60s?

Also, how are offsets handled during this transition? Should the old
consumer checkpoint its offset for the topic/partition that it's
relinquishing? Are duplicate messages going to be consumed during this
transition (i.e. A messages is consumed by both the old and new consumer)?

Also^2, what exactly is specified in the begin_group_change notification?
If you don't handle any partition assignment inside Kafka, don't the
consumers all need to know who is in the group? The wiki list no fields
for this. It seems like you'd need something like [consumer1, consumer2,
conumer3], or something, to do deterministic ring/hash-based partition
assignment entirely client side. If this is the case, will Kafka assign
consumer group IDs to each registered consumer? This would make it easier
to implement client-side partition assignment.

"Currently we do a lot of logging and metrics recording in the client. I
think instead of handling reporting of metrics and logging we should
instead incorporate this feedback in the client apis and allow the user to
log or monitor this in the manner they chose. This will avoid dependence
on a particular logging or metrics library and seems like a good policy."

+1

"""
How many jars should we have? I think we could do either

* Option A: kafka-clients.jar and kafka-server.jar or
* Option B: kakfa-common.jar, kafka-producer.jar, kafka-consumer.jar, and
kafka-server.jar

I prefer Option A as it is simpler if we add an AdminApi--we won't need to
introduce a whole new jar for it.
"""

An argument for option B is that it would make it easier to consume from
one version of Kafka and produce to another, provided that kafka-common is
API and runtime (protocol) compatible (MirrorMaker). It could just be a
pipe dream, since almost every version bump is likely to introduce some
level of incompatibility in the API. A third (crazy) option is to have
only two jars: kafka-producer, and kafka-conumer, but to have a third
FOLDER called kafka-common in the source tree. Using SBT (heh), prior to
the compile task, copy the kafka-common folder into non-colliding package
spaces in both the kafka-consumer and kafka-producer jars. This would
essentially result in two entirely stand-alone jars, allow for different
consumer/producer versions, and allow for only a single "common" source
tree. I know, it's crazy, but I thought I'd throw it out there anyway.
Also not sure how well this idea would work with IDEs.

"I am thinking of moving both serialization and compression out of the
background send thread."

I take this to mean that the consumer/producer API is just essentially
send(byte[])/poll(byte[]). Making users do (de)serialization in their own
code seems pretty reasonable, but I'm kind of bummed if I'm going to have
to manage my own compression too. It's handy to just flip a switch and get
compression out of the box. An example of this exact API would be LevelDB,
which takes bytes-in/bytes-out, but can handle compression for you, if you
wish (defaults to auto-compressing with Snappy, I think).

Cheers,
Chris



On 7/26/13 3:41 PM, "Xavier Stevens" <xav...@gaikai.com> wrote:

>+1 to making the API use bytes and push serialization into the client.
>This
>is effectively what I am doing currently anyway. I implemented a generic
>Encoder<ByteString> which just passes the bytes through.
>
>I also like the idea of the client being written in pure Java. Interacting
>with Scala code from Java isn't nearly as nice as the other way around.
>
>Just my 2 cents.
>
>-Xavier
>
>
>
>On Fri, Jul 26, 2013 at 2:46 PM, Jason Rosenberg <j...@squareup.com> wrote:
>
>> Jay,
>>
>> This seems like a great direction.  Simplifying the consumer client
>>would
>> be a big win, and +1 for more native java client integration.
>>
>> On the last point, regarding memory usage for buffering per partition.
>>I
>> would think it could be possible to devise a dynamic queuing system, to
>> allow higher volume partitions to have larger effective buffers than
>> smaller, low-volume partitions.  Thus, if you reserve a fixed
>> total.buffer.memory, you could allocate units of buffer space which
>>could
>> then be composed to make larger buffers (perhaps not necessarily
>> contiguous).  The long-tail of low-volume partitions could also be
>>moved to
>> some sort of auxiliary, non-collated buffer space, as they are less
>>likely
>> to benefit from contiguous buffering anyway.
>>
>> Fun stuff.
>>
>> Jason
>>
>> Jason
>>
>>
>> On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>
>> > I sent around a wiki a few weeks back proposing a set of client
>> > improvements that essentially amount to a rewrite of the producer and
>> > consumer java clients.
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>> >
>> > The below discussion assumes you have read this wiki.
>> >
>> > I started to do a little prototyping for the producer and wanted to
>>share
>> > some of the ideas that came up to get early feedback.
>> >
>> > First, a few simple but perhaps controversial things to discuss.
>> >
>> > Rollout
>> > Phase 1: We add the new clients. No change on the server. Old clients
>> still
>> > exist. The new clients will be entirely in a new package so there
>>will be
>> > no possibility of name collision.
>> > Phase 2: We swap out all shared code on the server to use the new
>>client
>> > stuff. At this point the old clients still exist but are essentially
>> > deprecated.
>> > Phase 3: We remove the old client code.
>> >
>> > Java
>> > I think we should do the clients in java. Making our users deal with
>> > scala's non-compatability issues and crazy stack traces causes people
>>a
>> lot
>> > of pain. Furthermore we end up having to wrap everything now to get a
>> > usable java api anyway for non-scala people. This does mean
>>maintaining a
>> > substantial chunk of java code, which is maybe less fun than scala.
>>But
>> > basically i think we should optimize for the end user and produce a
>> > standalone pure-java jar with no dependencies.
>> >
>> > Jars
>> > We definitely want to separate out the client jar. There is also a
>>fair
>> > amount of code shared between both (exceptions, protocol definition,
>> utils,
>> > and the message set implementation). Two approaches.
>> > Two jar approach: split kafka.jar into kafka-clients.jar and
>> > kafka-server.jar with the server depending on the clients. The
>>advantage
>> of
>> > this is that it is simple. The disadvantage is that things like utils
>>and
>> > protocol definition will be in the client jar though technical they
>> belong
>> > equally to the server.
>> > Many jar approach: split kafka.jar into kafka-common.jar,
>> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
>> > kafka-server.jar. The disadvantage of this is that the user needs two
>> jars
>> > (common + something) which is for sure going to confuse people. I also
>> > think this will tend to spawn more jars over time.
>> >
>> > Background threads
>> > I am thinking of moving both serialization and compression out of the
>> > background send thread. I will explain a little about this idea below.
>> >
>> > Serialization
>> > I am not sure if we should handle serialization in the client at all.
>> > Basically I wonder if our own API wouldn't just be a lot simpler if we
>> took
>> > a byte[] key and byte[] value and let people serialize stuff
>>themselves.
>> > Injecting a class name for us to create the serializer is more
>>roundabout
>> > and has a lot of problems if the serializer itself requires a lot of
>> > configuration or other objects to be instantiated.
>> >
>> > Partitioning
>> > The real question with serialization is whether the partitioning
>>should
>> > happen on the java object or on the byte array key. The argument for
>> doing
>> > it on the java object is that it is easier to do something like a
>>range
>> > partition on the object. The problem with doing it on the object is
>>that
>> > the consumer may not be in java and so may not be able to reproduce
>>the
>> > partitioning. For example we currently use Object.hashCode which is a
>> > little sketchy. We would be better off doing a standardized hash
>>function
>> > on the key bytes. If we want to give the partitioner access to the
>> original
>> > java object then obviously we need to handle serialization behind our
>> api.
>> >
>> > Names
>> > I think good names are important. I would like to rename the following
>> > classes in the new client:
>> >   Message=>Record: Now that the message has both a message and a key
>>it
>> is
>> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>> >   MessageSet=>Records: This isn't too important but nit pickers
>>complain
>> > that it is not technically a Set but rather a List or Sequence but
>> > MessageList sounds funny to me.
>> >
>> > The actual clients will not interact with these classes. They will
>> interact
>> > with a ProducerRecord and ConsumerRecord. The reason for having
>>different
>> > fields is because the different clients
>> > Proposed producer API:
>> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>> >
>> > Protocol Definition
>> >
>> > Here is what I am thinking about protocol definition. I see a couple
>>of
>> > problems with what we are doing currently. First the protocol
>>definition
>> is
>> > spread throughout a bunch of custom java objects. The error reporting
>>in
>> > these object is really terrible because they don't record the field
>> names.
>> > Furthermore people keep adding business logic into the protocol
>>objects
>> > which is pretty nasty.
>> >
>> > I would like to move to having a single Protocol.java file that
>>defines
>> the
>> > protocol in a readable DSL. Here is what I am thinking:
>> >
>> >   public static Schema REQUEST_HEADER =
>> >
>> >     new Schema(new Field("api_key", INT16, "The id of the request
>> type."),
>> >
>> >                new Field("api_version", INT16, "The version of the
>> API."),
>> >
>> >                  new Field("correlation_id", INT32, "A user-supplied
>> > integer value that will be passed back with the response"),
>> >
>> >                  new Field("client_id", STRING, "A user specified
>> > identifier for the client making the request."));
>> >
>> > To parse one of these requests you would do
>> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>> >    short apiKey = struct.get("api_key");
>> >
>> > Internally Struct is just an Object[] with one entry per field which
>>is
>> > populated from the schema. The mapping of name to array index is a
>>hash
>> > table lookup. We can optimize access for performance critical areas by
>> > allowing:
>> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); //
>>do
>> > this once to lookup the index of the field
>> >    ...
>> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>> >    short apiKey = struct.get(apiKeyField); // now this is just an
>>array
>> > access
>> >
>> > One advantage of this is this level of indirection will make it really
>> easy
>> > for us to handle backwards compatability in a more principled way. The
>> > protocol file will actually contain ALL versions of the schema and we
>> will
>> > always use the appropriate version to read the request (as specified
>>in
>> the
>> > header).
>> >
>> > NIO layer
>> >
>> > The plan is to add a non-blocking multi-connection abstraction that
>>would
>> > be used by both clients.
>> >
>> > class Selector {
>> >   /* create a new connection and associate it with the given id */
>> >   public void connect(int id, InetSocketAddress address,
>> intsendBufferSize,
>> > int receiveBufferSize)
>> >   /* wakeup this selector if it is currently awaiting data */
>> >   public void wakeup()
>> >   /* user provides sends, recieves, and a timeout. this method will
>> > populate "completed" and "disconnects" lists. Method blocks for up to
>>the
>> > timeout waiting for data to read. */
>> >   public void poll(long timeout, List<Send> sends, List<Send>
>>completed,
>> > List<Receive> receives, List<Integer> disconnects)
>> > }
>> >
>> > The consumer and producer would then each define their own logic to
>> manage
>> > their set of in-flight requests.
>> >
>> > Producer Implementation
>> >
>> > There are a couple of interesting changes I think we can make to the
>> > producer implementation.
>> >
>> > We retain the single background "sender" thread.
>> >
>> > But we can remove the definition of sync vs async clients. We always
>> return
>> > a "future" response immediately. Both sync and async sends would go
>> through
>> > the buffering that we currently do for the async layer. This would
>>mean
>> > that even in sync mode while the event loop is doing network IO if
>>many
>> > requests accumulate they will be sent in a single batch. This
>>effectively
>> > acts as a kind of "group commit". So instead of having an "async" mode
>> that
>> > acts differently in some way you just have a max.delay time that
>>controls
>> > how long the client will linger waiting for more data to accumulate.
>> > max.delay=0 is equivalent to the current sync producer.
>> >
>> > I would also propose changing our buffering strategy. Currently we
>>queue
>> > unserialized requests in a BlockingQueue. This is not ideal as it is
>>very
>> > difficult to reason about the memory usage of this queue. One 5MB
>>message
>> > may be bigger than 10k small messages. I propose that (1) we change
>>our
>> > queuing strategy to queue per-partition and (2) we directly write the
>> > messages to the ByteBuffer which will eventually be sent and use that
>>as
>> > the "queue". The batch size should likewise be in bytes not in number
>>of
>> > messages.
>> >
>> > If you think about it our current queuing strategy doesn't really make
>> > sense any more now that we always load balance over brokers. You set a
>> > batch size of N and we do a request when we have N messages in queue
>>but
>> > this says nothing about the size of the requests that will be sent.
>>You
>> > might end up sending all N messages to one server or you might end up
>> > sending 1 message to N different servers (totally defeating the
>>purpose
>> of
>> > batching).
>> >
>> > There are two granularities of batching that could make sense: the
>>broker
>> > level or the partition level. We do the send requests at the broker
>>level
>> > but we do the disk IO at the partition level. I propose making the
>>queues
>> > per-partition rather than per broker to avoid having to reshuffle the
>> > contents of queues when leadership changes. This could be debated,
>> though.
>> >
>> > If you actually look at the byte path of the producer this approach
>> allows
>> > cleaning a ton of stuff up. We can do in-pace writes to the
>>destination
>> > buffer that we will eventually send. This does mean moving
>>serialization
>> > and compression to the user thread. But I think this is good as these
>>may
>> > be slow but aren't unpredictably slow.
>> >
>> > The per-partition queues are thus implemented with a bunch of
>> pre-allocated
>> > ByteBuffers sized to max.batch.size, when the buffer is full or the
>>delay
>> > time elapses that buffer is sent.
>> >
>> > By doing this we could actually just reuse the buffers when the send
>>is
>> > complete. This would be nice because since the buffers are used for
>> > allocation they will likely fall out of young gen.
>> >
>> > A question to think about is how we want to bound memory usage. I
>>think
>> > what we want is the max.batch.size which controls the size of the
>> > individual buffers and total.buffer.memory which controls the total
>> memory
>> > used by all buffers. One problem with this is that there is the
>> possibility
>> > of some fragmentation. I.e. image a situation with 5k partitions being
>> > produced to, each getting a low but steady message rate. Giving each
>>of
>> > these a 1MB buffer would require 5GB of buffer space to have a buffer
>>for
>> > each partition. I'm not sure how bad this is since at least the memory
>> > usage is predictable and the reality is that holding thousands of java
>> > objects has huge overhead compared to contiguous byte arrays.
>> >
>> > -Jay
>> >
>>

Reply via email to