???????? ????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??8??4??(??????) ????1:13
??????: "[email protected]"<[email protected]>;
????: Re: flink-1.11 ????????
source??????????????In??????,????Out??????
????source??map??????operator chain??????task????disable????operator
chain,??map??????????????task????????map????????????In??Out????
????????????????????isBackpressured??????????????????operator????????????????????????????????
kcz <[email protected]> ??2020??8??4?????? ????12:41??????
> ???? yeah??ui??????????????????????????souce????????????????map
sleep???????????????????????? ??????????????100w????????
>
>
>
>
>
> ------------------ ???????? ------------------
> ??????: shizk233 <[email protected]&gt;
> ????????: 2020??8??3?? 23:03
> ??????: [email protected] <[email protected]&gt;
> ????: ??????flink-1.11 ????????
>
>
>
> source??????????????????????????????web ui??task??numOfRecordsIn??
>
> kcz <[email protected]&gt; ??2020??8??3?????? ????7:29??????
>
> &gt;
????????????????????????????kafka??????100w??????????source??????????????????????????????????????????
> &gt; public static void main(String[] args) throws Exception{
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;
StreamExecutionEnvironment env =
> &gt; StreamExecutionEnvironment.getExecutionEnvironment();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;
env.enableCheckpointing(2000L,
> CheckpointingMode.EXACTLY_ONCE);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new
MemoryStateBackend());
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties =
getLocal();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;
properties.setProperty("group.id","test");
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;
FlinkKafkaConsumer<String&amp;gt; consumer =
> new
> &gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new
> SimpleStringSchema(),
> &gt; properties);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;
DataStream<String&amp;gt; stream = env
>
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .addSource(consumer);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new
MapFunction<String,
> Tuple2<Integer,Integer&amp;gt;&amp;gt;() {
>
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
@Override
>
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
public
> Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception {
>
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> Thread.sleep(1000*60*60*60);
>
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return new Tuple2(1,1);
>
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute();
> &gt;
> &gt; }