????????????????????????????
public TypeInformation<?&gt;[] getParameterTypes(Class<?&gt;[] signature) {
    return new 
RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes();
}




------------------&nbsp;????????&nbsp;------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??5??20??(??????) ????7:39
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: flink1.10.x ???? arrar<row&gt; ????



????????????????????????`ScalarFunction#getParameterTypes(Class<?&gt;[]
signature)`????????????????????????????????????
????????????Row[]??????????????Types.OBJECT_ARRAY(Types.ROW(Types.INT,
Types.STRING...))??Row??????????????????
??????????????

?????????????? <[email protected]&gt; ??2020??5??20?????? ????7:24??????

&gt; udf???????????????? org.apache.flink.types.Row[]??????????????????
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"Benchao Li"<[email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??5??20??(??????) ????6:51
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: flink1.10.x ???? arrar<row&amp;gt; ????
&gt;
&gt;
&gt;
&gt; 
????????????UDF????????????????????????????????????????????????????????????????????????????????????
&gt;
&gt; ?????????????? <[email protected]&amp;gt; ??2020??5??20?????? ????4:25??????
&gt;
&gt; &amp;gt; 1.blink_planner ????ddl????array??????????????select 
????????????????????
&gt; &amp;gt;&amp;nbsp; 2.blink_planner 
????????????????????????????????????????????????flink????????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 3.????????flink-planner????????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; CREATE TABLE sourceTable (
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;event_time_line array<ROW (
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; `rule_name` VARCHAR,
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; `count` VARCHAR
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;)&amp;amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ) WITH (
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.type' = 'kafka',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.version' = 'universal',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.startup-mode' = 'earliest-offset',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.topic' = 'topic_test_1',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.properties.zookeeper.connect' = 
'localhost:2181',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.properties.bootstrap.servers' = 
'localhost:9092',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'update-mode' = 'append',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'format.type' = 'json',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'format.derive-schema' = 'true'
&gt; &amp;gt;
&gt; &amp;gt; );
&gt; &amp;gt;
&gt; &amp;gt; --????????????
&gt; &amp;gt;
&gt; &amp;gt; select event_time_line from sourceTable ;
&gt; &amp;gt;
&gt; &amp;gt; 
--??????????????????????????value??????????????????????size??????????
&gt; &amp;gt;
&gt; &amp;gt; select type_change(event_time_line) from sourceTable ;
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt;
&gt; &amp;gt; public class TypeChange extends ScalarFunction {
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; /**
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; *
&gt; &amp;gt; 
??null??????????????????????????????????????????????????????????planner????????????????
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; * @param 
rows
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; * @return
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; */
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; public String eval(Row 
[] rows){
&gt; &amp;gt;
&gt; &amp;gt; 
&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;amp;nbsp;return
&gt; &amp;gt; JSONObject.toJSONString(rows);
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; }
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt;
&gt; &amp;gt; }
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Benchao Li
&gt; School of Electronics Engineering and Computer Science, Peking University
&gt; Tel:+86-15650713730
&gt; Email: [email protected]; [email protected]



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复