Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
executed.  

public class StreamingJob {
  public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    ObjectMapper jsonMapper = new ObjectMapper();

    Properties properties = new Properties();
//    String brokers = "172.27.138.8:9092";
    String brokers = "localhost:9092";
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty("group.id", "test_fink");
    String topic = "stream_test";

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    FlinkKafkaConsumer010<BitRate> myConsumer =
        new FlinkKafkaConsumer010(topic, new 
BitRate.BitRateDeserializtionSchema(), properties);

    DataStream<BitRate> stream = env.addSource(myConsumer)
        .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
    DataStream<BitRate>
        reduceItems =
        stream
            .keyBy(a -> a.gameId)
            .timeWindow(Time.seconds(10))
            .reduce((a, b) -> a.add(b));

    reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
(tuple) -> {
      try {
        tuple.end();
        System.out.println(tuple.rate + "\t" + tuple.user);
        return jsonMapper.writeValueAsBytes(tuple);
      } catch (JsonProcessingException e) {
        e.printStackTrace();
        return "".getBytes();
      }
    }));

    env.execute("Flink Streaming Java API Skeleton");
  }

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
not worked.

public class CustomWatermarkEmitter implements 
AssignerWithPeriodicWatermarks<BitRate> {

  private long currentMax = 0;
  private long lag = 3600 * 1000; //not worked ,even though the lag is very big

  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
    long atLeastTime = currentMax - lag;
    System.out.println("water mark" + atLeastTime);
    return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
  }

  @Override
  public long extractTimestamp(BitRate bitRate, long l) {
    currentMax = Long.max(bitRate.eventTime, currentMax);
    return bitRate.eventTime;
  }
}


Here is the entity BitRate, the logs are generated in time , sample log   
`4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
  this.eventTime = eventTime;

  this.gameId = gameId;
  this.rate = rate;
  this.user = user;
  this.startTs = System.currentTimeMillis();
  this.type = 0;
}

public void end() {
  this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
  System.out.println("Add:" + b.rate);
  this.rate += b.rate;
  this.user += b.user;
  return this;
}

Reply via email to