[ https://issues.apache.org/jira/browse/KUDU-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Grant Henke reassigned KUDU-3205: --------------------------------- Assignee: Grant Henke > NPE in KuduScanTokenBuilder#build after a tablet server goes down > ----------------------------------------------------------------- > > Key: KUDU-3205 > URL: https://issues.apache.org/jira/browse/KUDU-3205 > Project: Kudu > Issue Type: Bug > Components: spark > Affects Versions: 1.13.0 > Reporter: Junegunn Choi > Assignee: Grant Henke > Priority: Major > > When a tablet server goes down while running a query on Spark, the connection > becomes unusable due to the cached tablet locations that have become stale. > h2. Steps to reproduce > h3. Start spark-shell with kudu-spark2 1.13.0 > The problem is not reproducible with kudu-spark2 1.12.0 or below, because it > was introduced in [KUDU-1802 > |https://github.com/apache/kudu/commit/d23ee5d38ddc4317f431dd65df0c825c00cc968a]. > h3. Run a scan query > {code:scala} > import org.apache.kudu.spark.kudu._ > val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" > -> "dummy")).kudu > dummy.createOrReplaceTempView("dummy") > spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show > {code} > h3. Kill a tablet server > Kill one of the tablet servers that are serving data for the query. The query > should fail immediately. > {noformat} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in > stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 > (TID 2, localhost, executor driver): java.lang.RuntimeException: > org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may > have expired) > {noformat} > h3. Re-run the query > {code:scala} > spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show > {code} > Doesn't work, fails with an NPE. > {noformat} > Caused by: java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697) > at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) > at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > ... 86 more > Caused by: java.lang.NullPointerException > at > org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674) > ... 117 more > {noformat} > Re-creating the DataFrame doesn't help: > {code:scala} > val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" > -> "dummy")).kudu > dummy.createOrReplaceTempView("dummy") > // Still fails with an NPE > spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show > {code} > h2. Cause > {code:java|title=KuduScanToken.java:666} > // Build the list of replica metadata. > List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>(); > for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) { > Integer serverIndex = serverIndexMap.get( > new HostAndPort(replica.getRpcHost(), replica.getRpcPort())); > Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder = > Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder() > .setRole(replica.getRoleAsEnum()) > .setTsIdx(serverIndex); > if (replica.getDimensionLabel() != null) { > tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel()); > } > replicas.add(tabletMetadataBuilder.build()); > } > {code} > {{serverIndex}} can be null here, because we're using the cached tablet > locations that are stale now ({{TableLocationsCache.Entry}}). > h2. Workarounds > - Restart Spark shell > - Wait until the connection becomes idle and cleaned up > {noformat} > DEBUG Connection: [peer master-***] handling channelInactive > DEBUG Connection: [peer master-***] cleaning up while in state READY due to: > connection closed > {noformat} > - Use kudu-spark2 1.12.0 or below -- This message was sent by Atlassian Jira (v8.3.4#803005)