[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224744#comment-16224744 ]
ASF GitHub Bot commented on FLINK-7784: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147677068 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -35,60 +42,101 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link TwoPhaseCommitSinkFunction}. */ public class TwoPhaseCommitSinkFunctionTest { - TestContext context; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private FileBasedSinkFunction sinkFunction; + + private OneInputStreamOperatorTestHarness<String, Object> harness; + + private AtomicBoolean throwException = new AtomicBoolean(); + + private File targetDirectory; + + private File tmpDirectory; + + @Mock + private Clock mockClock; + + @Mock + private Logger mockLogger; --- End diff -- The test is asserting on the presence of the substring `This is close to or even exceeding the transaction timeout` in the log message. All your changes to the code would still pass the test except for the 2nd case because there is an assert on the elapsed time. Mockito is used here so that the argument passed to `.warn` can be captured. Imo this is not evil as no behavior is mocked, and actual effects are tested. > Don't fail TwoPhaseCommitSinkFunction when failing to commit > ------------------------------------------------------------ > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.0 > Reporter: Aljoscha Krettek > Assignee: Gary Yao > Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)