This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2a87cdb2b6 [parquet] Minor refactor schemaCache in ParquetReaderFactory
2a87cdb2b6 is described below
commit 2a87cdb2b6eb8ae99b6ab17728654748f04d95c5
Author: JingsongLi <[email protected]>
AuthorDate: Fri Mar 20 09:53:43 2026 +0800
[parquet] Minor refactor schemaCache in ParquetReaderFactory
---
.../format/parquet/ParquetReaderFactory.java | 73 +++++++++++++++-------
1 file changed, 50 insertions(+), 23 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 42d926bc89..6da6ccc933 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -58,6 +58,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -82,7 +83,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
@Nullable private final FilterCompat.Filter filter;
/**
- * Cache: fileSchema -> (requestedSchema, parquetFields).
+ * Cache: fileSchema -> requestedSchema.
*
* <p>Within one factory instance the readType is fixed, so the result of
{@code
* clipParquetSchema(fileSchema)} is deterministic for a given {@code
fileSchema}. Most
@@ -91,7 +92,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
* (rather than assuming all files have the same schema) keeps correctness
for edge cases such
* as externally-migrated Parquet files whose on-disk schema may vary.
*/
- private final Map<MessageType, Pair<MessageType, List<ParquetField>>>
schemaCache =
+ private final Map<MessageType, RequestedSchema> requestedSchemaCache =
new ConcurrentHashMap<>();
public ParquetReaderFactory(
@@ -117,46 +118,46 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
builder.build(),
context.selection());
MessageType fileSchema = reader.getFileMetaData().getSchema();
-
- // clipParquetSchema and buildFieldsList are pure functions of
(readFields, fileSchema).
- // Cache the result keyed by fileSchema so that files sharing the same
on-disk schema
- // within this factory instance avoid redundant computation. Keying by
fileSchema (rather
- // than a simple "compute once" flag) correctly handles edge cases
where different files
- // read by the same factory instance may have different on-disk
schemas, e.g. externally
- // migrated Parquet files.
- Pair<MessageType, List<ParquetField>> cached =
- schemaCache.computeIfAbsent(
- fileSchema,
- fs -> {
- MessageType rs = clipParquetSchema(fs);
- MessageColumnIO columnIO = new
ColumnIOFactory().getColumnIO(rs);
- List<ParquetField> f = buildFieldsList(readFields,
columnIO, rs);
- return Pair.of(rs, f);
- });
- MessageType requestedSchema = cached.getLeft();
- List<ParquetField> fields = cached.getRight();
+ RequestedSchema requestedSchema =
getOrCreateRequestedSchema(fileSchema);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Create reader of the parquet file {}, the fileSchema is
{}, the requestedSchema is {}.",
context.filePath(),
fileSchema,
- requestedSchema);
+ requestedSchema.messageType);
}
- reader.setRequestedSchema(requestedSchema);
+ reader.setRequestedSchema(requestedSchema.messageType);
WritableColumnVector[] writableVectors = createWritableVectors();
return new VectorizedParquetRecordReader(
context.filePath(),
reader,
fileSchema,
- fields,
+ requestedSchema.fields,
writableVectors,
batchSize,
context.fileIO());
}
+ private RequestedSchema getOrCreateRequestedSchema(MessageType fileSchema)
{
+ // clipParquetSchema and buildFieldsList are pure functions of
(readFields, fileSchema).
+ // Cache the result keyed by fileSchema so that files sharing the same
on-disk schema
+ // within this factory instance avoid redundant computation. Keying by
fileSchema (rather
+ // than a simple "compute once" flag) correctly handles edge cases
where different files
+ // read by the same factory instance may have different on-disk
schemas, e.g. externally
+ // migrated Parquet files.
+ return requestedSchemaCache.computeIfAbsent(fileSchema,
this::createRequestedSchema);
+ }
+
+ private RequestedSchema createRequestedSchema(MessageType fileSchema) {
+ MessageType rs = clipParquetSchema(fileSchema);
+ MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(rs);
+ List<ParquetField> f = buildFieldsList(readFields, columnIO, rs);
+ return new RequestedSchema(rs, f);
+ }
+
/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[readFields.length];
@@ -310,4 +311,30 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
return columns;
}
+
+ private static class RequestedSchema {
+
+ private final MessageType messageType;
+ private final List<ParquetField> fields;
+
+ private RequestedSchema(MessageType messageType, List<ParquetField>
fields) {
+ this.messageType = messageType;
+ this.fields = fields;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RequestedSchema that = (RequestedSchema) o;
+ return Objects.equals(messageType, that.messageType)
+ && Objects.equals(fields, that.fields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(messageType, fields);
+ }
+ }
}