Hi,

for periodically generated watermarks, you should use 
`ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
> 
> 
> 
> Hi, I got a problem in Flink  and need your help.
> 
> I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
> executed.  
> 
> public class StreamingJob {
>   public static void main(String[] args) throws Exception {
>     // set up the streaming execution environment
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     ObjectMapper jsonMapper = new ObjectMapper();
> 
>     Properties properties = new Properties();
> //    String brokers = "172.27.138.8:9092";
>     String brokers = "localhost:9092";
>     properties.setProperty("bootstrap.servers", brokers);
>     properties.setProperty("group.id", "test_fink");
>     String topic = "stream_test";
> 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     FlinkKafkaConsumer010<BitRate> myConsumer =
>         new FlinkKafkaConsumer010(topic, new 
> BitRate.BitRateDeserializtionSchema(), properties);
> 
>     DataStream<BitRate> stream = env.addSource(myConsumer)
>         .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>     DataStream<BitRate>
>         reduceItems =
>         stream
>             .keyBy(a -> a.gameId)
>             .timeWindow(Time.seconds(10))
>             .reduce((a, b) -> a.add(b));
> 
>     reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
> (tuple) -> {
>       try {
>         tuple.end();
>         System.out.println(tuple.rate + "\t" + tuple.user);
>         return jsonMapper.writeValueAsBytes(tuple);
>       } catch (JsonProcessingException e) {
>         e.printStackTrace();
>         return "".getBytes();
>       }
>     }));
> 
>     env.execute("Flink Streaming Java API Skeleton");
>   }
> 
> }
> 
> 
> Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
> not worked.
> 
> public class CustomWatermarkEmitter implements 
> AssignerWithPeriodicWatermarks<BitRate> {
> 
>   private long currentMax = 0;
>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
> big
> 
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
>     long atLeastTime = currentMax - lag;
>     System.out.println("water mark" + atLeastTime);
>     return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>   }
> 
>   @Override
>   public long extractTimestamp(BitRate bitRate, long l) {
>     currentMax = Long.max(bitRate.eventTime, currentMax);
>     return bitRate.eventTime;
>   }
> }
> 
> 
> Here is the entity BitRate, the logs are generated in time , sample log   
> `4281_783_1520047769115`
> 
> 
> public BitRate(long eventTime, long gameId, long rate, long user) {
>   this.eventTime = eventTime;
> 
>   this.gameId = gameId;
>   this.rate = rate;
>   this.user = user;
>   this.startTs = System.currentTimeMillis();
>   this.type = 0;
> }
> 
> public void end() {
>   this.endTs = System.currentTimeMillis();
> }
> 
> public BitRate add(BitRate b) {
>   System.out.println("Add:" + b.rate);
>   this.rate += b.rate;
>   this.user += b.user;
>   return this;
> }
> 

Reply via email to