Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.
Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)? Piotrek wt., 8 cze 2021 o 17:18 Kezhu Wang <kez...@gmail.com> napisał(a): > Could it be same as FLINK-21028[1] (titled as “Streaming application > didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? > > > [1]: https://issues.apache.org/jira/browse/FLINK-21028 > > > Best, > Kezhu Wang > > On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: > > Hi Thomas, > > I tried but do not re-produce the exception yet. I have filed > an issue for the exception first [1]. > > > > [1] https://issues.apache.org/jira/browse/FLINK-22928 > > > ------------------Original Mail ------------------ > *Sender:*Thomas Wang <w...@datability.io> > *Send Date:*Tue Jun 8 07:45:52 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API > >> This is actually a very simple job that reads from Kafka and writes to S3 >> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API >> and nothing custom. >> >> Thomas >> >> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote: >> >>> Hi Thoms, >>> >>> Very thanks for reporting the exceptions, and it seems to be not work as >>> expected to me... >>> Could you also show us the dag of the job ? And does some operators in >>> the source task >>> use multiple-threads to emit records? >>> >>> Best, >>> Yun >>> >>> >>> ------------------Original Mail ------------------ >>> *Sender:*Thomas Wang <w...@datability.io> >>> *Send Date:*Sun Jun 6 04:02:20 2021 >>> *Recipients:*Yun Gao <yungao...@aliyun.com> >>> *CC:*user <user@flink.apache.org> >>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API >>> >>>> One thing I noticed is that if I set drain = true, the job could be >>>> stopped correctly. Maybe that's because I'm using a Parquet file sink which >>>> is a bulk-encoded format and only writes to disk during checkpoints? >>>> >>>> Thomas >>>> >>>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote: >>>> >>>>> Hi Yun, >>>>> >>>>> Thanks for the tips. Yes, I do see some exceptions as copied below. >>>>> I'm not quite sure what they mean though. Any hints? >>>>> >>>>> Thanks. >>>>> >>>>> Thomas >>>>> >>>>> ``` >>>>> 2021-06-05 10:02:51 >>>>> java.util.concurrent.ExecutionException: >>>>> org.apache.flink.streaming.runtime.tasks. >>>>> ExceptionInChainedOperatorException: Could not forward element to >>>>> next operator >>>>> at java.util.concurrent.CompletableFuture.reportGet( >>>>> CompletableFuture.java:357) >>>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>>>> .java:1928) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .close(StreamOperatorWrapper.java:130) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .close(StreamOperatorWrapper.java:134) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .close(StreamOperatorWrapper.java:80) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>>>> .closeOperators(OperatorChain.java:302) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .afterInvoke(StreamTask.java:576) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:544) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: org.apache.flink.streaming.runtime.tasks. >>>>> ExceptionInChainedOperatorException: Could not forward element to >>>>> next operator >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) >>>>> at org.apache.flink.streaming.api.operators.CountingOutput >>>>> .emitWatermark(CountingOutput.java:41) >>>>> at org.apache.flink.streaming.runtime.operators. >>>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( >>>>> TimestampsAndWatermarksOperator.java:165) >>>>> at org.apache.flink.api.common.eventtime. >>>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit( >>>>> BoundedOutOfOrdernessWatermarks.java:69) >>>>> at org.apache.flink.streaming.runtime.operators. >>>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator >>>>> .java:125) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>>>> .runThrowing(StreamTaskActionExecutor.java:92) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .closeOperator(StreamOperatorWrapper.java:203) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>>>> .runThrowing(StreamTaskActionExecutor.java:92) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>>>> .java:78) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) >>>>> ... 9 more >>>>> Caused by: java.lang.RuntimeException >>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput >>>>> .emitWatermark(RecordWriterOutput.java:123) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain >>>>> .java:762) >>>>> at org.apache.flink.streaming.api.operators.CountingOutput >>>>> .emitWatermark(CountingOutput.java:41) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>>> .processWatermark(AbstractStreamOperator.java:570) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) >>>>> ... 21 more >>>>> Caused by: java.lang.IllegalStateException >>>>> at org.apache.flink.util.Preconditions.checkState(Preconditions >>>>> .java:179) >>>>> at org.apache.flink.runtime.io.network.buffer.BufferBuilder >>>>> .append(BufferBuilder.java:83) >>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer >>>>> .java:90) >>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter >>>>> .copyFromSerializerToTargetChannel(RecordWriter.java:136) >>>>> at org.apache.flink.runtime.io.network.api.writer. >>>>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter >>>>> .java:80) >>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput >>>>> .emitWatermark(RecordWriterOutput.java:121) >>>>> ... 25 more >>>>> ``` >>>>> >>>>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote: >>>>> >>>>>> Hi Thomas, >>>>>> >>>>>> For querying the savepoint status, a get request could be issued to >>>>>> /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status >>>>>> and position >>>>>> of the savepoint. But if the job is running with some kind of per-job >>>>>> mode and >>>>>> JobMaster is gone after the stop-with-savepoint, the request might >>>>>> not be >>>>>> available. >>>>>> >>>>>> For the kafka source, have you ever found some exception or some >>>>>> messages in the >>>>>> TaskManager's log when it could not be stopped ? >>>>>> >>>>>> Best, >>>>>> Yun >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid >>>>>> >>>>>> >>>>>> >>>>>> ------------------Original Mail ------------------ >>>>>> *Sender:*Thomas Wang <w...@datability.io> >>>>>> *Send Date:*Sat Jun 5 00:47:47 2021 >>>>>> *Recipients:*Yun Gao <yungao...@aliyun.com> >>>>>> *CC:*user <user@flink.apache.org> >>>>>> *Subject:*Re: Failed to cancel a job using the STOP rest API >>>>>> >>>>>>> Hi Yun, >>>>>>> >>>>>>> Thanks for your reply. We are not using any legacy source. For this >>>>>>> specific job, there is only one source that is using FlinkKafkaConsumer >>>>>>> which I assume has the correct cancel() method implemented. >>>>>>> >>>>>>> Also could you suggest how I could use the "request-id" to get the >>>>>>> savepoint location? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote: >>>>>>> >>>>>>>> Hi Thomas, >>>>>>>> >>>>>>>> I think you are right that the CLI is also using the same rest API >>>>>>>> underlying, and since >>>>>>>> the response of the rest API is ok and the savepoint is triggered >>>>>>>> successfully, I reckon >>>>>>>> that it might not be due to rest API process, and we might still >>>>>>>> first focus on the >>>>>>>> stop-with-savepoint process. >>>>>>>> >>>>>>>> Currently stop-with-savepoint would first do a savepoint, then >>>>>>>> cancel all the sources to >>>>>>>> stop the job. Thus are the sources all legacy source (namely the >>>>>>>> one using SourceFunction) ? >>>>>>>> and does the source implement the cancel() method correctly ? >>>>>>>> >>>>>>>> Best, >>>>>>>> Yun >>>>>>>> >>>>>>>> ------------------------------------------------------------------ >>>>>>>> From:Thomas Wang <w...@datability.io> >>>>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55 >>>>>>>> To:user <user@flink.apache.org> >>>>>>>> Subject:Failed to cancel a job using the STOP rest API >>>>>>>> >>>>>>>> Hi, Flink community, >>>>>>>> >>>>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm >>>>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled >>>>>>>> successfully while other times, they couldn't. Either way, the POST >>>>>>>> request >>>>>>>> is accepted with a status code 202 and a "request-id". >>>>>>>> >>>>>>>> From the Flink UI, I can see the savepoint being completed >>>>>>>> successfully. However the job is still in running state afterwards. >>>>>>>> The CLI >>>>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop >>>>>>>> the >>>>>>>> job and get the resulting savepoint location. If I understand >>>>>>>> this correctly, the CLI should be using the same REST API behind the >>>>>>>> scenes, isn't it? >>>>>>>> >>>>>>>> Here is my POST request URL: `http:// >>>>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`. >>>>>>>> >>>>>>>> Here is the BODY of the request: >>>>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`. >>>>>>>> >>>>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0. >>>>>>>> >>>>>>>> Any suggestions on how I can debug this? >>>>>>>> >>>>>>>> Another question is, given the response "request-id", which >>>>>>>> endpoint should I query to get the status of the request? Most >>>>>>>> importantly, >>>>>>>> where can I get the expected savepoint location? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>>