[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944826#comment-14944826 ]
ASF GitHub Bot commented on FLINK-2805: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41246319 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class BlobRecoveryITCase { + + private File recoveryDir; + + @Before + public void setUp() throws Exception { + recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); + if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { + throw new IllegalStateException("Failed to create temp directory for test"); + } + } + + @After + public void cleanUp() throws Exception { + if (recoveryDir != null) { + FileUtils.deleteDirectory(recoveryDir); + } + } + + /** + * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any + * participating BlobServer. + */ + @Test + public void testBlobServerRecovery() throws Exception { + Random rand = new Random(); + + BlobServer[] server = new BlobServer[2]; + InetSocketAddress[] serverAddress = new InetSocketAddress[2]; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + + for (int i = 0; i < server.length; i++) { + server[i] = new BlobServer(config); + serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); + } + + client = new BlobClient(serverAddress[0]); + + // Random data + byte[] actual = new byte[1024]; + rand.nextBytes(actual); + + BlobKey[] keys = new BlobKey[2]; + + // Put data + keys[0] = client.put(actual); // Request 1 + keys[1] = client.put(actual, 32, 256); // Request 2 + + JobID[] jobId = new JobID[] { new JobID(), new JobID() }; + String[] testKey = new String[] { "test-key-1", "test-key-2" }; + + client.put(jobId[0], testKey[0], actual); // Request 3 + client.put(jobId[1], testKey[1], actual, 32, 256); // Request 4 + + // Close the client and connect to the other server + client.close(); + client = new BlobClient(serverAddress[1]); + + // Verify request 1 + try (InputStream is = client.get(keys[0])) { + byte[] expected = new byte[actual.length]; --- End diff -- Yes, I don't know what happened there > Make user jars available for all job managers to recover > -------------------------------------------------------- > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)