lindong28 commented on code in PR #248: URL: https://github.com/apache/flink-ml/pull/248#discussion_r1295838409
########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java: ########## @@ -106,7 +106,7 @@ public int getLength() { @Override public void serialize(IterationRecord<T> record, DataOutputView target) throws IOException { target.writeByte((byte) record.getType().ordinal()); - serializerNumber(record.getEpoch(), target); + target.writeInt(record.getEpoch()); Review Comment: It seems that this change will increase the average epoch overhead for common cases. Can you explain why this change is needed? ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param <T> The element type. + */ +@Internal +final class SpillableFeedbackQueue<T> { + private final DataOutputSerializer output = new DataOutputSerializer(256); + private final TypeSerializer<T> serializer; + private final IOManager ioManager; + private final MemoryManager memoryManager; + private final int numPages; + + private List<MemorySegment> segments; + private ListMemorySegmentPool segmentPool; + + private SpillingBuffer target; + private long size = 0L; + + SpillableFeedbackQueue( + IOManager ioManager, + MemoryManager memoryManager, + TypeSerializer<T> serializer, + long inMemoryBufferSize, + long pageSize) Review Comment: It seems simpler to get pageSize via `memoryManager.getPageSize()`. ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param <T> The element type. + */ +@Internal +final class SpillableFeedbackQueue<T> { + private final DataOutputSerializer output = new DataOutputSerializer(256); + private final TypeSerializer<T> serializer; + private final IOManager ioManager; + private final MemoryManager memoryManager; + private final int numPages; + + private List<MemorySegment> segments; + private ListMemorySegmentPool segmentPool; + + private SpillingBuffer target; + private long size = 0L; + + SpillableFeedbackQueue( + IOManager ioManager, + MemoryManager memoryManager, + TypeSerializer<T> serializer, + long inMemoryBufferSize, + long pageSize) + throws MemoryAllocationException { + this.serializer = Objects.requireNonNull(serializer); + this.ioManager = Objects.requireNonNull(ioManager); + this.memoryManager = Objects.requireNonNull(memoryManager); + + this.numPages = (int) (inMemoryBufferSize / pageSize); + resetSpillingBuffer(); + } + + void add(T item) { + try { + output.clear(); + serializer.serialize(item, output); + target.write(output.getSharedBuffer(), 0, output.length()); + size++; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + MutableObjectIterator<T> iterate() { + try { + DataInputView input = target.flip(); + return new InputViewIterator<>(input, this.serializer); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + long size() { + return size; + } + + void reset() throws Exception { + size = 0; + close(); + resetSpillingBuffer(); + } + + void close() throws IOException { + output.clear(); + List<MemorySegment> toRelease = target.close(); + toRelease.addAll(segments); + memoryManager.release(toRelease); + segmentPool.clear(); + } + + private void resetSpillingBuffer() throws MemoryAllocationException { + this.segments = memoryManager.allocatePages(this, numPages); + this.segmentPool = new ListMemorySegmentPool(segments); Review Comment: Would it be simpler to use `ListMemorySegmentSource` (defined in flink-runtime) rather than ListMemorySegmentPool (defined in flink-table-runtime)? ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackChannel.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer; +import org.apache.flink.statefun.flink.core.feedback.SubtaskFeedbackKey; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.Preconditions; + +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Single producer, single consumer channel, which can spill the records to disk when the in-memory + * buffer is full. + */ +@Internal +public final class SpillableFeedbackChannel<T> implements Closeable { + + /** The key that used to identify this channel. */ + private final SubtaskFeedbackKey<T> key; + + /** A single registered consumer. */ + private final AtomicReference<ConsumerTask<T>> consumerRef = new AtomicReference<>(); + + /** The underlying queue used to hold the feedback results. */ + private MpscQueue<T> queue; + + /** Whether the feedback channel is initialized. */ + private boolean initialized; Review Comment: Instead of adding this variable, would it be simpler to check whether `queue == null`? ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param <T> The element type. + */ +@Internal +final class SpillableFeedbackQueue<T> { + private final DataOutputSerializer output = new DataOutputSerializer(256); + private final TypeSerializer<T> serializer; + private final IOManager ioManager; + private final MemoryManager memoryManager; + private final int numPages; + + private List<MemorySegment> segments; + private ListMemorySegmentPool segmentPool; + + private SpillingBuffer target; + private long size = 0L; + + SpillableFeedbackQueue( + IOManager ioManager, + MemoryManager memoryManager, + TypeSerializer<T> serializer, + long inMemoryBufferSize, + long pageSize) + throws MemoryAllocationException { + this.serializer = Objects.requireNonNull(serializer); + this.ioManager = Objects.requireNonNull(ioManager); + this.memoryManager = Objects.requireNonNull(memoryManager); + + this.numPages = (int) (inMemoryBufferSize / pageSize); + resetSpillingBuffer(); + } + + void add(T item) { + try { + output.clear(); + serializer.serialize(item, output); + target.write(output.getSharedBuffer(), 0, output.length()); + size++; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + MutableObjectIterator<T> iterate() { + try { + DataInputView input = target.flip(); + return new InputViewIterator<>(input, this.serializer); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + long size() { + return size; + } + + void reset() throws Exception { + size = 0; + close(); + resetSpillingBuffer(); + } + + void close() throws IOException { + output.clear(); + List<MemorySegment> toRelease = target.close(); + toRelease.addAll(segments); + memoryManager.release(toRelease); Review Comment: Would it be simpler to just do `memoryManager.release(segments)`? ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param <T> The element type. + */ +@Internal +final class SpillableFeedbackQueue<T> { + private final DataOutputSerializer output = new DataOutputSerializer(256); + private final TypeSerializer<T> serializer; + private final IOManager ioManager; + private final MemoryManager memoryManager; + private final int numPages; + + private List<MemorySegment> segments; + private ListMemorySegmentPool segmentPool; + + private SpillingBuffer target; Review Comment: Would it be more readable to name it `buffer`? ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param <T> The element type. + */ +@Internal +final class SpillableFeedbackQueue<T> { + private final DataOutputSerializer output = new DataOutputSerializer(256); + private final TypeSerializer<T> serializer; + private final IOManager ioManager; + private final MemoryManager memoryManager; + private final int numPages; + + private List<MemorySegment> segments; + private ListMemorySegmentPool segmentPool; + + private SpillingBuffer target; + private long size = 0L; + + SpillableFeedbackQueue( + IOManager ioManager, + MemoryManager memoryManager, + TypeSerializer<T> serializer, + long inMemoryBufferSize, + long pageSize) + throws MemoryAllocationException { + this.serializer = Objects.requireNonNull(serializer); + this.ioManager = Objects.requireNonNull(ioManager); + this.memoryManager = Objects.requireNonNull(memoryManager); + + this.numPages = (int) (inMemoryBufferSize / pageSize); + resetSpillingBuffer(); + } + + void add(T item) { + try { + output.clear(); + serializer.serialize(item, output); + target.write(output.getSharedBuffer(), 0, output.length()); + size++; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + MutableObjectIterator<T> iterate() { + try { + DataInputView input = target.flip(); + return new InputViewIterator<>(input, this.serializer); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + long size() { + return size; + } + + void reset() throws Exception { + size = 0; + close(); + resetSpillingBuffer(); Review Comment: Instead of repeatedly release and re-allocate this list of buffers, can we re-use the allocated buffers before/after the reset, similar to what is done in `SpillingResettableIterator#reset`? ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param <T> The element type. + */ +@Internal +final class SpillableFeedbackQueue<T> { + private final DataOutputSerializer output = new DataOutputSerializer(256); + private final TypeSerializer<T> serializer; + private final IOManager ioManager; + private final MemoryManager memoryManager; + private final int numPages; + + private List<MemorySegment> segments; + private ListMemorySegmentPool segmentPool; + + private SpillingBuffer target; + private long size = 0L; + + SpillableFeedbackQueue( + IOManager ioManager, + MemoryManager memoryManager, + TypeSerializer<T> serializer, + long inMemoryBufferSize, + long pageSize) + throws MemoryAllocationException { + this.serializer = Objects.requireNonNull(serializer); + this.ioManager = Objects.requireNonNull(ioManager); + this.memoryManager = Objects.requireNonNull(memoryManager); + + this.numPages = (int) (inMemoryBufferSize / pageSize); + resetSpillingBuffer(); + } + + void add(T item) { + try { + output.clear(); + serializer.serialize(item, output); + target.write(output.getSharedBuffer(), 0, output.length()); + size++; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + MutableObjectIterator<T> iterate() { + try { + DataInputView input = target.flip(); + return new InputViewIterator<>(input, this.serializer); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + long size() { + return size; + } + + void reset() throws Exception { + size = 0; + close(); Review Comment: close() is generally used to release all resources used by this instance and there should be no further invocation of any method of this instance after close() is invoked. It would be useful to follow this pattern. ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/MpscQueue.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.util.EmptyMutableObjectIterator; +import org.apache.flink.statefun.flink.core.queue.Lock; +import org.apache.flink.statefun.flink.core.queue.Locks; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.Preconditions; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Multi producers single consumer fifo queue. + * + * @param <T> The element type. + */ +@Internal +final class MpscQueue<T> implements Closeable { + private final Lock lock = Locks.spinLock(); + + private SpillableFeedbackQueue<T> activeQueue; + private SpillableFeedbackQueue<T> standByQueue; + + MpscQueue( + IOManager ioManager, + MemoryManager memoryManager, + TypeSerializer<T> serializer, + long inMemoryBufferSize, + long pageSize) + throws MemoryAllocationException { + this.activeQueue = + new SpillableFeedbackQueue<>( + ioManager, memoryManager, serializer, inMemoryBufferSize / 2, pageSize); + this.standByQueue = + new SpillableFeedbackQueue<>( + ioManager, memoryManager, serializer, inMemoryBufferSize / 2, pageSize); + } + + /** + * Adds an element to this (unbound) queue. + * + * @param element the element to add. + * @return the number of elements in the queue after the addition. + */ + long add(T element) { + Preconditions.checkState(element instanceof StreamRecord); + + lock.lockUninterruptibly(); Review Comment: It would be useful to still use `final Lock lock = this.lock` as was done in flink-statefun's MpscQueue. See https://stackoverflow.com/questions/13155860/why-does-jdk-sourcecode-take-a-final-copy-of-volatile-instances for an explanation. ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java: ########## @@ -422,16 +430,44 @@ public void close() throws Exception { } } - private void registerFeedbackConsumer(Executor mailboxExecutor) { + private void registerFeedbackConsumer(Executor mailboxExecutor) + throws MemoryAllocationException { + StreamTask<?, ?> task = getContainingTask(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); int attemptNum = getRuntimeContext().getAttemptNumber(); FeedbackKey<StreamRecord<IterationRecord<?>>> feedbackKey = OperatorUtils.createFeedbackKey(iterationId, feedbackIndex); SubtaskFeedbackKey<StreamRecord<IterationRecord<?>>> key = feedbackKey.withSubTaskIndex(indexOfThisSubtask, attemptNum); - FeedbackChannelBroker broker = FeedbackChannelBroker.get(); - FeedbackChannel<StreamRecord<IterationRecord<?>>> channel = broker.getChannel(key); - OperatorUtils.registerFeedbackConsumer(channel, this, mailboxExecutor); + SpillableFeedbackChannelBroker broker = SpillableFeedbackChannelBroker.get(); + this.feedbackChannel = broker.getChannel(key); Review Comment: Would it be more readable to pass the initialization logic as a parameter of type `Consumer<SpillableFeedbackChannel>` to `getChannel(...)`? We can do the same for TailOperator. The consumer will be invoked iff a new channel needs to be created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org