[ 
https://issues.apache.org/jira/browse/KAFKA-2944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076312#comment-15076312
 ] 

jin xing edited comment on KAFKA-2944 at 1/1/16 3:18 PM:
---------------------------------------------------------

Cannot reproduce this; believe it is transient failure;
KafkaConfigStorage code as below:
private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new 
Callback<ConsumerRecord<String, byte[]>>() {
   public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> 
record) {
       ...
       else if (record.key().startsWith(TASK_PREFIX)) {
           Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(taskId.connector());
           if (deferred == null) {
               deferred = new HashMap<>();
               deferredTaskUpdates.put(taskId.connector(), deferred);
           }
                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
       } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
           Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(connectorName);                   
           int newTaskCount = intValue(((Map<String, Object>) 
value.value()).get("tasks"));
           Map<String, Set<Integer>> updatedConfigIdsByConnector = 
taskIdsByConnector(deferred);
           Set<Integer> taskIdSet = 
updatedConfigIdsByConnector.get(connectorName);
           if (!completeTaskIdSet(taskIdSet, newTaskCount)) 
//NullPointerException comes out from here
           {    
                        ....
            }
       }
Since method "halt()" in DistributedHerder has not executed yet, believe that 
it is not the issue of shutdown; 
In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages 
with TASK but succeeded sending message with COMMIT_TASKS_PREFIX, the 
deferredTaskUpdates will not have corresponding key of connector;
So it make sense to call a 'flush'  after sending easy message of connector or 
task configuration to KafkaBasedLog;


was (Author: jinxing6...@126.com):
Cannot reproduce this; believe it is transient failure;
KafkaConfigStorage code as below:
private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new 
Callback<ConsumerRecord<String, byte[]>>() {
   public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> 
record) {
       ...
       else if (record.key().startsWith(TASK_PREFIX)) {
           Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(taskId.connector());
           if (deferred == null) {
               deferred = new HashMap<>();
               deferredTaskUpdates.put(taskId.connector(), deferred);
           }
                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
       } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
           Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(connectorName);                   
           int newTaskCount = intValue(((Map<String, Object>) 
value.value()).get("tasks"));
           Map<String, Set<Integer>> updatedConfigIdsByConnector = 
taskIdsByConnector(deferred);
           Set<Integer> taskIdSet = 
updatedConfigIdsByConnector.get(connectorName);
           if (!completeTaskIdSet(taskIdSet, newTaskCount)) {     
//NullPointerException comes out from here
                        ....
            }
       }
Since method "halt()" in DistributedHerder has not executed yet, believe that 
it is not the issue of shutdown; 
In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages 
with TASK but succeeded sending message with COMMIT_TASKS_PREFIX, the 
deferredTaskUpdates will not have corresponding key of connector;
So it make sense to call a 'flush'  after sending easy message of connector or 
task configuration to KafkaBasedLog;

> NullPointerException in KafkaConfigStorage when config storage starts right 
> before shutdown request
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2944
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2944
>             Project: Kafka
>          Issue Type: Bug
>          Components: copycat
>    Affects Versions: 0.9.0.0
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>
> Relevant log where you can see a config update starting, then the request to 
> shutdown happens and we end up with a NullPointerException:
> {quote}
> [2015-12-03 09:12:55,712] DEBUG Change in connector task count from 2 to 3, 
> writing updated task configurations 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2015-12-03 09:12:56,224] INFO Kafka Connect stopping 
> (org.apache.kafka.connect.runtime.Connect)
> [2015-12-03 09:12:56,224] INFO Stopping REST server 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> [2015-12-03 09:12:56,227] INFO Stopped 
> ServerConnector@10cb550e{HTTP/1.1}{0.0.0.0:8083} 
> (org.eclipse.jetty.server.ServerConnector)
> [2015-12-03 09:12:56,234] INFO Stopped 
> o.e.j.s.ServletContextHandler@3f8a24d5{/,null,UNAVAILABLE} 
> (org.eclipse.jetty.server.handler.ContextHandler)
> [2015-12-03 09:12:56,235] INFO REST server stopped 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> [2015-12-03 09:12:56,235] INFO Herder stopping 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2015-12-03 09:12:58,209] ERROR Unexpected exception in KafkaBasedLog's work 
> thread (org.apache.kafka.connect.util.KafkaBasedLog)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage.completeTaskIdSet(KafkaConfigStorage.java:558)
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage.access$1200(KafkaConfigStorage.java:143)
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:476)
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:372)
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:235)
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:275)
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$300(KafkaBasedLog.java:70)
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:307)
> [2015-12-03 09:13:26,704] ERROR Failed to write root configuration to Kafka:  
> (org.apache.kafka.connect.storage.KafkaConfigStorage)
> java.util.concurrent.TimeoutException: Timed out waiting for future
>       at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74)
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
>       at java.lang.Thread.run(Thread.java:745)
> [2015-12-03 09:13:26,704] ERROR Failed to reconfigure connector's tasks, 
> retrying after backoff: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.connect.errors.ConnectException: Error writing root 
> configuration to Kafka
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:355)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Timed out waiting for future
>       at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74)
>       at 
> org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352)
>       ... 8 more
> {quote}
> I'm not certain that the issue is specifically due to shutting down (the 
> KafkaConfigStorage.stop() hasn't been invoked yet when this occurs, so the 
> underlying KafkaBasedLog is still running, although shutdown of the entire 
> process has started), but this has only shown up during shutdown so far.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to