Anyone seen this before: Caused by: java.io.IOException: Received an event in channel 0 while still having data from a record. This indicates broken serialization logic. If you are using custom serialization code (Writable or Value types), check their serialization routines. In the case of Kryo, check the respective Kryo serializer. at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:98) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:190) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745)
We're on 1.1.4 right now. We're reading parquet file using code like this: AvroParquetInputFormat<GenericRecord> inputFormat = new AvroParquetInputFormat<GenericRecord>(); AvroParquetInputFormat.setAvroReadSchema(job, getMergeSchema(storeName, datasetName)); // Get patch of input parquet file DatasetHdfsInfo info = getLastCompleteMergedDatasetHDFSInfo(storeName, datasetName); Path path = new Path(info.getRootDir()); DataSet<Tuple2<Void, GenericRecord>> d = getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, GenericRecord.class, path.toString(), job);