[ https://issues.apache.org/jira/browse/FLINK-20542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
谢波 closed FLINK-20542. ---------------------- Resolution: Not A Problem > flink-1.11.2 - stream join stream savepoint fail > ------------------------------------------------ > > Key: FLINK-20542 > URL: https://issues.apache.org/jira/browse/FLINK-20542 > Project: Flink > Issue Type: Bug > Environment: <flink.version>1.11.2</flink.version> > <scala.binary.version>2.11</scala.binary.version> > > {color:#ff0000}tEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), > Time.hours(24)){color} > > env.enableCheckpointing(2 * 60 * 1000L) > val checkpointConfig = env.getCheckpointConfig > checkpointConfig.setCheckpointTimeout(20 * 60 * 1000) > checkpointConfig.setTolerableCheckpointFailureNumber(2) > > {color:#ff0000}env.setStateBackend({color} > {color:#ff0000} new RocksDBStateBackend(checkpointPath, true){color} > {color:#ff0000} .asInstanceOf[StateBackend]{color} > {color:#ff0000}){color} > Reporter: 谢波 > Priority: Major > > I have a SQL job that uses stream join stream, but at savepoint, the job > fails. > I can't manually trigger savepoint right now because the task will die once > it's triggered. > Can anyone help?Thanks > > > 2020-12-09 10:46:21,801 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > [] - Found Yarn properties file under > /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn > properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 > INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found > Yarn properties file under /tmp/.yarn-properties-xiebo.2020-12-09 > 10:46:22,024 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil > [] - The configuration directory ('/home/xiebo/module/flink/conf') already > contains a LOG4J config file.If you want to use logback, then please delete > or rename the log configuration file.2020-12-09 10:46:22,414 INFO > org.apache.flink.yarn.YarnClusterDescriptor [] - No path for > the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2020-12-09 > 10:46:22,575 INFO org.apache.flink.yarn.YarnClusterDescriptor > [] - Found Web Interface shd183.yonghui.cn:45855 of application > 'application_1594105654926_6844100'.Triggering savepoint for job > 749a7a45d6324f57ac859d3eab55a56c.Waiting for response... > ------------------------------------------------------------ The program > finished with the following exception: > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > 749a7a45d6324f57ac859d3eab55a56c failed. at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:668) > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:646) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) > at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:643) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:934) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)Caused > by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. > at > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:764) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > akka.actor.ActorCell.invoke(ActorCell.scala:561) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > akka.dispatch.Mailbox.run(Mailbox.scala:225) at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused > by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$0(CheckpointCoordinator.java:467) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:494) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1663) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:932) > at > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:827) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused {color:#FF0000}by: > org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:493) > ... 10 moreCaused by: java.lang.Exception: Could not materialize checkpoint > 439 for operator Join(joinType=[LeftOuterJoin], where=[(transNumber = > transNumber0)], select={color}{color:#FF0000}[transNumber, retailNumber, > werks, businessDayDate, goodsId, retailQuantity, salesAmount, rtPlu,{color} > updateTime, keyBy, busi_grp_id, busi_grp_name, sales_dist, sales_dist_name, > region_id_new, region_name_new, prov_code, prov_name, city_code, city_name, > shop_name, vendor_id, vendor_name, firm_id, firm_name, div_id, dept_id, > dept_name, catg_l_id, catg_l_name, catg_m_id, catg_m_name, catg_s_id, > catg_s_name, goodsname, brand, brand_name, operation_type_id, > operation_type_name, efct_sign_id, efct_sign_name, shop_goods_sts_id, > shop_goods_sts_name, bravo_region_id, bravo_region_name, shop_belong, > shop_belong_desc, zone_id, zone_name, bd_id, bd_name, firm_g1_id, > firm_g1_name, firm_g2_id, firm_g2_name, pur_tax_rate, cost_price, > vendor_name_new, vendor_id_new, transNumber0, returnFlag, channelFlag, > commodityType, endTimestamp], leftInputSpec=[NoUniqueKey], > rightInputSpec=[NoUniqueKey]) -> Calc(select=[businessDayDate AS sdt, werks > AS shopid, CAST(((CAST((endTimestamp IS NULL IF updateTime IF endTimestamp)) > UNIX_TIMESTAMP _UTF-16LE'yyyy-MM-dd HH:mm:ss.SSS') * 1000)) AS updatetime, > retailNumber AS serialid, transNumber AS sheetid, goodsId AS goodsid, keyBy > AS key_by, CAST((endTimestamp IS NULL IF (FLAG(HOUR) EXTRACT updateTime) IF > (FLAG(HOUR) EXTRACT endTimestamp))) AS timeframe, region_id_new AS regionid, > region_name_new AS regionname, shop_name AS shopname, bravo_region_id AS > serviceregionid, bravo_region_name AS serviceregionname, shop_belong AS > shopbelongid, shop_belong_desc AS shopbelongname, sales_dist AS distid, > sales_dist_name AS distname, prov_code AS provinceid, prov_name AS > provincename, city_code AS cityid, city_name AS cityname, rtPlu AS pluid, > salesAmount AS salevalue, commodityType AS commoditytype, > CAST(_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS memnum, > busi_grp_id AS groupid, busi_grp_name AS groupname, div_id AS categoryid, > goodsname, (((retailQuantity * cost_price) * (1 + (pur_tax_rate / 100))) > ROUND 2) AS cost, retailQuantity AS amount, CAST(channelFlag) AS channelflag, > CAST(returnFlag) AS isreturn, firm_id AS firmid, firm_name AS firmname, > dept_id AS deptid, dept_name AS deptname, brand, brand_name AS brandname, > operation_type_id AS operationtypeid, operation_type_name AS > operationtypename, efct_sign_id AS efctsignid, efct_sign_name AS > efctsignname, shop_goods_sts_id AS shopgoodsstsid, shop_goods_sts_name AS > shopgoodsstsname, vendor_id AS vendorid, vendor_name AS vendorname, > vendor_id_new AS vendoridnew, vendor_name_new AS vendornamenew, catg_l_id AS > catglid, catg_l_name AS catglname, catg_m_id AS catgmid, catg_m_name AS > catgmname, catg_s_id AS catgsid, catg_s_name AS catgsname, zone_id AS zoneid, > zone_name AS zonename, bd_id AS bdid, bd_name AS bdname, firm_g1_id AS > firmg1id, firm_g1_name AS firmg1name, firm_g2_id AS firmg2id, firm_g2_name AS > firmg2name, ((cost_price * (1 + (pur_tax_rate / 100))) ROUND 2) AS > cycleunitprice, businessDayDate AS saledate]) (1/10). at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191) > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138) > ... 3 moreCaused by: java.util.concurrent.CancellationException at > java.util.concurrent.FutureTask.report(FutureTask.java:121) at > java.util.concurrent.FutureTask.get(FutureTask.java:192) at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50) > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102) > ... 3 more -- This message was sent by Atlassian Jira (v8.3.4#803005)