????????????????????????????
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
return new
RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes();
}
------------------ ???????? ------------------
??????: "Benchao Li"<[email protected]>;
????????: 2020??5??20??(??????) ????7:39
??????: "user-zh"<[email protected]>;
????: Re: flink1.10.x ???? arrar<row> ????
????????????????????????`ScalarFunction#getParameterTypes(Class<?>[]
signature)`????????????????????????????????????
????????????Row[]??????????????Types.OBJECT_ARRAY(Types.ROW(Types.INT,
Types.STRING...))??Row??????????????????
??????????????
?????????????? <[email protected]> ??2020??5??20?????? ????7:24??????
> udf???????????????? org.apache.flink.types.Row[]??????????????????
>
>
> ------------------&nbsp;????????&nbsp;------------------
> ??????:&nbsp;"Benchao Li"<[email protected]&gt;;
> ????????:&nbsp;2020??5??20??(??????) ????6:51
> ??????:&nbsp;"user-zh"<[email protected]&gt;;
>
> ????:&nbsp;Re: flink1.10.x ???? arrar<row&gt; ????
>
>
>
>
????????????UDF????????????????????????????????????????????????????????????????????????????????????
>
> ?????????????? <[email protected]&gt; ??2020??5??20?????? ????4:25??????
>
> &gt; 1.blink_planner ????ddl????array??????????????select
????????????????????
> &gt;&nbsp; 2.blink_planner
????????????????????????????????????????????????flink????????????
> &gt;
> &gt;
> &gt; 3.????????flink-planner????????????
> &gt;
> &gt;
> &gt;
> &gt; CREATE TABLE sourceTable (
> &gt;
> &gt; &amp;nbsp;event_time_line array<ROW (
> &gt;
> &gt; &amp;nbsp; `rule_name` VARCHAR,
> &gt;
> &gt; &amp;nbsp; `count` VARCHAR
> &gt;
> &gt; &amp;nbsp;)&amp;gt;
> &gt;
> &gt; ) WITH (
> &gt;
> &gt; &amp;nbsp;'connector.type' = 'kafka',
> &gt;
> &gt; &amp;nbsp;'connector.version' = 'universal',
> &gt;
> &gt; &amp;nbsp;'connector.startup-mode' = 'earliest-offset',
> &gt;
> &gt; &amp;nbsp;'connector.topic' = 'topic_test_1',
> &gt;
> &gt; &amp;nbsp;'connector.properties.zookeeper.connect' =
'localhost:2181',
> &gt;
> &gt; &amp;nbsp;'connector.properties.bootstrap.servers' =
'localhost:9092',
> &gt;
> &gt; &amp;nbsp;'update-mode' = 'append',
> &gt;
> &gt; &amp;nbsp;'format.type' = 'json',
> &gt;
> &gt; &amp;nbsp;'format.derive-schema' = 'true'
> &gt;
> &gt; );
> &gt;
> &gt; --????????????
> &gt;
> &gt; select event_time_line from sourceTable ;
> &gt;
> &gt;
--??????????????????????????value??????????????????????size??????????
> &gt;
> &gt; select type_change(event_time_line) from sourceTable ;
> &gt;
> &gt; &amp;nbsp;
> &gt;
> &gt; public class TypeChange extends ScalarFunction {
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; /**
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; *
> &gt;
??null??????????????????????????????????????????????????????????planner????????????????
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @param
rows
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @return
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; */
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; public String eval(Row
[] rows){
> &gt;
> &gt;
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &amp;nbsp;return
> &gt; JSONObject.toJSONString(rows);
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; }
> &gt;
> &gt; &amp;nbsp;
> &gt;
> &gt; }
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [email protected]; [email protected]
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]