source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
kcz <[email protected]> 于2020年8月3日周一 下午7:29写道: > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new MemoryStateBackend()); > env.setParallelism(4); > Properties properties = getLocal(); > properties.setProperty("group.id","test"); > FlinkKafkaConsumer<String> consumer = new > FlinkKafkaConsumer<>("testOrderTopic", new SimpleStringSchema(), > properties); > DataStream<String> stream = env > .addSource(consumer); > stream.map(new MapFunction<String, Tuple2<Integer,Integer>>() { > @Override > public Tuple2<Integer,Integer> map(String s) throws Exception { > Thread.sleep(1000*60*60*60); > return new Tuple2(1,1); > } > }).keyBy(0).sum(0); > stream.print(); > //stream.map(); > env.execute(); > > }
