This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.
Thomas On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[email protected]> wrote: > Hi Thoms, > > Very thanks for reporting the exceptions, and it seems to be not work as > expected to me... > Could you also show us the dag of the job ? And does some operators in the > source task > use multiple-threads to emit records? > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Thomas Wang <[email protected]> > *Send Date:*Sun Jun 6 04:02:20 2021 > *Recipients:*Yun Gao <[email protected]> > *CC:*user <[email protected]> > *Subject:*Re: Re: Failed to cancel a job using the STOP rest API > >> One thing I noticed is that if I set drain = true, the job could be >> stopped correctly. Maybe that's because I'm using a Parquet file sink which >> is a bulk-encoded format and only writes to disk during checkpoints? >> >> Thomas >> >> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[email protected]> wrote: >> >>> Hi Yun, >>> >>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm >>> not quite sure what they mean though. Any hints? >>> >>> Thanks. >>> >>> Thomas >>> >>> ``` >>> 2021-06-05 10:02:51 >>> java.util.concurrent.ExecutionException: >>> org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at java.util.concurrent.CompletableFuture.reportGet( >>> CompletableFuture.java:357) >>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>> .java:1928) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .close(StreamOperatorWrapper.java:130) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .close(StreamOperatorWrapper.java:134) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .close(StreamOperatorWrapper.java:80) >>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>> .closeOperators(OperatorChain.java:302) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( >>> StreamTask.java:576) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:544) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at org.apache.flink.streaming.runtime.tasks. >>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) >>> at org.apache.flink.streaming.api.operators.CountingOutput >>> .emitWatermark(CountingOutput.java:41) >>> at org.apache.flink.streaming.runtime.operators. >>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( >>> TimestampsAndWatermarksOperator.java:165) >>> at org.apache.flink.api.common.eventtime. >>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit( >>> BoundedOutOfOrdernessWatermarks.java:69) >>> at org.apache.flink.streaming.runtime.operators. >>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator >>> .java:125) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205) >>> at org.apache.flink.streaming.runtime.tasks. >>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>> .runThrowing(StreamTaskActionExecutor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .closeOperator(StreamOperatorWrapper.java:203) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) >>> at org.apache.flink.streaming.runtime.tasks. >>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>> .runThrowing(StreamTaskActionExecutor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>> .java:78) >>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) >>> ... 9 more >>> Caused by: java.lang.RuntimeException >>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput >>> .emitWatermark(RecordWriterOutput.java:123) >>> at org.apache.flink.streaming.runtime.tasks. >>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain >>> .java:762) >>> at org.apache.flink.streaming.api.operators.CountingOutput >>> .emitWatermark(CountingOutput.java:41) >>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>> .processWatermark(AbstractStreamOperator.java:570) >>> at org.apache.flink.streaming.runtime.tasks. >>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) >>> ... 21 more >>> Caused by: java.lang.IllegalStateException >>> at org.apache.flink.util.Preconditions.checkState(Preconditions >>> .java:179) >>> at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append( >>> BufferBuilder.java:83) >>> at org.apache.flink.runtime.io.network.api.serialization. >>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer >>> .java:90) >>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter >>> .copyFromSerializerToTargetChannel(RecordWriter.java:136) >>> at org.apache.flink.runtime.io.network.api.writer. >>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter >>> .java:80) >>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput >>> .emitWatermark(RecordWriterOutput.java:121) >>> ... 25 more >>> ``` >>> >>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[email protected]> wrote: >>> >>>> Hi Thomas, >>>> >>>> For querying the savepoint status, a get request could be issued to >>>> /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and >>>> position >>>> of the savepoint. But if the job is running with some kind of per-job >>>> mode and >>>> JobMaster is gone after the stop-with-savepoint, the request might not >>>> be >>>> available. >>>> >>>> For the kafka source, have you ever found some exception or some >>>> messages in the >>>> TaskManager's log when it could not be stopped ? >>>> >>>> Best, >>>> Yun >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid >>>> >>>> >>>> >>>> ------------------Original Mail ------------------ >>>> *Sender:*Thomas Wang <[email protected]> >>>> *Send Date:*Sat Jun 5 00:47:47 2021 >>>> *Recipients:*Yun Gao <[email protected]> >>>> *CC:*user <[email protected]> >>>> *Subject:*Re: Failed to cancel a job using the STOP rest API >>>> >>>>> Hi Yun, >>>>> >>>>> Thanks for your reply. We are not using any legacy source. For this >>>>> specific job, there is only one source that is using FlinkKafkaConsumer >>>>> which I assume has the correct cancel() method implemented. >>>>> >>>>> Also could you suggest how I could use the "request-id" to get the >>>>> savepoint location? >>>>> >>>>> Thanks. >>>>> >>>>> Thomas >>>>> >>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[email protected]> wrote: >>>>> >>>>>> Hi Thomas, >>>>>> >>>>>> I think you are right that the CLI is also using the same rest API >>>>>> underlying, and since >>>>>> the response of the rest API is ok and the savepoint is triggered >>>>>> successfully, I reckon >>>>>> that it might not be due to rest API process, and we might still >>>>>> first focus on the >>>>>> stop-with-savepoint process. >>>>>> >>>>>> Currently stop-with-savepoint would first do a savepoint, then cancel >>>>>> all the sources to >>>>>> stop the job. Thus are the sources all legacy source (namely the one >>>>>> using SourceFunction) ? >>>>>> and does the source implement the cancel() method correctly ? >>>>>> >>>>>> Best, >>>>>> Yun >>>>>> >>>>>> ------------------------------------------------------------------ >>>>>> From:Thomas Wang <[email protected]> >>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55 >>>>>> To:user <[email protected]> >>>>>> Subject:Failed to cancel a job using the STOP rest API >>>>>> >>>>>> Hi, Flink community, >>>>>> >>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm >>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled >>>>>> successfully while other times, they couldn't. Either way, the POST >>>>>> request >>>>>> is accepted with a status code 202 and a "request-id". >>>>>> >>>>>> From the Flink UI, I can see the savepoint being completed >>>>>> successfully. However the job is still in running state afterwards. The >>>>>> CLI >>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop >>>>>> the >>>>>> job and get the resulting savepoint location. If I understand >>>>>> this correctly, the CLI should be using the same REST API behind the >>>>>> scenes, isn't it? >>>>>> >>>>>> Here is my POST request URL: `http:// >>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`. >>>>>> >>>>>> Here is the BODY of the request: >>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`. >>>>>> >>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0. >>>>>> >>>>>> Any suggestions on how I can debug this? >>>>>> >>>>>> Another question is, given the response "request-id", which endpoint >>>>>> should I query to get the status of the request? Most importantly, where >>>>>> can I get the expected savepoint location? >>>>>> >>>>>> Thanks. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>>
