Timo and Dawid, Registering my UDF via the deprecated *registerFunction()* instead of the new *createTemporarySystemFunction()* worked. So it would appear there is some incompatibility with my implementation and the new registration system. I will wait for the Flip to be completed and retry then. This solution works for now.
Thanks, Steve On Mon, Nov 9, 2020 at 10:12 AM Timo Walther <twal...@apache.org> wrote: > Sorry for jumping in so late. I think Dawid gave a nice summary. > > As he said, integration of the DataStream <> Table integration is still > under development. Until then I would suggest to option 3) which means > don't upgrade the functions and use the old registration function > `registerFunction`. Everything should work as expected there with the > old types. > > Let me know if you need more input. > > Regards, > Timo > > > On 09.11.20 10:28, Dawid Wysakowicz wrote: > > Hi Steve, > > > > Unfortunately the information you posted still does not explain how you > > ended up with *RAW('java.util.Map', ?)* for your input type. Would be > > best if you could share an example that I could use to reproduce it. > > > > I tried putting down some potential approaches: > > > > I tested it with a class generated from an avsc: > > > > {"namespace": "com.ververica.avro.generated", > > "type": "record", > > "name": "Address", > > "fields": [ > > {"name": "num", "type": "int"}, > > {"name": "street", "type": { > > "type": "map", > > "values" : "string", > > "default": {} > > }} > > ] > > } > > > > which has two fields: > > > > @Deprecated public int num; > > @Deprecated public java.util.Map<java.lang.String,java.lang.String> > > street; > > > > 1) From the description you posted the UrlParameters (street in my case) > > field should have *LEGACY('RAW', 'ANY<java.util.Map>')* type. > > > > root > > |-- num: INT > > |-- street: LEGACY('RAW', 'ANY<java.util.Map>') > > > > 2) Using the new type system > > > > A more seamless integration of the DataStream <> Table integration is > > still under development. You can check FLIP-136[1] for it. Therefore > > you'd need to adjust your types in the input DataStream. Bare in mind > > this approach changes the way the type is serialized from an Avro based > > to custom Flink's POJO serialization. > > > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > Map<String, TypeInformation<?>> fieldTypes = new HashMap<>(); > > fieldTypes.put("num", BasicTypeInfo.INT_TYPE_INFO); > > fieldTypes.put("street", Types.MAP(Types.STRING, Types.STRING)); > > SingleOutputStreamOperator<Address> elements = env.fromElements( > > Address.newBuilder() > > .setNum(1) > > .setStreet(new HashMap<>()) > > .build() > > ) > > .returns( > > Types.POJO( > > Address.class, > > fieldTypes > > ) > > ); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > > env, > > EnvironmentSettings.newInstance().useBlinkPlanner().build()); > > > > tEnv.createTemporaryView("test", elements); > > > > tEnv.from("test").select(call(Func.class, > > $("street"))).execute().print(); > > } > > > > public static class Func extends ScalarFunction { > > @FunctionHint( > > input = {@DataTypeHint(value = "MAP<STRING, STRING>")}, > > output = @DataTypeHint("STRING") > > ) > > public String eval(final Map<String, String> map) { > > // business logic > > return "ABC"; > > } > > } > > > > 3) Using the legacy types approach you can query that field like this: > > > > public static class LegacyFunc extends ScalarFunction { > > public String eval(final Map<String, String> map) { > > // business logic > > return "ABC"; > > } > > } > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > SingleOutputStreamOperator<Address> elements = env.fromElements( > > Address.newBuilder() > > .setNum(1) > > .setStreet(new HashMap<>()) > > .build() > > ); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > > env, > > EnvironmentSettings.newInstance().useBlinkPlanner().build()); > > > > tEnv.createTemporaryView("test", elements); > > tEnv.registerFunction("legacyFunc", new LegacyFunc()); > > > > tEnv.from("test").select(call("legacyFunc", > > $("street"))).execute().print(); > > > > Best, > > > > Dawid > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > > > On 09/11/2020 02:30, Steve Whelan wrote: > >> Hi Dawid, > >> > >> Just wanted to bump this thread in case you had any thoughts. > >> > >> Thanks, > >> > >> Steve > >> > >> On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhe...@jwplayer.com > >> <mailto:swhe...@jwplayer.com>> wrote: > >> > >> For some background, I am upgrading from Flink v1.9 to v1.11. So > >> what I am about to describe is our implementation on v1.9, which > >> worked. I am trying to achieve the same functionality on v1.11. > >> > >> I have a DataStream whose type is an avro generated POJO, which > >> contains a field *UrlParameters* that is of type *Map<String, > >> String>*. I register this stream as a view so I can perform SQL > >> queries on it. One of the queries contains the UDF I have > >> previously posted. It appears that in the conversion to a view, > >> the type of *UrlParameters* is being converted into > >> *RAW('java.util.Map', ?)*. > >> > >> > >> *Code on v1.9* > >> > >> DataStream pings = // a Kafka stream source deserialized into an > >> avro generated POJO > >> tableEnvironment.registerDataStream("myTable", pings); > >> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters, > >> 'some_key') FROM myTable"); > >> // tablesinks... > >> > >> > >> /The produced type of my deserializer is:/ > >> > >> @Override > >> public TypeInformation<Ping> getProducedType() { > >> // Ping.class is an avro generated POJO > >> return TypeInformation.of(Ping.class); > >> } > >> > >> /Scalar UDF MAP_VALUE:/ > >> > >> public static String eval(final Map<String, String> map, final > >> String key) { > >> return map.get(key); > >> } > >> > >> > >> I an using a UDF to access fields in the *UrlParameters* map > >> because if I try to access them directly in the SQL (i.e. > >> `*UrlParameters['some_key']*`), I get the below exception. This > >> stackoverflow[1] had suggested the UDF as a work around. > >> > >> Caused by: org.apache.flink.table.api.TableException: Type is not > >> supported: ANY > >> at > >> > > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551) > >> at > >> > > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478) > >> at > >> > > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53) > >> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288) > >> at > >> > > org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490) > >> > >> > >> This above implementation worked successfully on v1.9. We use a > >> stream source instead of a table source b/c we do other non-SQL > >> type things with the stream. > >> > >> > >> *Code on v1.11* > >> > >> The following is the implementation on v1.11 which does not work. > >> I was using the Old Planner on v1.9 but have switched to the Blink > >> Planner on v1.11, in case that has any relevance here. > >> > >> > >> DataStream pings = // a Kafka stream source deserialized into an > >> avro generated POJO object > >> tableEnvironment.createTemporaryView("myTable", pings); > >> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters, > >> 'some_key') FROM myTable"); > >> // tablesinks... > >> > >> > >> The UDF referenced above produced the below error. So I assumed > >> adding DataTypeHints was the way to solve it but I was unable to > >> get that to work. That is what prompted the initial email to the ML. > >> > >> Caused by: org.apache.flink.table.api.ValidationException: Invalid > >> input arguments. Expected signatures are: > >> MAP_VALUE(map => MAP<STRING, STRING>, key => STRING) > >> at > >> > > org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190) > >> at > >> > > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131) > >> at > >> > > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) > >> ... 50 more > >> Caused by: org.apache.flink.table.api.ValidationException: Invalid > >> argument type at position 0. Data type MAP<STRING, STRING> > >> expected but RAW('java.util.Map', ?) passed. > >> at > >> > > org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137) > >> at > >> > > org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102) > >> at > >> > > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126) > >> ... 51 more > >> > >> > >> I can try creating a concrete reproducible example if this > >> explanation isn't enough though its quite a bit with the avro POJO > >> and custom deserializer. > >> > >> > >> Thanks, > >> > >> Steve > >> > >> > >> [1] > >> > https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types > >> < > https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types > > > >> > >