[ https://issues.apache.org/jira/browse/KAFKA-2944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076312#comment-15076312 ]
jin xing edited comment on KAFKA-2944 at 1/1/16 3:18 PM: --------------------------------------------------------- Cannot reproduce this; believe it is transient failure; KafkaConfigStorage code as below: private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() { public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { ... else if (record.key().startsWith(TASK_PREFIX)) { Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector()); if (deferred == null) { deferred = new HashMap<>(); deferredTaskUpdates.put(taskId.connector(), deferred); } deferred.put(taskId, (Map<String, String>) newTaskConfig); } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName); int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred); Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName); if (!completeTaskIdSet(taskIdSet, newTaskCount)) //NullPointerException comes out from here { .... } } Since method "halt()" in DistributedHerder has not executed yet, believe that it is not the issue of shutdown; In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages with TASK but succeeded sending message with COMMIT_TASKS_PREFIX, the deferredTaskUpdates will not have corresponding key of connector; So it make sense to call a 'flush' after sending easy message of connector or task configuration to KafkaBasedLog; was (Author: jinxing6...@126.com): Cannot reproduce this; believe it is transient failure; KafkaConfigStorage code as below: private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() { public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { ... else if (record.key().startsWith(TASK_PREFIX)) { Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector()); if (deferred == null) { deferred = new HashMap<>(); deferredTaskUpdates.put(taskId.connector(), deferred); } deferred.put(taskId, (Map<String, String>) newTaskConfig); } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName); int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred); Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName); if (!completeTaskIdSet(taskIdSet, newTaskCount)) { //NullPointerException comes out from here .... } } Since method "halt()" in DistributedHerder has not executed yet, believe that it is not the issue of shutdown; In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages with TASK but succeeded sending message with COMMIT_TASKS_PREFIX, the deferredTaskUpdates will not have corresponding key of connector; So it make sense to call a 'flush' after sending easy message of connector or task configuration to KafkaBasedLog; > NullPointerException in KafkaConfigStorage when config storage starts right > before shutdown request > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-2944 > URL: https://issues.apache.org/jira/browse/KAFKA-2944 > Project: Kafka > Issue Type: Bug > Components: copycat > Affects Versions: 0.9.0.0 > Reporter: Ewen Cheslack-Postava > Assignee: Ewen Cheslack-Postava > > Relevant log where you can see a config update starting, then the request to > shutdown happens and we end up with a NullPointerException: > {quote} > [2015-12-03 09:12:55,712] DEBUG Change in connector task count from 2 to 3, > writing updated task configurations > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2015-12-03 09:12:56,224] INFO Kafka Connect stopping > (org.apache.kafka.connect.runtime.Connect) > [2015-12-03 09:12:56,224] INFO Stopping REST server > (org.apache.kafka.connect.runtime.rest.RestServer) > [2015-12-03 09:12:56,227] INFO Stopped > ServerConnector@10cb550e{HTTP/1.1}{0.0.0.0:8083} > (org.eclipse.jetty.server.ServerConnector) > [2015-12-03 09:12:56,234] INFO Stopped > o.e.j.s.ServletContextHandler@3f8a24d5{/,null,UNAVAILABLE} > (org.eclipse.jetty.server.handler.ContextHandler) > [2015-12-03 09:12:56,235] INFO REST server stopped > (org.apache.kafka.connect.runtime.rest.RestServer) > [2015-12-03 09:12:56,235] INFO Herder stopping > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2015-12-03 09:12:58,209] ERROR Unexpected exception in KafkaBasedLog's work > thread (org.apache.kafka.connect.util.KafkaBasedLog) > java.lang.NullPointerException > at > org.apache.kafka.connect.storage.KafkaConfigStorage.completeTaskIdSet(KafkaConfigStorage.java:558) > at > org.apache.kafka.connect.storage.KafkaConfigStorage.access$1200(KafkaConfigStorage.java:143) > at > org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:476) > at > org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:372) > at > org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:235) > at > org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:275) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$300(KafkaBasedLog.java:70) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:307) > [2015-12-03 09:13:26,704] ERROR Failed to write root configuration to Kafka: > (org.apache.kafka.connect.storage.KafkaConfigStorage) > java.util.concurrent.TimeoutException: Timed out waiting for future > at > org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74) > at > org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) > at java.lang.Thread.run(Thread.java:745) > [2015-12-03 09:13:26,704] ERROR Failed to reconfigure connector's tasks, > retrying after backoff: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > org.apache.kafka.connect.errors.ConnectException: Error writing root > configuration to Kafka > at > org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:355) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.TimeoutException: Timed out waiting for future > at > org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74) > at > org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352) > ... 8 more > {quote} > I'm not certain that the issue is specifically due to shutting down (the > KafkaConfigStorage.stop() hasn't been invoked yet when this occurs, so the > underlying KafkaBasedLog is still running, although shutdown of the entire > process has started), but this has only shown up during shutdown so far. -- This message was sent by Atlassian JIRA (v6.3.4#6332)