Hello fellow Flinksters,

I currently work on implementing Stale Synchronous Parallel iterations
from the current bulk iterations. I have replacement classes for
IterationHeadPactTask, IterationSynchronizationTask and corresponding
event handlers to do the job. Among the generated events, I have
ClockTaskEvent that inherits from IterationEventWithAggregators and
adds an Int member. I have implemented the write and read method
accordingly and written serialization tests accordingly, inspired by
EventAggregatorsTest.java. The tests pass and everything runs well on
a local setup.

Now, when run on a cluster, I encounter the following error:

java.io.IOException: io.netty.handler.codec.DecoderException:
java.lang.RuntimeException: Error while deserializing event.
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at
org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231)
at
org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException:
java.lang.RuntimeException: Error while deserializing event.
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
Caused by: java.lang.RuntimeException: Error while deserializing
event.
at
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78)
at
org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458)
at
org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146)
at
org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 13 more
Caused by: java.io.EOFException
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141)
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130)
at
org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168)
at
org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83)
at
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73)
... 17 more

What am I missing here? Should I register the new event ClockTaskEvent
to some serializer somewhere? Also, these lines bother me:
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)

Why is it going through the getNextBuffer method since ClockTaskEvent
is an event and not a buffer?

Thanks and best regards,

Tran Nam-Luc


Reply via email to