Here is what I ended up doing. Improvements are welcome.
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType
from pyspark.sql.functions import asc, desc, sum, count
sqlContext = SQLContext(sc)
error_schema = StructType([
StructField('id', IntegerType(), nullable=False),
StructField('error_code', IntegerType(),
nullable=False),
StructField('error_desc', StringType(), nullable=False)
])
error_data = sc.parallelize([
Row(1, 1, 'type 1 error'),
Row(1, 2, 'type 2 error'),
Row(2, 4, 'type 4 error'),
Row(2, 3, 'type 3 error'),
Row(2, 3, 'type 3 error'),
Row(2, 2, 'type 2 error'),
Row(2, 1, 'type 1 error'),
Row(3, 2, 'type 2 error'),
Row(3, 2, 'type 2 error'),
Row(3, 2, 'type 2 error'),
Row(3, 1, 'type 1 error'),
Row(3, 3, 'type 3 error'),
Row(3, 1, 'type 1 error'),
Row(3, 1, 'type 1 error'),
Row(3, 4, 'type 4 error'),
Row(3, 5, 'type 5 error'),
Row(3, 1, 'type 1 error'),
Row(3, 1, 'type 1 error'),
Row(3, 2, 'type 2 error'),
Row(3, 4, 'type 4 error'),
Row(3, 1, 'type 1 error'),
])
error_df = sqlContext.createDataFrame(error_data, error_schema)
error_df.show()
id_count_df =
error_df.groupBy(error_df["id"]).agg(count(error_df["id"]).alias("id_count")).orderBy(desc("id_count"))
joined_df = error_df.join(id_count_df, "id")
error_count_df = joined_df.groupBy(joined_df["id"], joined_df["error_code"],
joined_df["error_desc"]).agg(count(joined_df["id"]).alias("error_count"))
joined_df2 = joined_df.join(error_count_df, ["id", "error_code",
"error_desc"])
joined_df2.distinct().orderBy(desc("id_count"),
desc("error_count")).select("id", "error_code", "error_desc",
"error_count").show()
+---+----------+------------+
| id|error_code| error_desc|
+---+----------+------------+
| 1| 1|type 1 error|
| 1| 2|type 2 error|
| 2| 4|type 4 error|
| 2| 3|type 3 error|
| 2| 3|type 3 error|
| 2| 2|type 2 error|
| 2| 1|type 1 error|
| 3| 2|type 2 error|
| 3| 2|type 2 error|
| 3| 2|type 2 error|
| 3| 1|type 1 error|
| 3| 3|type 3 error|
| 3| 1|type 1 error|
| 3| 1|type 1 error|
| 3| 4|type 4 error|
| 3| 5|type 5 error|
| 3| 1|type 1 error|
| 3| 1|type 1 error|
| 3| 2|type 2 error|
| 3| 4|type 4 error|
+---+----------+------------+
only showing top 20 rows
+---+----------+------------+-----------+
| id|error_code| error_desc|error_count|
+---+----------+------------+-----------+
| 3| 1|type 1 error| 6|
| 3| 2|type 2 error| 4|
| 3| 4|type 4 error| 2|
| 3| 5|type 5 error| 1|
| 3| 3|type 3 error| 1|
| 2| 3|type 3 error| 2|
| 2| 2|type 2 error| 1|
| 2| 4|type 4 error| 1|
| 2| 1|type 1 error| 1|
| 1| 1|type 1 error| 1|
| 1| 2|type 2 error| 1|
+---+----------+------------+-----------+
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-orderBy-previous-groupBy-count-orderBy-in-pyspark-tp26864p26870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]