Hi,

I suspect that this error is not caused by Flink code (because our
serializer stack is fairly stable, there would be more users reporting such
issues if it was a bug in Flink).
In my experience, these issues are caused by broken serializer
implementations (e.g. a serializer being used by multiple threads causing
issues; or a serializer somebow not being deterministic).

Maybe there's a bug in the "com.spotify.scio.coders.*" code? Have you
checked if these errors are known there?

On Tue, May 3, 2022 at 11:31 PM Yunus Olgun <yunol...@gmail.com> wrote:

> Hi,
>
> We're running a large Flink batch job and sometimes it throws
> serialization errors in the middle of the job. It is always the same
> operator but the error can be different. Then the following attempts work.
> Or sometimes attempts get exhausted, then retrying the job.
>
> The job is basically reading a list of filenames, downloading them from
> GCS, doing a groupBy- reduce and then writing it. The error happens at the
> reducing operator.
>
> We use Flink 1.13.6 and Beam 2.35.
>
> 1 - Do you know what may be going wrong here or how to debug it further?
> 2 - Attempts require reading all data again. Is there any way to fasten
> the recovery time in cases like this?
>
> Thanks,
>
> >>>>>>>>>>>>>> Example stacktrace 1:
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at groupByKey@{xxx}' , caused an error: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492)
>   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.WrappingRuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
>   ... 4 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error obtaining the sorted input: Thread 
> 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>   at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
> Source)
>   at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257)
>   ... 7 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown
>  Source)
>   at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
>  Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>   at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>   at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:118)
>   at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:48)
>   at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:60)
>   at 
> org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:70)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 6 out of bounds 
> for length 3
>   at 
> org.apache.beam.sdk.transforms.windowing.PaneInfo$PaneInfoCoder$Encoding.fromTag(PaneInfo.java:313)
>   at 
> org.apache.beam.sdk.transforms.windowing.PaneInfo$PaneInfoCoder.decode(PaneInfo.java:362)
>   at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:620)
>   at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612)
>   at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558)
>   at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:117)
>   at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:130)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:160)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>   at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>   ... 8 more
> >>>>>>>>>>>>>> Example stacktrace 2:
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at groupByKey@{xxx}' , caused an error: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Expected 0 or 1, got 108
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492)
>   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.WrappingRuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Expected 0 or 1, got 108
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
>   ... 4 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error obtaining the sorted input: Thread 
> 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, 
> got 108
>   at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
> Source)
>   at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257)
>   ... 7 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 
> or 1, got 108
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown
>  Source)
>   at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
>  Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: Expected 0 or 1, got 108
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
> Caused by: java.io.IOException: Expected 0 or 1, got 108
>   at Due to Exception while trying to `decode` an instance of 
> com.xxx.MetricValue: Can't decode field point.(:0)
>   at org.apache.beam.sdk.coders.BooleanCoder.decode(BooleanCoder.java:48)
>   at 
> com.spotify.scio.coders.instances.OptionCoder.decode(ScalaCoders.scala:149)
>   at 
> com.spotify.scio.coders.instances.OptionCoder.decode(ScalaCoders.scala:140)
>   at 
> com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272)
>   at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259)
>   at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272)
>   at 
> com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply$mcV$sp(Coder.scala:344)
>   at 
> com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply(Coder.scala:344)
>   at 
> com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply(Coder.scala:344)
>   at com.spotify.scio.coders.RecordCoder.onErrorMsg(Coder.scala:314)
>   at com.spotify.scio.coders.RecordCoder.decode(Coder.scala:344)
>   at 
> com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272)
>   at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259)
>   at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272)
>   at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184)
>   at 
> com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272)
>   at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259)
>   at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272)
>   at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>   at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
>   at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
>   at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
>   at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612)
>   at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558)
>   at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:117)
>   at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:130)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:160)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>   at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>   at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:118)
>   at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:48)
>   at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:60)
>   at 
> org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:70)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73)
>   at ### Coder materialization stack ###.(:0)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:61)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:67)
>   at 
> com.spotify.scio.coders.CoderMaterializer$$anonfun$beamImpl$1.apply(CoderMaterializer.scala:74)
>   at 
> com.spotify.scio.coders.CoderMaterializer$$anonfun$beamImpl$1.apply(CoderMaterializer.scala:74)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at ### Coder materialization stack ###.(:0)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:76)
>   at com.spotify.scio.coders.LazyCoder.bcoder$lzycompute(Coder.scala:182)
>   at com.spotify.scio.coders.LazyCoder.bcoder(Coder.scala:182)
>   at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184)
>   at 
> com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272)
>   at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259)
>   at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272)
>   at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>   at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
>   at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
>   at ### Coder materialization stack ###.(:0)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:76)
>   at com.spotify.scio.coders.LazyCoder.bcoder$lzycompute(Coder.scala:182)
>   at com.spotify.scio.coders.LazyCoder.bcoder(Coder.scala:182)
>   at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184)
>   at 
> com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272)
>   at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259)
>   at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272)
>   at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>   at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
>   at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
>   at ### Coder materialization stack ###.(:0)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:61)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:49)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:35)
>   at 
> com.spotify.scio.coders.CoderMaterializer$.kvCoder(CoderMaterializer.scala:99)
>   at 
> com.spotify.scio.values.PairSCollectionFunctions.toKV(PairSCollectionFunctions.scala:62)
>   at 
> com.spotify.scio.values.PairSCollectionFunctions$$anonfun$applyPerKey$1.apply(PairSCollectionFunctions.scala:68)
>   at 
> com.spotify.scio.values.PairSCollectionFunctions$$anonfun$applyPerKey$1.apply(PairSCollectionFunctions.scala:71)
>   at 
> com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:262)
>   at 
> com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:262)
>   at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:271)
>
>
> >>>>>>>>>>>>>> Example stacktrace 3:
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at groupByKey@{xxx}' , caused an error: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Network stream corrupted: received incorrect magic number. 
> (connection to 'xxx/xxx:33127')
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492)
>   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.WrappingRuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Network stream corrupted: received incorrect magic number. 
> (connection to 'xxx/xxx:33127')
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
>   ... 4 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error obtaining the sorted input: Thread 
> 'SortMerger Reading Thread' terminated due to an exception: Network stream 
> corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127')
>   at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
> Source)
>   at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257)
>   ... 7 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Network 
> stream corrupted: received incorrect magic number. (connection to 
> 'xxx/xxx:33127')
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown
>  Source)
>   at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
>  Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: Network stream corrupted: received incorrect magic 
> number. (connection to 'xxx/xxx:33127')
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> Network stream corrupted: received incorrect magic number. (connection to 
> 'xxx/xxx:33127')
>   at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:201)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:381)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.IllegalStateException: Network stream corrupted: 
> received incorrect magic number.
>   at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.decodeFrameHeader(NettyMessageClientDecoderDelegate.java:141)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:118)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>   ... 12 more
>

Reply via email to