Hi, for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
Hope that helps. Best, Xingcan > On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote: > > > > Hi, I got a problem in Flink and need your help. > > I tried to use TimeCharacteristic.EvenTime, but the sink function never be > executed. > > public class StreamingJob { > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > ObjectMapper jsonMapper = new ObjectMapper(); > > Properties properties = new Properties(); > // String brokers = "172.27.138.8:9092"; > String brokers = "localhost:9092"; > properties.setProperty("bootstrap.servers", brokers); > properties.setProperty("group.id", "test_fink"); > String topic = "stream_test"; > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > FlinkKafkaConsumer010<BitRate> myConsumer = > new FlinkKafkaConsumer010(topic, new > BitRate.BitRateDeserializtionSchema(), properties); > > DataStream<BitRate> stream = env.addSource(myConsumer) > .assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); > DataStream<BitRate> > reduceItems = > stream > .keyBy(a -> a.gameId) > .timeWindow(Time.seconds(10)) > .reduce((a, b) -> a.add(b)); > > reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", > (tuple) -> { > try { > tuple.end(); > System.out.println(tuple.rate + "\t" + tuple.user); > return jsonMapper.writeValueAsBytes(tuple); > } catch (JsonProcessingException e) { > e.printStackTrace(); > return "".getBytes(); > } > })); > > env.execute("Flink Streaming Java API Skeleton"); > } > > } > > > Here is the CustomWatermarkEmitter. I tried to increase the lag number, but > not worked. > > public class CustomWatermarkEmitter implements > AssignerWithPeriodicWatermarks<BitRate> { > > private long currentMax = 0; > private long lag = 3600 * 1000; //not worked ,even though the lag is very > big > > @Nullable > @Override > public Watermark getCurrentWatermark() { > long atLeastTime = currentMax - lag; > System.out.println("water mark" + atLeastTime); > return new Watermark(atLeastTime < 0 ? 0 : atLeastTime); > } > > @Override > public long extractTimestamp(BitRate bitRate, long l) { > currentMax = Long.max(bitRate.eventTime, currentMax); > return bitRate.eventTime; > } > } > > > Here is the entity BitRate, the logs are generated in time , sample log > `4281_783_1520047769115` > > > public BitRate(long eventTime, long gameId, long rate, long user) { > this.eventTime = eventTime; > > this.gameId = gameId; > this.rate = rate; > this.user = user; > this.startTs = System.currentTimeMillis(); > this.type = 0; > } > > public void end() { > this.endTs = System.currentTimeMillis(); > } > > public BitRate add(BitRate b) { > System.out.println("Add:" + b.rate); > this.rate += b.rate; > this.user += b.user; > return this; > } >