Reuven, thanks for the reply.

The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
"KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
ByteArrayCoder.of)" coder.
I can't paste the code for DoFn due to company policy, but here's the
structure:

//////////////////////////////////////
Pipeline.scala
-------------------------------
type InputType = KafkaRecord[Array[Byte], Array[Byte]]

//////////////////////////////////////
ParsePayloadDoFn.scala
-------------------------------
class ParsePayloadDoFn[InputType](
  inputAdaptor: RowAdaptor[InputType],
  ...
  deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends DoFn[InputType,
OutpuType] {

  @Setup
  def setup(): Unit =
    inputAdaptor.setup()

  @ProcessElement
  def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit =
    Try {
      val result: Result = inputAdaptor.adapt(c.element()) // parse payload
      ...
      val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
c.output(deadLetterTag, _)
      val outputPayloadFn: OutputType => Unit = c.output

      result.protocol match {
        case error: ErrorType =>
          deadLetterMessageFn(KV.of(..., ...))
        case payload: Payload =>
          payload.events.zipWithIndex.foreach {
            case failure: ParsingFailure =>
              deadLetterMessageFn(KV.of(..., ...))
            case (message: Message, index: Int) =>
              // extract body from Message
              val body = ...
              // make a GET http call and compose output
              val output = ...

              outputPayloadFn(
                OutputType(
                  output,
                  ...
                  payload.header,
                  body,
                  index
                )
              )
          }
      }
    } match {
      case Failure(exception) =>
        error(
          s"ParsePayloadDoFn - unhandled exception:
${exception.getMessage}\nStack trace:
${ExceptionUtils.getStackTrace(exception)}"
        )
      case Success(_) => ()
    }
}
//////////////////////////////////////

For reference, we are using Scio v0.11.5 and Beam v2.36.0.

Thank you,
Yuri Jin


On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:

> What is the type of the input - do you have a custom coder? Are you able
> to paste the code for your DoFn?
>
> In answer to your question - Direct runner tests for this, because it is a
> testing runner. This error scenario can cause random unexpected behavior in
> production runners, which is why the testing runner tries to explicitly
> detect it.
>
> Reuven
>
> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yuri....@unity3d.com> wrote:
>
>> Hi Beam users,
>>
>> We have a DoFn that reads data from Kafka and parses an array byte
>> payload. It works fine with dataflow runner, but throws
>> IllegalMutationException with direct runner. It does not directly modify
>> the input value. Therefore, I am guessing that the output is different when
>> there are multiple input values.
>>
>> The detailed error is as follows.
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>> mutated value "DoFnOutputA" after it was output (new value was
>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>         at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>         at
>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>         at
>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>         at
>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>         at
>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>         at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>> "Base64EncodedA", now "Base64EncodedB".
>>         at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>         at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>         at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>         at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>         ... 10 more
>>
>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
>> different from "Base64EncodedB".
>>
>> I was wondering if you could give me some advice on the following
>> questions.
>>
>> 1. How can we find the problematic part? I did some unit tests, but I
>> couldn't reproduce them.
>> 2. Have you experienced the same error and solved it?
>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use
>> the "enforceImmutability=false" option?
>>
>>
>> Any comments would be appreciated.
>>
>> Thanks,
>> Yuri Jin
>>
>

-- 
Yuri Jin
Senior Software Developer, Data Platform
yuri....@unity3d.com
(+1) 778-858-3585
unity.com

Reply via email to