Hi, I'm using beam 2.51.0 I'm trying to use UDF to transform float arrays and got the following error:
Exception in thread "main" java.lang.IllegalArgumentException: Cannot find a matching Beam FieldType for Calcite type: REAL at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280) at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253) at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352) at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220) at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243) This is my code: public static class TestUDF implements SerializableFunction<List<Float>, Float> { @Override public Float apply(List<Float> fv) { float sum = 0; if(fv != null) for (Float a : fv) sum += a; return sum; } } public static void example6() { System.out.println("Example 6"); Schema rowSchema = Schema.builder() .addField("ind", Schema.FieldType.INT32) .addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT)) .build(); Pipeline p = createPipeline(); p .apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5)) .apply(ParDo.of(new DoFn<Integer, Row>() { @ProcessElement public void processElement(@Element Integer ind, OutputReceiver<Row> out) { Row.Builder rowBuilder = Row.withSchema(rowSchema); List<Float> fv = new ArrayList<Float>(); fv.add(1f * ind); fv.add(2f * ind); Row row = rowBuilder .addValue(ind) .addValue(fv) .build(); out.output(row); } })) .setRowSchema(rowSchema) .apply( SqlTransform.query("select fv, testUDF(fv) from PCOLLECTION") .registerUdf("testUDF", new TestUDF()) ) .apply(new Print()); p.run().waitUntilFinish(); } -- György Balogh CTO E gyorgy.bal...@ultinous.com <zsolt.sala...@ultinous.com> M +36 30 270 8342 <+36%2030%20270%208342> A HU, 1117 Budapest, Budafoki út 209. W www.ultinous.com