[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13912363#comment-13912363
 ] 

Jun Rao commented on KAFKA-1227:
--------------------------------

Accumulated some more review comments.

20. Regarding VIP. We need to think through another corner case. When the 
brokers for all replicas of an existing topic die, the admin can start new 
brokers with existing broker ids to start the topic from scratch. Those new 
brokers can be added to the VIP. However, if the producer only uses the VIP 
once during startup, there is no way for the producer to identify the new 
brokers unless it's restarted.

21. AbstractConfig:
21.1 To be consistent, we probably should change the return value of the 
following api from Long to long.
   public Long getLong(String key)
21.1 Could we also add getDouble(String key)?

22. Metadata.fetch(): I am a bit puzzled by the following code. Not sure what 
the intention for the handling of InterruptedException is. This code will be 
called in the producer client thread, which will probably be interrupted during 
shutdown. When that happens, we probably should just throw the 
InterruptedException back to the caller. Otherwise, the producer may have to 
wait metadataFetchTimeoutMs before it can shut down. Also, it seems that we 
need to adjust maxWaitMs in each loop. Otherwise, we will be waiting longer 
than we should.
                try {
                    wait(maxWaitMs);
                } catch (InterruptedException e) { /* this is fine, just try 
again */
                }

23. Sender: We learned in kafka-1228 that it's not enough to just handle 
IOException in the following code. UnresolvedAddressException is an Error, not 
an IOException.
    private void initiateConnect(Node node, long now) {
        try {
            selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
            this.nodeStates.connecting(node.id(), now);
        } catch (IOException e) {
            /* attempt failed, we'll try again after the backoff */
            nodeStates.disconnected(node.id());
            /* maybe the problem is our metadata, update it */
            metadata.forceUpdate();
        }
 
24. BufferPool.allocate(): Could we add some comments on when this call is 
blocked?

25. MemoryRecords: The following constructor doesn't seem to be used.
    public MemoryRecords(int size)

26. Spellings:
26.1 ConfigDef: seperated list, programmatically
26.2 Metadata: ellapsed


> Code dump of new producer
> -------------------------
>
>                 Key: KAFKA-1227
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1227
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-1227.patch, KAFKA-1227.patch, KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to