devdanylo opened a new issue, #3150:
URL: https://github.com/apache/parquet-java/issues/3150

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Hello folks,
   
   I'm currently working on removing the `hadoop-common` dependency from the 
runtime in one of my projects.
   
   As part of this process, I need to replace the "Hadoop" codec factory, which 
relies on certain classes from `hadoop-common`, with a codec factory that 
exclusively uses classes from `parquet-hadoop`. One potential option is 
`DirectCodecFactory`, but it comes with a problem.
   
   For demonstration purposes, I’m using a simple key-value Snappy-compressed 
parquet file containing 51,000 records. Here’s a sample of the data:
   <img width="308" alt="Image" 
src="https://github.com/user-attachments/assets/bbe7f94c-9904-4bbb-a157-a7d82e9af54f";
 />
   
   When attempting to read this file using `DirectCodecFactory`, I encounter 
two issues:
   1. The file fails to read completely. Near the end, it throws an error: 
`Can't read value in column [key] optional binary key (STRING) = 0 at value 
49,534 out of 51,000, 9,534 out of 11,000 in currentPage. Repetition level: 0, 
definition level: 1.`
   2. At record number 40,001, the key and value columns get mixed up, with the 
key column unexpectedly containing a value.
   
   **Note that with the Hadoop codec factory the file can be read without any 
