Hello, I have a standalone cluster setup with Flink 1.8. Task manager processes
configured via systemd units with the always restart policy. An error occurred
during execution of the JobGraph and caused termination of the task manager.
Logs from task manager:
{"time":"2019-09-02
11:33:14.797","loglevel":"INFO","class":"org.apache.flink.runtime.taskmanager.Task","message":"Source:
Custom Source -> Filter (7/8) (f6ba0f0040fa578a15a3d71396281a6e) switched from
RUNNING to FAILED.","host":"clickstream-flink08"}
java.lang.RuntimeException: Buffer pool is destroyed.
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more
{"time":"2019-09-02
11:33:14.797","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL
- exception in resource cleanup of task Source: Custom Source (2/8)
(0d0fd38e421b5f2ac389303787ea1f54).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02
11:33:14.801","loglevel":"INFO","class":"org.apache.flink.runtime.taskmanager.Task","message":"Freeing
task resources for Source: Custom Source -> Filter (7/8)
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
{"time":"2019-09-02
11:33:14.801","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL
- exception in resource cleanup of task Source: Custom Source -> Filter (7/8)
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02
11:33:14.803","loglevel":"ERROR","class":"org.apache.flink.runtime.taskexecutor.TaskExecutor","message":"FATAL
- exception in resource cleanup of task Source: Custom Source -> Filter (7/8)
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02
11:33:14.803","loglevel":"ERROR","class":"org.apache.flink.runtime.taskexecutor.TaskManagerRunner","message":"Fatal
error occurred while executing the TaskManager. Shutting it
down...","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02
11:33:14.809","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Shutting
down remote daemon.","host":"clickstream-flink08"}
{"time":"2019-09-02
11:33:14.810","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remote
daemon shut down; proceeding with flushing remote
transports.","host":"clickstream-flink08"}
{"time":"2019-09-02
11:33:14.827","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remoting
shut down.","host":"clickstream-flink08"}
{"time":"2019-09-02
11:33:14.827","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remoting
shut down.","host":"clickstream-flink08"}
{"time":"2019-09-02
11:33:14.836","loglevel":"INFO","class":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Stopped
Akka RPC service.","host":"clickstream-flink08”}
But task manager process is still alive:
flink 29078 423 7.0 49191076 27790920 ? Sl 10:13 828:28 java
-Djava.net.preferIPv4Stack=true
-Dlog.file=/opt/flink/log/flink--taskexecutor-0-clickstream-flink08.log
-Dlog4j.configuration=file:/opt/flink/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml -classpath
/opt/flink/lib/flink-cep_2.12-1.8.0.jar:/opt/flink/lib/flink-queryable-state-runtime_2.12-1.8.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.8.0.jar:/opt/flink/lib/flink-table_2.12-1.8.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.8.0.jar:::
org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir
/opt/flink/conf
Is it acceptable behaviour?
Best regards,
Anton Ustinov