Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not
quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(
StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(
StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(
StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput
.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput
.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.
TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks
.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.
TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:
125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:
78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl
.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput
.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:
762)
    at org.apache.flink.streaming.api.operators.CountingOutput
.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput
.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179
)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(
BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.
SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:
90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter
.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.
ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:
80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput
.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Thomas,
>
> For querying the savepoint status, a get request could be issued to
>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and
> position
> of the savepoint. But if the job is running with some kind of per-job mode
> and
> JobMaster is gone after the stop-with-savepoint, the request might not be
> available.
>
> For the kafka source, have you ever found some exception or some messages
> in the
> TaskManager's log when it could not be stopped ?
>
> Best,
> Yun
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <w...@datability.io>
> *Send Date:*Sat Jun 5 00:47:47 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Failed to cancel a job using the STOP rest API
>
>> Hi Yun,
>>
>> Thanks for your reply. We are not using any legacy source. For this
>> specific job, there is only one source that is using FlinkKafkaConsumer
>> which I assume has the correct cancel() method implemented.
>>
>> Also could you suggest how I could use the "request-id" to get the
>> savepoint location?
>>
>> Thanks.
>>
>> Thomas
>>
>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> Hi Thomas,
>>>
>>> I think you are right that the CLI is also using the same rest API
>>> underlying, and since
>>> the response of the rest API is ok and the savepoint is triggered
>>> successfully, I reckon
>>> that it might not be due to rest API process, and we might still first
>>> focus on the
>>> stop-with-savepoint process.
>>>
>>> Currently stop-with-savepoint would first do a savepoint, then cancel
>>> all the sources to
>>> stop the job. Thus are the sources all legacy source (namely the one
>>> using SourceFunction) ?
>>> and does the source implement the cancel() method correctly ?
>>>
>>> Best,
>>> Yun
>>>
>>> ------------------------------------------------------------------
>>> From:Thomas Wang <w...@datability.io>
>>> Send Time:2021 Jun. 4 (Fri.) 12:55
>>> To:user <user@flink.apache.org>
>>> Subject:Failed to cancel a job using the STOP rest API
>>>
>>> Hi, Flink community,
>>>
>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing
>>> some inconsistent results. Sometimes, jobs could be cancelled successfully
>>> while other times, they couldn't. Either way, the POST request is accepted
>>> with a status code 202 and a "request-id".
>>>
>>> From the Flink UI, I can see the savepoint being completed successfully.
>>> However the job is still in running state afterwards. The CLI command
>>> `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and
>>> get the resulting savepoint location. If I understand this correctly, the
>>> CLI should be using the same REST API behind the scenes, isn't it?
>>>
>>> Here is my POST request URL: `http://
>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>>>
>>> Here is the BODY of the request:
>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>>>
>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>>>
>>> Any suggestions on how I can debug this?
>>>
>>> Another question is, given the response "request-id", which endpoint
>>> should I query to get the status of the request? Most importantly, where
>>> can I get the expected savepoint location?
>>>
>>> Thanks.
>>>
>>> Thomas
>>>
>>>
>>>

Reply via email to