[ 
https://issues.apache.org/jira/browse/FLINK-20542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

谢波 closed FLINK-20542.
----------------------
    Resolution: Not A Problem

> flink-1.11.2 - stream join stream savepoint fail
> ------------------------------------------------
>
>                 Key: FLINK-20542
>                 URL: https://issues.apache.org/jira/browse/FLINK-20542
>             Project: Flink
>          Issue Type: Bug
>         Environment: <flink.version>1.11.2</flink.version>
> <scala.binary.version>2.11</scala.binary.version>
>  
> {color:#ff0000}tEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), 
> Time.hours(24)){color}
>  
> env.enableCheckpointing(2 * 60 * 1000L)
>  val checkpointConfig = env.getCheckpointConfig
>  checkpointConfig.setCheckpointTimeout(20 * 60 * 1000)
>  checkpointConfig.setTolerableCheckpointFailureNumber(2)
>  
> {color:#ff0000}env.setStateBackend({color}
>  {color:#ff0000} new RocksDBStateBackend(checkpointPath, true){color}
>  {color:#ff0000} .asInstanceOf[StateBackend]{color}
>  {color:#ff0000}){color}
>            Reporter: 谢波
>            Priority: Major
>
> I have a SQL job that uses stream join stream, but at savepoint, the job 
> fails.
> I can't manually trigger savepoint right now because the task will die once 
> it's triggered.
> Can anyone help?Thanks
>  
>  
> 2020-12-09 10:46:21,801 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>              [] - Found Yarn properties file under 
> /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 INFO  
> org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn 
> properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 
> INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found 
> Yarn properties file under /tmp/.yarn-properties-xiebo.2020-12-09 
> 10:46:22,024 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil      
>   [] - The configuration directory ('/home/xiebo/module/flink/conf') already 
> contains a LOG4J config file.If you want to use logback, then please delete 
> or rename the log configuration file.2020-12-09 10:46:22,414 INFO  
> org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for 
> the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2020-12-09 
> 10:46:22,575 INFO  org.apache.flink.yarn.YarnClusterDescriptor                
>   [] - Found Web Interface shd183.yonghui.cn:45855 of application 
> 'application_1594105654926_6844100'.Triggering savepoint for job 
> 749a7a45d6324f57ac859d3eab55a56c.Waiting for response...
>  ------------------------------------------------------------ The program 
> finished with the following exception:
>  org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> 749a7a45d6324f57ac859d3eab55a56c failed. at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:668)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:646)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>  at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:643) 
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:934) 
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) 
> at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
>  at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)Caused 
> by: java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. 
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:764)
>  at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at 
> akka.actor.ActorCell.invoke(ActorCell.scala:561) at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at 
> akka.dispatch.Mailbox.run(Mailbox.scala:225) at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
>  by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. 
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) 
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$0(CheckpointCoordinator.java:467)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:494)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1663)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:932)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:827)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)Caused {color:#FF0000}by: 
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. 
> at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:493)
>  ... 10 moreCaused by: java.lang.Exception: Could not materialize checkpoint 
> 439 for operator Join(joinType=[LeftOuterJoin], where=[(transNumber = 
> transNumber0)], select={color}{color:#FF0000}[transNumber, retailNumber, 
> werks, businessDayDate, goodsId, retailQuantity, salesAmount, rtPlu,{color} 
> updateTime, keyBy, busi_grp_id, busi_grp_name, sales_dist, sales_dist_name, 
> region_id_new, region_name_new, prov_code, prov_name, city_code, city_name, 
> shop_name, vendor_id, vendor_name, firm_id, firm_name, div_id, dept_id, 
> dept_name, catg_l_id, catg_l_name, catg_m_id, catg_m_name, catg_s_id, 
> catg_s_name, goodsname, brand, brand_name, operation_type_id, 
> operation_type_name, efct_sign_id, efct_sign_name, shop_goods_sts_id, 
> shop_goods_sts_name, bravo_region_id, bravo_region_name, shop_belong, 
> shop_belong_desc, zone_id, zone_name, bd_id, bd_name, firm_g1_id, 
> firm_g1_name, firm_g2_id, firm_g2_name, pur_tax_rate, cost_price, 
> vendor_name_new, vendor_id_new, transNumber0, returnFlag, channelFlag, 
> commodityType, endTimestamp], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[NoUniqueKey]) -> Calc(select=[businessDayDate AS sdt, werks 
> AS shopid, CAST(((CAST((endTimestamp IS NULL IF updateTime IF endTimestamp)) 
> UNIX_TIMESTAMP _UTF-16LE'yyyy-MM-dd HH:mm:ss.SSS') * 1000)) AS updatetime, 
> retailNumber AS serialid, transNumber AS sheetid, goodsId AS goodsid, keyBy 
> AS key_by, CAST((endTimestamp IS NULL IF (FLAG(HOUR) EXTRACT updateTime) IF 
> (FLAG(HOUR) EXTRACT endTimestamp))) AS timeframe, region_id_new AS regionid, 
> region_name_new AS regionname, shop_name AS shopname, bravo_region_id AS 
> serviceregionid, bravo_region_name AS serviceregionname, shop_belong AS 
> shopbelongid, shop_belong_desc AS shopbelongname, sales_dist AS distid, 
> sales_dist_name AS distname, prov_code AS provinceid, prov_name AS 
> provincename, city_code AS cityid, city_name AS cityname, rtPlu AS pluid, 
> salesAmount AS salevalue, commodityType AS commoditytype, 
> CAST(_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS memnum, 
> busi_grp_id AS groupid, busi_grp_name AS groupname, div_id AS categoryid, 
> goodsname, (((retailQuantity * cost_price) * (1 + (pur_tax_rate / 100))) 
> ROUND 2) AS cost, retailQuantity AS amount, CAST(channelFlag) AS channelflag, 
> CAST(returnFlag) AS isreturn, firm_id AS firmid, firm_name AS firmname, 
> dept_id AS deptid, dept_name AS deptname, brand, brand_name AS brandname, 
> operation_type_id AS operationtypeid, operation_type_name AS 
> operationtypename, efct_sign_id AS efctsignid, efct_sign_name AS 
> efctsignname, shop_goods_sts_id AS shopgoodsstsid, shop_goods_sts_name AS 
> shopgoodsstsname, vendor_id AS vendorid, vendor_name AS vendorname, 
> vendor_id_new AS vendoridnew, vendor_name_new AS vendornamenew, catg_l_id AS 
> catglid, catg_l_name AS catglname, catg_m_id AS catgmid, catg_m_name AS 
> catgmname, catg_s_id AS catgsid, catg_s_name AS catgsname, zone_id AS zoneid, 
> zone_name AS zonename, bd_id AS bdid, bd_name AS bdname, firm_g1_id AS 
> firmg1id, firm_g1_name AS firmg1name, firm_g2_id AS firmg2id, firm_g2_name AS 
> firmg2name, ((cost_price * (1 + (pur_tax_rate / 100))) ROUND 2) AS 
> cycleunitprice, businessDayDate AS saledate]) (1/10). at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191)
>  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138)
>  ... 3 moreCaused by: java.util.concurrent.CancellationException at 
> java.util.concurrent.FutureTask.report(FutureTask.java:121) at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50)
>  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
>  ... 3 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to