[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880752#comment-13880752
 ] 

Jay Kreps commented on KAFKA-1227:
----------------------------------

Hey Guozhang, thanks for the detailed stylistic questions. I think these are 
important to discuss. Quick responses inline:

_1. How to we decide where to put Exception definitions? Currently we have an 
errors folder in kafka.comm and some folders also have their only exceptions._

That package was meant to be explicitly API errors. I.e. those errors which are 
defined in ErrorKeys.java with a registered error code and have a bidirectional 
mapping to this code. These represent communication between the client and 
server so I wanted to put them in a special place. In general exceptions should 
be kept with the package with which they most naturally fit (ConfigException 
goes with config, etc).

The most important code organization principle I had in mind was that each 
package should be either public or private. Only public packages will be 
javadoc'd. All classes in a public package are exposed to the user and are part 
of the public interface. The idea is that we would be very careful with these 
public packages. I considered actually differentiating these as something like 
kafka.clients.producer.pub or something but I thought that was a bit 
ugly--maybe there is another way to differentiate or annotate classes so that 
we are very explicit about public or private. Essentially any change to 
interfaces in these packages is breaking all our users so we have to think very 
carefully about API design and change. The rest of the classes (most of them 
actually) are really just an implementation detail and we can change them at 
will.

Currently the public packages are just:
  kafka.clients.producer
  kafka.common
  kafka.common.errors

One item I wanted to document and discuss as a separate thread was code 
organization as I think these kinds of conventions only work if they are 
documented and broadly understood.

2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the 
requests definitions are highly dependent on the protocol class?

I would rather not. The kafka.common.network package defines a low-level 
network framing based on size delimited messages. This is fully generic--the 
unit test tests an "echo server". It is not tied to any details of our protocol 
and it is really important that people not leak details of our protocol into 
it!!!! :-)

The protocol is just a bunch of message definitions and isn't tied to the 
network transport or framing at all. It is just a way of laying out bytes.

The request package combines the protocol definition and network framing.

I am hoping to keep these things orthogonal.

Once we get our build fixed (ahem), I'd really like us to get checkstyle 
integrated so we can enforce these kinds of package dependencies and keep some 
kind of logical coherence. It has been a struggle otherwise.

3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one 
sub-folder, for example, called kafka.comm.metadata?

I'm open to that. I liked having the current flat package for simplicity for 
the user (fewer things to import). Basically I am trying to make the javadoc 
for the producer as simple and flat as possible.

4. Shall we put the Serializer classes into the protocol folder?

The Serializer is the PUBLIC interface for users to serialize their messages. 
It actually isn't related to the definition of our protocol definition.

5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?

Yeah I think that is actually how it is. I previously had it separate and the 
rationale was that many of the classes...e.g. the Selector were really written 
with the clients in mind. Theoretically the same Selector class could be the 
basis for the socket server but I didn't really think those use cases through.

1. Since the nextNode use global round robin, we need to make sure no more than 
one objects access a single Cluster’s nextNode.

That may just be a bad name. The goal of that method was load balancing not 
iterating over the nodes. So actually the intention was to give a different 
node to each thread in the multithreaded case. 

1. Shall we put config names such as ENCODING_CONFIG all in a single file?

I planned to do a discussion on config. The way it works is that configs are 
defined by the ConfigDef. However we allow plug-in interfaces (Serializer, 
Partitioner, etc). These may need configs too, but these are (generally 
speaking) user classes. So we allow including user-defined configs. So 
StringSerializer is essentially a user plug-in that seemed useful enough to 
include in the main code. I think it makes more sense to document it's configs 
with the class rather than elsewhere.

— kafka.common.AbstractIterator
1. makeNext is not supposed to left in other states other than DONE and READY?

Yeah this is basically a transliteration of the same class in the main code 
base which is a transliteration of the iterator in Google Collections.

1. kafka.common.protocl.Schema: Will Field order difference make to different 
schemas?

Yes our protocol is based on position not name.

1. kafka.common.protocl.ProtoUtil: parseMetadataResponse: after reading the 
function I feel that the TopicInfo/PartitionInfo object for parsing might be 
preferable. We can put these objects in the Protocol.java file so any protocol 
change would only require one file edit.

I'd like to have a discussion about this. I tried this approach. The problem is 
that the mixing of protocol definition with business logic leads to leaking of 
logic into the protocol and makes the protocol hard to read. There are several 
other options. It would be good to discuss.

1. kafka.common.record.LogEntry: Maybe we can rename to OffsetRecord?
Hmm, I'm open to changing it but I'm not sure that's better. I try to avoid 
names like TopicPartition which are just the concatenation of all the fields as 
I feel the purpose of a name is to capture the concept the fields describe. 
I.e. if we add a new field we shouldn't need to lengthen the name!

