Hello Paul, Many thanks for your quick answer. This did the trick! Fantastic!
Best, Raphael ***PARAGRAPH INPUT:*** val AggregatedChangepointAnalyzer = new UserDefinedAggregateFunction { … } ***PARAGRAPH OUTPUT:*** AggregatedChangepointAnalyzer: org.apache.spark.sql.expressions.UserDefinedAggregateFunction{def evaluate(buffer: org.apache.spark.sql.Row): String} = $$$$79b2515edf74bd80cfc9d8ac1ba563c6$$$$anon$1@3b65afbc I was then able to use the UDAF easily: ***PARAGRAPH INPUT:*** val cpt_df = df.groupBy("foo", "bar ", "baz", "bok").agg(AggregatedChangepointAnalyzer(col("y")).as("cpt")) cpt_df.show cpt_df: org.apache.spark.sql.DataFrame = [foo: string, bar: string ... 3 more fields] +--------+--------+--------+----------+---+ |foo |bar |baz | bok |cpt| +--------+--------+--------+----------+---+ |some | secret | thing | here | 40| +--------+--------+--------+----------+---+ From: Paul Brenner <pbren...@placeiq.com> Date: Tuesday, February 27, 2018 at 3:31 PM To: Raphael Vannson <raphael.vann...@thinkbiganalytics.com>, "users@zeppelin.apache.org" <users@zeppelin.apache.org> Subject: Cannot define UDAF in %spark interpreter [https://share.polymail.io/v2/z/a/NWE5NWU5NTdmN2Y5/ROsxnbrMSYqGdOuaYkRq7vFSwJ97WreGD-Dfi3zj_k7RT9GXsy7LJYxWVOSOxXNnopoYW22sBBaRxUGSCFmhLwx727JO_WGuGh8CZ5M6sOuFnUq9DZv6uloiPnfuhKSpaFMgs_T8eBORw_R9_ouLQgOanPF5xyctX24AtKNGHT8=.png] Unfortunately, I don’t know why code that is working for you in spark shell isn’t working in Zeppelin. But if you are looking for a quick fix perhaps this could help? I’ve had luck defining my UDAFs in zeppelin like: val myUDAF = new UserDefinedAggregateFunction {} So for example the following code compiles fine for me in zeppelin: val FractionOfDayCoverage = new UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = StructType(Array(StructField("seconds", LongType))) // Intermediate Schema def bufferSchema = StructType(Array( StructField("times", ArrayType(LongType)))) // Returned Data Type . def dataType = DoubleType // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { var timeArray = new ListBuffer[Long]() buffer.update(0,timeArray) } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer, input: Row) = { if (!(input.isNullAt(0))){ var timeArray = new ListBuffer[Long]() timeArray ++= buffer.getAs[List[Long]](0) timeArray += input.getLong(0) buffer.update(0,timeArray) }} // Merge two partial aggregates def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { var timeArray = new ListBuffer[Long]() timeArray ++= buffer1.getAs[List[Long]](0) timeArray ++= buffer2.getAs[List[Long]](0) buffer1.update(0,timeArray) } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { var timeArray = new ListBuffer[Long]() timeArray ++= buffer.getAs[List[Long]](0).filter(x => x != null) val times = timeArray.toArray scala.util.Sorting.quickSort(times) var intStart = times(0) - 30*60 var intEnd = times(0) + 30*60 var seen = 0L for (t <- times) { if (t > intEnd + 30*60) { seen += (intEnd - intStart) intStart = t - 30*60 intEnd = t + 30*60 } else { intEnd = t + 30*60 } } seen += intEnd - intStart math.min(seen.toDouble/(24*60*60), 1) } } I’m using zeppelin 0.7.2 and spark 2.0.1 (I think) so perhaps there is a version issue somewhere? [https://ci3.googleusercontent.com/proxy/tFn1I-GEOnccUtv8DHHEc49-6g3x3CbuQKzbfl2Z1BObEy0Qz6QebJimpP96TK3Za5MXwXTuwBZaobKp22nYAG3NdxAC0Q=s0-d-e1-ft#https://marketing.placeiq.net/images/placeiq.png]<http://www.placeiq.com/> Paul Brenner [https://ci4.googleusercontent.com/proxy/490PXYv9O6OiIp_DL4vuabJqVn53fMon5xNYZdftCVea9ySR2LcFDHe6Cdntb2G68uDAuA6FgLny8wKWLFWpsrPAt_FtLaE=s0-d-e1-ft#https://marketing.placeiq.net/images/twitter1.png]<https://twitter.com/placeiq> [https://ci3.googleusercontent.com/proxy/fztHf1lRKLQYcAxebqfp2PYXCwVap3GobHVIbyp0j3NcuJOY16bUAZBibVOFf-fd1GsiuhrOfYy6dSwhlCwWU8ZUlw9OX5I=s0-d-e1-ft#https://marketing.placeiq.net/images/facebook.png]<https://www.facebook.com/PlaceIQ> [https://ci5.googleusercontent.com/proxy/H26ThD7R6DOqxoLTgzi6k5SMrHoF2Tj44xI_7XlD9KfOIiGwe1WIMc5iQBxUBA9EuIyJMdaRXrhZTOrnkrn8O9Rf1FP9UQU=s0-d-e1-ft#https://marketing.placeiq.net/images/linkedin.png]<https://www.linkedin.com/company/placeiq> DATA SCIENTIST (217) 390-3033 [PlaceIQ:CES 2018] On Tue, Feb 27, 2018 at 6:19 PM Vannson Raphael <Vannson Raphael <mailto:vannson%20raphael%20%3craphael.vann...@thinkbiganalytics.com%3e> > wrote: Hello, I am having trouble defining a UDAF, using the same code in spark-shell in :paste mode works fine. Environment: - Amazon EMR - Apache Zeppelin Version 0.7.3 - Spark version 2.2.1 - Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161) 1) Is there a way to configure the zeppelin %spark interpreter to do the equivalent of spark-shell's :paste mode? 2) If not, is there a workaround to be able to define UDAFs in Zeppelin's %spark interpreter? Thanks! Raphael ***PARAGRAPH INPUT:*** %spark import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray import scala.collection.mutable.ListBuffer class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction { // Input schema override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) // Intermediate buffer schema override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) //Output schema override def dataType: DataType = StringType // Deterministic UDAF override def deterministic: Boolean = true // How to initialize the intermediate processing buffer for each group: // We simply create a List[Double] which will hold the observations (y) // of each group override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = Array.emptyDoubleArray } // What to do with each new row within the group: // Here we append each new observation of the group // in a List[Double] override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { // Put the observations collected into a List var values = new ListBuffer[Double]() values.appendAll(buffer.getAs[List[Double]](0)) // Get the new value for the current row val newValue = input.getDouble(0) // Append the new value to the buffer and return it values.append(newValue) buffer.update(0, values) } // How to merge 2 buffers located on 2 separate executor hosts or JVMs: // Simply append one List at the end of another override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { var values = new ListBuffer[Double]() values ++= buffer1.getAs[List[Double]](0) values ++= buffer2.getAs[List[Double]](0) buffer1.update(0, values) } override def evaluate(buffer: Row): String = { val observations = buffer.getSeq[Double](0) observations.size.toString } } ***PARAGRAPH OUTPUT:*** import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray import scala.collection.mutable.ListBuffer :12: error: not found: type UserDefinedAggregateFunction class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction { ^ :14: error: not found: type StructType override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ :14: error: not found: value StructType override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ :14: error: not found: value StructField override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ :14: error: not found: value DoubleType override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil) ^ :17: error: not found: type StructType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ :17: error: not found: value StructType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ :17: error: not found: value StructField override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) :17: error: not found: value ArrayType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ :17: error: not found: value DoubleType override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil) ^ :20: error: not found: type DataType override def dataType: DataType = StringType ^ :20: error: not found: value StringType override def dataType: DataType = StringType ^ :30: error: not found: type MutableAggregationBuffer override def initialize(buffer: MutableAggregationBuffer): Unit = { ^ :37: error: not found: type MutableAggregationBuffer override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { ^ :37: error: not found: type Row override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { ^ :39: error: not found: type ListBuffer var values = new ListBuffer[Double]() ^ :53: error: not found: type MutableAggregationBuffer override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { ^ :53: error: not found: type Row override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { ^ :54: error: not found: type ListBuffer var values = new ListBuffer[Double]() ^ :62: error: not found: type Row override def evaluate(buffer: Row): String = { ^