benwang li created FLINK-8815: --------------------------------- Summary: EventTime won't work as reduce Key: FLINK-8815 URL: https://issues.apache.org/jira/browse/FLINK-8815 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.4.1 Environment:
Main Code {code:java} public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ObjectMapper jsonMapper = new ObjectMapper(); Properties properties = new Properties(); // String brokers = "172.27.138.8:9092"; String brokers = "localhost:9092"; properties.setProperty("bootstrap.servers", brokers); properties.setProperty("group.id", "test_fink"); String topic = "stream_test"; env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer010<BitRate> myConsumer = new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties); DataStream<BitRate> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<BitRate> reduceItems = stream .keyBy(a -> a.gameId) .timeWindow(Time.seconds(1)) .reduce((a, b) -> a.add(b)); reduceItems.print(); //never print reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> { try { tuple.end(); System.out.println(tuple.rate + "\t" + tuple.user); //never print return jsonMapper.writeValueAsBytes(tuple); } catch (JsonProcessingException e) { e.printStackTrace(); return "".getBytes(); } })); env.execute("Flink Streaming Java API Skeleton"); } {code} The reduceItems will never print, But the bitrate add method print logs. My log is simple like this, all the log are generated in time. {code:java} 4281_783_1519827320460 7347_939_1519827320460 3281_984_1519827320460 8225_810_1519827320460 3956_920_1519827320460 6594_815_1519827320460 5962_925_1519827320460 4028_854_1519827320460 811_875_1519827320460 3837_974_1519827320460 {code} My Event BitRate {code:java} public class BitRate { public long eventTime; public long gameId; public long rate; public long user; public long startTs; public long endTs; public int type; public BitRate() { } public BitRate(long eventTime, long gameId, long rate, long user) { this.eventTime = eventTime; this.gameId = gameId; this.rate = rate; this.user = user; this.startTs = System.currentTimeMillis(); this.type = 0; } public void end() { this.endTs = System.currentTimeMillis(); // System.out.println("end" + this.user); } public BitRate add(BitRate b) { this.rate += b.rate; this.user += b.user; // System.out.println("add" + b.user); return this; } {code} My CustomWatermarkEmitter {code:java} public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> { @Nullable @Override public Watermark getCurrentWatermark() { // System.out.println("get=>" + currentTs +maxTimeLag); return new Watermark(System.currentTimeMillis()); } @Override public long extractTimestamp(BitRate bitRate, long l) { // System.out.println("extract"+bitRate.startTs + ":" + l); return bitRate.eventTime; } } {code} Reporter: benwang li I Use the EventTime option to do the window reduce operation.But the reduce result stream got nothing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)