Hi, We're running a large Flink batch job and sometimes it throws serialization errors in the middle of the job. It is always the same operator but the error can be different. Then the following attempts work. Or sometimes attempts get exhausted, then retrying the job.
The job is basically reading a list of filenames, downloading them from GCS, doing a groupBy- reduce and then writing it. The error happens at the reducing operator. We use Flink 1.13.6 and Beam 2.35. 1 - Do you know what may be going wrong here or how to debug it further? 2 - Attempts require reading all data again. Is there any way to fasten the recovery time in cases like this? Thanks, >>>>>>>>>>>>>> Example stacktrace 1: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at groupByKey@{xxx}' , caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.WrappingRuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) ... 4 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257) ... 7 more Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392) at org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:118) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:48) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:60) at org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:70) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 6 out of bounds for length 3 at org.apache.beam.sdk.transforms.windowing.PaneInfo$PaneInfoCoder$Encoding.fromTag(PaneInfo.java:313) at org.apache.beam.sdk.transforms.windowing.PaneInfo$PaneInfoCoder.decode(PaneInfo.java:362) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:620) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:117) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:130) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:160) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) ... 8 more >>>>>>>>>>>>>> Example stacktrace 2: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at groupByKey@{xxx}' , caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, got 108 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.WrappingRuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, got 108 at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) ... 4 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, got 108 at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257) ... 7 more Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, got 108 at org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392) at org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Expected 0 or 1, got 108 at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) Caused by: java.io.IOException: Expected 0 or 1, got 108 at Due to Exception while trying to `decode` an instance of com.xxx.MetricValue: Can't decode field point.(:0) at org.apache.beam.sdk.coders.BooleanCoder.decode(BooleanCoder.java:48) at com.spotify.scio.coders.instances.OptionCoder.decode(ScalaCoders.scala:149) at com.spotify.scio.coders.instances.OptionCoder.decode(ScalaCoders.scala:140) at com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) at com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply$mcV$sp(Coder.scala:344) at com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply(Coder.scala:344) at com.spotify.scio.coders.RecordCoder$$anonfun$decode$3.apply(Coder.scala:344) at com.spotify.scio.coders.RecordCoder.onErrorMsg(Coder.scala:314) at com.spotify.scio.coders.RecordCoder.decode(Coder.scala:344) at com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184) at com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:117) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:130) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:160) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:118) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:48) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:60) at org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:70) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73) at ### Coder materialization stack ###.(:0) at com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:61) at com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:67) at com.spotify.scio.coders.CoderMaterializer$$anonfun$beamImpl$1.apply(CoderMaterializer.scala:74) at com.spotify.scio.coders.CoderMaterializer$$anonfun$beamImpl$1.apply(CoderMaterializer.scala:74) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at ### Coder materialization stack ###.(:0) at com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:76) at com.spotify.scio.coders.LazyCoder.bcoder$lzycompute(Coder.scala:182) at com.spotify.scio.coders.LazyCoder.bcoder(Coder.scala:182) at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184) at com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37) at ### Coder materialization stack ###.(:0) at com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:76) at com.spotify.scio.coders.LazyCoder.bcoder$lzycompute(Coder.scala:182) at com.spotify.scio.coders.LazyCoder.bcoder(Coder.scala:182) at com.spotify.scio.coders.LazyCoder.decode(Coder.scala:184) at com.spotify.scio.coders.WrappedBCoder$$anonfun$decode$1.apply(Coder.scala:272) at com.spotify.scio.coders.WrappedBCoder.catching(Coder.scala:259) at com.spotify.scio.coders.WrappedBCoder.decode(Coder.scala:272) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37) at ### Coder materialization stack ###.(:0) at com.spotify.scio.coders.CoderMaterializer$.beamImpl(CoderMaterializer.scala:61) at com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:49) at com.spotify.scio.coders.CoderMaterializer$.beam(CoderMaterializer.scala:35) at com.spotify.scio.coders.CoderMaterializer$.kvCoder(CoderMaterializer.scala:99) at com.spotify.scio.values.PairSCollectionFunctions.toKV(PairSCollectionFunctions.scala:62) at com.spotify.scio.values.PairSCollectionFunctions$$anonfun$applyPerKey$1.apply(PairSCollectionFunctions.scala:68) at com.spotify.scio.values.PairSCollectionFunctions$$anonfun$applyPerKey$1.apply(PairSCollectionFunctions.scala:71) at com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:262) at com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:262) at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:271) >>>>>>>>>>>>>> Example stacktrace 3: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at groupByKey@{xxx}' , caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Network stream corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.WrappingRuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Network stream corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) ... 4 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Network stream corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257) ... 7 more Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Network stream corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') at org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392) at org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Network stream corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Network stream corrupted: received incorrect magic number. (connection to 'xxx/xxx:33127') at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:201) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:381) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.IllegalStateException: Network stream corrupted: received incorrect magic number. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.decodeFrameHeader(NettyMessageClientDecoderDelegate.java:141) at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:118) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ... 12 more