[ https://issues.apache.org/jira/browse/FLINK-30278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski closed FLINK-30278. ---------------------------------- Fix Version/s: 1.17.0 Resolution: Fixed Merged to master as f38c2370e85 > Unexpected config mutation in SinkTransformationTranslator > ----------------------------------------------------------- > > Key: FLINK-30278 > URL: https://issues.apache.org/jira/browse/FLINK-30278 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration > Affects Versions: 1.17.0 > Reporter: Anton Kalashnikov > Assignee: Piotr Nowojski > Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > If we forbid changing configuration > programmatically(`execution.program-config.enabled`) and try to use > `FileSink`. The following exception will occur: > {noformat} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Not allowed configuration change(s) were detected: > - Configuration parallelism.default:1 not allowed in the configuration > object ExecutionConfig. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319) > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) > Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not > allowed configuration change(s) were detected: > - Configuration parallelism.default:1 not allowed in the configuration > object ExecutionConfig. > at > org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) > at > org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347) > ... 16 more > {noformat} > It happens since inside of `SinkTransformationTranslator` we have following > logic: > * Remeber the current parallelism > * Set parallelism to default > * Do transformation > * Set parallelism to remembered one > But if the initial prallelism is default we actually should do nothing but > according current logic we explicitly set default value to the configuration > which actually is the programmatic config mutation(which we want to avoid) > See > org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341 > -- This message was sent by Atlassian Jira (v8.20.10#820010)