Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some 
problem.

https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc
 
<https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc>


From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no 
result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method 
is called. But the no result from the sink function.




> On 3 Mar 2018, at 12:47, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Hi,
> 
> for periodically generated watermarks, you should use 
> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com 
>> <mailto:543950...@qq.com>> wrote:
>> 
>> 
>> 
>> Hi, I got a problem in Flink  and need your help.
>> 
>> I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
>> executed.  
>> 
>> public class StreamingJob {
>>   public static void main(String[] args) throws Exception {
>>     // set up the streaming execution environment
>>     final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     ObjectMapper jsonMapper = new ObjectMapper();
>> 
>>     Properties properties = new Properties();
>> //    String brokers = "172.27.138.8:9092";
>>     String brokers = "localhost:9092";
>>     properties.setProperty("bootstrap.servers", brokers);
>>     properties.setProperty("group.id <http://group.id/>", "test_fink");
>>     String topic = "stream_test";
>> 
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>     FlinkKafkaConsumer010<BitRate> myConsumer =
>>         new FlinkKafkaConsumer010(topic, new 
>> BitRate.BitRateDeserializtionSchema(), properties);
>> 
>>     DataStream<BitRate> stream = env.addSource(myConsumer)
>>         .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>>     DataStream<BitRate>
>>         reduceItems =
>>         stream
>>             .keyBy(a -> a.gameId)
>>             .timeWindow(Time.seconds(10))
>>             .reduce((a, b) -> a.add(b));
>> 
>>     reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
>> (tuple) -> {
>>       try {
>>         tuple.end();
>>         System.out.println(tuple.rate + "\t" + tuple.user);
>>         return jsonMapper.writeValueAsBytes(tuple);
>>       } catch (JsonProcessingException e) {
>>         e.printStackTrace();
>>         return "".getBytes();
>>       }
>>     }));
>> 
>>     env.execute("Flink Streaming Java API Skeleton");
>>   }
>> 
>> }
>> 
>> 
>> Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
>> not worked.
>> 
>> public class CustomWatermarkEmitter implements 
>> AssignerWithPeriodicWatermarks<BitRate> {
>> 
>>   private long currentMax = 0;
>>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
>> big
>> 
>>   @Nullable
>>   @Override
>>   public Watermark getCurrentWatermark() {
>>     long atLeastTime = currentMax - lag;
>>     System.out.println("water mark" + atLeastTime);
>>     return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>>   }
>> 
>>   @Override
>>   public long extractTimestamp(BitRate bitRate, long l) {
>>     currentMax = Long.max(bitRate.eventTime, currentMax);
>>     return bitRate.eventTime;
>>   }
>> }
>> 
>> 
>> Here is the entity BitRate, the logs are generated in time , sample log   
>> `4281_783_1520047769115`
>> 
>> 
>> public BitRate(long eventTime, long gameId, long rate, long user) {
>>   this.eventTime = eventTime;
>> 
>>   this.gameId = gameId;
>>   this.rate = rate;
>>   this.user = user;
>>   this.startTs = System.currentTimeMillis();
>>   this.type = 0;
>> }
>> 
>> public void end() {
>>   this.endTs = System.currentTimeMillis();
>> }
>> 
>> public BitRate add(BitRate b) {
>>   System.out.println("Add:" + b.rate);
>>   this.rate += b.rate;
>>   this.user += b.user;
>>   return this;
>> }
>> 
> 

Reply via email to