Hi, We are using flink-1.17.0 table API and RocksDB as backend to provide a service to our users to run sql queries. The tables are created using the avro schema and we also provide users to attach python udf as a plugin. This plugin is downloaded at the time of building the table and we update the StreamTableEnvironment with the python.files, python.archives, python.client.executable and python.executable.
We use restart policies in case an unexpected failure happens. We are recently observing issues where the job fails to recover with error > Caused by: java.nio.file.FileAlreadyExistsException: File already exists: > /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e > aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip and the only way to recover for us is to restart the cluster. Does anyone have any idea on how to fix this? This is the full stacktrace java.lang.RuntimeException: An error occurred while copying the file. at org.apache.flink.api.common.cache.DistributedCache.getFile( DistributedCache.java:158) at org.apache.flink.python.env.PythonDependencyInfo.create( PythonDependencyInfo.java:151) at org.apache.flink.streaming.api.operators.python.process. AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager( AbstractExternalPythonFunctionOperator.java:124) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamAggregateOperator.createPythonFunctionRunner( AbstractPythonStreamAggregateOperator.java:176) at org.apache.flink.streaming.api.operators.python.process. AbstractExternalPythonFunctionOperator.open( AbstractExternalPythonFunctionOperator.java:56) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamAggregateOperator.open( AbstractPythonStreamAggregateOperator.java:160) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamGroupAggregateOperator.open( AbstractPythonStreamGroupAggregateOperator.java:116) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain .initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call( StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask .java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.nio.file.FileAlreadyExistsException: File already exists: /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem .java:257) at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536) at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( FileCache.java:289) at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( FileCache.java:261) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors .java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent. ScheduledThreadPoolExecutor$ScheduledFutureTask.run( ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628) ... 1 more