Hi Dawid, Just wanted to bump this thread in case you had any thoughts.
Thanks, Steve On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhe...@jwplayer.com> wrote: > For some background, I am upgrading from Flink v1.9 to v1.11. So what I am > about to describe is our implementation on v1.9, which worked. I am trying > to achieve the same functionality on v1.11. > > I have a DataStream whose type is an avro generated POJO, which contains a > field *UrlParameters* that is of type *Map<String, String>*. I register > this stream as a view so I can perform SQL queries on it. One of the > queries contains the UDF I have previously posted. It appears that in the > conversion to a view, the type of *UrlParameters* is being converted into > *RAW('java.util.Map', > ?)*. > > > *Code on v1.9* > > DataStream pings = // a Kafka stream source deserialized into an avro > generated POJO > tableEnvironment.registerDataStream("myTable", pings); > table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters, > 'some_key') FROM myTable"); > // tablesinks... > > > *The produced type of my deserializer is:* > > @Override > public TypeInformation<Ping> getProducedType() { > // Ping.class is an avro generated POJO > return TypeInformation.of(Ping.class); > } > > *Scalar UDF MAP_VALUE:* > > public static String eval(final Map<String, String> map, final String key) > { > return map.get(key); > } > > > I an using a UDF to access fields in the *UrlParameters* map because if I > try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`), > I get the below exception. This stackoverflow[1] had suggested the UDF as a > work around. > > Caused by: org.apache.flink.table.api.TableException: Type is not > supported: ANY > at > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:288) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490) > > > This above implementation worked successfully on v1.9. We use a stream > source instead of a table source b/c we do other non-SQL type things with > the stream. > > > *Code on v1.11* > > The following is the implementation on v1.11 which does not work. I was > using the Old Planner on v1.9 but have switched to the Blink Planner on > v1.11, in case that has any relevance here. > > > DataStream pings = // a Kafka stream source deserialized into an avro > generated POJO object > tableEnvironment.createTemporaryView("myTable", pings); > table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters, > 'some_key') FROM myTable"); > // tablesinks... > > > The UDF referenced above produced the below error. So I assumed adding > DataTypeHints was the way to solve it but I was unable to get that to work. > That is what prompted the initial email to the ML. > > Caused by: org.apache.flink.table.api.ValidationException: Invalid input > arguments. Expected signatures are: > MAP_VALUE(map => MAP<STRING, STRING>, key => STRING) > at > org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) > ... 50 more > Caused by: org.apache.flink.table.api.ValidationException: Invalid > argument type at position 0. Data type MAP<STRING, STRING> expected but > RAW('java.util.Map', ?) passed. > at > org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137) > at > org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126) > ... 51 more > > > I can try creating a concrete reproducible example if this explanation > isn't enough though its quite a bit with the avro POJO and custom > deserializer. > > > Thanks, > > Steve > > > [1] > https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types > >>