suheng.cloud created FLINK-27823:
------------------------------------

             Summary: Standalone Job continously restart by illegal 
checkpointId check on PartitionTimeCommitTrigger when use  FilesystemTableSink
                 Key: FLINK-27823
                 URL: https://issues.apache.org/jira/browse/FLINK-27823
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.6
            Reporter: suheng.cloud


Hi, community

When I build up a standalone job to read from kafka topic and sink to hdfs, I 
found the job continously restart after normal running 4 hours.
When the first restart show up, the logs are like

```
2022-05-28 00:24:04,861 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job 
00000000000000000000000000000000.
2022-05-28 00:34:04,861 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26 of 
job 00000000000000000000000000000000 expired before completing.
2022-05-28 00:34:04,866 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job 
00000000000000000000000000000000.
2022-05-28 00:41:02,208 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in 417284 
ms).
2022-05-28 00:41:18,517 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter 
-> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677) switched from RUNNING to 
FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142 (dataPort=43131).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The 
watermark information is:

{27=1653668944610}

.
at 
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
 ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
 ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
 ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) 
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 00:41:18,524 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 50 tasks should be restarted to recover the failed task 
cc0206f9bd17ee99dc4565713cd749d7_0. 
2022-05-28 00:41:18,525 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx 
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 00:41:18,526 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
(24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
2022-05-28 00:41:18,526 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
(23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
...
...
```

after that, the job restart and successfully restore state form cp(using 
state.checkpoint-storage=jobmanager), and the following checkpoint 
(27/28/29/...) can also be sucessfully finished. But it seems the recovered 
state try to report commit msg of old checkpoint 26 to the PartitionCommitter 
which continously cause failures.
Finally the job restart again and again, and the same error log likes

```
2022-05-28 08:36:23,718 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter 
-> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86) switched from RUNNING to 
FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226 (dataPort=41827).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The 
watermark information is:

{27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517, 
31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057, 
35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421, 
39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173, 
43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522, 
47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692, 
51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683, 
55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742, 
60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056, 
64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529, 
68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991, 
72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397, 
76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090, 
80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953, 
84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316, 
88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643, 
92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307, 
96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146, 
100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179, 
104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050, 
108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699, 
112=1653697882510}

.
at 
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
 ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
 ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
 ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) 
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 08:36:23,718 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 50 tasks should be restarted to recover the failed task 
cc0206f9bd17ee99dc4565713cd749d7_0. 
2022-05-28 08:36:23,719 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx 
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 08:36:23,719 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
(24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
2022-05-28 08:36:23,719 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
(23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
```

The job logic is very simple, which flink sql like

```

CREATE TEMPORAY TABLE filesystem_sink_table(....)

PARTITIONED BY(`dt`,`hour`,`topic`) WITH(

'connector'='filesystem',

'format'='textfile',

'sink.partition-commit.trigger'='partition-time',

'sink.partition-commit.delay'='1 hour',

'sink.partition-commit.policy.kind'='success-file',

'auto-compaction' = 'true'

...

);

CREATE TEMPORARY TABLE kafka_source_table ...

streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM 
kafka_source_table");

```

I have seek the source at PartitionTimeCommitTrigger, and what puzzle me is 
that it seems the watermarks should only remove the committed checkpointId 
after pass the valiation

```

...

if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(
String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

...

```

So, do I mistake some config or there some inconsistent state?

Thanks for any help.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to