[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529627#comment-16529627 ]
ASF GitHub Bot commented on FLINK-9633: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6194#discussion_r199441371 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java --- @@ -842,6 +852,100 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable } } + @Test + public void testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws Exception { + + final long checkpointId = 42L; + final long timestamp = 1L; + + Environment mockEnvironment = spy(new MockEnvironmentBuilder().build()); + StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); + + // mock the operators + StreamOperator<?> statelessOperator = + mock(StreamOperator.class); + + final OperatorID operatorID = new OperatorID(); + when(statelessOperator.getOperatorID()).thenReturn(operatorID); + + // mock the returned empty snapshot result (all state handles are null) + OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures(); + when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))) + .thenReturn(statelessOperatorSnapshotResult); + + // set up the task + StreamOperator<?>[] streamOperators = {statelessOperator}; + OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + + FileSystem checkpointFileSystem = mock(FileSystem.class); + FileSystem savepointFileSystem = mock(FileSystem.class); + + FileSystem.setFsFactories(new HashMap<String, FileSystemFactory>() {{ + this.put("file", new FileSystemFactory() { + + @Override + public String getScheme() { + return "file"; + } + + @Override + public void configure(Configuration config) { + + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + return savepointFileSystem; + } + }); + this.put("hdfs", new FileSystemFactory() { + @Override + public String getScheme() { + return "hdfs"; + } + + @Override + public void configure(Configuration config) { + + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + return checkpointFileSystem; + } + }); + }}); + + CheckpointStorage checkpointStorage = spy(new FsCheckpointStorage(new Path("hdfs://test1/"), new Path("file:///test2/"), new JobID(), 1024)); + + CheckpointStorageLocationReference locationReference = AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///test2/")); + + when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId, locationReference)).then(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + // valid + FsCheckpointStorageLocation checkpointStorageLocation = (FsCheckpointStorageLocation) invocationOnMock.callRealMethod(); + assertEquals(savepointFileSystem, checkpointStorageLocation.getFileSystem()); + return checkpointStorageLocation; + } + }); + + Whitebox.setInternalState(streamTask, "isRunning", true); + Whitebox.setInternalState(streamTask, "lock", new Object()); + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry()); + Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); + Whitebox.setInternalState(streamTask, "checkpointStorage", checkpointStorage); + Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool()); + + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + + streamTask.triggerCheckpoint( + checkpointMetaData, + new CheckpointOptions(CheckpointType.SAVEPOINT, locationReference)); + } --- End diff -- This tests includes a lot of mocking, spying and whitebox testing. Usually these things are really hard to maintain. I would, therefore, suggest to create a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation` method instead. > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > ------------------------------------------------------------------------------------- > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)