dawidwys commented on a change in pull request #12147: URL: https://github.com/apache/flink/pull/12147#discussion_r425850966
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A stream operator that may do one or both of the following: extract timestamps from + * events and generate watermarks. + * + * <p>These two responsibilities run in the same operator rather than in two different ones, + * because the implementation of the timestamp assigner and the watermark generator is + * frequently in the same class (and should be run in the same instance), even though the + * separate interfaces support the use of different classes. + * + * @param <T> The type of the input elements + */ +public class TimestampsAndWatermarksOperator<T> + extends AbstractStreamOperator<T> + implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + private final WatermarkStrategy<T> watermarkStrategy; + + /** The timestamp assigner. */ + private transient TimestampAssigner<T> timestampAssigner; + + /** The watermark generator, initialized during runtime. */ + private transient WatermarkGenerator<T> watermarkGenerator; + + /** The watermark output gateway, initialized during runtime. */ + private transient WatermarkOutput wmOutput; + + /** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */ + private transient long watermarkInterval; + + public TimestampsAndWatermarksOperator( + WatermarkStrategy<T> watermarkStrategy) { + + this.watermarkStrategy = checkNotNull(watermarkStrategy); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + + timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); Review comment: When @kl0u was reviewing the `DeserializationSchema#open()` method he suggested that we add `"user"` metric group before user metrics, so it is easier to distinguish which are system and which are user metrics. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org