Hi,
I'm using beam 2.51.0
I'm trying to use UDF to transform float arrays and got the following error:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot find
a matching Beam FieldType for Calcite type: REAL
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)


This is my code:

public static class TestUDF implements SerializableFunction<List<Float>, Float>
{
@Override
public Float apply(List<Float> fv) {
float sum = 0;
if(fv != null)
for (Float a : fv)
sum += a;
return sum;
}
}

public static void example6() {
System.out.println("Example 6");

Schema rowSchema = Schema.builder()
.addField("ind", Schema.FieldType.INT32)
.addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
.build();

Pipeline p = createPipeline();
p
.apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
.apply(ParDo.of(new DoFn<Integer, Row>() {
@ProcessElement
public void processElement(@Element Integer ind, OutputReceiver<Row> out) {
Row.Builder rowBuilder = Row.withSchema(rowSchema);
List<Float> fv = new ArrayList<Float>();
fv.add(1f * ind);
fv.add(2f * ind);
Row row = rowBuilder
.addValue(ind)
.addValue(fv)
.build();
out.output(row);
}
}))
.setRowSchema(rowSchema)
.apply(
SqlTransform.query("select fv, testUDF(fv) from PCOLLECTION")
.registerUdf("testUDF", new TestUDF())
)
.apply(new Print());
p.run().waitUntilFinish();
}


-- 

György Balogh
CTO
E gyorgy.bal...@ultinous.com <zsolt.sala...@ultinous.com>
M +36 30 270 8342 <+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com

Reply via email to