Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t
stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?


[1]: https://issues.apache.org/jira/browse/FLINK-21028


Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed
an issue for the exception first [1].



[1] https://issues.apache.org/jira/browse/FLINK-22928


------------------Original Mail ------------------
*Sender:*Thomas Wang <w...@datability.io>
*Send Date:*Tue Jun 8 07:45:52 2021
*Recipients:*Yun Gao <yungao...@aliyun.com>
*CC:*user <user@flink.apache.org>
*Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API

> This is actually a very simple job that reads from Kafka and writes to S3
> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
> and nothing custom.
>
> Thomas
>
> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote:
>
>> Hi Thoms,
>>
>> Very thanks for reporting the exceptions, and it seems to be not work as
>> expected to me...
>> Could you also show us the dag of the job ? And does some operators in
>> the source task
>> use multiple-threads to emit records?
>>
>> Best,
>> Yun
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Thomas Wang <w...@datability.io>
>> *Send Date:*Sun Jun 6 04:02:20 2021
>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>> *CC:*user <user@flink.apache.org>
>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>>
>>> One thing I noticed is that if I set drain = true, the job could be
>>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>>> is a bulk-encoded format and only writes to disk during checkpoints?
>>>
>>> Thomas
>>>
>>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote:
>>>
>>>> Hi Yun,
>>>>
>>>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
>>>> not quite sure what they mean though. Any hints?
>>>>
>>>> Thanks.
>>>>
>>>> Thomas
>>>>
>>>> ```
>>>> 2021-06-05 10:02:51
>>>> java.util.concurrent.ExecutionException:
>>>> org.apache.flink.streaming.runtime.tasks.
>>>> ExceptionInChainedOperatorException: Could not forward element to next
>>>> operator
>>>>     at java.util.concurrent.CompletableFuture.reportGet(
>>>> CompletableFuture.java:357)
>>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>> .java:1928)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .close(StreamOperatorWrapper.java:130)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .close(StreamOperatorWrapper.java:134)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .close(StreamOperatorWrapper.java:80)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .closeOperators(OperatorChain.java:302)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
>>>> StreamTask.java:576)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:544)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>>> ExceptionInChainedOperatorException: Could not forward element to next
>>>> operator
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>>> .emitWatermark(CountingOutput.java:41)
>>>>     at org.apache.flink.streaming.runtime.operators.
>>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
>>>> TimestampsAndWatermarksOperator.java:165)
>>>>     at org.apache.flink.api.common.eventtime.
>>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
>>>> BoundedOutOfOrdernessWatermarks.java:69)
>>>>     at org.apache.flink.streaming.runtime.operators.
>>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
>>>> .java:125)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>>> .runThrowing(StreamTaskActionExecutor.java:92)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .closeOperator(StreamOperatorWrapper.java:203)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>>> .runThrowing(StreamTaskActionExecutor.java:92)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
>>>> .java:78)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>>>>     ... 9 more
>>>> Caused by: java.lang.RuntimeException
>>>>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
>>>> .emitWatermark(RecordWriterOutput.java:123)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain
>>>> .java:762)
>>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>>> .emitWatermark(CountingOutput.java:41)
>>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>>> .processWatermark(AbstractStreamOperator.java:570)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>>>>     ... 21 more
>>>> Caused by: java.lang.IllegalStateException
>>>>     at org.apache.flink.util.Preconditions.checkState(Preconditions
>>>> .java:179)
>>>>     at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(
>>>> BufferBuilder.java:83)
>>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer
>>>> .java:90)
>>>>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter
>>>> .copyFromSerializerToTargetChannel(RecordWriter.java:136)
>>>>     at org.apache.flink.runtime.io.network.api.writer.
>>>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter
>>>> .java:80)
>>>>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
>>>> .emitWatermark(RecordWriterOutput.java:121)
>>>>     ... 25 more
>>>> ```
>>>>
>>>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> For querying the savepoint status, a get request could be issued to
>>>>>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and
>>>>> position
>>>>> of the savepoint. But if the job is running with some kind of per-job
>>>>> mode and
>>>>> JobMaster is gone after the stop-with-savepoint, the request might not
>>>>> be
>>>>> available.
>>>>>
>>>>> For the kafka source, have you ever found some exception or some
>>>>> messages in the
>>>>> TaskManager's log when it could not be stopped ?
>>>>>
>>>>> Best,
>>>>> Yun
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
>>>>>
>>>>>
>>>>>
>>>>> ------------------Original Mail ------------------
>>>>> *Sender:*Thomas Wang <w...@datability.io>
>>>>> *Send Date:*Sat Jun 5 00:47:47 2021
>>>>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>>>>> *CC:*user <user@flink.apache.org>
>>>>> *Subject:*Re: Failed to cancel a job using the STOP rest API
>>>>>
>>>>>> Hi Yun,
>>>>>>
>>>>>> Thanks for your reply. We are not using any legacy source. For this
>>>>>> specific job, there is only one source that is using FlinkKafkaConsumer
>>>>>> which I assume has the correct cancel() method implemented.
>>>>>>
>>>>>> Also could you suggest how I could use the "request-id" to get the
>>>>>> savepoint location?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>>>>
>>>>>>> Hi Thomas,
>>>>>>>
>>>>>>> I think you are right that the CLI is also using the same rest API
>>>>>>> underlying, and since
>>>>>>> the response of the rest API is ok and the savepoint is triggered
>>>>>>> successfully, I reckon
>>>>>>> that it might not be due to rest API process, and we might still
>>>>>>> first focus on the
>>>>>>> stop-with-savepoint process.
>>>>>>>
>>>>>>> Currently stop-with-savepoint would first do a savepoint, then
>>>>>>> cancel all the sources to
>>>>>>> stop the job. Thus are the sources all legacy source (namely the one
>>>>>>> using SourceFunction) ?
>>>>>>> and does the source implement the cancel() method correctly ?
>>>>>>>
>>>>>>> Best,
>>>>>>> Yun
>>>>>>>
>>>>>>> ------------------------------------------------------------------
>>>>>>> From:Thomas Wang <w...@datability.io>
>>>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55
>>>>>>> To:user <user@flink.apache.org>
>>>>>>> Subject:Failed to cancel a job using the STOP rest API
>>>>>>>
>>>>>>> Hi, Flink community,
>>>>>>>
>>>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm
>>>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled
>>>>>>> successfully while other times, they couldn't. Either way, the POST 
>>>>>>> request
>>>>>>> is accepted with a status code 202 and a "request-id".
>>>>>>>
>>>>>>> From the Flink UI, I can see the savepoint being completed
>>>>>>> successfully. However the job is still in running state afterwards. The 
>>>>>>> CLI
>>>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop 
>>>>>>> the
>>>>>>> job and get the resulting savepoint location. If I understand
>>>>>>> this correctly, the CLI should be using the same REST API behind the
>>>>>>> scenes, isn't it?
>>>>>>>
>>>>>>> Here is my POST request URL: `http://
>>>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>>>>>>>
>>>>>>> Here is the BODY of the request:
>>>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>>>>>>>
>>>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>>>>>>>
>>>>>>> Any suggestions on how I can debug this?
>>>>>>>
>>>>>>> Another question is, given the response "request-id", which endpoint
>>>>>>> should I query to get the status of the request? Most importantly, where
>>>>>>> can I get the expected savepoint location?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to