Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?
[1]: https://issues.apache.org/jira/browse/FLINK-21028 Best, Kezhu Wang On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: Hi Thomas, I tried but do not re-produce the exception yet. I have filed an issue for the exception first [1]. [1] https://issues.apache.org/jira/browse/FLINK-22928 ------------------Original Mail ------------------ *Sender:*Thomas Wang <w...@datability.io> *Send Date:*Tue Jun 8 07:45:52 2021 *Recipients:*Yun Gao <yungao...@aliyun.com> *CC:*user <user@flink.apache.org> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API > This is actually a very simple job that reads from Kafka and writes to S3 > using the StreamingFileSink w/ Parquet format. I'm all using Flink's API > and nothing custom. > > Thomas > > On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote: > >> Hi Thoms, >> >> Very thanks for reporting the exceptions, and it seems to be not work as >> expected to me... >> Could you also show us the dag of the job ? And does some operators in >> the source task >> use multiple-threads to emit records? >> >> Best, >> Yun >> >> >> ------------------Original Mail ------------------ >> *Sender:*Thomas Wang <w...@datability.io> >> *Send Date:*Sun Jun 6 04:02:20 2021 >> *Recipients:*Yun Gao <yungao...@aliyun.com> >> *CC:*user <user@flink.apache.org> >> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API >> >>> One thing I noticed is that if I set drain = true, the job could be >>> stopped correctly. Maybe that's because I'm using a Parquet file sink which >>> is a bulk-encoded format and only writes to disk during checkpoints? >>> >>> Thomas >>> >>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote: >>> >>>> Hi Yun, >>>> >>>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm >>>> not quite sure what they mean though. Any hints? >>>> >>>> Thanks. >>>> >>>> Thomas >>>> >>>> ``` >>>> 2021-06-05 10:02:51 >>>> java.util.concurrent.ExecutionException: >>>> org.apache.flink.streaming.runtime.tasks. >>>> ExceptionInChainedOperatorException: Could not forward element to next >>>> operator >>>> at java.util.concurrent.CompletableFuture.reportGet( >>>> CompletableFuture.java:357) >>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>>> .java:1928) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .close(StreamOperatorWrapper.java:130) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .close(StreamOperatorWrapper.java:134) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .close(StreamOperatorWrapper.java:80) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>>> .closeOperators(OperatorChain.java:302) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( >>>> StreamTask.java:576) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:544) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.streaming.runtime.tasks. >>>> ExceptionInChainedOperatorException: Could not forward element to next >>>> operator >>>> at org.apache.flink.streaming.runtime.tasks. >>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) >>>> at org.apache.flink.streaming.api.operators.CountingOutput >>>> .emitWatermark(CountingOutput.java:41) >>>> at org.apache.flink.streaming.runtime.operators. >>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( >>>> TimestampsAndWatermarksOperator.java:165) >>>> at org.apache.flink.api.common.eventtime. >>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit( >>>> BoundedOutOfOrdernessWatermarks.java:69) >>>> at org.apache.flink.streaming.runtime.operators. >>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator >>>> .java:125) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>>> .runThrowing(StreamTaskActionExecutor.java:92) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .closeOperator(StreamOperatorWrapper.java:203) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>>> .runThrowing(StreamTaskActionExecutor.java:92) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>>> .java:78) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) >>>> ... 9 more >>>> Caused by: java.lang.RuntimeException >>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput >>>> .emitWatermark(RecordWriterOutput.java:123) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain >>>> .java:762) >>>> at org.apache.flink.streaming.api.operators.CountingOutput >>>> .emitWatermark(CountingOutput.java:41) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>> .processWatermark(AbstractStreamOperator.java:570) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) >>>> ... 21 more >>>> Caused by: java.lang.IllegalStateException >>>> at org.apache.flink.util.Preconditions.checkState(Preconditions >>>> .java:179) >>>> at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append( >>>> BufferBuilder.java:83) >>>> at org.apache.flink.runtime.io.network.api.serialization. >>>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer >>>> .java:90) >>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter >>>> .copyFromSerializerToTargetChannel(RecordWriter.java:136) >>>> at org.apache.flink.runtime.io.network.api.writer. >>>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter >>>> .java:80) >>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput >>>> .emitWatermark(RecordWriterOutput.java:121) >>>> ... 25 more >>>> ``` >>>> >>>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote: >>>> >>>>> Hi Thomas, >>>>> >>>>> For querying the savepoint status, a get request could be issued to >>>>> /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and >>>>> position >>>>> of the savepoint. But if the job is running with some kind of per-job >>>>> mode and >>>>> JobMaster is gone after the stop-with-savepoint, the request might not >>>>> be >>>>> available. >>>>> >>>>> For the kafka source, have you ever found some exception or some >>>>> messages in the >>>>> TaskManager's log when it could not be stopped ? >>>>> >>>>> Best, >>>>> Yun >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid >>>>> >>>>> >>>>> >>>>> ------------------Original Mail ------------------ >>>>> *Sender:*Thomas Wang <w...@datability.io> >>>>> *Send Date:*Sat Jun 5 00:47:47 2021 >>>>> *Recipients:*Yun Gao <yungao...@aliyun.com> >>>>> *CC:*user <user@flink.apache.org> >>>>> *Subject:*Re: Failed to cancel a job using the STOP rest API >>>>> >>>>>> Hi Yun, >>>>>> >>>>>> Thanks for your reply. We are not using any legacy source. For this >>>>>> specific job, there is only one source that is using FlinkKafkaConsumer >>>>>> which I assume has the correct cancel() method implemented. >>>>>> >>>>>> Also could you suggest how I could use the "request-id" to get the >>>>>> savepoint location? >>>>>> >>>>>> Thanks. >>>>>> >>>>>> Thomas >>>>>> >>>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote: >>>>>> >>>>>>> Hi Thomas, >>>>>>> >>>>>>> I think you are right that the CLI is also using the same rest API >>>>>>> underlying, and since >>>>>>> the response of the rest API is ok and the savepoint is triggered >>>>>>> successfully, I reckon >>>>>>> that it might not be due to rest API process, and we might still >>>>>>> first focus on the >>>>>>> stop-with-savepoint process. >>>>>>> >>>>>>> Currently stop-with-savepoint would first do a savepoint, then >>>>>>> cancel all the sources to >>>>>>> stop the job. Thus are the sources all legacy source (namely the one >>>>>>> using SourceFunction) ? >>>>>>> and does the source implement the cancel() method correctly ? >>>>>>> >>>>>>> Best, >>>>>>> Yun >>>>>>> >>>>>>> ------------------------------------------------------------------ >>>>>>> From:Thomas Wang <w...@datability.io> >>>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55 >>>>>>> To:user <user@flink.apache.org> >>>>>>> Subject:Failed to cancel a job using the STOP rest API >>>>>>> >>>>>>> Hi, Flink community, >>>>>>> >>>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm >>>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled >>>>>>> successfully while other times, they couldn't. Either way, the POST >>>>>>> request >>>>>>> is accepted with a status code 202 and a "request-id". >>>>>>> >>>>>>> From the Flink UI, I can see the savepoint being completed >>>>>>> successfully. However the job is still in running state afterwards. The >>>>>>> CLI >>>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop >>>>>>> the >>>>>>> job and get the resulting savepoint location. If I understand >>>>>>> this correctly, the CLI should be using the same REST API behind the >>>>>>> scenes, isn't it? >>>>>>> >>>>>>> Here is my POST request URL: `http:// >>>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`. >>>>>>> >>>>>>> Here is the BODY of the request: >>>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`. >>>>>>> >>>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0. >>>>>>> >>>>>>> Any suggestions on how I can debug this? >>>>>>> >>>>>>> Another question is, given the response "request-id", which endpoint >>>>>>> should I query to get the status of the request? Most importantly, where >>>>>>> can I get the expected savepoint location? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>>