[ https://issues.apache.org/jira/browse/FLINK-22073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363368#comment-17363368 ]
luoyuxia edited comment on FLINK-22073 at 6/15/21, 4:18 AM: ------------------------------------------------------------ >From the detail log, there are three streaming-writer, two of them received >endInput, but there remains one streaming-writer-2 are running and waiting to >do checkpoint. And then when streaming-writer-2 tries to do checkpoint, it >fail. Thus, we will find data lose. {code:java} # streaming-writer-3 is finished 22:55:01,666 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (3/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 received completion notification for checkpoint with id=9223372036854775807. 22:55:01,662 [Source: Custom Source (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/1)#0 (77153182cb299b13c51028fb517baa23) switched from RUNNING to FINISHED. 22:55:01,662 [ Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 5 (type=CHECKPOINT) @ 1623452101659 for job ae7cdeec14ebc2cf7b67a971b51c0c1e. 22:55:01,660 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (2/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 received completion notification for checkpoint with id=4. 22:55:01,668 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (1/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=9223372036854775807 (max part counter=67). # streaming-writer-1 is finished 22:55:01,668 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (1/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=9223372036854775807. 22:55:01,668 [Source: Custom Source (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source (1/1)#0 (77153182cb299b13c51028fb517baa23). 22:55:01,670 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source (1/1)#0 77153182cb299b13c51028fb517baa23. # fail do checkpoint 5 for streaming-writer-2 is not finished yet 22:55:01,673 [jobmanager-future-thread-6] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 5 by task 77153182cb299b13c51028fb517baa23 of job ae7cdeec14ebc2cf7b67a971b51c0c1e at 9892322e-f9dc-457f-b051-0d3f7c96b111 @ localhost (dataPort=-1). org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: Custom Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready) {code} So just like [~Leonard Xu] said in https://issues.apache.org/jira/browse/FLINK-21032?focusedCommentId=17268345&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17268345, I think that's the cause of this test failure, we may need to set parallelism parameters to avoid producing multiple stream-write. How do you think? [~dwysakowicz] [~lzljs3620320] By the way, after I dive into the code, I found even though when the input ends, the CompactFileWriter won't emit EndCheckpoint with a Long.MAX_VALUE, that will also case data lose. I would like to create a Jira and fix it. was (Author: luoyuxia): >From the detail log, there are three streaming-writer, two of them received >endInput, but there remains one streaming-writer-2 are running and waiting to >do checkpoint. And then when streaming-writer-2 tries to do checkpoint, it >fail. Thus, we will find data lose. {code:java} # streaming-writer-3 is finished 22:55:01,666 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (3/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 received completion notification for checkpoint with id=9223372036854775807. 22:55:01,662 [Source: Custom Source (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/1)#0 (77153182cb299b13c51028fb517baa23) switched from RUNNING to FINISHED. 22:55:01,662 [ Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 5 (type=CHECKPOINT) @ 1623452101659 for job ae7cdeec14ebc2cf7b67a971b51c0c1e. 22:55:01,660 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (2/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 received completion notification for checkpoint with id=4. 22:55:01,668 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (1/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=9223372036854775807 (max part counter=67). # streaming-writer-1 is finished 22:55:01,668 [Filter -> DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> streaming-writer (1/3)#0] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=9223372036854775807. 22:55:01,668 [Source: Custom Source (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source (1/1)#0 (77153182cb299b13c51028fb517baa23). 22:55:01,670 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source (1/1)#0 77153182cb299b13c51028fb517baa23. # fail do checkpoint 5 for streaming-writer-2 is not finished yet 22:55:01,673 [jobmanager-future-thread-6] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 5 by task 77153182cb299b13c51028fb517baa23 of job ae7cdeec14ebc2cf7b67a971b51c0c1e at 9892322e-f9dc-457f-b051-0d3f7c96b111 @ localhost (dataPort=-1). org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: Custom Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready) {code} So just like [~Leonard Xu] said in https://issues.apache.org/jira/browse/FLINK-21032?focusedCommentId=17268345&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17268345, I think that's the cause of this test failure, we may need to set parallelism parameters to avoid producing multiple stream-write. How do you think? [~dwysakowicz] [~lzljs3620320] By the way, after I dive into the code, I found even though when the input ends, the CompactFileWriter won't emit EndCheckpoint with a Long.MAX_VALUE, that will also case data lose. I would like to create a Jira and fix it. > ParquetFileCompactionITCase fails on Azure > ------------------------------------------ > > Key: FLINK-22073 > URL: https://issues.apache.org/jira/browse/FLINK-22073 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.13.0 > Reporter: Dawid Wysakowicz > Assignee: luoyuxia > Priority: Minor > Labels: auto-deprioritized-major, test-stability > Fix For: 1.14.0 > > > {code} > [INFO] Running org.apache.flink.formats.parquet.ParquetFsStreamingSinkITCase > [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 17.612 s <<< FAILURE! - in > org.apache.flink.formats.parquet.ParquetFileCompactionITCase > [ERROR] > testNonPartition(org.apache.flink.formats.parquet.ParquetFileCompactionITCase) > Time elapsed: 1.509 s <<< FAILURE! > java.lang.AssertionError: expected:<[+I[0, 0, 0], +I[0, 0, 0], +I[1, 1, 1], > +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], +I[4, 4, 4], > +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], > +I[7, 7, 7], +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, > 0], +I[10, 0, 0], +I[11, 1, 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], > +I[13, 3, 3], +I[13, 3, 3], +I[14, 4, 4], +I[14, 4, 4], +I[15, 5, 5], +I[15, > 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 7, 7], +I[17, 7, 7], +I[18, 8, 8], > +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], +I[21, > 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3], > +I[24, 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[26, > 6, 6], +I[27, 7, 7], +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], > +I[29, 9, 9], +I[30, 0, 0], +I[30, 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, > 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], +I[34, 4, 4], +I[34, 4, 4], > +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 7, 7], +I[37, > 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], > +I[40, 0, 0], +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, > 3, 3], +I[43, 3, 3], +I[44, 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], > +I[46, 6, 6], +I[46, 6, 6], +I[47, 7, 7], +I[47, 7, 7], +I[48, 8, 8], +I[48, > 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 0, 0], +I[50, 0, 0], +I[51, 1, 1], > +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], +I[54, > 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6], > +I[57, 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[59, > 9, 9], +I[60, 0, 0], +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], > +I[62, 2, 2], +I[63, 3, 3], +I[63, 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, > 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], +I[67, 7, 7], +I[67, 7, 7], > +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 0, 0], +I[70, > 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], > +I[73, 3, 3], +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, > 6, 6], +I[76, 6, 6], +I[77, 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], > +I[79, 9, 9], +I[79, 9, 9], +I[80, 0, 0], +I[80, 0, 0], +I[81, 1, 1], +I[81, > 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 3, 3], +I[83, 3, 3], +I[84, 4, 4], > +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], +I[87, > 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9], > +I[90, 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[92, > 2, 2], +I[93, 3, 3], +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], > +I[95, 5, 5], +I[96, 6, 6], +I[96, 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, > 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]> but was:<[+I[0, 0, 0], > +I[0, 0, 0], +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], > +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], > +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 0], +I[11, 1, > 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], +I[13, 3, 3], +I[14, 4, 4], > +I[14, 4, 4], +I[15, 5, 5], +I[15, 5, 5], +I[16, 6, 6], +I[17, 7, 7], +I[17, > 7, 7], +I[18, 8, 8], +I[18, 8, 8], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], > +I[21, 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3], +I[24, > 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[26, 6, 6], +I[26, 6, 6], +I[27, 7, 7], > +I[27, 7, 7], +I[28, 8, 8], +I[29, 9, 9], +I[29, 9, 9], +I[30, 0, 0], +I[30, > 0, 0], +I[31, 1, 1], +I[32, 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], > +I[34, 4, 4], +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, > 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], > +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 3, 3], +I[44, > 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], +I[46, 6, 6], +I[47, 7, 7], > +I[47, 7, 7], +I[48, 8, 8], +I[48, 8, 8], +I[49, 9, 9], +I[50, 0, 0], +I[50, > 0, 0], +I[51, 1, 1], +I[51, 1, 1], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], > +I[54, 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6], +I[57, > 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[59, 9, 9], +I[59, 9, 9], +I[60, 0, 0], > +I[60, 0, 0], +I[61, 1, 1], +I[62, 2, 2], +I[62, 2, 2], +I[63, 3, 3], +I[63, > 3, 3], +I[64, 4, 4], +I[65, 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], > +I[67, 7, 7], +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, > 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], > +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 6, 6], +I[77, > 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], +I[79, 9, 9], +I[80, 0, 0], > +I[80, 0, 0], +I[81, 1, 1], +I[81, 1, 1], +I[82, 2, 2], +I[83, 3, 3], +I[83, > 3, 3], +I[84, 4, 4], +I[84, 4, 4], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], > +I[87, 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9], +I[90, > 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[92, 2, 2], +I[92, 2, 2], +I[93, 3, 3], > +I[93, 3, 3], +I[94, 4, 4], +I[95, 5, 5], +I[95, 5, 5], +I[96, 6, 6], +I[96, > 6, 6], +I[97, 7, 7], +I[98, 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.assertIterator(CompactionITCaseBase.java:134) > at > org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.innerTestNonPartition(CompactionITCaseBase.java:109) > at > org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.testNonPartition(CompactionITCaseBase.java:101) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15871&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=03dca39c-73e8-5aaf-601d-328ae5c35f20&l=13139 -- This message was sent by Atlassian Jira (v8.3.4#803005)