xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1170781624


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+    private static final String TIER_STORE_DIR = "tiered-store";
+
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 
'E', 'F'
+    };
+
+    public static byte[] randomBytes(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        Random random = new Random();
+        byte[] bytes = new byte[length];
+        random.nextBytes(bytes);
+        return bytes;
+    }
+
+    public static String bytesToHexString(byte[] bytes) {

Review Comment:
   Why not just use `StringUtils#byteToHexString`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {

Review Comment:
   1. I'd suggest the name `TieredStorageBytesBasedDataIdentifier`
   2. I'd suggest to make this an abstract class



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 
'E', 'F'
+    };
+
+    public static byte[] randomBytes(int length) {

Review Comment:
   Why do we need this? It seems never used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageJobId.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+
+/**
+ * Identifier of a job.
+ *
+ * <p>A job is equivalent to a job in Flink.
+ */
+public class TieredStorageJobId extends TieredStorageAbstractId {

Review Comment:
   I wonder why do we need this. What does a job id mean for the tiered 
storage? Should we simply map Flink's JobId & PartitionId to topic id in tiered 
storage?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {

Review Comment:
   It took me a while to understand that this means the record should be 
broadcasted while the result partition is not broadcast only. This would be 
easier to understand if we have a JavaDoc for the public method that describes 
each argument of the method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -49,6 +57,10 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final ResourceRegistry resourceRegistry;

Review Comment:
   This is never used outside the constructor of `NettyShuffleMaster`.  Then 
why keeping the reference.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> 
jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, 
TieredStoragePartitionId partitionId) {

Review Comment:
   And why would shuffle register resource at the storage? Do you mean add 
partition?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }
+
+    public static TierType[] memoryDiskTierTypes() {

Review Comment:
   And the method name confuses me:
   1. Why do we need a method that returns only memory and disk tiers?
   2. This implicitly assumes `DEFAULT_MEMORY_DISK_TIER_TYPES` is memory + 
disk, which is dangerous. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;
+
+    /** ID represented by a byte array. */
+    protected final byte[] id;
+
+    /** Pre-calculated hash-code for acceleration. */
+    protected final int hashCode;
+
+    public TieredStorageAbstractId(byte[] id) {
+        checkArgument(id != null, "Must be not null.");
+
+        this.id = id;
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public TieredStorageAbstractId(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        this.id = randomBytes(length);
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public byte[] getId() {

Review Comment:
   I'd suggest the name `getBytes()`. We may also modify the name of the field.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -29,6 +34,18 @@ public class TieredStorageUtils {
         '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 
'E', 'F'
     };
 
+    public static List<TierFactory> getTierFactoriesFromConfiguration() {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                TieredStorageConfiguration.builder()
+                        
.setTierTypes(TieredStorageConfiguration.memoryDiskTierTypes())
+                        .build();

Review Comment:
   This is weird.
   1. We get an array of types out of `TieredStorageConfiguration`, then set it 
back to `TieredStorageConfiguration#Builder`.
   2. The method `xxxFromConfiguration` does not accept a configuration as 
input. Instead, it creates a configuration internally.
   
   I assume 2 is a temporal status, which should be commented. In addition, how 
is `TieredStorageConfiguration` supposed to be generated in the final state?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+
+/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */
+public class TieredStorageIdMappingUtils {
+
+    public static TieredStorageJobId convertId(JobID jobID) {
+        return new TieredStorageJobId(jobID.getBytes());
+    }
+
+    public static JobID convertId(TieredStorageJobId tieredStorageJobId) {
+        return new JobID(tieredStorageJobId.getId());
+    }
+
+    public static TieredStorageTopicId convertId(IntermediateDataSetID 
intermediateDataSetID) {
+        return new TieredStorageTopicId(intermediateDataSetID.getBytes());
+    }
+
+    public static IntermediateDataSetID convertId(TieredStorageTopicId 
topicId) {
+        return new IntermediateDataSetID(new AbstractID(topicId.getId()));
+    }
+
+    public static TieredStoragePartitionId convertId(ResultPartitionID 
resultPartitionId) {
+        ByteBuf byteBuf = Unpooled.buffer();

Review Comment:
   Better not to use netty classes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> 
jobPartitionIds;

Review Comment:
   The storage client probably should not understand the mapping between job 
and partition. It should be mapped in the shuffle components, e.g., shuffle 
master.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }
+
+    public static TierType[] memoryDiskTierTypes() {

Review Comment:
   It doesn't make sense that a public method returns a private type.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;

Review Comment:
   All `serialVersionUID` should start at `1L`.
   
https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-serialization



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> 
jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, 
TieredStoragePartitionId partitionId) {

Review Comment:
   Shouldn't the resource also belongs to a certain topic?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }

Review Comment:
   Need JavaDoc for enum values.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }

Review Comment:
   Returning an array here doesn't make sense to me. How about replace this 
with `getNumTiers()`, so the valid index would be `[0, getNumbTiers()]`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> 
jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, 
TieredStoragePartitionId partitionId) {
+        jobPartitionIds.computeIfAbsent(jobId, ignore -> new 
ArrayList<>()).add(partitionId);
+        resourceRegistry.registerResource(
+                partitionId, () -> tiers.forEach(TierMasterAgent::release));

Review Comment:
   This is probably correct, but against intuition. What is the resource being 
registered? This is more suitable for a releaser / cleaner pattern.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -49,6 +57,10 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final ResourceRegistry resourceRegistry;

Review Comment:
   And if we need to keep them, it might be better to have a wrapper class to 
hold all the tiered shuffle related components.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {
+            for (int i = 0; i < numSubpartitions; ++i) {
+                bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+            }
+        } else {
+            bufferAccumulator.receive(record, subpartitionId, dataType);
+        }
+    }
+
+    public void close() {
+        bufferAccumulator.close();
+        tierProducerAgents.forEach(TierProducerAgent::close);
+    }
+
+    public void writeFinishedBuffers(

Review Comment:
   This should be private. And I'd suggest the name `writeAccumulatedBuffers`. 
Otherwise, we need to explain what is the concept `finished` for a buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to