source算子好像是没有In指标的,只有Out指标; 默认source和map算子会operator chain成一个task,你disable一下operator chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。 背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。
kcz <[email protected]> 于2020年8月4日周二 上午12:41写道: > 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。 > > > > > > ------------------ 原始邮件 ------------------ > 发件人: shizk233 <[email protected]> > 发送时间: 2020年8月3日 23:03 > 收件人: [email protected] <[email protected]> > 主题: 回复:flink-1.11 模拟背压 > > > > source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 > > kcz <[email protected]> 于2020年8月3日周一 下午7:29写道: > > > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > > public static void main(String[] args) throws Exception{ > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.enableCheckpointing(2000L, > CheckpointingMode.EXACTLY_ONCE); > > env.setStateBackend(new MemoryStateBackend()); > > env.setParallelism(4); > > Properties properties = getLocal(); > > properties.setProperty("group.id","test"); > > FlinkKafkaConsumer<String&gt; consumer = > new > > FlinkKafkaConsumer<&gt;("testOrderTopic", new > SimpleStringSchema(), > > properties); > > DataStream<String&gt; stream = env > > > .addSource(consumer); > > stream.map(new MapFunction<String, > Tuple2<Integer,Integer&gt;&gt;() { > > @Override > > public > Tuple2<Integer,Integer&gt; map(String s) throws Exception { > > > Thread.sleep(1000*60*60*60); > > > return new Tuple2(1,1); > > } > > }).keyBy(0).sum(0); > > stream.print(); > > //stream.map(); > > env.execute(); > > > > }
