我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new MemoryStateBackend());
env.setParallelism(4);
Properties properties = getLocal();
properties.setProperty("group.id","test");
FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>("testOrderTopic", new SimpleStringSchema(), properties);
DataStream<String> stream = env
.addSource(consumer);
stream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> map(String s) throws Exception {
Thread.sleep(1000*60*60*60);
return new Tuple2(1,1);
}
}).keyBy(0).sum(0);
stream.print();
//stream.map();
env.execute();
}