[ https://issues.apache.org/jira/browse/KUDU-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17224808#comment-17224808 ]
Grant Henke commented on KUDU-3205: ----------------------------------- I looked into this a little. It looks like the issue is that unresolved addresses get filtered out of the tablet servers list in `RemoteTablet.java`, but the servers still remain in the list of replicas. This happens in `AsyncKuduClient.discoverTablets(...)`: https://github.com/apache/kudu/blob/17cede9da2c7213e05dcca921a0e767dda54f131/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java#L2328-L2337 In that method, things are allowed to proceed as long as at least one replica could be looked up: https://github.com/apache/kudu/blob/17cede9da2c7213e05dcca921a0e767dda54f131/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java#L2370-L2374 We could adjust the scan token logic to ignore the missing servers and exclude the related replicas. That would be easy and behave similar to the single client behavior in this scenario. However, a better approach might be to defer resolving the address until absolutely necessary (when `ServerInfo.getResolvedAddress` is called) to ensure all of the servers can be included on the token regardless of address resolution. That means that if the client generating the tokens can't resolve the address, but the client using the tokens can, that server/replica will still be included and used. > NPE in KuduScanTokenBuilder#build after a tablet server goes down > ----------------------------------------------------------------- > > Key: KUDU-3205 > URL: https://issues.apache.org/jira/browse/KUDU-3205 > Project: Kudu > Issue Type: Bug > Components: spark > Affects Versions: 1.13.0 > Reporter: Junegunn Choi > Assignee: Grant Henke > Priority: Major > > When a tablet server goes down while running a query on Spark, the connection > becomes unusable due to the cached tablet locations that have become stale. > h2. Steps to reproduce > h3. Start spark-shell with kudu-spark2 1.13.0 > The problem is not reproducible with kudu-spark2 1.12.0 or below, because it > was introduced in [KUDU-1802 > |https://github.com/apache/kudu/commit/d23ee5d38ddc4317f431dd65df0c825c00cc968a]. > h3. Run a scan query > {code:scala} > import org.apache.kudu.spark.kudu._ > val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" > -> "dummy")).kudu > dummy.createOrReplaceTempView("dummy") > spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show > {code} > h3. Kill a tablet server > Kill one of the tablet servers that are serving data for the query. The query > should fail immediately. > {noformat} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in > stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 > (TID 2, localhost, executor driver): java.lang.RuntimeException: > org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may > have expired) > {noformat} > h3. Re-run the query > {code:scala} > spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show > {code} > Doesn't work, fails with an NPE. > {noformat} > Caused by: java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697) > at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > ... 86 more > Caused by: java.lang.NullPointerException > at > org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674) > ... 117 more > {noformat} > Re-creating the DataFrame doesn't help: > {code:scala} > val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" > -> "dummy")).kudu > dummy.createOrReplaceTempView("dummy") > // Still fails with an NPE > spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show > {code} > h2. Cause > {code:java|title=KuduScanToken.java:666} > // Build the list of replica metadata. > List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>(); > for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) { > Integer serverIndex = serverIndexMap.get( > new HostAndPort(replica.getRpcHost(), replica.getRpcPort())); > Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder = > Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder() > .setRole(replica.getRoleAsEnum()) > .setTsIdx(serverIndex); > if (replica.getDimensionLabel() != null) { > tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel()); > } > replicas.add(tabletMetadataBuilder.build()); > } > {code} > {{serverIndex}} can be null here, because we're using the cached tablet > locations that are stale now ({{TableLocationsCache.Entry}}). > h2. Workarounds > - Restart Spark shell > - Wait until the connection becomes idle and cleaned up > {noformat} > DEBUG Connection: [peer master-***] handling channelInactive > DEBUG Connection: [peer master-***] cleaning up while in state READY due to: > connection closed > {noformat} > - Use kudu-spark2 1.12.0 or below -- This message was sent by Atlassian Jira (v8.3.4#803005)