Hi Reuven.

I have overridden and implemented equals() and got the same exception.

With more testing, I suspect the part of parsing the byte array into a http
request inside inputAdaptor.adapt() .
We are using com.athaydes.rawhttp.rawhttp-core.
https://github.com/renatoathaydes/rawhttp/blob/core-2.0/rawhttp-core/src/main/java/rawhttp/core/RawHttp.java#L130

I'm thinking of implementing it in a different way. Any advice on that?

Thank you,
Yuri Jin

On Fri, May 6, 2022 at 2:49 PM Siyu Lin <siyu...@unity3d.com> wrote:

> Hi Reuven,
>
> Do you mean we should have coder explicitly defined for all input types
> and output types in chaining do fns? Do we also need to have comparedTo and
> equals defined as well?
>
> thanks again!
> Siyu
>
> On May 6, 2022, at 12:23 PM, Reuven Lax <re...@google.com> wrote:
>
> 
> Could be - I would check the implementation of inputAdapator.
>
> On Fri, May 6, 2022 at 11:59 AM Yuri Jin <yuri....@unity3d.com> wrote:
>
>> Thanks, I'll check it out.
>>
>> I split inputAdaptor.adapt() into different DoFn for testing and it threw
>> the same exception for the new DoFn. So I guess it's because of
>> inputAdaptor.adapt().
>>
>> On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I meant to say .equals() not compareTo.
>>>
>>> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Unfortunately I'm not very familiar with Scio. However this could also
>>>> be caused by an object that either doesn't properly implement the compareTo
>>>> method or the coder doesn't return such an object in structuralValue.
>>>>
>>>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yuri....@unity3d.com> wrote:
>>>>
>>>>> Reuven, thanks for the reply.
>>>>>
>>>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
>>>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
>>>>> ByteArrayCoder.of)" coder.
>>>>> I can't paste the code for DoFn due to company policy, but here's the
>>>>> structure:
>>>>>
>>>>> //////////////////////////////////////
>>>>> Pipeline.scala
>>>>> -------------------------------
>>>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>>>>
>>>>> //////////////////////////////////////
>>>>> ParsePayloadDoFn.scala
>>>>> -------------------------------
>>>>> class ParsePayloadDoFn[InputType](
>>>>>   inputAdaptor: RowAdaptor[InputType],
>>>>>   ...
>>>>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends
>>>>> DoFn[InputType, OutpuType] {
>>>>>
>>>>>   @Setup
>>>>>   def setup(): Unit =
>>>>>     inputAdaptor.setup()
>>>>>
>>>>>   @ProcessElement
>>>>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext):
>>>>> Unit =
>>>>>     Try {
>>>>>       val result: Result = inputAdaptor.adapt(c.element()) // parse
>>>>> payload
>>>>>       ...
>>>>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
>>>>> c.output(deadLetterTag, _)
>>>>>       val outputPayloadFn: OutputType => Unit = c.output
>>>>>
>>>>>       result.protocol match {
>>>>>         case error: ErrorType =>
>>>>>           deadLetterMessageFn(KV.of(..., ...))
>>>>>         case payload: Payload =>
>>>>>           payload.events.zipWithIndex.foreach {
>>>>>             case failure: ParsingFailure =>
>>>>>               deadLetterMessageFn(KV.of(..., ...))
>>>>>             case (message: Message, index: Int) =>
>>>>>               // extract body from Message
>>>>>               val body = ...
>>>>>               // make a GET http call and compose output
>>>>>               val output = ...
>>>>>
>>>>>               outputPayloadFn(
>>>>>                 OutputType(
>>>>>                   output,
>>>>>                   ...
>>>>>                   payload.header,
>>>>>                   body,
>>>>>                   index
>>>>>                 )
>>>>>               )
>>>>>           }
>>>>>       }
>>>>>     } match {
>>>>>       case Failure(exception) =>
>>>>>         error(
>>>>>           s"ParsePayloadDoFn - unhandled exception:
>>>>> ${exception.getMessage}\nStack trace:
>>>>> ${ExceptionUtils.getStackTrace(exception)}"
>>>>>         )
>>>>>       case Success(_) => ()
>>>>>     }
>>>>> }
>>>>> //////////////////////////////////////
>>>>>
>>>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>>>>
>>>>> Thank you,
>>>>> Yuri Jin
>>>>>
>>>>>
>>>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> What is the type of the input - do you have a custom coder? Are you
>>>>>> able to paste the code for your DoFn?
>>>>>>
>>>>>> In answer to your question - Direct runner tests for this, because it
>>>>>> is a testing runner. This error scenario can cause random unexpected
>>>>>> behavior in production runners, which is why the testing runner tries to
>>>>>> explicitly detect it.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yuri....@unity3d.com> wrote:
>>>>>>
>>>>>>> Hi Beam users,
>>>>>>>
>>>>>>> We have a DoFn that reads data from Kafka and parses an array byte
>>>>>>> payload. It works fine with dataflow runner, but throws
>>>>>>> IllegalMutationException with direct runner. It does not directly modify
>>>>>>> the input value. Therefore, I am guessing that the output is different 
>>>>>>> when
>>>>>>> there are multiple input values.
>>>>>>>
>>>>>>> The detailed error is as follows.
>>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse 
>>>>>>> Payload
>>>>>>> mutated value "DoFnOutputA" after it was output (new value was
>>>>>>> "DoFnOutputB"). Values must not be mutated in any way after being 
>>>>>>> output.
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>>>>         at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding 
>>>>>>> was
>>>>>>> "Base64EncodedA", now "Base64EncodedB".
>>>>>>>         at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>>>>         at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>>>>         at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>>>>         ... 10 more
>>>>>>>
>>>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA"
>>>>>>> is different from "Base64EncodedB".
>>>>>>>
>>>>>>> I was wondering if you could give me some advice on the following
>>>>>>> questions.
>>>>>>>
>>>>>>> 1. How can we find the problematic part? I did some unit tests, but
>>>>>>> I couldn't reproduce them.
>>>>>>> 2. Have you experienced the same error and solved it?
>>>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to
>>>>>>> use the "enforceImmutability=false" option?
>>>>>>>
>>>>>>>
>>>>>>> Any comments would be appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Yuri Jin
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Yuri Jin
>>>>> Senior Software Developer, Data Platform
>>>>> yuri....@unity3d.com
>>>>> (+1) 778-858-3585 <(778)%20858-3585>
>>>>> unity.com
>>>>>
>>>>
>>
>> --
>> Yuri Jin
>> Senior Software Developer, Data Platform
>> yuri....@unity3d.com
>> (+1) 778-858-3585 <(778)%20858-3585>
>> unity.com
>>
>

-- 
Yuri Jin
Senior Software Developer, Data Platform
yuri....@unity3d.com
(+1) 778-858-3585
unity.com

Reply via email to