Hi Dawid,

Just wanted to bump this thread in case you had any thoughts.

Thanks,

Steve

On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhe...@jwplayer.com> wrote:

> For some background, I am upgrading from Flink v1.9 to v1.11. So what I am
> about to describe is our implementation on v1.9, which worked. I am trying
> to achieve the same functionality on v1.11.
>
> I have a DataStream whose type is an avro generated POJO, which contains a
> field *UrlParameters* that is of type *Map<String, String>*. I register
> this stream as a view so I can perform SQL queries on it. One of the
> queries contains the UDF I have previously posted. It appears that in the
> conversion to a view, the type of *UrlParameters* is being converted into 
> *RAW('java.util.Map',
> ?)*.
>
>
> *Code on v1.9*
>
> DataStream pings = // a Kafka stream source deserialized into an avro
> generated POJO
> tableEnvironment.registerDataStream("myTable", pings);
> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> 'some_key') FROM myTable");
> // tablesinks...
>
>
> *The produced type of my deserializer is:*
>
> @Override
> public TypeInformation<Ping> getProducedType() {
>     // Ping.class is an avro generated POJO
>     return TypeInformation.of(Ping.class);
> }
>
> *Scalar UDF MAP_VALUE:*
>
> public static String eval(final Map<String, String> map, final String key)
> {
>     return map.get(key);
> }
>
>
> I an using a UDF to access fields in the *UrlParameters* map because if I
> try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`),
> I get the below exception. This stackoverflow[1] had suggested the UDF as a
> work around.
>
> Caused by: org.apache.flink.table.api.TableException: Type is not
> supported: ANY
> at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
>
>
> This above implementation worked successfully on v1.9. We use a stream
> source instead of a table source b/c we do other non-SQL type things with
> the stream.
>
>
> *Code on v1.11*
>
> The following is the implementation on v1.11 which does not work. I was
> using the Old Planner on v1.9 but have switched to the Blink Planner on
> v1.11, in case that has any relevance here.
>
>
> DataStream pings = // a Kafka stream source deserialized into an avro
> generated POJO object
> tableEnvironment.createTemporaryView("myTable", pings);
> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> 'some_key') FROM myTable");
> // tablesinks...
>
>
> The UDF referenced above produced the below error. So I assumed adding
> DataTypeHints was the way to solve it but I was unable to get that to work.
> That is what prompted the initial email to the ML.
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type MAP<STRING, STRING> expected but
> RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> I can try creating a concrete reproducible example if this explanation
> isn't enough though its quite a bit with the avro POJO and custom
> deserializer.
>
>
> Thanks,
>
> Steve
>
>
> [1]
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
>
>>

Reply via email to