Hi, thanks for your reply. I have searched it in stackoverflow, and there is someone who has the some problem.
https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc <https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc> From your advice, I tried the code. env.getConfig().setAutoWatermarkInterval(3 * 1000); And it calls the getCurrentWaterMark function each 3 seconds, but still no result come out. From the outputs ('water mark1520049229163'), I could see that the add method is called. But the no result from the sink function. > On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com> wrote: > > Hi, > > for periodically generated watermarks, you should use > `ExecutionConfig.setAutoWatermarkInterval()` to set an interval. > > Hope that helps. > > Best, > Xingcan > >> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com >> <mailto:543950...@qq.com>> wrote: >> >> >> >> Hi, I got a problem in Flink and need your help. >> >> I tried to use TimeCharacteristic.EvenTime, but the sink function never be >> executed. >> >> public class StreamingJob { >> public static void main(String[] args) throws Exception { >> // set up the streaming execution environment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> ObjectMapper jsonMapper = new ObjectMapper(); >> >> Properties properties = new Properties(); >> // String brokers = "172.27.138.8:9092"; >> String brokers = "localhost:9092"; >> properties.setProperty("bootstrap.servers", brokers); >> properties.setProperty("group.id <http://group.id/>", "test_fink"); >> String topic = "stream_test"; >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> FlinkKafkaConsumer010<BitRate> myConsumer = >> new FlinkKafkaConsumer010(topic, new >> BitRate.BitRateDeserializtionSchema(), properties); >> >> DataStream<BitRate> stream = env.addSource(myConsumer) >> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); >> DataStream<BitRate> >> reduceItems = >> stream >> .keyBy(a -> a.gameId) >> .timeWindow(Time.seconds(10)) >> .reduce((a, b) -> a.add(b)); >> >> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", >> (tuple) -> { >> try { >> tuple.end(); >> System.out.println(tuple.rate + "\t" + tuple.user); >> return jsonMapper.writeValueAsBytes(tuple); >> } catch (JsonProcessingException e) { >> e.printStackTrace(); >> return "".getBytes(); >> } >> })); >> >> env.execute("Flink Streaming Java API Skeleton"); >> } >> >> } >> >> >> Here is the CustomWatermarkEmitter. I tried to increase the lag number, but >> not worked. >> >> public class CustomWatermarkEmitter implements >> AssignerWithPeriodicWatermarks<BitRate> { >> >> private long currentMax = 0; >> private long lag = 3600 * 1000; //not worked ,even though the lag is very >> big >> >> @Nullable >> @Override >> public Watermark getCurrentWatermark() { >> long atLeastTime = currentMax - lag; >> System.out.println("water mark" + atLeastTime); >> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime); >> } >> >> @Override >> public long extractTimestamp(BitRate bitRate, long l) { >> currentMax = Long.max(bitRate.eventTime, currentMax); >> return bitRate.eventTime; >> } >> } >> >> >> Here is the entity BitRate, the logs are generated in time , sample log >> `4281_783_1520047769115` >> >> >> public BitRate(long eventTime, long gameId, long rate, long user) { >> this.eventTime = eventTime; >> >> this.gameId = gameId; >> this.rate = rate; >> this.user = user; >> this.startTs = System.currentTimeMillis(); >> this.type = 0; >> } >> >> public void end() { >> this.endTs = System.currentTimeMillis(); >> } >> >> public BitRate add(BitRate b) { >> System.out.println("Add:" + b.rate); >> this.rate += b.rate; >> this.user += b.user; >> return this; >> } >> >