Hi zhijiang,

I checked this value and I haven't configured it so I think it should be the default 10s. I checked how long the flink cancel command took with the time command and it was finished after 6 seconds.

After filtering out the messages of one Sink, it looks like it interrupts it in the area of milliseconds. Here are the logs from one taskmanager (standalone cluster).

06:53:16,484 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed). 06:53:16,484 INFO org.apache.flink.runtime.taskmanager.Task - Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed) switched from RUNNING to CANCELING. 06:53:16,484 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed). 06:53:16,503 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. 06:53:16,503 ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - Error while trying to hflushOrSync! java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.streaming.connectors.fs.StreamWriterBase.hflushOrSync(StreamWriterBase.java:72) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:131) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:146) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
06:53:16,504 INFO org.apache.flink.core.fs.FileSystem - Ensuring all FileSystem streams are closed for Sink: /tmp/flink/events_invalid HDFS sink (2/2) 06:53:16,504 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: /tmp/flink/events_invalid HDFS sink (477db6e41932ad9b60c72e14de4488ed)

Best,
Jürgen

On 13.04.2017 07:05, Zhijiang(wangzhijiang999) wrote:
Hi Jürgen,

You can set the timeout in the configuration by this key "akka.ask.timeout", and the current default value is 10 s. Hope it can help you.


cheers,
zhijiang


    ------------------------------------------------------------------
    发件人:Jürgen Thomann <juergen.thom...@innogames.com>
    发送时间:2017年4月12日(星期三) 19:04
    收件人:user <user@flink.apache.org>
    主 题:Changing timeout for cancel command

    Hi,

    We currently get the following exception if we cancel a job which writes

    to Hadoop:
    ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink

    - Error while trying to hflushOrSync! java.io.InterruptedIOException:
    Interrupted while waiting for data to be acknowledged by pipeline

    This causes problem if we cancel a job with creating a savepoint and
    resubmitting the job because the file is sometimes at the end smaller
    than the file size specified in the valid-length file.

    Is there a way to increase the time out during cancel to give the flush

    a bit more time? We currently lose events if this happens.

    Best,
    Jürgen


Reply via email to