Hello!

I would like to run a legacy flink project on top of old-version Flink
(1.4.1) and I'm getting error when trying to cancel a job with savepoint.
Specifically, it reports the following error on requestBuffer:

My understanding would be that the save point operation probably requires
all outstanding messages to be processed, which somehow requires larger
buffer space (not entirely sure about this). However, it seems that my job
has no problem processing regular messages as long as I'm not cancelling it
with savepoint. And I have reduced the "web.backpressure.refresh-interval"
to 100 to force it to check back pressure frequently, but it still leads to
this error.

I am aware that I'd probably get more configuration knobs by running a
newer version of Flink but this particular version has some particular
modified functionalities I want to try. Any suggestions?


2023-03-21 23:04:59,718 WARN org.apache.flink.runtime.taskmanager.Task -
Task 'Source: bid-source -> Filter -> Flat Map -> flatmap-timestamp (10/32)'
did not react to cancelling signal, but is stuck in method:
java.lang.Object.wait(Native Method)
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(
LocalBufferPool.java:222)
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking
(LocalBufferPool.java:191)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(
RecordWriter.java:146)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
RecordWriter.java:92)
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(
StreamRecordWriter.java:84)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(
RecordWriterOutput.java:106)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:88)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:43)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement
(TimestampsAndPunctuatedWatermarksOperator.java:52)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
TimestampedCollector.java:51)
ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
.flatMap(KeyedHighestBidCount.java:245)
ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
.flatMap(KeyedHighestBidCount.java:173)
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
StreamFlatMap.java:50)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:527)
org.apache.flink.streaming.runtime.tasks.OperatorChain$
CopyingChainingOutput.collect(OperatorChain.java:507)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:830)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$
CountingOutput.collect(AbstractStreamOperator.java:808)
org.apache.flink.streaming.api.operators.StreamSourceContexts$
ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
org.apache.flink.streaming.api.operators.StreamSourceContexts$
WatermarkContext.collect(StreamSourceContexts.java:394)
source.NexmarkDynamicBatchSourceFunction.run(
NexmarkDynamicBatchSourceFunction.java:403)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
86)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
55)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
SourceStreamTask.java:94)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:
265)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:748)

Reply via email to