[ 
https://issues.apache.org/jira/browse/KUDU-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264326#comment-17264326
 ] 

ASF subversion and git services commented on KUDU-3205:
-------------------------------------------------------

Commit 929cb67da52f9bd30827527bb440b83d2260f608 in kudu's branch 
refs/heads/master from Grant Henke
[ https://gitbox.apache.org/repos/asf?p=kudu.git;h=929cb67 ]

KUDU-3205: Fix building scan tokens when tablet not found errors occur

When tablet not found errors occur AsyncKuduClient.invalidateTabletCache
is called which then calls RemoteTablet.removeTabletClient to remove
a server (by uuid) from the RemoteTablet tabletServers list. This means that
the RemoteTablet can have a replica returned from RemoteTablet.getReplicas
which has a server that is not returned by RemoteTablet.getTabletServersCopy.
This results in a NPE when building a scan token because there is no matching
server for the replica in the serverIndexMap.

To fix this I simply check if the serverIndex found via the serverIndexMap is
null and ignore that replica if it is. A better long term fix is likely to build
the server info from the replicas themself, or to change the RemoteTablet 
behavior
to blacklist servers instead of removing them from the actual tabletServers 
list, or
to also remove the replica itself when RemoteTablet.removeTabletClient is 
called.
I didn’t do any of these options now because they are all larger changes with a
wider potential impact.

Change-Id: I68679dee1ad7ebca405dd6e086770f3e034e310c
Reviewed-on: http://gerrit.cloudera.org:8080/16941
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aser...@cloudera.com>


> NPE in KuduScanTokenBuilder#build after a tablet server goes down
> -----------------------------------------------------------------
>
>                 Key: KUDU-3205
>                 URL: https://issues.apache.org/jira/browse/KUDU-3205
>             Project: Kudu
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 1.13.0
>            Reporter: Junegunn Choi
>            Assignee: Grant Henke
>            Priority: Major
>
> When a tablet server goes down while running a query on Spark, the connection 
> becomes unusable due to the cached tablet locations that have become stale.
> h2. Steps to reproduce
> h3. Start spark-shell with kudu-spark2 1.13.0
> The problem is not reproducible with kudu-spark2 1.12.0 or below, because it 
> was introduced in [KUDU-1802 
> |https://github.com/apache/kudu/commit/d23ee5d38ddc4317f431dd65df0c825c00cc968a].
> h3. Run a scan query
> {code:scala}
> import org.apache.kudu.spark.kudu._
> val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" 
> -> "dummy")).kudu
> dummy.createOrReplaceTempView("dummy")
> spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
> {code}
> h3. Kill a tablet server
> Kill one of the tablet servers that are serving data for the query. The query 
> should fail immediately.
> {noformat}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in 
> stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 
> (TID 2, localhost, executor driver): java.lang.RuntimeException: 
> org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may 
> have expired)
> {noformat}
> h3. Re-run the query
> {code:scala}
> spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
> {code}
> Doesn't work, fails with an NPE.
> {noformat}
> Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>   at 
> org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697)
>   at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   ... 86 more
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674)
>   ... 117 more
> {noformat}
> Re-creating the DataFrame doesn't help:
> {code:scala}
> val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" 
> -> "dummy")).kudu
> dummy.createOrReplaceTempView("dummy")
> // Still fails with an NPE
> spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
> {code}
> h2. Cause
> {code:java|title=KuduScanToken.java:666}
> // Build the list of replica metadata.
> List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>();
> for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) {
>   Integer serverIndex = serverIndexMap.get(
>       new HostAndPort(replica.getRpcHost(), replica.getRpcPort()));
>   Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder =
>       Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder()
>           .setRole(replica.getRoleAsEnum())
>           .setTsIdx(serverIndex);
>   if (replica.getDimensionLabel() != null) {
>     tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel());
>   }
>   replicas.add(tabletMetadataBuilder.build());
> }
> {code}
> {{serverIndex}} can be null here, because we're using the cached tablet 
> locations that are stale now ({{TableLocationsCache.Entry}}).
> h2. Workarounds
>  - Restart Spark shell
>  - Wait until the connection becomes idle and cleaned up
> {noformat}
> DEBUG Connection: [peer master-***] handling channelInactive
> DEBUG Connection: [peer master-***] cleaning up while in state READY due to: 
> connection closed
> {noformat}
>  - Use kudu-spark2 1.12.0 or below



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to