This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a87cdb2b6 [parquet] Minor refactor schemaCache in ParquetReaderFactory
2a87cdb2b6 is described below

commit 2a87cdb2b6eb8ae99b6ab17728654748f04d95c5
Author: JingsongLi <[email protected]>
AuthorDate: Fri Mar 20 09:53:43 2026 +0800

    [parquet] Minor refactor schemaCache in ParquetReaderFactory
---
 .../format/parquet/ParquetReaderFactory.java       | 73 +++++++++++++++-------
 1 file changed, 50 insertions(+), 23 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 42d926bc89..6da6ccc933 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -58,6 +58,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -82,7 +83,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     @Nullable private final FilterCompat.Filter filter;
 
     /**
-     * Cache: fileSchema -> (requestedSchema, parquetFields).
+     * Cache: fileSchema -> requestedSchema.
      *
      * <p>Within one factory instance the readType is fixed, so the result of 
{@code
      * clipParquetSchema(fileSchema)} is deterministic for a given {@code 
fileSchema}. Most
@@ -91,7 +92,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
      * (rather than assuming all files have the same schema) keeps correctness 
for edge cases such
      * as externally-migrated Parquet files whose on-disk schema may vary.
      */
-    private final Map<MessageType, Pair<MessageType, List<ParquetField>>> 
schemaCache =
+    private final Map<MessageType, RequestedSchema> requestedSchemaCache =
             new ConcurrentHashMap<>();
 
     public ParquetReaderFactory(
@@ -117,46 +118,46 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                         builder.build(),
                         context.selection());
         MessageType fileSchema = reader.getFileMetaData().getSchema();
-
-        // clipParquetSchema and buildFieldsList are pure functions of 
(readFields, fileSchema).
-        // Cache the result keyed by fileSchema so that files sharing the same 
on-disk schema
-        // within this factory instance avoid redundant computation. Keying by 
fileSchema (rather
-        // than a simple "compute once" flag) correctly handles edge cases 
where different files
-        // read by the same factory instance may have different on-disk 
schemas, e.g. externally
-        // migrated Parquet files.
-        Pair<MessageType, List<ParquetField>> cached =
-                schemaCache.computeIfAbsent(
-                        fileSchema,
-                        fs -> {
-                            MessageType rs = clipParquetSchema(fs);
-                            MessageColumnIO columnIO = new 
ColumnIOFactory().getColumnIO(rs);
-                            List<ParquetField> f = buildFieldsList(readFields, 
columnIO, rs);
-                            return Pair.of(rs, f);
-                        });
-        MessageType requestedSchema = cached.getLeft();
-        List<ParquetField> fields = cached.getRight();
+        RequestedSchema requestedSchema = 
getOrCreateRequestedSchema(fileSchema);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Create reader of the parquet file {}, the fileSchema is 
{}, the requestedSchema is {}.",
                     context.filePath(),
                     fileSchema,
-                    requestedSchema);
+                    requestedSchema.messageType);
         }
 
-        reader.setRequestedSchema(requestedSchema);
+        reader.setRequestedSchema(requestedSchema.messageType);
         WritableColumnVector[] writableVectors = createWritableVectors();
 
         return new VectorizedParquetRecordReader(
                 context.filePath(),
                 reader,
                 fileSchema,
-                fields,
+                requestedSchema.fields,
                 writableVectors,
                 batchSize,
                 context.fileIO());
     }
 
+    private RequestedSchema getOrCreateRequestedSchema(MessageType fileSchema) 
{
+        // clipParquetSchema and buildFieldsList are pure functions of 
(readFields, fileSchema).
+        // Cache the result keyed by fileSchema so that files sharing the same 
on-disk schema
+        // within this factory instance avoid redundant computation. Keying by 
fileSchema (rather
+        // than a simple "compute once" flag) correctly handles edge cases 
where different files
+        // read by the same factory instance may have different on-disk 
schemas, e.g. externally
+        // migrated Parquet files.
+        return requestedSchemaCache.computeIfAbsent(fileSchema, 
this::createRequestedSchema);
+    }
+
+    private RequestedSchema createRequestedSchema(MessageType fileSchema) {
+        MessageType rs = clipParquetSchema(fileSchema);
+        MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(rs);
+        List<ParquetField> f = buildFieldsList(readFields, columnIO, rs);
+        return new RequestedSchema(rs, f);
+    }
+
     /** Clips `parquetSchema` according to `fieldNames`. */
     private MessageType clipParquetSchema(GroupType parquetSchema) {
         Type[] types = new Type[readFields.length];
@@ -310,4 +311,30 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         }
         return columns;
     }
+
+    private static class RequestedSchema {
+
+        private final MessageType messageType;
+        private final List<ParquetField> fields;
+
+        private RequestedSchema(MessageType messageType, List<ParquetField> 
fields) {
+            this.messageType = messageType;
+            this.fields = fields;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            RequestedSchema that = (RequestedSchema) o;
+            return Objects.equals(messageType, that.messageType)
+                    && Objects.equals(fields, that.fields);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(messageType, fields);
+        }
+    }
 }

Reply via email to