Hi,

We are using flink-1.17.0 table API and RocksDB as backend to provide a
service to our users to run sql queries. The tables are created using the
avro schema and we also provide users to attach python udf as a plugin.
This plugin is downloaded at the time of building the table and we update
the StreamTableEnvironment with the python.files, python.archives,
python.client.executable and python.executable.

We use restart policies in case an unexpected failure happens. We are
recently observing issues where the job fails to recover with error

> Caused by: java.nio.file.FileAlreadyExistsException: File already exists:
> /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e
> aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip

 and the only way to recover for us is to restart the cluster.

Does anyone have any idea on how to fix this?
This is the full stacktrace

java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(
DistributedCache.java:158)
at org.apache.flink.python.env.PythonDependencyInfo.create(
PythonDependencyInfo.java:151)
at org.apache.flink.streaming.api.operators.python.process.
AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager(
AbstractExternalPythonFunctionOperator.java:124)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamAggregateOperator.createPythonFunctionRunner(
AbstractPythonStreamAggregateOperator.java:176)
at org.apache.flink.streaming.api.operators.python.process.
AbstractExternalPythonFunctionOperator.open(
AbstractExternalPythonFunctionOperator.java:56)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamAggregateOperator.open(
AbstractPythonStreamAggregateOperator.java:160)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamGroupAggregateOperator.open(
AbstractPythonStreamGroupAggregateOperator.java:116)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:734)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(
StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:709)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
.java:675)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.file.FileAlreadyExistsException: File already exists:
/tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e
aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem
.java:257)
at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536)
at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
FileCache.java:289)
at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
FileCache.java:261)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.
ScheduledThreadPoolExecutor$ScheduledFutureTask.run(
ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:628)
... 1 more

Reply via email to