[ 
https://issues.apache.org/jira/browse/FLINK-21427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290198#comment-17290198
 ] 

Kenzyme Le edited comment on FLINK-21427 at 2/24/21, 7:01 PM:
--------------------------------------------------------------

Hi [~roman_khachatryan] ,

I was able to successfully reproduce this error. The issue is actually not with 
S3, but in the GenericWriteAheadSink class.

Based on my tests, stopping successfully with savepoint using 
GenericWriteAheadSink doesn't save an important piece of information to S3: 
[id|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L84].
 I think it should be written to external state 
[here|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L124]

This value is used by 
[CassandraCommitter|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L143]
 to verify if the previous commit was successful. When resuming from savepoint, 
the function 
[isCheckpointCommitted()|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L134]
 will [search in Cassandra for the last checkpoint 
committed|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L134]
 . Since it doesn't have this information in external state, it will use the 
newly generated UUID value for the 
[*id*|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L84].
 Hence, the [SELECT query will return 
null|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L152]
 and the function will return *true* (checkpoint considered not committed to 
external system) and proceed on committing already committed data.

Afterwards, the condition is passed, it will [continue in the if 
block|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L231]
 . Then, in the case of 
[CassandraTupleWriteAheadSink.sendValues()|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java#L133],
 it will loop the values (for me it was stored in S3). At this point is where I 
got the error about not finding key in S3. Since I stopped with savepoint as 
mentioned above, the job should have already committed the data to external 
system, but it will still lookup for nonexistent data in S3.

Please let me know if I made any incorrect assumptions or missed anything.

Thanks!


was (Author: klden):
Hi [~roman_khachatryan] ,

I was able to successfully reproduce this error. The issue is actually not with 
S3, but in the GenericWriteAheadSink class.

Based on my tests, stopping successfully with savepoint using 
GenericWriteAheadSink doesn't save an important piece of information to S3: 
[id|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L84].
 I think it should be written to external state 
[here|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L124]

This value is used by 
[CassandraCommitter|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L143]
 to verify if the previous commit was successful. When resuming from savepoint, 
the function 
[isCheckpointCommitted()|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L134]
 will [search in Cassandra for the last checkpoint 
committed|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L134]
 . Since it doesn't have this information in external state, it will use the 
newly generated UUID value for the 
[*id*|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L84].
 Hence, the [SELECT query will return 
null|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java#L152]
 and the function will return *true* (checkpoint considered not committed to 
external system) and proceed on committing already committed data.

Afterwards, the condition is passed, it will [continue in the if 
block|https://github.com/apache/flink/blob/release-1.11.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java#L231]
 . Then, in the case of 
[CassandraTupleWriteAheadSink.sendValues()|https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java#L133],
 it will loop the values (for me it was stored in S3). At this point is where I 
got the error about not finding key in S3. Since I stopped with savepoint as 
mentioned above, the job would have already committed the data to external 
system, but it will still lookup for nonexistent data in S3.

Please let me know if I made any incorrect assumptions or missed anything.

Thanks!

> Recovering from savepoint key does not exist in S3
> --------------------------------------------------
>
>                 Key: FLINK-21427
>                 URL: https://issues.apache.org/jira/browse/FLINK-21427
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.11.2
>            Reporter: Kenzyme Le
>            Priority: Major
>
> Hi,
> I was able to stop and generate a savepoint successfully, but resuming the 
> job caused repeated errors in the logs *specified key does not exist* in S3.
> Plugin used: *flink-s3-fs-presto-1.11.2.jar*
>  
> {code:java}
> [] - Could not commit checkpoint.
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>  com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does 
> not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; 
> Request ID: D76810E62E37680C; S3 Extended Request ID: 
> h5MRYXOZj5UnPEVjYtMOnUqXUSeJ784eFz3PEQjdT8B7499ZvV+3DHNLII8WLVbVhJ1/ujPG7Bo=),
>  S3 Extended Request ID: 
> h5MRYXOZj5UnPEVjYtMOnUqXUSeJ784eFz3PEQjdT8B7499ZvV+3DHNLII8WLVbVhJ1/ujPG7Bo= 
> (Path: 
> s3p://app/flink/checkpoints/prod/613240ac4a3ebb2e1a428bbd1a973433/taskowned/7a252162-f002-4afc-a45a-04b0a622c204)
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:917)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>       at java.io.BufferedInputStream.fill(BufferedInputStream.java:252) ~[?:?]
>       at java.io.BufferedInputStream.read(BufferedInputStream.java:271) ~[?:?]
>       at java.io.FilterInputStream.read(FilterInputStream.java:83) ~[?:?]
>       at 
> org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
>  ~[?:?]
>       at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at java.io.DataInputStream.readByte(DataInputStream.java:270) ~[?:?]
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:435)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper.hasNext(ReusingMutableToRegularIteratorWrapper.java:61)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> com.app.streams.kernel.sinks.JdbcBatchWriteAheadSink.sendValues(JdbcBatchWriteAheadSink.java:52)
>  
> ~[blob_p-642a2d12ebc0fdfb4a406ab5e9ebff24a2edf335-29b861b5042a27f11426951b8a753b1f:?]
>       at 
> org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink.notifyCheckpointComplete(GenericWriteAheadSink.java:233)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:107)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:283)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:987)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:958)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:974)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:79)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runSynchronousSavepointMailboxLoop(StreamTask.java:406)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:881)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  [flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  [flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>  [flink-dist_2.12-1.11.2.jar:1.11.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>  [flink-dist_2.12-1.11.2.jar:1.11.2]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>       at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified 
> key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: 
> NoSuchKey; Request ID: D76810E62E37680C; S3 Extended Request ID: 
> h5MRYXOZj5UnPEVjYtMOnUqXUSeJ784eFz3PEQjdT8B7499ZvV+3DHNLII8WLVbVhJ1/ujPG7Bo=)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1486) 
> ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>       ... 41 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to