Hi Reuven. I have overridden and implemented equals() and got the same exception.
With more testing, I suspect the part of parsing the byte array into a http request inside inputAdaptor.adapt() . We are using com.athaydes.rawhttp.rawhttp-core. https://github.com/renatoathaydes/rawhttp/blob/core-2.0/rawhttp-core/src/main/java/rawhttp/core/RawHttp.java#L130 I'm thinking of implementing it in a different way. Any advice on that? Thank you, Yuri Jin On Fri, May 6, 2022 at 2:49 PM Siyu Lin <siyu...@unity3d.com> wrote: > Hi Reuven, > > Do you mean we should have coder explicitly defined for all input types > and output types in chaining do fns? Do we also need to have comparedTo and > equals defined as well? > > thanks again! > Siyu > > On May 6, 2022, at 12:23 PM, Reuven Lax <re...@google.com> wrote: > > > Could be - I would check the implementation of inputAdapator. > > On Fri, May 6, 2022 at 11:59 AM Yuri Jin <yuri....@unity3d.com> wrote: > >> Thanks, I'll check it out. >> >> I split inputAdaptor.adapt() into different DoFn for testing and it threw >> the same exception for the new DoFn. So I guess it's because of >> inputAdaptor.adapt(). >> >> On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote: >> >>> I meant to say .equals() not compareTo. >>> >>> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote: >>> >>>> Unfortunately I'm not very familiar with Scio. However this could also >>>> be caused by an object that either doesn't properly implement the compareTo >>>> method or the coder doesn't return such an object in structuralValue. >>>> >>>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yuri....@unity3d.com> wrote: >>>> >>>>> Reuven, thanks for the reply. >>>>> >>>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the >>>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of), >>>>> ByteArrayCoder.of)" coder. >>>>> I can't paste the code for DoFn due to company policy, but here's the >>>>> structure: >>>>> >>>>> ////////////////////////////////////// >>>>> Pipeline.scala >>>>> ------------------------------- >>>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]] >>>>> >>>>> ////////////////////////////////////// >>>>> ParsePayloadDoFn.scala >>>>> ------------------------------- >>>>> class ParsePayloadDoFn[InputType]( >>>>> inputAdaptor: RowAdaptor[InputType], >>>>> ... >>>>> deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends >>>>> DoFn[InputType, OutpuType] { >>>>> >>>>> @Setup >>>>> def setup(): Unit = >>>>> inputAdaptor.setup() >>>>> >>>>> @ProcessElement >>>>> def processElement(c: DoFn[InputType, OutputType]#ProcessContext): >>>>> Unit = >>>>> Try { >>>>> val result: Result = inputAdaptor.adapt(c.element()) // parse >>>>> payload >>>>> ... >>>>> val deadLetterMessageFn: KV[KeyType, ValueType] => Unit = >>>>> c.output(deadLetterTag, _) >>>>> val outputPayloadFn: OutputType => Unit = c.output >>>>> >>>>> result.protocol match { >>>>> case error: ErrorType => >>>>> deadLetterMessageFn(KV.of(..., ...)) >>>>> case payload: Payload => >>>>> payload.events.zipWithIndex.foreach { >>>>> case failure: ParsingFailure => >>>>> deadLetterMessageFn(KV.of(..., ...)) >>>>> case (message: Message, index: Int) => >>>>> // extract body from Message >>>>> val body = ... >>>>> // make a GET http call and compose output >>>>> val output = ... >>>>> >>>>> outputPayloadFn( >>>>> OutputType( >>>>> output, >>>>> ... >>>>> payload.header, >>>>> body, >>>>> index >>>>> ) >>>>> ) >>>>> } >>>>> } >>>>> } match { >>>>> case Failure(exception) => >>>>> error( >>>>> s"ParsePayloadDoFn - unhandled exception: >>>>> ${exception.getMessage}\nStack trace: >>>>> ${ExceptionUtils.getStackTrace(exception)}" >>>>> ) >>>>> case Success(_) => () >>>>> } >>>>> } >>>>> ////////////////////////////////////// >>>>> >>>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0. >>>>> >>>>> Thank you, >>>>> Yuri Jin >>>>> >>>>> >>>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> What is the type of the input - do you have a custom coder? Are you >>>>>> able to paste the code for your DoFn? >>>>>> >>>>>> In answer to your question - Direct runner tests for this, because it >>>>>> is a testing runner. This error scenario can cause random unexpected >>>>>> behavior in production runners, which is why the testing runner tries to >>>>>> explicitly detect it. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yuri....@unity3d.com> wrote: >>>>>> >>>>>>> Hi Beam users, >>>>>>> >>>>>>> We have a DoFn that reads data from Kafka and parses an array byte >>>>>>> payload. It works fine with dataflow runner, but throws >>>>>>> IllegalMutationException with direct runner. It does not directly modify >>>>>>> the input value. Therefore, I am guessing that the output is different >>>>>>> when >>>>>>> there are multiple input values. >>>>>>> >>>>>>> The detailed error is as follows. >>>>>>> >>>>>>> Exception in thread "main" >>>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse >>>>>>> Payload >>>>>>> mutated value "DoFnOutputA" after it was output (new value was >>>>>>> "DoFnOutputB"). Values must not be mutated in any way after being >>>>>>> output. >>>>>>> at >>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131) >>>>>>> at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value >>>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding >>>>>>> was >>>>>>> "Base64EncodedA", now "Base64EncodedB". >>>>>>> at >>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158) >>>>>>> at >>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153) >>>>>>> at >>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128) >>>>>>> at >>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127) >>>>>>> ... 10 more >>>>>>> >>>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" >>>>>>> is different from "Base64EncodedB". >>>>>>> >>>>>>> I was wondering if you could give me some advice on the following >>>>>>> questions. >>>>>>> >>>>>>> 1. How can we find the problematic part? I did some unit tests, but >>>>>>> I couldn't reproduce them. >>>>>>> 2. Have you experienced the same error and solved it? >>>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to >>>>>>> use the "enforceImmutability=false" option? >>>>>>> >>>>>>> >>>>>>> Any comments would be appreciated. >>>>>>> >>>>>>> Thanks, >>>>>>> Yuri Jin >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Yuri Jin >>>>> Senior Software Developer, Data Platform >>>>> yuri....@unity3d.com >>>>> (+1) 778-858-3585 <(778)%20858-3585> >>>>> unity.com >>>>> >>>> >> >> -- >> Yuri Jin >> Senior Software Developer, Data Platform >> yuri....@unity3d.com >> (+1) 778-858-3585 <(778)%20858-3585> >> unity.com >> > -- Yuri Jin Senior Software Developer, Data Platform yuri....@unity3d.com (+1) 778-858-3585 unity.com