[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15230058#comment-15230058 ]
Konstantin Knauf commented on FLINK-3713: ----------------------------------------- Just for my understanding: It says there {quote} Disposing custom state handles: Disposing an old savepoint does not work with custom state handles (if you are using a custom state backend), because the user code class loader is not available during disposal. {quote} We are using FsStateBackend, not a custome state backend. Or does "custom state backend" always mean user classes? Let me give you some context, maybe we can come up with a different solution: We have a streaming job, which should always be running. Normally, flink should handle failures, but we can not solely rely on it (max restart attempts, some bug,..). So we have a script, which runs regularly and checks if the job is still running. If it is not running over a period of x minutes (to take account for automatic recovery by flink) the job is restarted again from a savepoint. Everytime this script runs a new savepoint is triggered, and the old one is discarded. So we need this regular savepoint to be able to do this "manual" recovery. So we are triggering quite a lot of savepoints and need some way of cleaning up the old ones. Do you have a different idea how to do this cleanup? Just deleting the directory in HDFS? If you have a complete different idea how to handle this supervision, I am also happy about suggestions. > DisposeSavepoint message uses system classloader to discard state > ----------------------------------------------------------------- > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager > Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > <some_package>.MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.checkpoint.StateForTask.discard(StateForTask.java:109) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:85) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:635) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:627) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:627) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > The issue was reported by [~knaufk] and analyzed by [~aljoscha]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)