Hi Yun, Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?
Thanks. Thomas ``` 2021-06-05 10:02:51 java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture .java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java: 1928) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close( StreamOperatorWrapper.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close( StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close( StreamOperatorWrapper.java:80) at org.apache.flink.streaming.runtime.tasks.OperatorChain .closeOperators(OperatorChain.java:302) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( StreamTask.java:576) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:544) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput .emitWatermark(OperatorChain.java:642) at org.apache.flink.streaming.api.operators.CountingOutput .emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.runtime.operators. TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( TimestampsAndWatermarksOperator.java:165) at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks .onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69) at org.apache.flink.streaming.runtime.operators. TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java: 125) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .lambda$closeOperator$5(StreamOperatorWrapper.java:205) at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .closeOperator(StreamOperatorWrapper.java:203) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java: 78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl .tryYield(MailboxExecutorImpl.java:90) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ... 9 more Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput .emitWatermark(RecordWriterOutput.java:123) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java: 762) at org.apache.flink.streaming.api.operators.CountingOutput .emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .processWatermark(AbstractStreamOperator.java:570) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput .emitWatermark(OperatorChain.java:638) ... 21 more Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179 ) at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append( BufferBuilder.java:83) at org.apache.flink.runtime.io.network.api.serialization. SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java: 90) at org.apache.flink.runtime.io.network.api.writer.RecordWriter .copyFromSerializerToTargetChannel(RecordWriter.java:136) at org.apache.flink.runtime.io.network.api.writer. ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java: 80) at org.apache.flink.streaming.runtime.io.RecordWriterOutput .emitWatermark(RecordWriterOutput.java:121) ... 25 more ``` On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi Thomas, > > For querying the savepoint status, a get request could be issued to > /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and > position > of the savepoint. But if the job is running with some kind of per-job mode > and > JobMaster is gone after the stop-with-savepoint, the request might not be > available. > > For the kafka source, have you ever found some exception or some messages > in the > TaskManager's log when it could not be stopped ? > > Best, > Yun > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid > > > > ------------------Original Mail ------------------ > *Sender:*Thomas Wang <w...@datability.io> > *Send Date:*Sat Jun 5 00:47:47 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Failed to cancel a job using the STOP rest API > >> Hi Yun, >> >> Thanks for your reply. We are not using any legacy source. For this >> specific job, there is only one source that is using FlinkKafkaConsumer >> which I assume has the correct cancel() method implemented. >> >> Also could you suggest how I could use the "request-id" to get the >> savepoint location? >> >> Thanks. >> >> Thomas >> >> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote: >> >>> Hi Thomas, >>> >>> I think you are right that the CLI is also using the same rest API >>> underlying, and since >>> the response of the rest API is ok and the savepoint is triggered >>> successfully, I reckon >>> that it might not be due to rest API process, and we might still first >>> focus on the >>> stop-with-savepoint process. >>> >>> Currently stop-with-savepoint would first do a savepoint, then cancel >>> all the sources to >>> stop the job. Thus are the sources all legacy source (namely the one >>> using SourceFunction) ? >>> and does the source implement the cancel() method correctly ? >>> >>> Best, >>> Yun >>> >>> ------------------------------------------------------------------ >>> From:Thomas Wang <w...@datability.io> >>> Send Time:2021 Jun. 4 (Fri.) 12:55 >>> To:user <user@flink.apache.org> >>> Subject:Failed to cancel a job using the STOP rest API >>> >>> Hi, Flink community, >>> >>> I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing >>> some inconsistent results. Sometimes, jobs could be cancelled successfully >>> while other times, they couldn't. Either way, the POST request is accepted >>> with a status code 202 and a "request-id". >>> >>> From the Flink UI, I can see the savepoint being completed successfully. >>> However the job is still in running state afterwards. The CLI command >>> `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and >>> get the resulting savepoint location. If I understand this correctly, the >>> CLI should be using the same REST API behind the scenes, isn't it? >>> >>> Here is my POST request URL: `http:// >>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`. >>> >>> Here is the BODY of the request: >>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`. >>> >>> I'm using Flink 1.11.2 Commit ID: DeadD0d0. >>> >>> Any suggestions on how I can debug this? >>> >>> Another question is, given the response "request-id", which endpoint >>> should I query to get the status of the request? Most importantly, where >>> can I get the expected savepoint location? >>> >>> Thanks. >>> >>> Thomas >>> >>> >>>