Hi,
I need to find top 10 most selling samples. So query looks like:
select s.name, count(s.name) from sample s group by s.name order by
count(s.name)
This query fails with following error:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [COUNT(name#0) ASC], true
Exchange (RangePartitioning [COUNT(name#0) ASC], 200)
Aggregate false, [name#0], [name#0 AS
name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0]
Exchange (HashPartitioning [name#0], 200)
Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L]
PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at
JavaSQLContext.scala:102
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:206)
at
org.apache.spark.sql.execution.Project.execute(basicOperators.scala:43)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444)
at
org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114)
at
com.edifecs.platform.df.analytics.spark.domain.dao.OrderByTest.testGetVisitDistributionByPrimaryDx(OrderByTest.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:121)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
Exchange (RangePartitioning [COUNT(name#0) ASC], 200)
Aggregate false, [name#0], [name#0 AS
name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0]
Exchange (HashPartitioning [name#0], 200)
Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L]
PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at
JavaSQLContext.scala:102
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:47)
at
org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207)
at
org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:207)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
... 37 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
No function to evaluate expression. type: Count, tree: COUNT(input[2])
at
org.apache.spark.sql.catalyst.expressions.AggregateExpression.eval(aggregates.scala:41)
at
org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:250)
at
org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:242)
at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
at java.util.TimSort.sort(TimSort.java:216)
at java.util.Arrays.sort(Arrays.java:1438)
at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
at scala.collection.AbstractSeq.sorted(Seq.scala:40)
at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
at
org.apache.spark.RangePartitioner$.determineBounds(Partitioner.scala:279)
at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:152)
at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:88)
at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
... 41 more
Source Code:
JavaSparkContext sc = new JavaSparkContext("local",
"SparkSchemaTest");
try {
List samples = new ArrayList<>();
samples.add(new SampleVo("Apple"));
samples.add(new SampleVo("Apple"));
samples.add(new SampleVo("Apple"));
samples.add(new SampleVo("Orange"));
samples.add(new SampleVo("Orange"));
JavaRDD<Map> claimRdd = sc.parallelize(samples);
JavaSQLContext sqlCtx = new JavaHiveContext(sc);
JavaSchemaRDD schemaRdd = sqlCtx.applySchema(claimRdd,
SampleVo.class);
sqlCtx.registerRDDAsTable(schemaRdd, "sample");
//String query = "select s.name, count(s.name) from sample s
group by s.name"; Worked OK
String query = "select s.name, count(s.name) from sample s
group by s.name order by count(s.name)";
JavaSchemaRDD teenagersCost = sqlCtx.sql(query);
List<org.apache.spark.sql.api.java.Row> rows =
teenagersCost.collect();
for (org.apache.spark.sql.api.java.Row row : rows) {
System.out.println(row.getString(0) + "=" + row.getLong(1));
}
} finally {
sc.stop();
}
----------------------------
public class SampleVo implements Serializable {
private String name;
public SampleVo() {
}
public SampleVo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
----------
Does this mean spark sql does not support order by over group by?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/group-by-order-by-fails-tp21815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]