Hi, I made some progress in binding the expressions to a LogicalPlan and then analyzing the plan. Problem is the Unique Id that are assigned to every expression.
def apply(dataFrame: DataFrame, selectExpressions: java.util.List[String]): RDD[InternalRow] = { val schema = dataFrame.schema val exprArray = selectExpressions.map(s => Column(SqlParser.parseExpression(s)).named ) val projectLogicalPlan = Project(exprArray, dataFrame.logicalPlan) val analyzedLogicalPlan = ContextWrapper.getSqlContext.analyzer.execute(projectLogicalPlan) val transformedRDD = dataFrame.mapPartitions( iter => { val project = UnsafeProjection.create(analyzedLogicalPlan.expressions, schema.toAttributes, subexpressionEliminationEnabled = false) iter.map { row => val irow = InternalRow.fromSeq(row.toSeq) project(irow) } }) transformedRDD } The error I get is when creating UnsafeProjection. It seems that the Unique Id assigned to each column is different in case of analyzedLogicalPlan.expressions and schema.toAttributes. This runs into an error when binding the columns. org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: EMP_NUM#3 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:86) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:85) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:85) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$$anonfun$4.apply(Projection.scala:146) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$$anonfun$4.apply(Projection.scala:146) at scala.collection.immutable.Stream.map(Stream.scala:376) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:146) at org.apache.spark.sql.ScalaTransform$$anonfun$2.apply(ScalaTransform.scala:49) at org.apache.spark.sql.ScalaTransform$$anonfun$2.apply(ScalaTransform.scala:48) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks and Regards, Aviral Agarwal On Tue, Mar 28, 2017 at 2:13 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote: > > I am not sure why you want to transform rows in the dataframe using > mapPartitions like that. > > If you want to project the rows with some expressions, you can use the API > like selectExpr and let Spark SQL to resolve expressions. To resolve > expressions manually, you need to (at least) deal with a resolver, and > transform the expressions recursively with LogicalPlan.resolve API. > > > Aviral Agarwal wrote > > Hi , > > Can you please point me on how to resolve the expression ? > > I was looking into LogicalPlan.Resolve expression() that takes a Partial > > Function but I am not sure how to use that. > > > > Thanks, > > Aviral Agarwal > > > > On Mar 24, 2017 09:20, "Liang-Chi Hsieh" < > > > viirya@ > > > > wrote: > > > > > > Hi, > > > > You need to resolve the expressions before passing into creating > > UnsafeProjection. > > > > > > > > Aviral Agarwal wrote > >> Hi guys, > >> > >> I want transform Row using NamedExpression. > >> > >> Below is the code snipped that I am using : > >> > >> > >> def apply(dataFrame: DataFrame, selectExpressions: > >> java.util.List[String]): RDD[UnsafeRow] = { > >> > >> val exprArray = selectExpressions.map(s => > >> Column(SqlParser.parseExpression(s)).named > >> ) > >> > >> val inputSchema = dataFrame.logicalPlan.output > >> > >> val transformedRDD = dataFrame.mapPartitions( > >> iter => { > >> val project = UnsafeProjection.create(exprArray,inputSchema) > >> iter.map{ > >> row => > >> project(InternalRow.fromSeq(row.toSeq)) > >> } > >> }) > >> > >> transformedRDD > >> } > >> > >> > >> The problem is that expression becomes unevaluable : > >> > >> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate > >> expression: 'a > >> at org.apache.spark.sql.catalyst.expressions.Unevaluable$class. > >> genCode(Expression.scala:233) > >> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.g > >> enCode(unresolved.scala:53) > >> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu > >> n$gen$2.apply(Expression.scala:106) > >> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfu > >> n$gen$2.apply(Expression.scala:102) > >> at scala.Option.getOrElse(Option.scala:120) > >> at org.apache.spark.sql.catalyst.expressions.Expression.gen(Exp > >> ression.scala:102) > >> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon > >> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464) > >> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon > >> text$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464) > >> at scala.collection.TraversableLike$$anonfun$map$1.apply( > >> TraversableLike.scala:244) > >> at scala.collection.TraversableLike$$anonfun$map$1.apply( > >> TraversableLike.scala:244) > >> at scala.collection.mutable.ResizableArray$class.foreach(Resiza > >> bleArray.scala:59) > >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer. > >> scala:47) > >> at scala.collection.TraversableLike$class.map(TraversableLike. > >> scala:244) > >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:105) > >> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenCon > >> text.generateExpressions(CodeGenerator.scala:464) > >> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn > >> safeProjection$.createCode(GenerateUnsafeProjection.scala:281) > >> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn > >> safeProjection$.create(GenerateUnsafeProjection.scala:324) > >> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn > >> safeProjection$.create(GenerateUnsafeProjection.scala:317) > >> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUn > >> safeProjection$.create(GenerateUnsafeProjection.scala:32) > >> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera > >> tor.generate(CodeGenerator.scala:635) > >> at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$. > >> create(Projection.scala:125) > >> at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$. > >> create(Projection.scala:135) > >> at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply( > >> ScalaTransform.scala:31) > >> at org.apache.spark.sql.ScalaTransform$$anonfun$3.apply( > >> ScalaTransform.scala:30) > >> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$ > >> apply$20.apply(RDD.scala:710) > >> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$ > >> apply$20.apply(RDD.scala:710) > >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR > >> DD.scala:38) > >> at > >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca > >> la:66) > >> at org.apache.spark.scheduler.Task.run(Task.scala:89) > >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor. > >> scala:214) > >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > >> Executor.java:1142) > >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > >> lExecutor.java:617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> > >> This might be because the Expression is unresolved. > >> > >> Any help would be appreciated. > >> > >> Thanks and Regards, > >> Aviral Agarwal > > > > > > > > > > > > ----- > > Liang-Chi Hsieh | @viirya > > Spark Technology Center > > http://www.spark.tc/ > > -- > > View this message in context: http://apache-spark- > > developers-list.1001551.n3.nabble.com/Fwd-SparkSQL- > > Project-using-NamedExpression-tp21224p21230.html > > Sent from the Apache Spark Developers List mailing list archive at > > Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: > > > dev-unsubscribe@.apache > > > > > > ----- > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Fwd-SparkSQL- > Project-using-NamedExpression-tp21224p21248.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >