source算子好像是没有In指标的,只有Out指标;
默认source和map算子会operator chain成一个task,你disable一下operator
chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。
背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。

kcz <[email protected]> 于2020年8月4日周二 上午12:41写道:

> 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: shizk233 <[email protected]&gt;
> 发送时间: 2020年8月3日 23:03
> 收件人: [email protected] <[email protected]&gt;
> 主题: 回复:flink-1.11 模拟背压
>
>
>
> source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
>
> kcz <[email protected]&gt; 于2020年8月3日周一 下午7:29写道:
>
> &gt; 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
> &gt; public static void main(String[] args) throws Exception{
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
> &gt; StreamExecutionEnvironment.getExecutionEnvironment();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.enableCheckpointing(2000L,
> CheckpointingMode.EXACTLY_ONCE);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new MemoryStateBackend());
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = getLocal();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; properties.setProperty("group.id","test");
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&amp;gt; consumer =
> new
> &gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new
> SimpleStringSchema(),
> &gt; properties);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream = env
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .addSource(consumer);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new MapFunction<String,
> Tuple2<Integer,Integer&amp;gt;&amp;gt;() {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public
> Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> Thread.sleep(1000*60*60*60);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return new Tuple2(1,1);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute();
> &gt;
> &gt; }

回复