谢波 created FLINK-20542:
--------------------------

             Summary: flink-1.11.2 - stream join stream savepoint fail
                 Key: FLINK-20542
                 URL: https://issues.apache.org/jira/browse/FLINK-20542
             Project: Flink
          Issue Type: Bug
         Environment: <flink.version>1.11.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>

{color:#FF0000}tEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), 
Time.hours(24)){color}

env.enableCheckpointing(2 * 60 * 1000L)
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointTimeout(20 * 60 * 1000)
checkpointConfig.setTolerableCheckpointFailureNumber(2)

 

{color:#FF0000}env.setStateBackend({color}
{color:#FF0000} new RocksDBStateBackend(checkpointPath, true){color}
{color:#FF0000} .asInstanceOf[StateBackend]{color}
{color:#FF0000}){color}
            Reporter: 谢波


I have a SQL job that uses stream join stream, but at savepoint, the job fails.

I can't manually trigger savepoint right now because the task will die once 
it's triggered.

Can anyone help?Thanks

 

!image-2020-12-09-10-59-58-757.png!

 

2020-12-09 10:46:21,801 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli     
           [] - Found Yarn properties file under 
/tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 INFO  
org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn 
properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:21,801 INFO  
org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn 
properties file under /tmp/.yarn-properties-xiebo.2020-12-09 10:46:22,024 WARN  
org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The 
configuration directory ('/home/xiebo/module/flink/conf') already contains a 
LOG4J config file.If you want to use logback, then please delete or rename the 
log configuration file.2020-12-09 10:46:22,414 INFO  
org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for 
the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2020-12-09 
10:46:22,575 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  
[] - Found Web Interface shd183.yonghui.cn:45855 of application 
'application_1594105654926_6844100'.Triggering savepoint for job 
749a7a45d6324f57ac859d3eab55a56c.Waiting for response...
------------------------------------------------------------ The program 
finished with the following exception:
org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
749a7a45d6324f57ac859d3eab55a56c failed. at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:668) 
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:646)
 at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) 
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:643) at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:934) 
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) 
at java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)Caused 
by: java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:764)
 at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
 by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) 
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$0(CheckpointCoordinator.java:467)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:494)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1663)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:932)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:827)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed. at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:493)
 ... 10 moreCaused by: java.lang.Exception: Could not materialize checkpoint 
439 for operator Join(joinType=[LeftOuterJoin], where=[(transNumber = 
transNumber0)], select=[transNumber, retailNumber, werks, businessDayDate, 
goodsId, retailQuantity, salesAmount, rtPlu, updateTime, keyBy, busi_grp_id, 
busi_grp_name, sales_dist, sales_dist_name, region_id_new, region_name_new, 
prov_code, prov_name, city_code, city_name, shop_name, vendor_id, vendor_name, 
firm_id, firm_name, div_id, dept_id, dept_name, catg_l_id, catg_l_name, 
catg_m_id, catg_m_name, catg_s_id, catg_s_name, goodsname, brand, brand_name, 
operation_type_id, operation_type_name, efct_sign_id, efct_sign_name, 
shop_goods_sts_id, shop_goods_sts_name, bravo_region_id, bravo_region_name, 
shop_belong, shop_belong_desc, zone_id, zone_name, bd_id, bd_name, firm_g1_id, 
firm_g1_name, firm_g2_id, firm_g2_name, pur_tax_rate, cost_price, 
vendor_name_new, vendor_id_new, transNumber0, returnFlag, channelFlag, 
commodityType, endTimestamp], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey]) -> Calc(select=[businessDayDate AS sdt, werks AS 
shopid, CAST(((CAST((endTimestamp IS NULL IF updateTime IF endTimestamp)) 
UNIX_TIMESTAMP _UTF-16LE'yyyy-MM-dd HH:mm:ss.SSS') * 1000)) AS updatetime, 
retailNumber AS serialid, transNumber AS sheetid, goodsId AS goodsid, keyBy AS 
key_by, CAST((endTimestamp IS NULL IF (FLAG(HOUR) EXTRACT updateTime) IF 
(FLAG(HOUR) EXTRACT endTimestamp))) AS timeframe, region_id_new AS regionid, 
region_name_new AS regionname, shop_name AS shopname, bravo_region_id AS 
serviceregionid, bravo_region_name AS serviceregionname, shop_belong AS 
shopbelongid, shop_belong_desc AS shopbelongname, sales_dist AS distid, 
sales_dist_name AS distname, prov_code AS provinceid, prov_name AS 
provincename, city_code AS cityid, city_name AS cityname, rtPlu AS pluid, 
salesAmount AS salevalue, commodityType AS commoditytype, 
CAST(_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS memnum, 
busi_grp_id AS groupid, busi_grp_name AS groupname, div_id AS categoryid, 
goodsname, (((retailQuantity * cost_price) * (1 + (pur_tax_rate / 100))) ROUND 
2) AS cost, retailQuantity AS amount, CAST(channelFlag) AS channelflag, 
CAST(returnFlag) AS isreturn, firm_id AS firmid, firm_name AS firmname, dept_id 
AS deptid, dept_name AS deptname, brand, brand_name AS brandname, 
operation_type_id AS operationtypeid, operation_type_name AS operationtypename, 
efct_sign_id AS efctsignid, efct_sign_name AS efctsignname, shop_goods_sts_id 
AS shopgoodsstsid, shop_goods_sts_name AS shopgoodsstsname, vendor_id AS 
vendorid, vendor_name AS vendorname, vendor_id_new AS vendoridnew, 
vendor_name_new AS vendornamenew, catg_l_id AS catglid, catg_l_name AS 
catglname, catg_m_id AS catgmid, catg_m_name AS catgmname, catg_s_id AS 
catgsid, catg_s_name AS catgsname, zone_id AS zoneid, zone_name AS zonename, 
bd_id AS bdid, bd_name AS bdname, firm_g1_id AS firmg1id, firm_g1_name AS 
firmg1name, firm_g2_id AS firmg2id, firm_g2_name AS firmg2name, ((cost_price * 
(1 + (pur_tax_rate / 100))) ROUND 2) AS cycleunitprice, businessDayDate AS 
saledate]) (1/10). at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138)
 ... 3 moreCaused by: java.util.concurrent.CancellationException at 
java.util.concurrent.FutureTask.report(FutureTask.java:121) at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50)
 at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
 ... 3 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to