Shashank Agarwal created FLINK-7756:
---------------------------------------
Summary: RocksDB state backend Checkpointing (Async and
Incremental) is not working with CEP.
Key: FLINK-7756
URL: https://issues.apache.org/jira/browse/FLINK-7756
Project: Flink
Issue Type: Bug
Components: CEP, State Backends, Checkpointing, Streaming
Affects Versions: 1.3.2
Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
Reporter: Shashank Agarwal
When i try to use RocksDBStateBackend on my staging cluster (which is using
HDFS as file system) it crashes. But When i use FsStateBackend on staging
(which is using HDFS as file system) it is working fine.
On local with local file system it's working fine in both cases.
Please check attached logs. I have around 20-25 tasks in my app.
{code:java}
2017-09-29 14:21:31,639 INFO
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state to
restore for the BucketingSink (taskIdx=0).
2017-09-29 14:21:31,640 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend -
Initializing RocksDB keyed state backend from snapshot.
2017-09-29 14:21:32,020 INFO
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state to
restore for the BucketingSink (taskIdx=1).
2017-09-29 14:21:32,022 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend -
Initializing RocksDB keyed state backend from snapshot.
2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil
- Found Netty's native epoll transport in the classpath, using it
2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Co-Flat Map (1/2)
(b879f192c4e8aae6671cdafb3a24c00a).
2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Map (2/2)
(1ea5aef6ccc7031edc6b37da2912d90b).
2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Co-Flat Map (2/2)
(4bac8e764c67520d418a4c755be23d4d).
2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task
- Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched
from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2
for operator Co-Flat Map (1/2).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator
Co-Flat Map (1/2).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caused by: java.lang.IllegalStateException
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
[CIRCULAR REFERENCE:java.lang.IllegalStateException]
2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Map (1/2)
(a06925261e74b4efdf50a30089e2b778).
2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Map (1/2)
(1747902c96e63fefd977ac4d4a01d2fa).
2017-09-29 14:21:34,180 INFO org.apache.flink.runtime.taskmanager.Task
- Map (1/2) (a06925261e74b4efdf50a30089e2b778) switched from
RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2
for operator Map (1/2).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator
Map (1/2).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caused by: java.lang.IllegalStateException
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
[CIRCULAR REFERENCE:java.lang.IllegalStateException]
{code}
That same printed for around 12-13 tasks. Than following logs printed :
{code:java}
2017-09-29 14:21:35,039 INFO org.apache.flink.runtime.taskmanager.Task
- Ensuring all FileSystem streams are closed for task Source:
Custom Source (2/2) (77c896e2a2063e98f399244cae21c260) [CANCELED]
2017-09-29 14:21:35,041 WARN org.apache.hadoop.ipc.Client
- interrupted waiting to send rpc request to server
java.lang.InterruptedException
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at
org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
at org.apache.hadoop.ipc.Client.call(Client.java:1454)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy12.delete(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy13.delete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-09-29 14:21:35,042 WARN
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
not delete the checkpoint stream file
hdfs://static.175.87.9.5.clients.your-server.de:8020/flink/flink-checkpoints/rocksDB/events/e10dbe09aa2ecccb22737ddce8b4dc9f/chk-2/a28796de-978a-4f1a-8ff5-5f5c654b0ffc.
java.io.IOException: java.lang.InterruptedException
at org.apache.hadoop.ipc.Client.call(Client.java:1460)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy12.delete(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy13.delete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at
org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
at org.apache.hadoop.ipc.Client.call(Client.java:1454)
... 31 more
2017-09-29 14:21:35,054 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task KeyedCEPPatternOperator -> Flat Map ->
(Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee).
2017-09-29 14:21:35,055 INFO org.apache.flink.runtime.taskmanager.Task
- KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) (1/2)
(8c6eff62d47c4a624a7554065bac36ee) switched from RUNNING to CANCELING.
{code}
Than same printed for 12-13 tasks.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)