Anyone seen this before:

Caused by: java.io.IOException: Received an event in channel 0 while still 
having data from a record. This indicates broken serialization logic. If you 
are using custom serialization code (Writable or Value types), check their 
serialization routines. In the case of Kryo, check the respective Kryo 
serializer.
     at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:98)
     at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
     at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
     at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:190)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
     at java.lang.Thread.run(Thread.java:745)


We're on 1.1.4 right now. We're reading parquet file using code like this:

           AvroParquetInputFormat<GenericRecord> inputFormat = new 
AvroParquetInputFormat<GenericRecord>();
           AvroParquetInputFormat.setAvroReadSchema(job, 
getMergeSchema(storeName, datasetName));

           // Get patch of input parquet file
           DatasetHdfsInfo info = 
getLastCompleteMergedDatasetHDFSInfo(storeName, datasetName);

           Path path = new Path(info.getRootDir());

           DataSet<Tuple2<Void, GenericRecord>> d = 
getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, 
GenericRecord.class, path.toString(), job);



Reply via email to