Sorry for jumping in so late. I think Dawid gave a nice summary.

As he said, integration of the DataStream <> Table integration is still under development. Until then I would suggest to option 3) which means don't upgrade the functions and use the old registration function `registerFunction`. Everything should work as expected there with the old types.

Let me know if you need more input.

Regards,
Timo


On 09.11.20 10:28, Dawid Wysakowicz wrote:
Hi Steve,

Unfortunately the information you posted still does not explain how you ended up with *RAW('java.util.Map', ?)* for your input type. Would be best if you could share an example that I could use to reproduce it.

I tried putting down some potential approaches:

I tested it with a class generated from an avsc:

  {"namespace": "com.ververica.avro.generated",
  "type": "record",
  "name": "Address",
  "fields": [
      {"name": "num", "type": "int"},
      {"name": "street", "type": {
                           "type": "map",
                           "values" : "string",
                           "default": {}
                         }}
   ]
}

which has two fields:

   @Deprecated public int num;
  @Deprecated public java.util.Map<java.lang.String,java.lang.String> street;

1) From the description you posted the UrlParameters (street in my case) field should have *LEGACY('RAW', 'ANY<java.util.Map>')* type.

root
  |-- num: INT
  |-- street: LEGACY('RAW', 'ANY<java.util.Map>')

2) Using the new type system

A more seamless integration of the DataStream <> Table integration is still under development. You can check FLIP-136[1] for it. Therefore you'd need to adjust your types in the input DataStream. Bare in mind this approach changes the way the type is serialized from an Avro based to custom Flink's POJO serialization.

     public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         Map<String, TypeInformation<?>> fieldTypes = new HashMap<>();
         fieldTypes.put("num", BasicTypeInfo.INT_TYPE_INFO);
         fieldTypes.put("street", Types.MAP(Types.STRING, Types.STRING));
         SingleOutputStreamOperator<Address> elements = env.fromElements(
                 Address.newBuilder()
                         .setNum(1)
                         .setStreet(new HashMap<>())
                         .build()
         )
         .returns(
                 Types.POJO(
                         Address.class,
                         fieldTypes
                 )
         );
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(
                 env,
EnvironmentSettings.newInstance().useBlinkPlanner().build());

         tEnv.createTemporaryView("test", elements);

        tEnv.from("test").select(call(Func.class, $("street"))).execute().print();
     }

     public static class Func extends ScalarFunction {
         @FunctionHint(
                 input = {@DataTypeHint(value = "MAP<STRING, STRING>")},
                 output = @DataTypeHint("STRING")
         )
         public String eval(final Map<String, String> map) {
             // business logic
             return "ABC";
         }
     }

3) Using the legacy types approach you can query that field like this:

     public static class LegacyFunc extends ScalarFunction {
         public String eval(final Map<String, String> map) {
             // business logic
             return "ABC";
         }
     }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     SingleOutputStreamOperator<Address> elements = env.fromElements(
             Address.newBuilder()
                     .setNum(1)
                     .setStreet(new HashMap<>())
                     .build()
     );
     StreamTableEnvironment tEnv = StreamTableEnvironment.create(
             env,
EnvironmentSettings.newInstance().useBlinkPlanner().build());

     tEnv.createTemporaryView("test", elements);
     tEnv.registerFunction("legacyFunc", new LegacyFunc());

    tEnv.from("test").select(call("legacyFunc", $("street"))).execute().print();

Best,

Dawid

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 09/11/2020 02:30, Steve Whelan wrote:
Hi Dawid,

Just wanted to bump this thread in case you had any thoughts.

Thanks,

Steve

On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhe...@jwplayer.com <mailto:swhe...@jwplayer.com>> wrote:

    For some background, I am upgrading from Flink v1.9 to v1.11. So
    what I am about to describe is our implementation on v1.9, which
    worked. I am trying to achieve the same functionality on v1.11.

    I have a DataStream whose type is an avro generated POJO, which
    contains a field *UrlParameters* that is of type *Map<String,
    String>*. I register this stream as a view so I can perform SQL
    queries on it. One of the queries contains the UDF I have
    previously posted. It appears that in the conversion to a view,
    the type of *UrlParameters* is being converted into
    *RAW('java.util.Map', ?)*.


    *Code on v1.9*

    DataStream pings = // a Kafka stream source deserialized into an
    avro generated POJO
    tableEnvironment.registerDataStream("myTable", pings);
    table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
    'some_key') FROM myTable");
    // tablesinks...


    /The produced type of my deserializer is:/

    @Override
    public TypeInformation<Ping> getProducedType() {
        // Ping.class is an avro generated POJO
        return TypeInformation.of(Ping.class);
    }

    /Scalar UDF MAP_VALUE:/

    public static String eval(final Map<String, String> map, final
    String key) {
        return map.get(key);
    }


    I an using a UDF to access fields in the *UrlParameters* map
    because if I try to access them directly in the SQL (i.e.
    `*UrlParameters['some_key']*`), I get the below exception. This
    stackoverflow[1] had suggested the UDF as a work around.

    Caused by: org.apache.flink.table.api.TableException: Type is not
    supported: ANY
    at
    
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
    at
    
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
    at
    
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
    at
    
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)


    This above implementation worked successfully on v1.9. We use a
    stream source instead of a table source b/c we do other non-SQL
    type things with the stream.


    *Code on v1.11*

    The following is the implementation on v1.11 which does not work.
    I was using the Old Planner on v1.9 but have switched to the Blink
    Planner on v1.11, in case that has any relevance here.


    DataStream pings = // a Kafka stream source deserialized into an
    avro generated POJO object
    tableEnvironment.createTemporaryView("myTable", pings);
    table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
    'some_key') FROM myTable");
    // tablesinks...


    The UDF referenced above produced the below error. So I assumed
    adding DataTypeHints was the way to solve it but I was unable to
    get that to work. That is what prompted the initial email to the ML.

    Caused by: org.apache.flink.table.api.ValidationException: Invalid
    input arguments. Expected signatures are:
    MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
    at
    
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
    at
    
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
    at
    
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
    ... 50 more
    Caused by: org.apache.flink.table.api.ValidationException: Invalid
    argument type at position 0. Data type MAP<STRING, STRING>
    expected but RAW('java.util.Map', ?) passed.
    at
    
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
    at
    
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
    at
    
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
    ... 51 more


    I can try creating a concrete reproducible example if this
    explanation isn't enough though its quite a bit with the avro POJO
    and custom deserializer.


    Thanks,

    Steve


    [1]
    
https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
    
<https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types>


Reply via email to