1. kafka.common.record.Record: Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD 
to be different in the future? Currently their values are the same and the way 
they are computed are also identical.

Good point I'll look into this, these are just copied from the scala.

1. kafka.common.request.RequestHeader: Is it better to define "client_id" 
strings as static field in the Protocol.java?
Unlikely. Currently you can access a field by the string name or the field 
instance. The field instance is an array access and the name is a hash table 
lookup to get the field followed by an array access. So the two reasonable 
options are to have static variables for the Fields or to access with Strings. 
Let's procrastinate that to the discussion of handling request definitions.

2. kafka.client.common.NetworkReceive: Does REQUEST/RESPONSE_HEADER also need 
to be versioned?

If we want to change it. We didn't have a version number for these in the 
protocol so we can't add one now. Any header change today is non-backwards 
compatible.

1. In the first constructor, why not also initializing the size buffer also to 
ByteBuffer.allocate(4)?

The point of the size buffer is to read the size to allocate and read the 
message buffer, but that constructor takes an already allocated/read message 
buffer. I was using that constructor for unit testing to fake responses that 
weren't really being read.

2. Why NetworkReceive not extending ByteBufferReceive?

Yes this distressed me as well. Here is the issue. I want 
ByteBufferSend/NetworkSend to work on an array of bytebuffers to handle the 
case where you already have serialized chunks of data (i.e. message sets). But 
in the case of a receive we don't currently have a good way to concatenate 
buffers so reading into multiple buffers isn't useful. This is basically a 
limitation of the ByteBuffer api.

1. kafka.client.common.Selector: “transmissions.send.remaining() <= 0”, under 
what condition can remaining() be < 0?

None, I think.

2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans 
== null?

Well that would give a null pointer, no? What this is saying is "if we have an 
id for this connection, record it as disconnected".

1. kafka.client.producer.internals.BufferPoolIn the freeUp() function, should 
use this.free.pollLast().capacity() instead of limit()?

Yeah that would be better, technically anything on the free list must have 
capacity==limit but I think it is bad to depend on that.

2. What is the rational of having just one poolable size?

Basically just to avoid implementing malloc. I guess the choice is either to 
implement a generic BufferPool or one specific to the needs of the producer. 
Since most of the time the producer will be doing allocations of that poolable 
size it makes sense to keep those around. If you try to keep arbitrary sizes I 
think things quickly get complex but since we will almost always be allocating 
the same size it seemed simpler to just handle that case. I'm open to 
generalizing it if it isn't too complex.

— kafka.clients.producer.internals.Metadata
1. After configs are added, we need to remove the hard-coded default values. So 
for all of these places we could leave a TODO mark for now.
Yup.

— kafka.clients.producer.internals.ProduceRequestResult
1. Its member fields are dependent on Protocol.java, so once we change the 
protocol we would probably also need to change this file.

I don't believe this is dependent on the protocol, maybe you can elaborate?
 
1. kafka.clients.producer.internals.RecordAccumulatorTypo: “Get a list of 
topic-partitions which are ready to be send.”

Ack.

— kafka.clients.producer.internals.Sender
1. One corner case we may need to consider is the following: if a partition 
becomes not available, and producer keep sending data to this partition, then 
later on this partition could exhaust the memory, keeping other partitions to 
not able to take more messages but block waiting.

If I understand the case you are describing you are saying that the producer 
could use up the full buffer on a partition which is not available and the 
producer will then block. This is correct and that is the intention. This 
shouldn't block the sender, though, it will keep trying to send until the 
partition becomes available again. I think this is what you want: you can 
buffer for a while but eventually must either block or drop data if memory is 
bounded.

2. In handling dis-connection, the ProduceRequestResult will set the exception, 
and if await() is called this exception will be thrown and the callback not be 
executed. Since this exception is already stored in the RecordSend I think a 
better way is not throw exception on await() but let the callback function to 
handle it. That would make the application code more clean since otherwise the 
application need so try-catch the await() call.

I think the callback is always executed...if there is a case this doesn't 
happen it is a bug.

I agree that 
  if(result.hasError())
   // do something
is easier to read. The problem is that it is incumbant on you to check and if 
you don't it silently fails. This is the complaint people have about mongodb. 
The principle I am going on is that
   producer.send(message).await()
should be exactly interchangable with a blocking call. Anyhow I am sympathetic 
to your point, let's move it into the public api discussion.

3. In closing the producer, there is another corner case that the io thread can 
keep trying to send the rest of the data and failed. Probably we could add 
another option to drop whatever is in the buffer and let the callback functions 
of the application to handle them.

I think what you are saying is that close() blocks until all data is sent. That 
is the intention. Since send is async I think it is counterintuitive to 
fail/drop in-progress calls as the user may not know their calls aren't 
completed.

> Code dump of new producer
> -------------------------
>
>                 Key: KAFKA-1227
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1227
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to