Hi, I suspect that this error is not caused by Flink code (because our serializer stack is fairly stable, there would be more users reporting such issues if it was a bug in Flink). In my experience, these issues are caused by broken serializer implementations (e.g. a serializer being used by multiple threads causing issues; or a serializer somebow not being deterministic).
Maybe there's a bug in the "com.spotify.scio.coders.*" code? Have you checked if these errors are known there? On Tue, May 3, 2022 at 11:31 PM Yunus Olgun <yunol...@gmail.com> wrote: > Hi, > > We're running a large Flink batch job and sometimes it throws > serialization errors in the middle of the job. It is always the same > operator but the error can be different. Then the following attempts work. > Or sometimes attempts get exhausted, then retrying the job. > > The job is basically reading a list of filenames, downloading them from > GCS, doing a groupBy- reduce and then writing it. The error happens at the > reducing operator. > > We use Flink 1.13.6 and Beam 2.35. > > 1 - Do you know what may be going wrong here or how to debug it further? > 2 - Attempts require reading all data again. Is there any way to fasten > the recovery time in cases like this? > > Thanks, > > >>>>>>>>>>>>>> Example stacktrace 1: > > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at groupByKey@{xxx}' , caused an error: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due > to an exception: Serializer consumed more bytes than the record had. This > indicates broken serialization. If you are using custom serialization types > (Value or Writable), check their serialization methods. If you are using a > Kryo-serialized type, check the corresponding Kryo serializer. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.WrappingRuntimeException: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due > to an exception: Serializer consumed more bytes than the record had. This > indicates broken serialization. If you are using custom serialization types > (Value or Writable), check their serialization methods. If you are using a > Kryo-serialized type, check the corresponding Kryo serializer. > at > org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) > ... 4 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Error obtaining the sorted input: Thread > 'SortMerger Reading Thread' terminated due to an exception: Serializer > consumed more bytes than the record had. This indicates broken serialization. > If you are using custom serialization types (Value or Writable), check their > serialization methods. If you are using a Kryo-serialized type, check the > corresponding Kryo serializer. > at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown > Source) > at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) > at > org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257) > ... 7 more > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer > consumed more bytes than the record had. This indicates broken serialization. > If you are using custom serialization types (Value or Writable), check their > serialization methods. If you are using a Kryo-serialized type, check the > corresponding Kryo serializer. > at > org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254) > at > java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown > Source) > at > java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown > Source) > at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown > Source) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) > at > org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392) > at > org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated > due to an exception: Serializer consumed more bytes than the record had. This > indicates broken serialization. If you are using custom serialization types > (Value or Writable), check their serialization methods. If you are using a > Kryo-serialized type, check the corresponding Kryo serializer. > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) > Caused by: java.io.IOException: Serializer consumed more bytes than the > record had. This indicates broken serialization. If you are using custom > serialization types (Value or Writable), check their serialization methods. > If you are using a Kryo-serialized type, check the corresponding Kryo > serializer. > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:118) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:48) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:60) > at > org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:70) > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73) > Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 6 out of bounds > for length 3 > at > org.apache.beam.sdk.transforms.windowing.PaneInfo$PaneInfoCoder$Encoding.fromTag(PaneInfo.java:313) > at > org.apache.beam.sdk.transforms.windowing.PaneInfo$PaneInfoCoder.decode(PaneInfo.java:362) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:620) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:117) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:130) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:160) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > ... 8 more > >>>>>>>>>>>>>> Example stacktrace 2: > > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at groupByKey@{xxx}' , caused an error: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due > to an exception: Expected 0 or 1, got 108 > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.WrappingRuntimeException: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due > to an exception: Expected 0 or 1, got 108 > at > org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) > ... 4 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Error obtaining the sorted input: Thread > 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, > got 108 > at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown > Source) > at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) > at > org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257) > ... 7 more > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 > or 1, got 108 > at > org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254) > at > java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown > Source) > at > java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown > Source) > at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown > Source) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) > at > org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392) > at > org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated > due to an exception: Expected 0 or 1, got 108 > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) > Caused by: java.io.IOException: Expected 0 or 1, got 108 > at Due to Exception while trying to `decode` an instance of > com.xxx.MetricValue: Can't decode field point.(:0) > at org.apache.beam.sdk.coders.BooleanCoder.decode(BooleanCoder.java:48) > at > com.spotify.scio.coders.instances.OptionCoder.decode(ScalaCoders.scala:149) > at > com.spotify.scio.coders.instances.OptionCoder.decode(ScalaCoders.scala:140) > at > com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) > at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) > at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) > at > com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply$mcV$sp(Coder.scala:344) > at > com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply(Coder.scala:344) > at > com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply(Coder.scala:344) > at com.spotify.scio.coders.RecordCoder.onErrorMsg(Coder.scala:314) > at com.spotify.scio.coders.RecordCoder.decode(Coder.scala:344) > at > com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) > at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) > at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) > at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184) > at > com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) > at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) > at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:117) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:130) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:160) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:118) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:48) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:60) > at > org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:70) > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73) > at ### Coder materialization stack ###.(:0) > at > com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:61) > at > com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:67) > at > com.spotify.scio.coders.CoderMaterializer$$anonfun$beamImpl$1.apply(CoderMaterializer.scala:74) > at > com.spotify.scio.coders.CoderMaterializer$$anonfun$beamImpl$1.apply(CoderMaterializer.scala:74) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at ### Coder materialization stack ###.(:0) > at > com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:76) > at com.spotify.scio.coders.LazyCoder.bcoder$lzycompute(Coder.scala:182) > at com.spotify.scio.coders.LazyCoder.bcoder(Coder.scala:182) > at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184) > at > com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) > at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) > at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37) > at ### Coder materialization stack ###.(:0) > at > com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:76) > at com.spotify.scio.coders.LazyCoder.bcoder$lzycompute(Coder.scala:182) > at com.spotify.scio.coders.LazyCoder.bcoder(Coder.scala:182) > at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184) > at > com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) > at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) > at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37) > at ### Coder materialization stack ###.(:0) > at > com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:61) > at > com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:49) > at > com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:35) > at > com.spotify.scio.coders.CoderMaterializer$.kvCoder(CoderMaterializer.scala:99) > at > com.spotify.scio.values.PairSCollectionFunctions.toKV(PairSCollectionFunctions.scala:62) > at > com.spotify.scio.values.PairSCollectionFunctions$$anonfun$applyPerKey$1.apply(PairSCollectionFunctions.scala:68) > at > com.spotify.scio.values.PairSCollectionFunctions$$anonfun$applyPerKey$1.apply(PairSCollectionFunctions.scala:71) > at > com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:262) > at > com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:262) > at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:271) > > > >>>>>>>>>>>>>> Example stacktrace 3: > > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at groupByKey@{xxx}' , caused an error: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due > to an exception: Network stream corrupted: received incorrect magic number. > (connection to 'xxx/xxx:33127') > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.WrappingRuntimeException: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due > to an exception: Network stream corrupted: received incorrect magic number. > (connection to 'xxx/xxx:33127') > at > org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) > ... 4 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Error obtaining the sorted input: Thread > 'SortMerger Reading Thread' terminated due to an exception: Network stream > corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') > at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown > Source) > at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) > at > org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257) > ... 7 more > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: Network > stream corrupted: received incorrect magic number. (connection to > 'xxx/xxx:33127') > at > org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254) > at > java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown > Source) > at > java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown > Source) > at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown > Source) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) > at > org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392) > at > org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated > due to an exception: Network stream corrupted: received incorrect magic > number. (connection to 'xxx/xxx:33127') > at > org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) > Caused by: > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > Network stream corrupted: received incorrect magic number. (connection to > 'xxx/xxx:33127') > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:201) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:381) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.IllegalStateException: Network stream corrupted: > received incorrect magic number. > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > at > org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.decodeFrameHeader(NettyMessageClientDecoderDelegate.java:141) > at > org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:118) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > ... 12 more >