issues.** Here are tests demonstrating that:
   ```java
   package sandbox.parquet.reader;
   
   import org.apache.hadoop.conf.Configuration;
   import org.apache.parquet.bytes.DirectByteBufferAllocator;
   import org.apache.parquet.column.ParquetProperties;
   import org.apache.parquet.conf.ParquetConfiguration;
   import org.apache.parquet.conf.PlainParquetConfiguration;
   import org.apache.parquet.hadoop.CodecFactory;
   import org.apache.parquet.hadoop.ParquetReader;
   import org.apache.parquet.hadoop.api.InitContext;
   import org.apache.parquet.hadoop.api.ReadSupport;
   import org.apache.parquet.io.InputFile;
   import org.apache.parquet.io.LocalInputFile;
   import org.apache.parquet.io.api.Binary;
   import org.apache.parquet.io.api.Converter;
   import org.apache.parquet.io.api.GroupConverter;
   import org.apache.parquet.io.api.PrimitiveConverter;
   import org.apache.parquet.io.api.RecordMaterializer;
   import org.apache.parquet.schema.GroupType;
   import org.apache.parquet.schema.MessageType;
   import org.apache.parquet.schema.Type;
   import org.junit.jupiter.api.Assertions;
   import org.junit.jupiter.api.Test;
   
   import java.nio.file.Path;
   import java.nio.file.Paths;
   import java.util.Map;
   
   public class ParquetReaderTest {
   
       @Test
       void readParquetFileAndVerifyRecordCount_usingDirectCodeFactory() throws 
Exception {
           Path filePath = 
Paths.get(ClassLoader.getSystemResource("test.parquet").toURI());
           int recordCount = 0;
           try(ParquetReader<String[]> parquetReader = 
createReaderWithDirectCodeFactory(filePath)) {
               while (parquetReader.read() != null) {
                   recordCount++;
               }
           }
   
           Assertions.assertEquals(51_000, recordCount);
       }
   
       @Test
       void readParquetFileAndVerifyContent_usingDirectCodeFactory() throws 
Exception {
           Path filePath = 
Paths.get(ClassLoader.getSystemResource("test.parquet").toURI());
           int recordCount = 0;
           try(ParquetReader<String[]> parquetReader = 
createReaderWithHadoopCodeFactory(filePath)) {
               String[] record;
               while ((record = parquetReader.read()) != null) {
                   Assertions.assertEquals("key_" + (recordCount + 1), 
record[0]);
                   Assertions.assertEquals("value_" + (recordCount + 1), 
record[1]);
                   recordCount++;
               }
           }
   
           Assertions.assertEquals(51_000, recordCount);
       }
   
       @Test
       void readParquetFileAndVerifyRecordCount_usingHadoopCodeFactory() throws 
Exception {
           Path filePath = 
Paths.get(ClassLoader.getSystemResource("test.parquet").toURI());
           int recordCount = 0;
           try(ParquetReader<String[]> parquetReader = 
createReaderWithHadoopCodeFactory(filePath)) {
               while (parquetReader.read() != null) {
                   recordCount++;
               }
           }
   
           Assertions.assertEquals(51_000, recordCount);
       }
   
       @Test
       void readParquetFileAndVerifyContent_usingHadoopCodeFactory() throws 
Exception {
           Path filePath = 
Paths.get(ClassLoader.getSystemResource("test.parquet").toURI());
           int recordCount = 0;
           try(ParquetReader<String[]> parquetReader = 
createReaderWithDirectCodeFactory(filePath)) {
               String[] record;
               while ((record = parquetReader.read()) != null) {
                   Assertions.assertEquals("key_" + (recordCount + 1), 
record[0]);
                   Assertions.assertEquals("value_" + (recordCount + 1), 
record[1]);
                   recordCount++;
               }
           }
   
           Assertions.assertEquals(51_000, recordCount);
       }
   
       private ParquetReader<String[]> createReaderWithDirectCodeFactory(Path 
file) throws Exception {
           return new ParquetReaderBuilder(new LocalInputFile(file), new 
PlainParquetConfiguration())
                   .withCodecFactory(CodecFactory.createDirectCodecFactory(null,
                           DirectByteBufferAllocator.getInstance(),
                           ParquetProperties.DEFAULT_PAGE_SIZE))
                   .build();
       }
   
       private ParquetReader<String[]> createReaderWithHadoopCodeFactory(Path 
file) throws Exception {
           // the Hadoop codec factory is created in 
ParquetReadOptions.Builder#build
           return new ParquetReaderBuilder(new LocalInputFile(file), new 
PlainParquetConfiguration())
                   .build();
       }
   
   
       static class ParquetReaderBuilder extends 
ParquetReader.Builder<String[]> {
   
           ParquetReaderBuilder(InputFile file, ParquetConfiguration conf) {
               super(file, conf);
           }
   
           @Override
           protected ReadSupport<String[]> getReadSupport() {
               return new TestReadSupport();
           }
       }
   
       static class TestReadSupport extends ReadSupport<String[]> {
   
   
           @Override
           public ReadContext init(InitContext context) {
               return new ReadContext(context.getFileSchema());
           }
   
           @Override
           public RecordMaterializer<String[]> prepareForRead(Configuration 
configuration,
                                                              Map<String, 
String> keyValueMetaData,
                                                              MessageType 
fileSchema,
                                                              ReadContext 
readContext) {
               return new TestRecordMaterializer(fileSchema);
           }
   
           @Override
           public RecordMaterializer<String[]> 
prepareForRead(ParquetConfiguration configuration,
                                                              Map<String, 
String> keyValueMetaData,
                                                              MessageType 
fileSchema,
                                                              ReadContext 
readContext) {
               return new TestRecordMaterializer(fileSchema);
           }
       }
   
       static class TestRecordMaterializer extends RecordMaterializer<String[]> 
{
   
           private final TestRootGroupConverter root;
   
           TestRecordMaterializer(MessageType schema) {
               this.root = new TestRootGroupConverter(schema);
           }
   
           @Override
           public String[] getCurrentRecord() {
               return root.getCurrentRecord();
           }
   
           @Override
           public GroupConverter getRootConverter() {
               return root;
           }
       }
   
       static class TestRootGroupConverter extends GroupConverter {
           private String[] currentRecord;
           private final Converter[] converters;
   
           TestRootGroupConverter(GroupType schema) {
               converters = new Converter[schema.getFieldCount()];
   
               for (int i = 0; i < converters.length; i++) {
                   final Type type = schema.getType(i);
                   if (type.isPrimitive()) {
                       converters[i] = new TestPrimitiveConverter(this, i);
                   } else {
                       throw new RuntimeException("Nested records not 
supported!");
                   }
               }
           }
   
           @Override
           public Converter getConverter(int fieldIndex) {
               return converters[fieldIndex];
           }
   
           @Override
           public void start() {
               currentRecord = new String[converters.length];
           }
   
           @Override
           public void end() {
           }
   
           String[] getCurrentRecord() {
               return currentRecord;
           }
       }
   
       static class TestPrimitiveConverter extends PrimitiveConverter {
   
           private final TestRootGroupConverter parent;
           private final int index;
   
           TestPrimitiveConverter(TestRootGroupConverter parent, int index) {
               this.parent = parent;
               this.index = index;
           }
   
           @Override
           public void addBinary(Binary value) {
               parent.getCurrentRecord()[index] = value.toStringUsingUTF8();
           }
   
           @Override
           public void addBoolean(boolean value) {
               throw new UnsupportedOperationException();
           }
   
           @Override
           public void addDouble(double value) {
               throw new UnsupportedOperationException();
           }
   
           @Override
           public void addFloat(float value) {
               throw new UnsupportedOperationException();
           }
   
           @Override
           public void addInt(int value) {
               throw new UnsupportedOperationException();
           }
   
           @Override
           public void addLong(long value) {
               throw new UnsupportedOperationException();
           }
       }
   }
   ```
   
   Attaching the parquet file and a demo application:
   * 
[test.parquet.zip](https://github.com/user-attachments/files/18718126/test.parquet.zip)
   * 
[parquet-reader.zip](https://github.com/user-attachments/files/18718132/parquet-reader.zip)
   
   ### Component(s)
   
   Core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@parquet.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@parquet.apache.org
For additional commands, e-mail: issues-h...@parquet.apache.org

Reply via email to