[ https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880519#comment-13880519 ]
Guozhang Wang commented on KAFKA-1227: -------------------------------------- Some more comments: --- General 1. How to we decide where to put Exception definitions? Currently we have an errors folder in kafka.comm and some folders also have their only exceptions. 2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the requests definitions are highly dependent on the protocol class? 3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one sub-folder, for example, called kafka.comm.metadata? 4. Shall we put the Serializer classes into the protocol folder? 5. Shall we move the kafka.clients.common.network sub-folder to kafka.common? --- kafka.common.Cluster 1. Since the nextNode use global round robin, we need to make sure no more than one objects access a single Cluster’s nextNode. --- kafka.common.StringSerialization 1. Shall we put config names such as ENCODING_CONFIG all in a single file? --- kafka.common.AbstractIterator 1. makeNext is not supposed to left in other states other than DONE and READY? --- kafka.common.protocl.Schema 1. Will Field order difference make to different schemas? --- kafka.common.protocl.ProtoUtil 1. parseMetadataResponse: after reading the function I feel that the TopicInfo/PartitionInfo object for parsing might be preferable. We can put these objects in the Protocol.java file so any protocol change would only require one file edit. --- kafka.common.record.LogEntry 1. Maybe we can rename to OffsetRecord? --- kafka.common.record.Record 1. Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the future? Currently their values are the same and the way they are computed are also identical. --- kafka.common.request.RequestHeader 1. Is it better to define "client_id" strings as static field in the Protocol.java? 2. Does REQUEST/RESPONSE_HEADER also need to be versioned? --- kafka.client.common.NetworkReceive 1. In the first constructor, why not also initializing the size buffer also to ByteBuffer.allocate(4)? 2. Why NetworkReceive not extending ByteBufferReceive? --- kafka.client.common.Selector 1. “transmissions.send.remaining() <= 0”, under what condition can remaining() be < 0? 2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans == null? --- kafka.client.producer.internals.BufferPool: 1. In the freeUp() function, should use this.free.pollLast().capacity() instead of limit()? 2. What is the rational of having just one poolable size? --- kafka.clients.producer.internals.Metadata 1. After configs are added, we need to remove the hard-coded default values. So for all of these places we could leave a TODO mark for now. --- kafka.clients.producer.internals.ProduceRequestResult 1. Its member fields are dependent on Protocol.java, so once we change the protocol we would probably also need to change this file. --- kafka.clients.producer.internals.RecordAccumulator 1. Typo: “Get a list of topic-partitions which are ready to be send.” --- kafka.clients.producer.internals.Sender 1. One corner case we may need to consider is the following: if a partition becomes not available, and producer keep sending data to this partition, then later on this partition could exhaust the memory, keeping other partitions to not able to take more messages but block waiting. 2. In handling dis-connection, the ProduceRequestResult will set the exception, and if await() is called this exception will be thrown and the callback not be executed. Since this exception is already stored in the RecordSend I think a better way is not throw exception on await() but let the callback function to handle it. That would make the application code more clean since otherwise the application need so try-catch the await() call. 3. In closing the producer, there is another corner case that the io thread can keep trying to send the rest of the data and failed. Probably we could add another option to drop whatever is in the buffer and let the callback functions of the application to handle them. > Code dump of new producer > ------------------------- > > Key: KAFKA-1227 > URL: https://issues.apache.org/jira/browse/KAFKA-1227 > Project: Kafka > Issue Type: New Feature > Reporter: Jay Kreps > Assignee: Jay Kreps > Attachments: KAFKA-1227.patch > > > The plan is to take a dump of the producer code "as is" and then do a series > of post-commit reviews to get it into shape. This bug tracks just the code > dump. -- This message was sent by Atlassian JIRA (v6.1.5#6160)