Hi Beam users,

We have a DoFn that reads data from Kafka and parses an array byte payload.
It works fine with dataflow runner, but throws IllegalMutationException
with direct runner. It does not directly modify the input value. Therefore,
I am guessing that the output is different when there are multiple input
values.

The detailed error is as follows.

Exception in thread "main"
org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
mutated value "DoFnOutputA" after it was output (new value was
"DoFnOutputB"). Values must not be mutated in any way after being output.
        at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
        at
org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
        at
org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
        at
org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
        at
org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
        at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
"DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
"Base64EncodedA", now "Base64EncodedB".
        at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
        at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
        at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
        at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
        ... 10 more

* "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
different from "Base64EncodedB".

I was wondering if you could give me some advice on the following questions.

1. How can we find the problematic part? I did some unit tests, but I
couldn't reproduce them.
2. Have you experienced the same error and solved it?
3. Only Direct runner enforces immutability for DoFns. Is it safe to use
the "enforceImmutability=false" option?


Any comments would be appreciated.

Thanks,
Yuri Jin

Reply via email to