Hello,
Thanks for the research! Good to know the cause.
Greetings,
Frank
On 03.02.22 17:18, Dawid Wysakowicz wrote:
I looked into the code again and unfortunately I have bad news :(
Indeed we treat S3 as if it always injects entropy. Even if the
entropy key is not specified, which effectively means it is disabled.
I created a JIRA ticket[1] to fix it.
Best,
Dawid
[1] https://issues.apache.org/jira/browse/FLINK-25952
On 03/02/2022 17:02, Frank Dekervel wrote:
Hello,
I didn't know about entropy injection. I have checked, and there is
no entropy injection configured in my flink-conf.yaml. This is the
relevant section:
s3.access-key: ???
s3.endpoint: http://minio/
s3.path.style.access: true
s3.secret-key: ???
I see that there are still S3 paths defined in the _metadata
kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ cat _metadata
| strings | grep s3
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/a6d59334-2769-4a6e-b582-d38d58352021
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f627c959-d69d-41a1-9732-748795efb9ad
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f9a03af4-2868-4797-a950-10257282ed1e
...
not all paths are existing
kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ l
b81e4e28-eabd-499e-9561-b98137084a9c _metadata
Thanks!
Greetings,
Frank
On 03.02.22 16:38, Dawid Wysakowicz wrote:
Hi Frank.
Do you use entropy injection by chance? I am afraid savepoints are
not relocatable in combination with entropy injection as described
here[1].
Best,
Dawid
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints
On 03/02/2022 14:44, Frank Dekervel wrote:
Hello,
I'm trying to inspect a savepoint that was stored on s3://flink/
(on a minio server), i downloaded it to my laptop for inspection. I
have two KeyedProcessFunctions (state in the same savepoint) and
strangely enough, one works perfectly and the other one doesn't.
The code is fairly simple:
val savepoint= Savepoint.load(bEnv.getJavaEnv, path,
newHashMapStateBackend()) ;
import org.apache.flink.api.scala._
// first one
val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti,
tia)
val valss= ds.collect().asScala;
// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path,
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader());
// here we ser/deser in kryo not scala case class serializer. No
idea why, but that's how its in the savepoint
val vals_sup= ds_sup.collect().asScala;
The second one seems to fail because it wants to access the
savepoint on the original path on S3 (which my laptop doesn't have
access to). I tought savepoints were supposed to be relocatable.
Weirdly enough, the first one works just fine.
This is the exception i get:
[error] Caused by:
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;
Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID:
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=;
Proxy: null), S3 Extended Request ID:
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1093)
[error] at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1078)
[error] at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1071)
[error] at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$1(PrestoS3FileSystem.java:1015)
[error] at
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:1014)
[error] at
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
[error] at
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
[error] at
java.io.FilterInputStream.read(FilterInputStream.java:83)
[error] at
org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
[error] at
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
[error] at
java.io.DataInputStream.readInt(DataInputStream.java:387)
[error] at
org.apache.flink.core.io.VersionedIOReadableWritable.read(VersionedIOReadableWritable.java:46)
[error] at
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:139)
[error] at
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.readMetaData(FullSnapshotRestoreOperation.java:194)
[error] at
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.restoreKeyGroupsInStateHandle(FullSnapshotRestoreOperation.java:171)
[error] at
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.access$100(FullSnapshotRestoreOperation.java:113)
[error] at
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:158)
[error] at
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:140)
[error] at
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:115)
[error] at
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
[error] at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174)
[error] at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
[error] at
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
[error] at
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
[error] at
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
[error] at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
[error] at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
[error] at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
[error] at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
[error] at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
[error] at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:187)
[error] at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:155)
[error] at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
[error] at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
[error] at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
[error] at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
[error] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
[error] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
[error] at java.lang.Thread.run(Thread.java:748)
Anybody knows what i'm doing wrong ?
Thanks!
Frank