devdanylo opened a new issue, #3150: URL: https://github.com/apache/parquet-java/issues/3150
### Describe the bug, including details regarding any error messages, version, and platform. Hello folks, I'm currently working on removing the `hadoop-common` dependency from the runtime in one of my projects. As part of this process, I need to replace the "Hadoop" codec factory, which relies on certain classes from `hadoop-common`, with a codec factory that exclusively uses classes from `parquet-hadoop`. One potential option is `DirectCodecFactory`, but it comes with a problem. For demonstration purposes, I’m using a simple key-value Snappy-compressed parquet file containing 51,000 records. Here’s a sample of the data: <img width="308" alt="Image" src="https://github.com/user-attachments/assets/bbe7f94c-9904-4bbb-a157-a7d82e9af54f" /> When attempting to read this file using `DirectCodecFactory`, I encounter two issues: 1. The file fails to read completely. Near the end, it throws an error: `Can't read value in column [key] optional binary key (STRING) = 0 at value 49,534 out of 51,000, 9,534 out of 11,000 in currentPage. Repetition level: 0, definition level: 1.` 2. At record number 40,001, the key and value columns get mixed up, with the key column unexpectedly containing a value. **Note that with the Hadoop codec factory the file can be read without any issues.** Here are tests demonstrating that: ```java package sandbox.parquet.reader; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.conf.PlainParquetConfiguration; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.LocalInputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Map; public class ParquetReaderTest { @Test void readParquetFileAndVerifyRecordCount_usingDirectCodeFactory() throws Exception { Path filePath = Paths.get(ClassLoader.getSystemResource("test.parquet").toURI()); int recordCount = 0; try(ParquetReader<String[]> parquetReader = createReaderWithDirectCodeFactory(filePath)) { while (parquetReader.read() != null) { recordCount++; } } Assertions.assertEquals(51_000, recordCount); } @Test void readParquetFileAndVerifyContent_usingDirectCodeFactory() throws Exception { Path filePath = Paths.get(ClassLoader.getSystemResource("test.parquet").toURI()); int recordCount = 0; try(ParquetReader<String[]> parquetReader = createReaderWithHadoopCodeFactory(filePath)) { String[] record; while ((record = parquetReader.read()) != null) { Assertions.assertEquals("key_" + (recordCount + 1), record[0]); Assertions.assertEquals("value_" + (recordCount + 1), record[1]); recordCount++; } } Assertions.assertEquals(51_000, recordCount); } @Test void readParquetFileAndVerifyRecordCount_usingHadoopCodeFactory() throws Exception { Path filePath = Paths.get(ClassLoader.getSystemResource("test.parquet").toURI()); int recordCount = 0; try(ParquetReader<String[]> parquetReader = createReaderWithHadoopCodeFactory(filePath)) { while (parquetReader.read() != null) { recordCount++; } } Assertions.assertEquals(51_000, recordCount); } @Test void readParquetFileAndVerifyContent_usingHadoopCodeFactory() throws Exception { Path filePath = Paths.get(ClassLoader.getSystemResource("test.parquet").toURI()); int recordCount = 0; try(ParquetReader<String[]> parquetReader = createReaderWithDirectCodeFactory(filePath)) { String[] record; while ((record = parquetReader.read()) != null) { Assertions.assertEquals("key_" + (recordCount + 1), record[0]); Assertions.assertEquals("value_" + (recordCount + 1), record[1]); recordCount++; } } Assertions.assertEquals(51_000, recordCount); } private ParquetReader<String[]> createReaderWithDirectCodeFactory(Path file) throws Exception { return new ParquetReaderBuilder(new LocalInputFile(file), new PlainParquetConfiguration()) .withCodecFactory(CodecFactory.createDirectCodecFactory(null, DirectByteBufferAllocator.getInstance(), ParquetProperties.DEFAULT_PAGE_SIZE)) .build(); } private ParquetReader<String[]> createReaderWithHadoopCodeFactory(Path file) throws Exception { // the Hadoop codec factory is created in ParquetReadOptions.Builder#build return new ParquetReaderBuilder(new LocalInputFile(file), new PlainParquetConfiguration()) .build(); } static class ParquetReaderBuilder extends ParquetReader.Builder<String[]> { ParquetReaderBuilder(InputFile file, ParquetConfiguration conf) { super(file, conf); } @Override protected ReadSupport<String[]> getReadSupport() { return new TestReadSupport(); } } static class TestReadSupport extends ReadSupport<String[]> { @Override public ReadContext init(InitContext context) { return new ReadContext(context.getFileSchema()); } @Override public RecordMaterializer<String[]> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) { return new TestRecordMaterializer(fileSchema); } @Override public RecordMaterializer<String[]> prepareForRead(ParquetConfiguration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) { return new TestRecordMaterializer(fileSchema); } } static class TestRecordMaterializer extends RecordMaterializer<String[]> { private final TestRootGroupConverter root; TestRecordMaterializer(MessageType schema) { this.root = new TestRootGroupConverter(schema); } @Override public String[] getCurrentRecord() { return root.getCurrentRecord(); } @Override public GroupConverter getRootConverter() { return root; } } static class TestRootGroupConverter extends GroupConverter { private String[] currentRecord; private final Converter[] converters; TestRootGroupConverter(GroupType schema) { converters = new Converter[schema.getFieldCount()]; for (int i = 0; i < converters.length; i++) { final Type type = schema.getType(i); if (type.isPrimitive()) { converters[i] = new TestPrimitiveConverter(this, i); } else { throw new RuntimeException("Nested records not supported!"); } } } @Override public Converter getConverter(int fieldIndex) { return converters[fieldIndex]; } @Override public void start() { currentRecord = new String[converters.length]; } @Override public void end() { } String[] getCurrentRecord() { return currentRecord; } } static class TestPrimitiveConverter extends PrimitiveConverter { private final TestRootGroupConverter parent; private final int index; TestPrimitiveConverter(TestRootGroupConverter parent, int index) { this.parent = parent; this.index = index; } @Override public void addBinary(Binary value) { parent.getCurrentRecord()[index] = value.toStringUsingUTF8(); } @Override public void addBoolean(boolean value) { throw new UnsupportedOperationException(); } @Override public void addDouble(double value) { throw new UnsupportedOperationException(); } @Override public void addFloat(float value) { throw new UnsupportedOperationException(); } @Override public void addInt(int value) { throw new UnsupportedOperationException(); } @Override public void addLong(long value) { throw new UnsupportedOperationException(); } } } ``` Attaching the parquet file and a demo application: * [test.parquet.zip](https://github.com/user-attachments/files/18718126/test.parquet.zip) * [parquet-reader.zip](https://github.com/user-attachments/files/18718132/parquet-reader.zip) ### Component(s) Core -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@parquet.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@parquet.apache.org For additional commands, e-mail: issues-h...@parquet.apache.org