ferenc-csaky commented on code in PR #23211:
URL: https://github.com/apache/flink/pull/23211#discussion_r1300624234


##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java:
##########
@@ -18,135 +18,41 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
-import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
-import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
-import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertTrue;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 /** Tests for the recovery of files of a {@link BlobCacheService} from a HA 
store. */
-public class BlobCacheRecoveryTest extends TestLogger {
+public class BlobCacheRecoveryTest {
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+    @TempDir private java.nio.file.Path tempDir;
 
     /**
      * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs 
are recoverable from
      * any participating BlobServer.
      */
     @Test
-    public void testBlobCacheRecovery() throws Exception {
+    void testBlobCacheRecovery() throws Exception {
         Configuration config = new Configuration();
         config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
         config.setString(
-                HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getPath());
+                HighAvailabilityOptions.HA_STORAGE_PATH, 
TempDirUtils.newFolder(tempDir).getPath());
 
         BlobStoreService blobStoreService = null;
 
         try {
             blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-            testBlobCacheRecovery(config, blobStoreService, 
TEMPORARY_FOLDER.newFolder());
+            TestingBlobHelpers.testBlobCacheRecovery(
+                    config, blobStoreService, TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
                 blobStoreService.closeAndCleanupAllData();
             }
         }
     }
-
-    /**
-     * Helper to test that the {@link BlobServer} recovery from its HA store 
works.
-     *
-     * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link 
BlobCacheService} and expects a
-     * second {@link BlobCacheService} to be able to retrieve them from a 
second {@link BlobServer}
-     * that is configured with the same HA store.
-     *
-     * @param config blob server configuration (including HA settings like 
{@link
-     *     HighAvailabilityOptions#HA_STORAGE_PATH} and {@link
-     *     HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up 
<tt>blobStore</tt>
-     * @param blobStore shared HA blob store to use
-     * @throws IOException in case of failures
-     */
-    public static void testBlobCacheRecovery(
-            final Configuration config, final BlobStore blobStore, final File 
blobStorage)
-            throws IOException {
-
-        final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
-        String storagePath =
-                config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + 
"/" + clusterId;
-        Random rand = new Random();
-
-        try (BlobServer server0 =
-                        new BlobServer(config, new File(blobStorage, 
"server0"), blobStore);
-                BlobServer server1 =
-                        new BlobServer(config, new File(blobStorage, 
"server1"), blobStore);
-                // use VoidBlobStore as the HA store to force download from 
each server's HA store
-                BlobCacheService cache0 =
-                        new BlobCacheService(
-                                config,
-                                new File(blobStorage, "cache0"),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server0.getPort()));
-                BlobCacheService cache1 =
-                        new BlobCacheService(
-                                config,
-                                new File(blobStorage, "cache1"),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server1.getPort()))) {
-
-            server0.start();
-            server1.start();
-
-            // Random data
-            byte[] expected = new byte[1024];
-            rand.nextBytes(expected);
-            byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
-
-            BlobKey[] keys = new BlobKey[2];
-            BlobKey nonHAKey;
-
-            // Put job-related HA data
-            JobID[] jobId = new JobID[] {new JobID(), new JobID()};
-            keys[0] = put(cache0, jobId[0], expected, PERMANENT_BLOB); // 
Request 1
-            keys[1] = put(cache0, jobId[1], expected2, PERMANENT_BLOB); // 
Request 2
-
-            // put non-HA data
-            nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
-            verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
-            verifyKeyDifferentHashEquals(keys[1], nonHAKey);
-
-            // check that the storage directory exists
-            final Path blobServerPath = new Path(storagePath, "blob");
-            FileSystem fs = blobServerPath.getFileSystem();
-            assertTrue("Unknown storage dir: " + blobServerPath, 
fs.exists(blobServerPath));
-
-            // Verify HA requests from cache1 (connected to server1) with no 
immediate access to the
-            // file
-            verifyContents(cache1, jobId[0], keys[0], expected);
-            verifyContents(cache1, jobId[1], keys[1], expected2);
-
-            // Verify non-HA file is not accessible from server1
-            verifyDeleted(cache1, jobId[0], nonHAKey);
-        }
-    }

Review Comment:
   Moved to the `TestingBlobHelpers` class, same reason I mentioned in the 
other comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to