[ 
https://issues.apache.org/jira/browse/FLINK-30278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-30278.
----------------------------------
    Fix Version/s: 1.17.0
       Resolution: Fixed

Merged to master as f38c2370e85

> Unexpected config mutation in SinkTransformationTranslator 
> -----------------------------------------------------------
>
>                 Key: FLINK-30278
>                 URL: https://issues.apache.org/jira/browse/FLINK-30278
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration
>    Affects Versions: 1.17.0
>            Reporter: Anton Kalashnikov
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> If we forbid changing configuration 
> programmatically(`execution.program-config.enabled`) and try to use 
> `FileSink`. The following exception will occur:
> {noformat}
>   org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Not allowed configuration change(s) were detected:
>  - Configuration parallelism.default:1 not allowed in the configuration 
> object ExecutionConfig.
>    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
>    at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
>    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
>    at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
>    at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
>    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>    at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>    at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>    at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>    at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>    at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>    at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>    at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not 
> allowed configuration change(s) were detected:
>  - Configuration parallelism.default:1 not allowed in the configuration 
> object ExecutionConfig.
>    at 
> org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
>    at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
>    at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
>    at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
>    at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
>    at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
>    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
>    ... 16 more
> {noformat}
> It happens since inside of `SinkTransformationTranslator` we have following 
> logic:
> * Remeber the current parallelism
> * Set parallelism to default
> * Do transformation
> * Set parallelism to remembered one
> But if the initial prallelism is default we actually should do nothing but 
> according current logic we explicitly set default value to the configuration 
> which actually is the programmatic config mutation(which we want to avoid)
> See 
> org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to