Yes good catch Kezhu, IllegalStateException sounds very much like
FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't
been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang <kez...@gmail.com> napisał(a):

> Could it be same as FLINK-21028[1] (titled as “Streaming application
> didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-21028
>
>
> Best,
> Kezhu Wang
>
> On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:
>
> Hi Thomas,
>
> I tried but do not re-produce the exception yet. I have filed
> an issue for the exception first [1].
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-22928
>
>
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <w...@datability.io>
> *Send Date:*Tue Jun 8 07:45:52 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API
>
>> This is actually a very simple job that reads from Kafka and writes to S3
>> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
>> and nothing custom.
>>
>> Thomas
>>
>> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> Hi Thoms,
>>>
>>> Very thanks for reporting the exceptions, and it seems to be not work as
>>> expected to me...
>>> Could you also show us the dag of the job ? And does some operators in
>>> the source task
>>> use multiple-threads to emit records?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Thomas Wang <w...@datability.io>
>>> *Send Date:*Sun Jun 6 04:02:20 2021
>>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>>> *CC:*user <user@flink.apache.org>
>>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>>>
>>>> One thing I noticed is that if I set drain = true, the job could be
>>>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>>>> is a bulk-encoded format and only writes to disk during checkpoints?
>>>>
>>>> Thomas
>>>>
>>>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote:
>>>>
>>>>> Hi Yun,
>>>>>
>>>>> Thanks for the tips. Yes, I do see some exceptions as copied below.
>>>>> I'm not quite sure what they mean though. Any hints?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Thomas
>>>>>
>>>>> ```
>>>>> 2021-06-05 10:02:51
>>>>> java.util.concurrent.ExecutionException:
>>>>> org.apache.flink.streaming.runtime.tasks.
>>>>> ExceptionInChainedOperatorException: Could not forward element to
>>>>> next operator
>>>>>     at java.util.concurrent.CompletableFuture.reportGet(
>>>>> CompletableFuture.java:357)
>>>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>>> .java:1928)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .close(StreamOperatorWrapper.java:130)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .close(StreamOperatorWrapper.java:134)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .close(StreamOperatorWrapper.java:80)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>>> .closeOperators(OperatorChain.java:302)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .afterInvoke(StreamTask.java:576)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>> StreamTask.java:544)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>>>> ExceptionInChainedOperatorException: Could not forward element to
>>>>> next operator
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>>>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>>>> .emitWatermark(CountingOutput.java:41)
>>>>>     at org.apache.flink.streaming.runtime.operators.
>>>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
>>>>> TimestampsAndWatermarksOperator.java:165)
>>>>>     at org.apache.flink.api.common.eventtime.
>>>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
>>>>> BoundedOutOfOrdernessWatermarks.java:69)
>>>>>     at org.apache.flink.streaming.runtime.operators.
>>>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
>>>>> .java:125)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>>>> .runThrowing(StreamTaskActionExecutor.java:92)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .closeOperator(StreamOperatorWrapper.java:203)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>>>> .runThrowing(StreamTaskActionExecutor.java:92)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
>>>>> .java:78)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>>>>>     ... 9 more
>>>>> Caused by: java.lang.RuntimeException
>>>>>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
>>>>> .emitWatermark(RecordWriterOutput.java:123)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain
>>>>> .java:762)
>>>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>>>> .emitWatermark(CountingOutput.java:41)
>>>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>>>> .processWatermark(AbstractStreamOperator.java:570)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>>>>>     ... 21 more
>>>>> Caused by: java.lang.IllegalStateException
>>>>>     at org.apache.flink.util.Preconditions.checkState(Preconditions
>>>>> .java:179)
>>>>>     at org.apache.flink.runtime.io.network.buffer.BufferBuilder
>>>>> .append(BufferBuilder.java:83)
>>>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>>>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer
>>>>> .java:90)
>>>>>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter
>>>>> .copyFromSerializerToTargetChannel(RecordWriter.java:136)
>>>>>     at org.apache.flink.runtime.io.network.api.writer.
>>>>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter
>>>>> .java:80)
>>>>>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
>>>>> .emitWatermark(RecordWriterOutput.java:121)
>>>>>     ... 25 more
>>>>> ```
>>>>>
>>>>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> For querying the savepoint status, a get request could be issued to
>>>>>>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status
>>>>>> and position
>>>>>> of the savepoint. But if the job is running with some kind of per-job
>>>>>> mode and
>>>>>> JobMaster is gone after the stop-with-savepoint, the request might
>>>>>> not be
>>>>>> available.
>>>>>>
>>>>>> For the kafka source, have you ever found some exception or some
>>>>>> messages in the
>>>>>> TaskManager's log when it could not be stopped ?
>>>>>>
>>>>>> Best,
>>>>>> Yun
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------Original Mail ------------------
>>>>>> *Sender:*Thomas Wang <w...@datability.io>
>>>>>> *Send Date:*Sat Jun 5 00:47:47 2021
>>>>>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>>>>>> *CC:*user <user@flink.apache.org>
>>>>>> *Subject:*Re: Failed to cancel a job using the STOP rest API
>>>>>>
>>>>>>> Hi Yun,
>>>>>>>
>>>>>>> Thanks for your reply. We are not using any legacy source. For this
>>>>>>> specific job, there is only one source that is using FlinkKafkaConsumer
>>>>>>> which I assume has the correct cancel() method implemented.
>>>>>>>
>>>>>>> Also could you suggest how I could use the "request-id" to get the
>>>>>>> savepoint location?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>>>>>
>>>>>>>> Hi Thomas,
>>>>>>>>
>>>>>>>> I think you are right that the CLI is also using the same rest API
>>>>>>>> underlying, and since
>>>>>>>> the response of the rest API is ok and the savepoint is triggered
>>>>>>>> successfully, I reckon
>>>>>>>> that it might not be due to rest API process, and we might still
>>>>>>>> first focus on the
>>>>>>>> stop-with-savepoint process.
>>>>>>>>
>>>>>>>> Currently stop-with-savepoint would first do a savepoint, then
>>>>>>>> cancel all the sources to
>>>>>>>> stop the job. Thus are the sources all legacy source (namely the
>>>>>>>> one using SourceFunction) ?
>>>>>>>> and does the source implement the cancel() method correctly ?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Yun
>>>>>>>>
>>>>>>>> ------------------------------------------------------------------
>>>>>>>> From:Thomas Wang <w...@datability.io>
>>>>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55
>>>>>>>> To:user <user@flink.apache.org>
>>>>>>>> Subject:Failed to cancel a job using the STOP rest API
>>>>>>>>
>>>>>>>> Hi, Flink community,
>>>>>>>>
>>>>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm
>>>>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled
>>>>>>>> successfully while other times, they couldn't. Either way, the POST 
>>>>>>>> request
>>>>>>>> is accepted with a status code 202 and a "request-id".
>>>>>>>>
>>>>>>>> From the Flink UI, I can see the savepoint being completed
>>>>>>>> successfully. However the job is still in running state afterwards. 
>>>>>>>> The CLI
>>>>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop 
>>>>>>>> the
>>>>>>>> job and get the resulting savepoint location. If I understand
>>>>>>>> this correctly, the CLI should be using the same REST API behind the
>>>>>>>> scenes, isn't it?
>>>>>>>>
>>>>>>>> Here is my POST request URL: `http://
>>>>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>>>>>>>>
>>>>>>>> Here is the BODY of the request:
>>>>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>>>>>>>>
>>>>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>>>>>>>>
>>>>>>>> Any suggestions on how I can debug this?
>>>>>>>>
>>>>>>>> Another question is, given the response "request-id", which
>>>>>>>> endpoint should I query to get the status of the request? Most 
>>>>>>>> importantly,
>>>>>>>> where can I get the expected savepoint location?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to