Reuven, thanks for the reply. The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of), ByteArrayCoder.of)" coder. I can't paste the code for DoFn due to company policy, but here's the structure:
////////////////////////////////////// Pipeline.scala ------------------------------- type InputType = KafkaRecord[Array[Byte], Array[Byte]] ////////////////////////////////////// ParsePayloadDoFn.scala ------------------------------- class ParsePayloadDoFn[InputType]( inputAdaptor: RowAdaptor[InputType], ... deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends DoFn[InputType, OutpuType] { @Setup def setup(): Unit = inputAdaptor.setup() @ProcessElement def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit = Try { val result: Result = inputAdaptor.adapt(c.element()) // parse payload ... val deadLetterMessageFn: KV[KeyType, ValueType] => Unit = c.output(deadLetterTag, _) val outputPayloadFn: OutputType => Unit = c.output result.protocol match { case error: ErrorType => deadLetterMessageFn(KV.of(..., ...)) case payload: Payload => payload.events.zipWithIndex.foreach { case failure: ParsingFailure => deadLetterMessageFn(KV.of(..., ...)) case (message: Message, index: Int) => // extract body from Message val body = ... // make a GET http call and compose output val output = ... outputPayloadFn( OutputType( output, ... payload.header, body, index ) ) } } } match { case Failure(exception) => error( s"ParsePayloadDoFn - unhandled exception: ${exception.getMessage}\nStack trace: ${ExceptionUtils.getStackTrace(exception)}" ) case Success(_) => () } } ////////////////////////////////////// For reference, we are using Scio v0.11.5 and Beam v2.36.0. Thank you, Yuri Jin On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote: > What is the type of the input - do you have a custom coder? Are you able > to paste the code for your DoFn? > > In answer to your question - Direct runner tests for this, because it is a > testing runner. This error scenario can cause random unexpected behavior in > production runners, which is why the testing runner tries to explicitly > detect it. > > Reuven > > On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yuri....@unity3d.com> wrote: > >> Hi Beam users, >> >> We have a DoFn that reads data from Kafka and parses an array byte >> payload. It works fine with dataflow runner, but throws >> IllegalMutationException with direct runner. It does not directly modify >> the input value. Therefore, I am guessing that the output is different when >> there are multiple input values. >> >> The detailed error is as follows. >> >> Exception in thread "main" >> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload >> mutated value "DoFnOutputA" after it was output (new value was >> "DoFnOutputB"). Values must not be mutated in any way after being output. >> at >> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137) >> at >> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228) >> at >> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160) >> at >> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292) >> at >> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194) >> at >> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value >> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was >> "Base64EncodedA", now "Base64EncodedB". >> at >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158) >> at >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153) >> at >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128) >> at >> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127) >> ... 10 more >> >> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is >> different from "Base64EncodedB". >> >> I was wondering if you could give me some advice on the following >> questions. >> >> 1. How can we find the problematic part? I did some unit tests, but I >> couldn't reproduce them. >> 2. Have you experienced the same error and solved it? >> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use >> the "enforceImmutability=false" option? >> >> >> Any comments would be appreciated. >> >> Thanks, >> Yuri Jin >> > -- Yuri Jin Senior Software Developer, Data Platform yuri....@unity3d.com (+1) 778-858-3585 unity.com