becketqin commented on a change in pull request #8: URL: https://github.com/apache/flink-ml/pull/8#discussion_r720780104
########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/progresstrack/ProgressTrackerListener.java ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.progresstrack; + +import java.io.IOException; + +/** The listener of alignment. */ +public interface ProgressTrackerListener { Review comment: Maybe rename to `OperatorEpochWatermarkListener` ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/progresstrack/ProgressTrackerFactory.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.progresstrack; + +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * The factory of {@link ProgressTracker}. It analyze the inputs of an operator and create the + * corresponding progress tracker. + */ +public class ProgressTrackerFactory { + + public static ProgressTracker create( + StreamConfig streamConfig, + StreamTask<?, ?> containingTask, + ProgressTrackerListener progressTrackerListener) { + + int[] numberOfChannels; + if (!streamConfig.isChainStart()) { + numberOfChannels = new int[] {1}; + } else { + InputGate[] inputGates = containingTask.getEnvironment().getAllInputGates(); + List<StreamEdge> inEdges = + streamConfig.getInPhysicalEdges(containingTask.getUserCodeClassLoader()); + + // Mapping the edge type (input number) into a continuous sequence start from 0. + TreeSet<Integer> edgeTypes = new TreeSet<>(); + inEdges.forEach(edge -> edgeTypes.add(edge.getTypeNumber())); + + Map<Integer, Integer> edgeTypeToIndices = new HashMap<>(); + for (int edgeType : edgeTypes) { + edgeTypeToIndices.put(edgeType, edgeTypeToIndices.size()); + } + + numberOfChannels = new int[edgeTypeToIndices.size()]; + for (int i = 0; i < inEdges.size(); ++i) { + numberOfChannels[edgeTypeToIndices.get(inEdges.get(i).getTypeNumber())] += Review comment: Why is it += instead = here? ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/allround/MultipleInputAllRoundWrapperOperator.java ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.operator.allround; + +import org.apache.flink.ml.iteration.IterationRecord; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.ArrayList; +import java.util.List; + +/** All-round wrapper for the multiple-inputs operator. */ +public class MultipleInputAllRoundWrapperOperator<OUT> + extends AbstractAllRoundWrapperOperator<OUT, MultipleInputStreamOperator<OUT>> + implements MultipleInputStreamOperator<IterationRecord<OUT>> { + + public MultipleInputAllRoundWrapperOperator( + StreamOperatorParameters<IterationRecord<OUT>> parameters, + StreamOperatorFactory<OUT> operatorFactory) { + super(parameters, operatorFactory); + } + + private <IN> void processElement( + int inputIndex, + Input<IN> input, + StreamRecord<IN> reusedInput, + StreamRecord<IterationRecord<IN>> element) + throws Exception { + switch (element.getValue().getType()) { + case RECORD: + reusedInput.replace(element.getValue().getValue(), element.getTimestamp()); + setIterationContextRound(element.getValue().getRound()); + input.processElement(reusedInput); + clearIterationContextRound(); + break; + case EPOCH_WATERMARK: + onEpochWatermarkEvent(inputIndex, element.getValue()); + break; + default: + throw new FlinkRuntimeException("Not supported iteration record type: " + element); + } + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public List<Input> getInputs() { + List<Input> proxyInputs = new ArrayList<>(); + for (int i = 0; i < wrappedOperator.getInputs().size(); ++i) { + // TODO: Note that here we relies on the assumption that the + // stream graph generator labels the input from 1 to n for + // the input array, which we map them from 0 to n - 1. + proxyInputs.add(new ProxyInput(i)); + } + return proxyInputs; + } + + public class ProxyInput<IN> implements Input<IterationRecord<IN>> { Review comment: Ideally the `ProxyInput` should stay with `ProxyOutput`. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/progresstrack/ProgressTracker.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.progresstrack; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Tracks the epoch watermark from each input. Once the minimum epoch watermark changed, it would + * notify the listener. + */ +public class ProgressTracker { + + private final ProgressTrackerListener progressTrackerListener; + + private final List<InputStatus> inputStatuses; + + private final LowerBoundMaintainer allInputsLowerBound; + + public ProgressTracker( Review comment: The constructor can be package private. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/AbstractWrapperOperator.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.operator; + +import org.apache.flink.ml.iteration.IterationRecord; +import org.apache.flink.ml.iteration.broadcast.BroadcastOutput; +import org.apache.flink.ml.iteration.broadcast.BroadcastOutputFactory; +import org.apache.flink.ml.iteration.progresstrack.ProgressTracker; +import org.apache.flink.ml.iteration.progresstrack.ProgressTrackerFactory; +import org.apache.flink.ml.iteration.progresstrack.ProgressTrackerListener; +import org.apache.flink.ml.iteration.proxy.ProxyOutput; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The base class of all the wrapper operators. It provides the alignment functionality. */ +public abstract class AbstractWrapperOperator<T> + implements StreamOperator<IterationRecord<T>>, ProgressTrackerListener, BoundedOneInput { Review comment: Should probably leave the `BoundedOneInput` inheritance to `OneInputAllRoundWrapperOperator` instead. Also, what about `BoundedMultiInput`? Should the wrapper take care of that as well? Essentially we need to implement all the possible decorative interfaces of `StreamOperators`. I am not sure if we have covered all of them. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/AbstractWrapperOperator.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.operator; + +import org.apache.flink.ml.iteration.IterationRecord; +import org.apache.flink.ml.iteration.broadcast.BroadcastOutput; +import org.apache.flink.ml.iteration.broadcast.BroadcastOutputFactory; +import org.apache.flink.ml.iteration.progresstrack.ProgressTracker; +import org.apache.flink.ml.iteration.progresstrack.ProgressTrackerFactory; +import org.apache.flink.ml.iteration.progresstrack.ProgressTrackerListener; +import org.apache.flink.ml.iteration.proxy.ProxyOutput; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The base class of all the wrapper operators. It provides the alignment functionality. */ +public abstract class AbstractWrapperOperator<T> + implements StreamOperator<IterationRecord<T>>, ProgressTrackerListener, BoundedOneInput { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractWrapperOperator.class); + + protected final StreamOperatorParameters<IterationRecord<T>> parameters; + + protected final StreamConfig streamConfig; + + protected final StreamTask<?, ?> containingTask; + + protected final Output<StreamRecord<IterationRecord<T>>> output; + + protected final StreamOperatorFactory<T> operatorFactory; + + // --------------- proxy --------------------------- + + protected final ProxyOutput<T> proxyOutput; + + protected final EpochWatermarkSupplier epochWatermarkSupplier; + + // --------------- Metrics --------------------------- + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + + // ------------- Iteration Related -------------------- + + protected final ProgressTracker progressTracker; + + protected final BroadcastOutput<IterationRecord<T>> eventBroadcastOutput; + + public AbstractWrapperOperator( + StreamOperatorParameters<IterationRecord<T>> parameters, + StreamOperatorFactory<T> operatorFactory) { + this.parameters = Objects.requireNonNull(parameters); + this.streamConfig = Objects.requireNonNull(parameters.getStreamConfig()); + this.containingTask = Objects.requireNonNull(parameters.getContainingTask()); + this.output = Objects.requireNonNull(parameters.getOutput()); + this.operatorFactory = Objects.requireNonNull(operatorFactory); + + this.proxyOutput = new ProxyOutput<>(output); + this.epochWatermarkSupplier = new EpochWatermarkSupplier(); + + this.metrics = createOperatorMetricGroup(containingTask.getEnvironment(), streamConfig); + + this.progressTracker = ProgressTrackerFactory.create(streamConfig, containingTask, this); + this.eventBroadcastOutput = + BroadcastOutputFactory.createBroadcastOutput( + output, metrics.getIOMetricGroup().getNumRecordsOutCounter()); + } + + protected void onEpochWatermarkEvent(int inputIndex, IterationRecord<?> iterationRecord) + throws IOException { + checkState( + iterationRecord.getType() == IterationRecord.Type.EPOCH_WATERMARK, + "The record " + iterationRecord + " is not epoch watermark."); + progressTracker.onEpochWatermark( + inputIndex, iterationRecord.getSender(), iterationRecord.getRound()); + } + + @Override + public void onEpochWatermarkIncrement(int epochWatermark) throws IOException { + eventBroadcastOutput.broadcastEmit( + new StreamRecord<>( + IterationRecord.newEpochWatermark( + epochWatermark, + OperatorUtils.getUniqueSenderId( Review comment: This value should probably be cached instead of reconstructed every time. Same for `HeadOperator` and `InputOperator`. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/allround/OneInputAllRoundWrapperOperator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.operator.allround; + +import org.apache.flink.ml.iteration.IterationRecord; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; + +/** All-round wrapper for the one-input operator. */ +public class OneInputAllRoundWrapperOperator<IN, OUT> + extends AbstractAllRoundWrapperOperator<OUT, OneInputStreamOperator<IN, OUT>> + implements OneInputStreamOperator<IterationRecord<IN>, IterationRecord<OUT>> { + + private final StreamRecord<IN> reusedInput; + + public OneInputAllRoundWrapperOperator( + StreamOperatorParameters<IterationRecord<OUT>> parameters, + StreamOperatorFactory<OUT> operatorFactory) { + super(parameters, operatorFactory); + this.reusedInput = new StreamRecord<>(null, 0); + } + + @Override + public void processElement(StreamRecord<IterationRecord<IN>> element) throws Exception { + switch (element.getValue().getType()) { + case RECORD: + reusedInput.replace(element.getValue().getValue(), element.getTimestamp()); + setIterationContextRound(element.getValue().getRound()); + wrappedOperator.processElement(reusedInput); + clearIterationContextRound(); + break; + case EPOCH_WATERMARK: + onEpochWatermarkEvent(0, element.getValue()); + break; + default: + throw new FlinkRuntimeException("Not supported iteration record type: " + element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + wrappedOperator.processWatermark(mark); + } + + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + wrappedOperator.processLatencyMarker(latencyMarker); Review comment: In this case, will the latency marker will be emitted to the output stream? Or will it also participate in the iteration until the iteration terminates? The behavior seems not well defined. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/allround/OneInputAllRoundWrapperOperator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.operator.allround; + +import org.apache.flink.ml.iteration.IterationRecord; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; + +/** All-round wrapper for the one-input operator. */ +public class OneInputAllRoundWrapperOperator<IN, OUT> + extends AbstractAllRoundWrapperOperator<OUT, OneInputStreamOperator<IN, OUT>> + implements OneInputStreamOperator<IterationRecord<IN>, IterationRecord<OUT>> { + + private final StreamRecord<IN> reusedInput; + + public OneInputAllRoundWrapperOperator( + StreamOperatorParameters<IterationRecord<OUT>> parameters, + StreamOperatorFactory<OUT> operatorFactory) { + super(parameters, operatorFactory); + this.reusedInput = new StreamRecord<>(null, 0); + } + + @Override + public void processElement(StreamRecord<IterationRecord<IN>> element) throws Exception { + switch (element.getValue().getType()) { + case RECORD: + reusedInput.replace(element.getValue().getValue(), element.getTimestamp()); + setIterationContextRound(element.getValue().getRound()); + wrappedOperator.processElement(reusedInput); + clearIterationContextRound(); + break; + case EPOCH_WATERMARK: + onEpochWatermarkEvent(0, element.getValue()); + break; + default: + throw new FlinkRuntimeException("Not supported iteration record type: " + element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { Review comment: It is not clear to me how the watermark is going to look like when there is repeated events. Also not sure if this will cause events to be dropped if a datastream with large event time span is replayed. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/progresstrack/ProgressTracker.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.progresstrack; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Tracks the epoch watermark from each input. Once the minimum epoch watermark changed, it would + * notify the listener. + */ +public class ProgressTracker { Review comment: Maybe rename to `OperatorEpochWatermarkTracker`. ########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/AbstractWrapperOperator.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration.operator; + +import org.apache.flink.ml.iteration.IterationRecord; +import org.apache.flink.ml.iteration.broadcast.BroadcastOutput; +import org.apache.flink.ml.iteration.broadcast.BroadcastOutputFactory; +import org.apache.flink.ml.iteration.progresstrack.ProgressTracker; +import org.apache.flink.ml.iteration.progresstrack.ProgressTrackerFactory; +import org.apache.flink.ml.iteration.progresstrack.ProgressTrackerListener; +import org.apache.flink.ml.iteration.proxy.ProxyOutput; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The base class of all the wrapper operators. It provides the alignment functionality. */ +public abstract class AbstractWrapperOperator<T> + implements StreamOperator<IterationRecord<T>>, ProgressTrackerListener, BoundedOneInput { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractWrapperOperator.class); + + protected final StreamOperatorParameters<IterationRecord<T>> parameters; + + protected final StreamConfig streamConfig; + + protected final StreamTask<?, ?> containingTask; + + protected final Output<StreamRecord<IterationRecord<T>>> output; + + protected final StreamOperatorFactory<T> operatorFactory; + + // --------------- proxy --------------------------- + + protected final ProxyOutput<T> proxyOutput; + + protected final EpochWatermarkSupplier epochWatermarkSupplier; + + // --------------- Metrics --------------------------- + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + + // ------------- Iteration Related -------------------- + + protected final ProgressTracker progressTracker; + + protected final BroadcastOutput<IterationRecord<T>> eventBroadcastOutput; + + public AbstractWrapperOperator( + StreamOperatorParameters<IterationRecord<T>> parameters, + StreamOperatorFactory<T> operatorFactory) { + this.parameters = Objects.requireNonNull(parameters); + this.streamConfig = Objects.requireNonNull(parameters.getStreamConfig()); + this.containingTask = Objects.requireNonNull(parameters.getContainingTask()); + this.output = Objects.requireNonNull(parameters.getOutput()); + this.operatorFactory = Objects.requireNonNull(operatorFactory); + + this.proxyOutput = new ProxyOutput<>(output); + this.epochWatermarkSupplier = new EpochWatermarkSupplier(); + + this.metrics = createOperatorMetricGroup(containingTask.getEnvironment(), streamConfig); + + this.progressTracker = ProgressTrackerFactory.create(streamConfig, containingTask, this); + this.eventBroadcastOutput = + BroadcastOutputFactory.createBroadcastOutput( + output, metrics.getIOMetricGroup().getNumRecordsOutCounter()); Review comment: I am not sure if we should count the EpochWatermarks as normal record. After all they are not actually user events, so it would be confusing to the users when they only send one event, but the counter gives a larger number. Maybe they should be counted separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org