[ 
https://issues.apache.org/jira/browse/FLINK-22073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363368#comment-17363368
 ] 

luoyuxia edited comment on FLINK-22073 at 6/15/21, 4:18 AM:
------------------------------------------------------------

>From the detail log, there are three streaming-writer, two of them received 
>endInput, but there remains one streaming-writer-2 are running and waiting to 
>do checkpoint. And then when streaming-writer-2 tries to do checkpoint, it 
>fail. Thus, we will find data lose. 
{code:java}
# streaming-writer-3 is finished
22:55:01,666 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (3/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 
received completion notification for checkpoint with id=9223372036854775807.

22:55:01,662 [Source: Custom Source (1/1)#0] INFO 
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/1)#0 
(77153182cb299b13c51028fb517baa23) switched from RUNNING to FINISHED. 

22:55:01,662 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 5 (type=CHECKPOINT) @ 1623452101659 for job 
ae7cdeec14ebc2cf7b67a971b51c0c1e.

22:55:01,660 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (2/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 
received completion notification for checkpoint with id=4.

22:55:01,668 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (1/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=9223372036854775807 (max part counter=67).


# streaming-writer-1 is finished
22:55:01,668 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (1/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=9223372036854775807.

22:55:01,668 [Source: Custom Source (1/1)#0] INFO 
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
Source: Custom Source (1/1)#0 (77153182cb299b13c51028fb517baa23).

22:55:01,670 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FINISHED to JobManager for task Source: Custom 
Source (1/1)#0 77153182cb299b13c51028fb517baa23.


# fail do checkpoint 5 for streaming-writer-2 is not finished yet
22:55:01,673 [jobmanager-future-thread-6] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline 
checkpoint 5 by task 77153182cb299b13c51028fb517baa23 of job 
ae7cdeec14ebc2cf7b67a971b51c0c1e at 9892322e-f9dc-457f-b051-0d3f7c96b111 @ 
localhost (dataPort=-1). org.apache.flink.util.SerializedThrowable: Task name 
with subtask : Source: Custom Source (1/1)#0 Failure reason: Checkpoint was 
declined (tasks not ready)
{code}
So just like [~Leonard Xu] said in 
https://issues.apache.org/jira/browse/FLINK-21032?focusedCommentId=17268345&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17268345,

I think that's the cause of this test failure, we may need to set parallelism  
parameters to avoid producing multiple stream-write.

How do you think? [~dwysakowicz] [~lzljs3620320]

 

By the way, after I dive into the code,  I found even though when the input 
ends,  the CompactFileWriter won't emit EndCheckpoint with a Long.MAX_VALUE,  
that will also case data lose. I would like to create a Jira and fix it.

 


was (Author: luoyuxia):
>From the detail log, there are three streaming-writer, two of them received 
>endInput, but there remains one streaming-writer-2 are running and waiting to 
>do checkpoint. And then when streaming-writer-2 tries to do checkpoint, it 
>fail. Thus, we will find data lose. 
{code:java}
# streaming-writer-3 is finished
22:55:01,666 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (3/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 
received completion notification for checkpoint with id=9223372036854775807.
22:55:01,662 [Source: Custom Source (1/1)#0] INFO 
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source (1/1)#0 
(77153182cb299b13c51028fb517baa23) switched from RUNNING to FINISHED. 
22:55:01,662 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 5 (type=CHECKPOINT) @ 1623452101659 for job 
ae7cdeec14ebc2cf7b67a971b51c0c1e.
22:55:01,660 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (2/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 
received completion notification for checkpoint with id=4.
22:55:01,668 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (1/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=9223372036854775807 (max part counter=67).

# streaming-writer-1 is finished
22:55:01,668 [Filter -> 
DataSteamToTable(stream=default_catalog.default_database.my_table, type=ROW<`a` 
INT, `b` STRING, `c` STRING> NOT NULL, rowtime=false, watermark=false) -> 
streaming-writer (1/3)#0] INFO 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=9223372036854775807.
 22:55:01,668 [Source: Custom Source (1/1)#0] INFO 
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
Source: Custom Source (1/1)#0 (77153182cb299b13c51028fb517baa23).
22:55:01,670 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FINISHED to JobManager for task Source: Custom 
Source (1/1)#0 77153182cb299b13c51028fb517baa23.
# fail do checkpoint 5 for streaming-writer-2 is not finished yet
22:55:01,673 [jobmanager-future-thread-6] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline 
checkpoint 5 by task 77153182cb299b13c51028fb517baa23 of job 
ae7cdeec14ebc2cf7b67a971b51c0c1e at 9892322e-f9dc-457f-b051-0d3f7c96b111 @ 
localhost (dataPort=-1). org.apache.flink.util.SerializedThrowable: Task name 
with subtask : Source: Custom Source (1/1)#0 Failure reason: Checkpoint was 
declined (tasks not ready)
{code}
So just like [~Leonard Xu] said in 
https://issues.apache.org/jira/browse/FLINK-21032?focusedCommentId=17268345&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17268345,

I think that's the cause of this test failure, we may need to set parallelism  
parameters to avoid producing multiple stream-write.

How do you think? [~dwysakowicz] [~lzljs3620320]

 

By the way, after I dive into the code,  I found even though when the input 
ends,  the CompactFileWriter won't emit EndCheckpoint with a Long.MAX_VALUE,  
that will also case data lose. I would like to create a Jira and fix it.

 

> ParquetFileCompactionITCase fails on Azure
> ------------------------------------------
>
>                 Key: FLINK-22073
>                 URL: https://issues.apache.org/jira/browse/FLINK-22073
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.13.0
>            Reporter: Dawid Wysakowicz
>            Assignee: luoyuxia
>            Priority: Minor
>              Labels: auto-deprioritized-major, test-stability
>             Fix For: 1.14.0
>
>
> {code}
> [INFO] Running org.apache.flink.formats.parquet.ParquetFsStreamingSinkITCase
> [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 17.612 s <<< FAILURE! - in 
> org.apache.flink.formats.parquet.ParquetFileCompactionITCase
> [ERROR] 
> testNonPartition(org.apache.flink.formats.parquet.ParquetFileCompactionITCase)
>   Time elapsed: 1.509 s  <<< FAILURE!
> java.lang.AssertionError: expected:<[+I[0, 0, 0], +I[0, 0, 0], +I[1, 1, 1], 
> +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], +I[4, 4, 4], 
> +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], 
> +I[7, 7, 7], +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 
> 0], +I[10, 0, 0], +I[11, 1, 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], 
> +I[13, 3, 3], +I[13, 3, 3], +I[14, 4, 4], +I[14, 4, 4], +I[15, 5, 5], +I[15, 
> 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 7, 7], +I[17, 7, 7], +I[18, 8, 8], 
> +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], +I[21, 
> 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3], 
> +I[24, 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[26, 
> 6, 6], +I[27, 7, 7], +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], 
> +I[29, 9, 9], +I[30, 0, 0], +I[30, 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, 
> 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], +I[34, 4, 4], +I[34, 4, 4], 
> +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 7, 7], +I[37, 
> 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], 
> +I[40, 0, 0], +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 
> 3, 3], +I[43, 3, 3], +I[44, 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], 
> +I[46, 6, 6], +I[46, 6, 6], +I[47, 7, 7], +I[47, 7, 7], +I[48, 8, 8], +I[48, 
> 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 0, 0], +I[50, 0, 0], +I[51, 1, 1], 
> +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], +I[54, 
> 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6], 
> +I[57, 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[59, 
> 9, 9], +I[60, 0, 0], +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], 
> +I[62, 2, 2], +I[63, 3, 3], +I[63, 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, 
> 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], +I[67, 7, 7], +I[67, 7, 7], 
> +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 0, 0], +I[70, 
> 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], 
> +I[73, 3, 3], +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 
> 6, 6], +I[76, 6, 6], +I[77, 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], 
> +I[79, 9, 9], +I[79, 9, 9], +I[80, 0, 0], +I[80, 0, 0], +I[81, 1, 1], +I[81, 
> 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 3, 3], +I[83, 3, 3], +I[84, 4, 4], 
> +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], +I[87, 
> 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9], 
> +I[90, 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[92, 
> 2, 2], +I[93, 3, 3], +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], 
> +I[95, 5, 5], +I[96, 6, 6], +I[96, 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, 
> 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]> but was:<[+I[0, 0, 0], 
> +I[0, 0, 0], +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], 
> +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], 
> +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 0], +I[11, 1, 
> 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], +I[13, 3, 3], +I[14, 4, 4], 
> +I[14, 4, 4], +I[15, 5, 5], +I[15, 5, 5], +I[16, 6, 6], +I[17, 7, 7], +I[17, 
> 7, 7], +I[18, 8, 8], +I[18, 8, 8], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], 
> +I[21, 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3], +I[24, 
> 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[26, 6, 6], +I[26, 6, 6], +I[27, 7, 7], 
> +I[27, 7, 7], +I[28, 8, 8], +I[29, 9, 9], +I[29, 9, 9], +I[30, 0, 0], +I[30, 
> 0, 0], +I[31, 1, 1], +I[32, 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], 
> +I[34, 4, 4], +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 
> 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], 
> +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 3, 3], +I[44, 
> 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], +I[46, 6, 6], +I[47, 7, 7], 
> +I[47, 7, 7], +I[48, 8, 8], +I[48, 8, 8], +I[49, 9, 9], +I[50, 0, 0], +I[50, 
> 0, 0], +I[51, 1, 1], +I[51, 1, 1], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], 
> +I[54, 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6], +I[57, 
> 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[59, 9, 9], +I[59, 9, 9], +I[60, 0, 0], 
> +I[60, 0, 0], +I[61, 1, 1], +I[62, 2, 2], +I[62, 2, 2], +I[63, 3, 3], +I[63, 
> 3, 3], +I[64, 4, 4], +I[65, 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], 
> +I[67, 7, 7], +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 
> 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], 
> +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 6, 6], +I[77, 
> 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], +I[79, 9, 9], +I[80, 0, 0], 
> +I[80, 0, 0], +I[81, 1, 1], +I[81, 1, 1], +I[82, 2, 2], +I[83, 3, 3], +I[83, 
> 3, 3], +I[84, 4, 4], +I[84, 4, 4], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], 
> +I[87, 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9], +I[90, 
> 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[92, 2, 2], +I[92, 2, 2], +I[93, 3, 3], 
> +I[93, 3, 3], +I[94, 4, 4], +I[95, 5, 5], +I[95, 5, 5], +I[96, 6, 6], +I[96, 
> 6, 6], +I[97, 7, 7], +I[98, 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]>
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.failNotEquals(Assert.java:834)
>       at org.junit.Assert.assertEquals(Assert.java:118)
>       at org.junit.Assert.assertEquals(Assert.java:144)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.assertIterator(CompactionITCaseBase.java:134)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.innerTestNonPartition(CompactionITCaseBase.java:109)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.testNonPartition(CompactionITCaseBase.java:101)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15871&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=03dca39c-73e8-5aaf-601d-328ae5c35f20&l=13139



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to