Hi Andy, Thanks for sharing the code snippet.
I am not sure if you miss something in the snippet, because some function signature are not matched, e.g., @Override public StructType bufferSchema() { return new UserDefineType(schema, unboundedEncoder); } Maybe you define a class UserDefineType which extends StructType. Anyway, I noticed that in this line: data.add(unboundedEncoder.toRow(input)); If you read the comment of "toRow", you will find it says: Note that multiple calls to toRow are allowed to return the same actual [[InternalRow]] object. Thus, the caller should copy the result before making another call if required. I think it is why you get a list of the same entries. So you may need to change it to: data.add(unboundedEncoder.toRow(input).copy()); Andy Dang wrote > Hi Liang-Chi, > > The snippet of code is below. If I bind the encoder early (the schema > doesn't change throughout the execution), the final result is a list of > the > same entries. > > @RequiredArgsConstructor > public class UDAF extends UserDefinedAggregateFunction { > > // Do not resolve and bind this expression encoder eagerly > private final ExpressionEncoder > <Row> > unboundedEncoder; > private final StructType schema; > > @Override > public StructType inputSchema() { > return schema; > } > > @Override > public StructType bufferSchema() { > return new UserDefineType(schema, unboundedEncoder); > } > > @Override > public DataType dataType() { > return DataTypes.createArrayType(schema); > } > > @Override > public void initialize(MutableAggregationBuffer buffer) { > buffer.update(0, new InternalRow[0]); > } > > @Override > public void update(MutableAggregationBuffer buffer, Row input) { > UserDefineType data = buffer.getAs(0); > > data.add(unboundedEncoder.toRow(input)); > > buffer.update(0, data); > } > > @Override > public void merge(MutableAggregationBuffer buffer1, Row buffer2) { > // merge > buffer1.update(0, data1); > } > > @Override > public Object evaluate(Row buffer) { > UserDefineType data = buffer.getAs(0); > > // need to return Row here instead of Internal Row > return data.rows(); > } > > static ExpressionEncoder > <Row> > resolveAndBind(ExpressionEncoder > <Row> > encoder) { > val attributes = > JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList()); > return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes), > SimpleAnalyzer$.MODULE$); > } > } > > // Wrap around a list of InternalRow > class TopKDataType extends UserDefinedType > <TopKDataType> > { > private final ExpressionEncoder > <Row> > unboundedEncoder; > private final List > <InternalRow> > data; > > public Row[] rows() { > val encoder = resolveAndBind(this.unboundedEncoder); > > return data.stream().map(encoder::fromRow).toArray(Row[]::new); > } > } > > ------- > Regards, > Andy > > On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh < > viirya@ > > wrote: > >> >> Can you show how you use the encoder in your UDAF? >> >> >> Andy Dang wrote >> > One more question about the behavior of ExpressionEncoder >> > > <Row> >> > . >> > >> > I have a UDAF that has ExpressionEncoder >> > > <Row> >> > as a member variable. >> > >> > However, if call resolveAndBind() eagerly on this encoder, it appears >> to >> > break the UDAF. Bascially somehow the deserialized row are all the same >> > during the merge step. Is it the expected behavior of Encoders? >> > >> > ------- >> > Regards, >> > Andy >> > >> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang < >> >> > namd88@ >> >> > > wrote: >> > >> >> Perfect. The API in Java is bit clumsy though >> >> >> >> What I ended up doing in Java (the val is from lombok, if anyone's >> >> wondering): >> >> val attributes = JavaConversions.asJavaCollection(schema. >> >> toAttributes()).stream().map(Attribute::toAttribute). >> >> collect(Collectors.toList()); >> >> val encoder = >> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils. >> scalaSeq(attributes), >> >> SimpleAnalyzer$.MODULE$); >> >> >> >> >> >> ------- >> >> Regards, >> >> Andy >> >> >> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh < >> >> > viirya@ >> >> > > wrote: >> >> >> >>> >> >>> You need to resolve and bind the encoder. >> >>> >> >>> ExpressionEncoder >> > > <Row> >> > enconder = RowEncoder.apply(struct).resol >> >>> veAndBind(); >> >>> >> >>> >> >>> Andy Dang wrote >> >>> > Hi all, >> >>> > (cc-ing dev since I've hit some developer API corner) >> >>> > >> >>> > What's the best way to convert an InternalRow to a Row if I've got >> an >> >>> > InternalRow and the corresponding Schema. >> >>> > >> >>> > Code snippet: >> >>> > @Test >> >>> > public void foo() throws Exception { >> >>> > Row row = RowFactory.create(1); >> >>> > StructType struct = new StructType().add("id", >> >>> > DataTypes.IntegerType); >> >>> > ExpressionEncoder >> >>> > >> > > <Row> >> >>> > enconder = RowEncoder.apply(struct); >> >>> > InternalRow internalRow = enconder.toRow(row); >> >>> > System.out.println("Internal row size: " + >> >>> > internalRow.numFields()); >> >>> > Row roundTrip = enconder.fromRow(internalRow); >> >>> > System.out.println("Round trip: " + roundTrip.size()); >> >>> > } >> >>> > >> >>> > The code fails at the line encoder.fromRow() with the exception: >> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot >> evaluate >> >>> > expression: getcolumnbyordinal(0, IntegerType) >> >>> > >> >>> > ------- >> >>> > Regards, >> >>> > Andy >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> ----- >> >>> Liang-Chi Hsieh | @viirya >> >>> Spark Technology Center >> >>> http://www.spark.tc/ >> >>> -- >> >>> View this message in context: http://apache-spark-developers >> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to- >> >>> a-Row-tp20460p20465.html >> >>> Sent from the Apache Spark Developers List mailing list archive at >> >>> Nabble.com. >> >>> >> >>> --------------------------------------------------------------------- >> >>> To unsubscribe e-mail: >> >> > dev-unsubscribe@.apache >> >> >>> >> >>> >> >> >> >> >> >> >> >> ----- >> Liang-Chi Hsieh | @viirya >> Spark Technology Center >> http://www.spark.tc/ >> -- >> View this message in context: http://apache-spark- >> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row- >> tp20460p20487.html >> Sent from the Apache Spark Developers List mailing list archive at >> Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: > dev-unsubscribe@.apache >> >> ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-tp20460p20506.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org