Ignite Dev Community,  

I’m working with the Ignite 2.4+ Spark SQL DataFrame functionality and have run 
into what I believe to be a bug where spark partition information is incorrect 
for non-trivial sizes of Ignite clusters.  

The partition array returned to Spark via 
org.apache.ignite.spark.impl.calcPartitions() needs to be in the order of the 
spark partition numbers, but the function doesn’t make that guarantee and 
consistently fails for anything but very small Ignite clusters. Without the 
correct partition sequencing, Spark will throw errors such as:

java.lang.IllegalArgumentException: requirement failed: partitions(0).partition 
== 3, but it should equal 0
at scala.Predef$.require(Predef.scala:224)
at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:255)
at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:254)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at 
org.apache.ignite.spark.IgniteSQLDataFrameSpec$$anonfun$1$$anonfun$apply$mcV$sp$11.apply$mcV$sp(IgniteSQLDataFrameSpec.scala:145)

I’ve forked and committed a change which demonstrates this by increasing the 
number of servers in the spark tests from 3 to 4 which causes the 
IgniteSQLDataFrameSpec test to start failing per above. This commit also 
demonstrates the fix which is to just sequence the ignite node map before 
zipping:

https://github.com/stuartmacd/ignite/commit/c9e7294c71de9e7b2bddfae671605a71260b80b3

Can anyone help confirm this behaviour? Happy to create a jira and pull request 
for the proposed change.

I believe this might also be related to another earlier report: 
http://apache-ignite-users.70518.x6.nabble.com/Getting-an-exception-when-listing-partitions-of-IgniteDataFrame-td22434.html

Thanks,
Stuart.

Reply via email to