Hi Soheil,

I think the root cause is that in the cancellation, the task was stuck in

*org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)*


The taskmanager process exit is expected in this case to enforce a failure
and recovery.
To be specific, when a task on the TM is to be canceled, a
*TaskCancelerWatchDog* will be started to watch the cancellation.
If the cancellation timed out, the watchdog would trigger a fatal error to
force the TM to exit.

I think you may need to diagnostic why the postgresql call took so long to
flush data.
Alternatively, if the long flushing time cost is expected, one can increase
the cancellation timeout ("task.cancellation.timeout") to avoid this issue.

Thanks,
Zhu Zhu

Soheil Pourbafrani <soheil.i...@gmail.com> 于2020年2月15日周六 上午6:41写道:

> Hi,
>
> I developed a single Flink job that read a huge amount of files and after
> some simple preprocessing, sink them into the database. I use the built-in
> JDBCOutputFormat for inserting records into the database. The problem is
> when I cancel the job using either the WebUI or the command line, the job
> did not cancel completely and finally, the taskmanager process crashes!
> Here are the taskmanager logs (generated continuously for some seconds):
>
> 2020-02-15 01:17:17,208 WARN
>>  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
>>  - The reader is stuck in method:
>>  java.lang.Object.wait(Native Method)
>> org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
>> org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
>>
>> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>>
>> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>>
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>>
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>> 2020-02-15 01:17:17,224 INFO
>>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
>> shut down.
>> 2020-02-15 01:17:17,225 INFO
>>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
>> shut down.
>
>
> I'm using the
> Flink: 1.7.2,
> java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
>
> Any help will be appreciated.
>
> All the best,
> Soheil
>

Reply via email to