Hi, I got a problem in Flink and need your help.
I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed. public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ObjectMapper jsonMapper = new ObjectMapper(); Properties properties = new Properties(); // String brokers = "172.27.138.8:9092"; String brokers = "localhost:9092"; properties.setProperty("bootstrap.servers", brokers); properties.setProperty("group.id", "test_fink"); String topic = "stream_test"; env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer010<BitRate> myConsumer = new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties); DataStream<BitRate> stream = env.addSource(myConsumer) .assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<BitRate> reduceItems = stream .keyBy(a -> a.gameId) .timeWindow(Time.seconds(10)) .reduce((a, b) -> a.add(b)); reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> { try { tuple.end(); System.out.println(tuple.rate + "\t" + tuple.user); return jsonMapper.writeValueAsBytes(tuple); } catch (JsonProcessingException e) { e.printStackTrace(); return "".getBytes(); } })); env.execute("Flink Streaming Java API Skeleton"); } } Here is the CustomWatermarkEmitter. I tried to increase the lag number, but not worked. public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> { private long currentMax = 0; private long lag = 3600 * 1000; //not worked ,even though the lag is very big @Nullable @Override public Watermark getCurrentWatermark() { long atLeastTime = currentMax - lag; System.out.println("water mark" + atLeastTime); return new Watermark(atLeastTime < 0 ? 0 : atLeastTime); } @Override public long extractTimestamp(BitRate bitRate, long l) { currentMax = Long.max(bitRate.eventTime, currentMax); return bitRate.eventTime; } } Here is the entity BitRate, the logs are generated in time , sample log `4281_783_1520047769115` public BitRate(long eventTime, long gameId, long rate, long user) { this.eventTime = eventTime; this.gameId = gameId; this.rate = rate; this.user = user; this.startTs = System.currentTimeMillis(); this.type = 0; } public void end() { this.endTs = System.currentTimeMillis(); } public BitRate add(BitRate b) { System.out.println("Add:" + b.rate); this.rate += b.rate; this.user += b.user; return this; }