Timo and Dawid,

Registering my UDF via the deprecated *registerFunction()* instead of the
new *createTemporarySystemFunction()* worked. So it would appear there is
some incompatibility with my implementation and the new registration
system. I will wait for the Flip to be completed and retry then. This
solution works for now.

Thanks,

Steve

On Mon, Nov 9, 2020 at 10:12 AM Timo Walther <twal...@apache.org> wrote:

> Sorry for jumping in so late. I think Dawid gave a nice summary.
>
> As he said, integration of the DataStream <> Table integration is still
> under development. Until then I would suggest to option 3) which means
> don't upgrade the functions and use the old registration function
> `registerFunction`. Everything should work as expected there with the
> old types.
>
> Let me know if you need more input.
>
> Regards,
> Timo
>
>
> On 09.11.20 10:28, Dawid Wysakowicz wrote:
> > Hi Steve,
> >
> > Unfortunately the information you posted still does not explain how you
> > ended up with *RAW('java.util.Map', ?)* for your input type. Would be
> > best if you could share an example that I could use to reproduce it.
> >
> > I tried putting down some potential approaches:
> >
> > I tested it with a class generated from an avsc:
> >
> >   {"namespace": "com.ververica.avro.generated",
> >   "type": "record",
> >   "name": "Address",
> >   "fields": [
> >       {"name": "num", "type": "int"},
> >       {"name": "street", "type": {
> >                            "type": "map",
> >                            "values" : "string",
> >                            "default": {}
> >                          }}
> >    ]
> > }
> >
> > which has two fields:
> >
> >    @Deprecated public int num;
> >    @Deprecated public java.util.Map<java.lang.String,java.lang.String>
> > street;
> >
> > 1) From the description you posted the UrlParameters (street in my case)
> > field should have *LEGACY('RAW', 'ANY<java.util.Map>')* type.
> >
> > root
> >   |-- num: INT
> >   |-- street: LEGACY('RAW', 'ANY<java.util.Map>')
> >
> > 2) Using the new type system
> >
> > A more seamless integration of the DataStream <> Table integration is
> > still under development. You can check FLIP-136[1] for it. Therefore
> > you'd need to adjust your types in the input DataStream. Bare in mind
> > this approach changes the way the type is serialized from an Avro based
> > to custom Flink's POJO serialization.
> >
> >      public static void main(String[] args) throws Exception {
> >          StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >          Map<String, TypeInformation<?>> fieldTypes = new HashMap<>();
> >          fieldTypes.put("num", BasicTypeInfo.INT_TYPE_INFO);
> >          fieldTypes.put("street", Types.MAP(Types.STRING, Types.STRING));
> >          SingleOutputStreamOperator<Address> elements = env.fromElements(
> >                  Address.newBuilder()
> >                          .setNum(1)
> >                          .setStreet(new HashMap<>())
> >                          .build()
> >          )
> >          .returns(
> >                  Types.POJO(
> >                          Address.class,
> >                          fieldTypes
> >                  )
> >          );
> >          StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> >                  env,
> > EnvironmentSettings.newInstance().useBlinkPlanner().build());
> >
> >          tEnv.createTemporaryView("test", elements);
> >
> >          tEnv.from("test").select(call(Func.class,
> > $("street"))).execute().print();
> >      }
> >
> >      public static class Func extends ScalarFunction {
> >          @FunctionHint(
> >                  input = {@DataTypeHint(value = "MAP<STRING, STRING>")},
> >                  output = @DataTypeHint("STRING")
> >          )
> >          public String eval(final Map<String, String> map) {
> >              // business logic
> >              return "ABC";
> >          }
> >      }
> >
> > 3) Using the legacy types approach you can query that field like this:
> >
> >      public static class LegacyFunc extends ScalarFunction {
> >          public String eval(final Map<String, String> map) {
> >              // business logic
> >              return "ABC";
> >          }
> >      }
> >
> >      StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >      SingleOutputStreamOperator<Address> elements = env.fromElements(
> >              Address.newBuilder()
> >                      .setNum(1)
> >                      .setStreet(new HashMap<>())
> >                      .build()
> >      );
> >      StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> >              env,
> > EnvironmentSettings.newInstance().useBlinkPlanner().build());
> >
> >      tEnv.createTemporaryView("test", elements);
> >      tEnv.registerFunction("legacyFunc", new LegacyFunc());
> >
> >      tEnv.from("test").select(call("legacyFunc",
> > $("street"))).execute().print();
> >
> > Best,
> >
> > Dawid
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >
> > On 09/11/2020 02:30, Steve Whelan wrote:
> >> Hi Dawid,
> >>
> >> Just wanted to bump this thread in case you had any thoughts.
> >>
> >> Thanks,
> >>
> >> Steve
> >>
> >> On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhe...@jwplayer.com
> >> <mailto:swhe...@jwplayer.com>> wrote:
> >>
> >>     For some background, I am upgrading from Flink v1.9 to v1.11. So
> >>     what I am about to describe is our implementation on v1.9, which
> >>     worked. I am trying to achieve the same functionality on v1.11.
> >>
> >>     I have a DataStream whose type is an avro generated POJO, which
> >>     contains a field *UrlParameters* that is of type *Map<String,
> >>     String>*. I register this stream as a view so I can perform SQL
> >>     queries on it. One of the queries contains the UDF I have
> >>     previously posted. It appears that in the conversion to a view,
> >>     the type of *UrlParameters* is being converted into
> >>     *RAW('java.util.Map', ?)*.
> >>
> >>
> >>     *Code on v1.9*
> >>
> >>     DataStream pings = // a Kafka stream source deserialized into an
> >>     avro generated POJO
> >>     tableEnvironment.registerDataStream("myTable", pings);
> >>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> >>     'some_key') FROM myTable");
> >>     // tablesinks...
> >>
> >>
> >>     /The produced type of my deserializer is:/
> >>
> >>     @Override
> >>     public TypeInformation<Ping> getProducedType() {
> >>         // Ping.class is an avro generated POJO
> >>         return TypeInformation.of(Ping.class);
> >>     }
> >>
> >>     /Scalar UDF MAP_VALUE:/
> >>
> >>     public static String eval(final Map<String, String> map, final
> >>     String key) {
> >>         return map.get(key);
> >>     }
> >>
> >>
> >>     I an using a UDF to access fields in the *UrlParameters* map
> >>     because if I try to access them directly in the SQL (i.e.
> >>     `*UrlParameters['some_key']*`), I get the below exception. This
> >>     stackoverflow[1] had suggested the UDF as a work around.
> >>
> >>     Caused by: org.apache.flink.table.api.TableException: Type is not
> >>     supported: ANY
> >>     at
> >>
>  
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
> >>     at
> >>
>  
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
> >>     at
> >>
>  
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
> >>     at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
> >>     at
> >>
>  
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
> >>
> >>
> >>     This above implementation worked successfully on v1.9. We use a
> >>     stream source instead of a table source b/c we do other non-SQL
> >>     type things with the stream.
> >>
> >>
> >>     *Code on v1.11*
> >>
> >>     The following is the implementation on v1.11 which does not work.
> >>     I was using the Old Planner on v1.9 but have switched to the Blink
> >>     Planner on v1.11, in case that has any relevance here.
> >>
> >>
> >>     DataStream pings = // a Kafka stream source deserialized into an
> >>     avro generated POJO object
> >>     tableEnvironment.createTemporaryView("myTable", pings);
> >>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> >>     'some_key') FROM myTable");
> >>     // tablesinks...
> >>
> >>
> >>     The UDF referenced above produced the below error. So I assumed
> >>     adding DataTypeHints was the way to solve it but I was unable to
> >>     get that to work. That is what prompted the initial email to the ML.
> >>
> >>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
> >>     input arguments. Expected signatures are:
> >>     MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
> >>     at
> >>
>  
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> >>     at
> >>
>  
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> >>     at
> >>
>  
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> >>     ... 50 more
> >>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
> >>     argument type at position 0. Data type MAP<STRING, STRING>
> >>     expected but RAW('java.util.Map', ?) passed.
> >>     at
> >>
>  
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> >>     at
> >>
>  
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> >>     at
> >>
>  
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> >>     ... 51 more
> >>
> >>
> >>     I can try creating a concrete reproducible example if this
> >>     explanation isn't enough though its quite a bit with the avro POJO
> >>     and custom deserializer.
> >>
> >>
> >>     Thanks,
> >>
> >>     Steve
> >>
> >>
> >>     [1]
> >>
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
> >>     <
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
> >
> >>
>
>

Reply via email to