[ https://issues.apache.org/jira/browse/FLINK-20665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251486#comment-17251486 ]
zhuxiaoshang edited comment on FLINK-20665 at 12/18/20, 3:55 AM: ----------------------------------------------------------------- [~lzljs3620320],I'd like to do this job. BTW,I find that if some small files are compacted,when restore, they will be compacted again. How can we ensure data consistency. was (Author: zhushang): [~lzljs3620320],I'd like to do this job. BTW,I find that if some small files are compacted,when restore, they will be compacted again. > FileNotFoundException when restore from latest Checkpoint > --------------------------------------------------------- > > Key: FLINK-20665 > URL: https://issues.apache.org/jira/browse/FLINK-20665 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.12.0 > Reporter: zhuxiaoshang > Priority: Blocker > Fix For: 1.12.1 > > > reproduce steps: > 1.a kafka to hdfs job,open `auto-compaction` > 2.when the job have done a successful checkpoint then cancel the job. > 3.restore from the latest checkpoint. > 4.after the first checkpoint has done ,the exception will appear > {code:java} > 2020-12-18 10:40:58java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > hdfs://xxxx/day=2020-12-18/hour=10/.uncompacted-part-84db54f8-eda9-4e01-8e85-672144041642-0-0 > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:160) > at > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:41) at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:169) > at java.util.HashMap.forEach(HashMap.java:1289) at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:166) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:147) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:137) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.io.FileNotFoundException: File does not exist: > hdfs://xxxx/day=2020-12-18/hour=10/.uncompacted-part-84db54f8-eda9-4e01-8e85-672144041642-0-0 > at > org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1441) > at > org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1434) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1434) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:64) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:158) > ... 17 more > {code} > DDL > {code:java} > CREATE TABLE cpc_bd_recall_log_hdfs ( log_timestamp BIGINT, ip STRING, > `raw` STRING, `day` STRING, `hour` STRING) PARTITIONED BY (`day` , > `hour`) WITH ( 'connector'='filesystem', 'path'='hdfs://xxx', > 'format'='parquet', 'parquet.compression'='SNAPPY', > 'sink.partition-commit.policy.kind' = 'success-file', 'auto-compaction' = > 'true', 'compaction.file-size' = '128MB'); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)