suheng.cloud created FLINK-27823:
------------------------------------
Summary: Standalone Job continously restart by illegal
checkpointId check on PartitionTimeCommitTrigger when use FilesystemTableSink
Key: FLINK-27823
URL: https://issues.apache.org/jira/browse/FLINK-27823
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.13.6
Reporter: suheng.cloud
Hi, community
When I build up a standalone job to read from kafka topic and sink to hdfs, I
found the job continously restart after normal running 4 hours.
When the first restart show up, the logs are like
```
2022-05-28 00:24:04,861 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job
00000000000000000000000000000000.
2022-05-28 00:34:04,861 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26 of
job 00000000000000000000000000000000 expired before completing.
2022-05-28 00:34:04,866 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job
00000000000000000000000000000000.
2022-05-28 00:41:02,208 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in 417284
ms).
2022-05-28 00:41:18,517 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter
-> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677) switched from RUNNING to
FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142 (dataPort=43131).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
watermark information is:
{27=1653668944610}
.
at
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 00:41:18,524 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 50 tasks should be restarted to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 00:41:18,526 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
2022-05-28 00:41:18,526 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
...
...
```
after that, the job restart and successfully restore state form cp(using
state.checkpoint-storage=jobmanager), and the following checkpoint
(27/28/29/...) can also be sucessfully finished. But it seems the recovered
state try to report commit msg of old checkpoint 26 to the PartitionCommitter
which continously cause failures.
Finally the job restart again and again, and the same error log likes
```
2022-05-28 08:36:23,718 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter
-> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86) switched from RUNNING to
FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226 (dataPort=41827).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
watermark information is:
{27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517,
31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057,
35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421,
39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173,
43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522,
47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692,
51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683,
55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742,
60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056,
64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529,
68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991,
72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397,
76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090,
80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953,
84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316,
88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643,
92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307,
96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146,
100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179,
104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050,
108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699,
112=1653697882510}
.
at
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 08:36:23,718 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 50 tasks should be restarted to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
```
The job logic is very simple, which flink sql like
```
CREATE TEMPORAY TABLE filesystem_sink_table(....)
PARTITIONED BY(`dt`,`hour`,`topic`) WITH(
'connector'='filesystem',
'format'='textfile',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 hour',
'sink.partition-commit.policy.kind'='success-file',
'auto-compaction' = 'true'
...
);
CREATE TEMPORARY TABLE kafka_source_table ...
streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM
kafka_source_table");
```
I have seek the source at PartitionTimeCommitTrigger, and what puzzle me is
that it seems the watermarks should only remove the committed checkpointId
after pass the valiation
```
...
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(
String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
...
```
So, do I mistake some config or there some inconsistent state?
Thanks for any help.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)