[ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529627#comment-16529627
 ] 

ASF GitHub Bot commented on FLINK-9633:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6194#discussion_r199441371
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ---
    @@ -842,6 +852,100 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
                }
        }
     
    +   @Test
    +   public void 
testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws 
Exception {
    +
    +           final long checkpointId = 42L;
    +           final long timestamp = 1L;
    +
    +           Environment mockEnvironment = spy(new 
MockEnvironmentBuilder().build());
    +           StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
    +
    +           // mock the operators
    +           StreamOperator<?> statelessOperator =
    +                   mock(StreamOperator.class);
    +
    +           final OperatorID operatorID = new OperatorID();
    +           when(statelessOperator.getOperatorID()).thenReturn(operatorID);
    +
    +           // mock the returned empty snapshot result (all state handles 
are null)
    +           OperatorSnapshotFutures statelessOperatorSnapshotResult = new 
OperatorSnapshotFutures();
    +           when(statelessOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
    +                   .thenReturn(statelessOperatorSnapshotResult);
    +
    +           // set up the task
    +           StreamOperator<?>[] streamOperators = {statelessOperator};
    +           OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
    +           
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
    +
    +           FileSystem checkpointFileSystem = mock(FileSystem.class);
    +           FileSystem savepointFileSystem = mock(FileSystem.class);
    +
    +           FileSystem.setFsFactories(new HashMap<String, 
FileSystemFactory>() {{
    +                   this.put("file", new FileSystemFactory() {
    +
    +                           @Override
    +                           public String getScheme() {
    +                                   return "file";
    +                           }
    +
    +                           @Override
    +                           public void configure(Configuration config) {
    +
    +                           }
    +
    +                           @Override
    +                           public FileSystem create(URI fsUri) throws 
IOException {
    +                                   return savepointFileSystem;
    +                           }
    +                   });
    +                   this.put("hdfs", new FileSystemFactory() {
    +                           @Override
    +                           public String getScheme() {
    +                                   return "hdfs";
    +                           }
    +
    +                           @Override
    +                           public void configure(Configuration config) {
    +
    +                           }
    +
    +                           @Override
    +                           public FileSystem create(URI fsUri) throws 
IOException {
    +                                   return checkpointFileSystem;
    +                           }
    +                   });
    +           }});
    +
    +           CheckpointStorage checkpointStorage = spy(new 
FsCheckpointStorage(new Path("hdfs://test1/"), new Path("file:///test2/"), new 
JobID(), 1024));
    +
    +           CheckpointStorageLocationReference locationReference = 
AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///test2/"));
    +
    +           
when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId, 
locationReference)).then(new Answer<Object>() {
    +                   @Override
    +                   public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
    +                           // valid
    +                           FsCheckpointStorageLocation 
checkpointStorageLocation = (FsCheckpointStorageLocation) 
invocationOnMock.callRealMethod();
    +                           assertEquals(savepointFileSystem, 
checkpointStorageLocation.getFileSystem());
    +                           return checkpointStorageLocation;
    +                   }
    +           });
    +
    +           Whitebox.setInternalState(streamTask, "isRunning", true);
    +           Whitebox.setInternalState(streamTask, "lock", new Object());
    +           Whitebox.setInternalState(streamTask, "operatorChain", 
operatorChain);
    +           Whitebox.setInternalState(streamTask, "cancelables", new 
CloseableRegistry());
    +           Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
    +           Whitebox.setInternalState(streamTask, "checkpointStorage", 
checkpointStorage);
    +           Whitebox.setInternalState(streamTask, 
"asyncOperationsThreadPool", Executors.newCachedThreadPool());
    +
    +           CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
    +
    +           streamTask.triggerCheckpoint(
    +                   checkpointMetaData,
    +                   new CheckpointOptions(CheckpointType.SAVEPOINT, 
locationReference));
    +   }
    --- End diff --
    
    This tests includes a lot of mocking, spying and whitebox testing. Usually 
these things are really hard to maintain. I would, therefore, suggest to create 
a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation`  
method instead. 


> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-9633
>                 URL: https://issues.apache.org/jira/browse/FLINK-9633
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to