How were the Parquet files you are trying to read generated? Same
version of libraries? I am successfully using the following Scala code
to read Parquet files using the HadoopInputFormat wrapper. Maybe try
that in Java?
val hadoopInputFormat =
new HadoopInputFormat[Void, GenericRecord](new
AvroParquetInputFormat,classOf[Void],classOf[GenericRecord], job)
AvroParquetInputFormat.setAvroReadSchema(job, EventOnlyRecord.getClassSchema)
//APIF extends ParquetInputFormat which extends FileInputFormat (FIP)
//addInputPath is a static method on FIP. val inputPath =new Path(input)
FileInputFormat.addInputPath(job, inputPath)
val rawEvents: DataSet[(Void, GenericRecord)] =
env.createInput(hadoopInputFormat)
On 01/11/2017 03:16 PM, Newport, Billy wrote:
Anyone seen this before:
Caused by: _java.io.IOException_: Received an event in channel 0 while
still having data from a record. This indicates broken serialization
logic. If you are using custom serialization code (Writable or Value
types), check their serialization routines. In the case of Kryo, check
the respective Kryo serializer.
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(_AbstractRecordReader.java:98_)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(_MutableRecordReader.java:42_)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(_ReaderIterator.java:73_)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(_DataSinkTask.java:190_)
at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:642_)
at java.lang.Thread.run(_Thread.java:745_)
We’re on 1.1.4 right now. We’re reading parquet file using code like this:
AvroParquetInputFormat<GenericRecord> inputFormat=
*new*AvroParquetInputFormat<GenericRecord>();
AvroParquetInputFormat./setAvroReadSchema/(job,
getMergeSchema(storeName, datasetName));
// Get patch of input parquet file
DatasetHdfsInfo info= getLastCompleteMergedDatasetHDFSInfo(storeName,
datasetName);
Path path= *new*Path(info.getRootDir());
DataSet<Tuple2<Void, GenericRecord>> d=
getExecutionEnvironment().readHadoopFile(inputFormat, Void.*class*,
GenericRecord.*class*, path.toString(), job);