This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 7e7a6585c GH-2836: Support reading pure parquet files with cat (#3332)
7e7a6585c is described below
commit 7e7a6585c8acaea721a375a218fec9c791ff4d76
Author: Arnav Balyan <[email protected]>
AuthorDate: Mon Sep 29 11:40:19 2025 +0530
GH-2836: Support reading pure parquet files with cat (#3332)
---
.../apache/parquet/cli/commands/CatCommand.java | 109 +++++++++++++++------
.../parquet/cli/commands/CatCommandTest.java | 58 +++++++++++
2 files changed, 138 insertions(+), 29 deletions(-)
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
index 285e8f50f..bba732a8b 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
@@ -27,12 +27,14 @@ import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.util.Expressions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;
@Parameters(commandDescription = "Print the first N records from a file")
@@ -60,41 +62,90 @@ public class CatCommand extends BaseCommand {
public int run() throws IOException {
Preconditions.checkArgument(sourceFiles != null && !sourceFiles.isEmpty(),
"Missing file name");
- // Ensure all source files have the columns specified first
- Map<String, Schema> schemas = new HashMap<>();
for (String source : sourceFiles) {
- Schema schema = getAvroSchema(source);
- schemas.put(source, Expressions.filterSchema(schema, columns));
+ try {
+ runWithAvroSchema(source);
+ } catch (SchemaParseException e) {
+ console.debug(
+ "Avro schema conversion failed for {}, falling back to Group
reader: {}",
+ source,
+ e.getMessage());
+ runWithGroupReader(source);
+ }
}
- for (String source : sourceFiles) {
- Schema projection = schemas.get(source);
- Iterable<Object> reader = openDataFile(source, projection);
- boolean threw = true;
- long count = 0;
- try {
- for (Object record : reader) {
- if (numRecords > 0 && count >= numRecords) {
- break;
- }
- if (columns == null || columns.size() != 1) {
- console.info(String.valueOf(record));
- } else {
- console.info(String.valueOf(select(projection, record,
columns.get(0))));
- }
- count += 1;
+ return 0;
+ }
+
+ private void runWithAvroSchema(String source) throws IOException {
+ Schema schema = getAvroSchema(source);
+ Schema projection = Expressions.filterSchema(schema, columns);
+
+ Iterable<Object> reader = openDataFile(source, projection);
+ boolean threw = true;
+ long count = 0;
+ try {
+ for (Object record : reader) {
+ if (numRecords > 0 && count >= numRecords) {
+ break;
}
- threw = false;
- } catch (RuntimeException e) {
- throw new RuntimeException("Failed on record " + count + " in file " +
source, e);
- } finally {
- if (reader instanceof Closeable) {
- Closeables.close((Closeable) reader, threw);
+ if (columns == null || columns.size() != 1) {
+ console.info(String.valueOf(record));
+ } else {
+ console.info(String.valueOf(select(projection, record,
columns.get(0))));
}
+ count += 1;
+ }
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count + " in file " +
source, e);
+ } finally {
+ if (reader instanceof Closeable) {
+ Closeables.close((Closeable) reader, threw);
}
}
+ }
- return 0;
+ private void runWithGroupReader(String source) throws IOException {
+ ParquetReader<Group> reader = ParquetReader.<Group>builder(new
GroupReadSupport(), qualifiedPath(source))
+ .withConf(getConf())
+ .build();
+
+ boolean threw = true;
+ long count = 0;
+ try {
+ for (Group record = reader.read(); record != null; record =
reader.read()) {
+ if (numRecords > 0 && count >= numRecords) {
+ break;
+ }
+
+ if (columns == null) {
+ console.info(record.toString());
+ } else {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < columns.size(); i++) {
+ String columnName = columns.get(i);
+ try {
+ Object value =
+
record.getValueToString(record.getType().getFieldIndex(columnName), 0);
+ if (i > 0) sb.append(", ");
+ sb.append(columnName).append(": ").append(value);
+ } catch (Exception e) {
+ console.warn("Column '{}' not found in file {}", columnName,
source);
+ }
+ }
+ if (sb.length() > 0) {
+ console.info(sb.toString());
+ }
+ }
+ count += 1;
+ }
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count + " in file " +
source, e);
+ } finally {
+ Closeables.close(reader, threw);
+ }
}
@Override
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
index 4a781886a..b8aa4ac13 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
@@ -24,8 +24,15 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.test.TestProtobuf;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -96,6 +103,57 @@ public class CatCommandTest extends ParquetFileTest {
Assert.assertEquals(0, result);
}
+ @Test
+ public void testCatCommandWithHyphenatedFieldNames() throws Exception {
+ File hyphenFile = new File(getTempFolder(), "hyphenated_fields.parquet");
+ writeParquetWithHyphenatedFields(hyphenFile);
+
+ CatCommand cmd = new CatCommand(createLogger(), 1);
+ cmd.sourceFiles = Arrays.asList(hyphenFile.getAbsolutePath());
+ cmd.setConf(new Configuration());
+
+ int result = cmd.run();
+ Assert.assertEquals(0, result);
+ }
+
+ private static void writeParquetWithHyphenatedFields(File file) throws
IOException {
+ MessageType schema = Types.buildMessage()
+ .required(PrimitiveTypeName.INT32)
+ .named("order_id")
+ .required(PrimitiveTypeName.BINARY)
+ .named("customer-name")
+ .required(PrimitiveTypeName.BINARY)
+ .named("product-category")
+ .required(PrimitiveTypeName.DOUBLE)
+ .named("sale-amount")
+ .required(PrimitiveTypeName.BINARY)
+ .named("region")
+ .named("SalesRecord");
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new
Path(file.getAbsolutePath()))
+ .withType(schema)
+ .build()) {
+
+ Group record1 = factory.newGroup()
+ .append("order_id", 1001)
+ .append("customer-name", "John Smith")
+ .append("product-category", "Electronics")
+ .append("sale-amount", 299.99)
+ .append("region", "North");
+ writer.write(record1);
+
+ Group record2 = factory.newGroup()
+ .append("order_id", 1002)
+ .append("customer-name", "Jane Doe")
+ .append("product-category", "Home-Garden")
+ .append("sale-amount", 149.50)
+ .append("region", "South");
+ writer.write(record2);
+ }
+ }
+
private static void writeProtoParquet(File file) throws Exception {
TestProtobuf.RepeatedIntMessage.Builder b =
TestProtobuf.RepeatedIntMessage.newBuilder()
.addRepeatedInt(1)