Hi Giriraj, This looks like the deserialization of a String failed. Can you isolate the problem to a pair of sending and receiving tasks?
Best, Fabian Am So., 5. Apr. 2020 um 20:18 Uhr schrieb Giriraj Chauhan < graj.chau...@gmail.com>: > Hi, > > We are submitting a flink(1.9.1) job for data processing. It runs fine and > processes data for sometime i.e. ~30 mins and later it throws following > exception and job gets killed. > 2020-04-02 14:15:43,371 INFO org.apache.flink.runtime.taskmanager.Task > - Sink: Unnamed (2/4) (45d01514f0fb99602883ca43e997e8f3) > switched from RUNNING to FAILED. > java.io.EOFException > at > org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321) > at > org.apache.flink.types.StringValue.readString(StringValue.java:769) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:91) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.base/java.lang.Thread.run(Unknown Source) > > > Once the above exception occur, we do see following runtime exception > > java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.dell.emc.mars.topology.bl.DiffGenerator.handleCreate(DiffGenerator.java:519) > at > com.dell.emc.mars.topology.bl.DiffGenerator.populateHTable(DiffGenerator.java:294) > at > com.dell.emc.mars.topology.bl.DiffGenerator.compare(DiffGenerator.java:58) > at > com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:146) > at > com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:22) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:110) > at > com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:24) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.dell.emc.mars.topology.bl.DiscoveryHandler.handleDisHoldJobs(DiscoveryHandler.java:180) > at > com.dell.emc.mars.topology.bl.DiscoveryHandler.lambda$handleEndMessage$0(DiscoveryHandler.java:138) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 29 more > > We are trying to catch/handle the exception but somehow job is getting > cancelled by manager. > Any opinion on what could be going wrong and how to get our arms around it? > > Best Regards, > Giriraj >