lindong28 commented on code in PR #248:
URL: https://github.com/apache/flink-ml/pull/248#discussion_r1295838409


##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java:
##########
@@ -106,7 +106,7 @@ public int getLength() {
     @Override
     public void serialize(IterationRecord<T> record, DataOutputView target) 
throws IOException {
         target.writeByte((byte) record.getType().ordinal());
-        serializerNumber(record.getEpoch(), target);
+        target.writeInt(record.getEpoch());

Review Comment:
   It seems that this change will increase the average epoch overhead for 
common cases.
   
   Can you explain why this change is needed?



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue<T> {
+    private final DataOutputSerializer output = new DataOutputSerializer(256);
+    private final TypeSerializer<T> serializer;
+    private final IOManager ioManager;
+    private final MemoryManager memoryManager;
+    private final int numPages;
+
+    private List<MemorySegment> segments;
+    private ListMemorySegmentPool segmentPool;
+
+    private SpillingBuffer target;
+    private long size = 0L;
+
+    SpillableFeedbackQueue(
+            IOManager ioManager,
+            MemoryManager memoryManager,
+            TypeSerializer<T> serializer,
+            long inMemoryBufferSize,
+            long pageSize)

Review Comment:
   It seems simpler to get pageSize via `memoryManager.getPageSize()`.



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue<T> {
+    private final DataOutputSerializer output = new DataOutputSerializer(256);
+    private final TypeSerializer<T> serializer;
+    private final IOManager ioManager;
+    private final MemoryManager memoryManager;
+    private final int numPages;
+
+    private List<MemorySegment> segments;
+    private ListMemorySegmentPool segmentPool;
+
+    private SpillingBuffer target;
+    private long size = 0L;
+
+    SpillableFeedbackQueue(
+            IOManager ioManager,
+            MemoryManager memoryManager,
+            TypeSerializer<T> serializer,
+            long inMemoryBufferSize,
+            long pageSize)
+            throws MemoryAllocationException {
+        this.serializer = Objects.requireNonNull(serializer);
+        this.ioManager = Objects.requireNonNull(ioManager);
+        this.memoryManager = Objects.requireNonNull(memoryManager);
+
+        this.numPages = (int) (inMemoryBufferSize / pageSize);
+        resetSpillingBuffer();
+    }
+
+    void add(T item) {
+        try {
+            output.clear();
+            serializer.serialize(item, output);
+            target.write(output.getSharedBuffer(), 0, output.length());
+            size++;
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    MutableObjectIterator<T> iterate() {
+        try {
+            DataInputView input = target.flip();
+            return new InputViewIterator<>(input, this.serializer);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    long size() {
+        return size;
+    }
+
+    void reset() throws Exception {
+        size = 0;
+        close();
+        resetSpillingBuffer();
+    }
+
+    void close() throws IOException {
+        output.clear();
+        List<MemorySegment> toRelease = target.close();
+        toRelease.addAll(segments);
+        memoryManager.release(toRelease);
+        segmentPool.clear();
+    }
+
+    private void resetSpillingBuffer() throws MemoryAllocationException {
+        this.segments = memoryManager.allocatePages(this, numPages);
+        this.segmentPool = new ListMemorySegmentPool(segments);

Review Comment:
   Would it be simpler to use `ListMemorySegmentSource` (defined in 
flink-runtime) rather than ListMemorySegmentPool (defined in 
flink-table-runtime)?



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackChannel.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
+import org.apache.flink.statefun.flink.core.feedback.SubtaskFeedbackKey;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Single producer, single consumer channel, which can spill the records to 
disk when the in-memory
+ * buffer is full.
+ */
+@Internal
+public final class SpillableFeedbackChannel<T> implements Closeable {
+
+    /** The key that used to identify this channel. */
+    private final SubtaskFeedbackKey<T> key;
+
+    /** A single registered consumer. */
+    private final AtomicReference<ConsumerTask<T>> consumerRef = new 
AtomicReference<>();
+
+    /** The underlying queue used to hold the feedback results. */
+    private MpscQueue<T> queue;
+
+    /** Whether the feedback channel is initialized. */
+    private boolean initialized;

Review Comment:
   Instead of adding this variable, would it be simpler to check whether `queue 
== null`?



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue<T> {
+    private final DataOutputSerializer output = new DataOutputSerializer(256);
+    private final TypeSerializer<T> serializer;
+    private final IOManager ioManager;
+    private final MemoryManager memoryManager;
+    private final int numPages;
+
+    private List<MemorySegment> segments;
+    private ListMemorySegmentPool segmentPool;
+
+    private SpillingBuffer target;
+    private long size = 0L;
+
+    SpillableFeedbackQueue(
+            IOManager ioManager,
+            MemoryManager memoryManager,
+            TypeSerializer<T> serializer,
+            long inMemoryBufferSize,
+            long pageSize)
+            throws MemoryAllocationException {
+        this.serializer = Objects.requireNonNull(serializer);
+        this.ioManager = Objects.requireNonNull(ioManager);
+        this.memoryManager = Objects.requireNonNull(memoryManager);
+
+        this.numPages = (int) (inMemoryBufferSize / pageSize);
+        resetSpillingBuffer();
+    }
+
+    void add(T item) {
+        try {
+            output.clear();
+            serializer.serialize(item, output);
+            target.write(output.getSharedBuffer(), 0, output.length());
+            size++;
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    MutableObjectIterator<T> iterate() {
+        try {
+            DataInputView input = target.flip();
+            return new InputViewIterator<>(input, this.serializer);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    long size() {
+        return size;
+    }
+
+    void reset() throws Exception {
+        size = 0;
+        close();
+        resetSpillingBuffer();
+    }
+
+    void close() throws IOException {
+        output.clear();
+        List<MemorySegment> toRelease = target.close();
+        toRelease.addAll(segments);
+        memoryManager.release(toRelease);

Review Comment:
   Would it be simpler to just do `memoryManager.release(segments)`?



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue<T> {
+    private final DataOutputSerializer output = new DataOutputSerializer(256);
+    private final TypeSerializer<T> serializer;
+    private final IOManager ioManager;
+    private final MemoryManager memoryManager;
+    private final int numPages;
+
+    private List<MemorySegment> segments;
+    private ListMemorySegmentPool segmentPool;
+
+    private SpillingBuffer target;

Review Comment:
   Would it be more readable to name it `buffer`?



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue<T> {
+    private final DataOutputSerializer output = new DataOutputSerializer(256);
+    private final TypeSerializer<T> serializer;
+    private final IOManager ioManager;
+    private final MemoryManager memoryManager;
+    private final int numPages;
+
+    private List<MemorySegment> segments;
+    private ListMemorySegmentPool segmentPool;
+
+    private SpillingBuffer target;
+    private long size = 0L;
+
+    SpillableFeedbackQueue(
+            IOManager ioManager,
+            MemoryManager memoryManager,
+            TypeSerializer<T> serializer,
+            long inMemoryBufferSize,
+            long pageSize)
+            throws MemoryAllocationException {
+        this.serializer = Objects.requireNonNull(serializer);
+        this.ioManager = Objects.requireNonNull(ioManager);
+        this.memoryManager = Objects.requireNonNull(memoryManager);
+
+        this.numPages = (int) (inMemoryBufferSize / pageSize);
+        resetSpillingBuffer();
+    }
+
+    void add(T item) {
+        try {
+            output.clear();
+            serializer.serialize(item, output);
+            target.write(output.getSharedBuffer(), 0, output.length());
+            size++;
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    MutableObjectIterator<T> iterate() {
+        try {
+            DataInputView input = target.flip();
+            return new InputViewIterator<>(input, this.serializer);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    long size() {
+        return size;
+    }
+
+    void reset() throws Exception {
+        size = 0;
+        close();
+        resetSpillingBuffer();

Review Comment:
   Instead of repeatedly release and re-allocate this list of buffers, can we 
re-use the allocated buffers before/after the reset, similar to what is done in 
`SpillingResettableIterator#reset`?



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue<T> {
+    private final DataOutputSerializer output = new DataOutputSerializer(256);
+    private final TypeSerializer<T> serializer;
+    private final IOManager ioManager;
+    private final MemoryManager memoryManager;
+    private final int numPages;
+
+    private List<MemorySegment> segments;
+    private ListMemorySegmentPool segmentPool;
+
+    private SpillingBuffer target;
+    private long size = 0L;
+
+    SpillableFeedbackQueue(
+            IOManager ioManager,
+            MemoryManager memoryManager,
+            TypeSerializer<T> serializer,
+            long inMemoryBufferSize,
+            long pageSize)
+            throws MemoryAllocationException {
+        this.serializer = Objects.requireNonNull(serializer);
+        this.ioManager = Objects.requireNonNull(ioManager);
+        this.memoryManager = Objects.requireNonNull(memoryManager);
+
+        this.numPages = (int) (inMemoryBufferSize / pageSize);
+        resetSpillingBuffer();
+    }
+
+    void add(T item) {
+        try {
+            output.clear();
+            serializer.serialize(item, output);
+            target.write(output.getSharedBuffer(), 0, output.length());
+            size++;
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    MutableObjectIterator<T> iterate() {
+        try {
+            DataInputView input = target.flip();
+            return new InputViewIterator<>(input, this.serializer);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    long size() {
+        return size;
+    }
+
+    void reset() throws Exception {
+        size = 0;
+        close();

Review Comment:
   close() is generally used to release all resources used by this instance and 
there should be no further invocation of any method of this instance after 
close() is invoked.
   
   It would be useful to follow this pattern.



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/MpscQueue.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.statefun.flink.core.queue.Lock;
+import org.apache.flink.statefun.flink.core.queue.Locks;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Multi producers single consumer fifo queue.
+ *
+ * @param <T> The element type.
+ */
+@Internal
+final class MpscQueue<T> implements Closeable {
+    private final Lock lock = Locks.spinLock();
+
+    private SpillableFeedbackQueue<T> activeQueue;
+    private SpillableFeedbackQueue<T> standByQueue;
+
+    MpscQueue(
+            IOManager ioManager,
+            MemoryManager memoryManager,
+            TypeSerializer<T> serializer,
+            long inMemoryBufferSize,
+            long pageSize)
+            throws MemoryAllocationException {
+        this.activeQueue =
+                new SpillableFeedbackQueue<>(
+                        ioManager, memoryManager, serializer, 
inMemoryBufferSize / 2, pageSize);
+        this.standByQueue =
+                new SpillableFeedbackQueue<>(
+                        ioManager, memoryManager, serializer, 
inMemoryBufferSize / 2, pageSize);
+    }
+
+    /**
+     * Adds an element to this (unbound) queue.
+     *
+     * @param element the element to add.
+     * @return the number of elements in the queue after the addition.
+     */
+    long add(T element) {
+        Preconditions.checkState(element instanceof StreamRecord);
+
+        lock.lockUninterruptibly();

Review Comment:
   It would be useful to still use `final Lock lock = this.lock` as was done in 
flink-statefun's MpscQueue.
   
   See 
https://stackoverflow.com/questions/13155860/why-does-jdk-sourcecode-take-a-final-copy-of-volatile-instances
 for an explanation.



##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java:
##########
@@ -422,16 +430,44 @@ public void close() throws Exception {
         }
     }
 
-    private void registerFeedbackConsumer(Executor mailboxExecutor) {
+    private void registerFeedbackConsumer(Executor mailboxExecutor)
+            throws MemoryAllocationException {
+        StreamTask<?, ?> task = getContainingTask();
+
         int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
         int attemptNum = getRuntimeContext().getAttemptNumber();
         FeedbackKey<StreamRecord<IterationRecord<?>>> feedbackKey =
                 OperatorUtils.createFeedbackKey(iterationId, feedbackIndex);
         SubtaskFeedbackKey<StreamRecord<IterationRecord<?>>> key =
                 feedbackKey.withSubTaskIndex(indexOfThisSubtask, attemptNum);
-        FeedbackChannelBroker broker = FeedbackChannelBroker.get();
-        FeedbackChannel<StreamRecord<IterationRecord<?>>> channel = 
broker.getChannel(key);
-        OperatorUtils.registerFeedbackConsumer(channel, this, mailboxExecutor);
+        SpillableFeedbackChannelBroker broker = 
SpillableFeedbackChannelBroker.get();
+        this.feedbackChannel = broker.getChannel(key);

Review Comment:
   Would it be more readable to pass the initialization logic as a parameter of 
type `Consumer<SpillableFeedbackChannel>` to `getChannel(...)`?
   
   We can do the same for TailOperator. The consumer will be invoked iff a new 
channel needs to be created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to