lirui-apache commented on a change in pull request #15939: URL: https://github.com/apache/flink/pull/15939#discussion_r636891521
########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcArrayColumnVector.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.types.logical.ArrayType; + +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; + +/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */ +public class OrcArrayColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.ArrayColumnVector { + + private ListColumnVector hiveVector; + private ArrayType type; Review comment: I don't see how this field is used ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcArrayColumnVector.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.types.logical.ArrayType; + +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; + +/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */ +public class OrcArrayColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.ArrayColumnVector { + + private ListColumnVector hiveVector; + private ArrayType type; + private ColumnVector flinkVector; Review comment: Make it final ########## File path: flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java ########## @@ -164,4 +167,40 @@ public void testOrcFilterPushDown() throws ExecutionException, InterruptedExcept "select x, y from orcFilterTable where g = timestamp '2020-01-01 05:20:00' and x = 'x10'", Collections.singletonList(Row.of("x10", "10"))); } + + @Test + public void testNestedTypes() throws ExecutionException, InterruptedException { + String path = + this.getClass().getClassLoader().getResource("test-data-nested.orc").getPath(); + super.tableEnv() + .executeSql( + String.format( + "create table orcNestedTypesTable (" + + "boolean1 boolean," + + "byte1 tinyint," + + "short1 smallint," + + "int1 int," + + "long1 bigint," + + "float1 float," + + "double1 double," + + "string1 string," + + "middle ROW<list ARRAY<ROW<int1 int,string1 string>>>," + + "list ARRAY<ROW<int1 int,string1 string>>," + + "map MAP<string,ROW<int1 int,string1 string>>" + + ") with (" + + "'connector' = 'filesystem'," + + "'format' = 'orc'," + + "'path' = '%s')", + path)); + + TableResult tableResult = super.tableEnv().executeSql("SELECT * FROM orcNestedTypesTable"); + List<Row> rows = new ArrayList<>(); + tableResult.collect().forEachRemaining(rows::add); + assertEquals( Review comment: Let's also verify the number of rows. ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcArrayColumnVector.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.types.logical.ArrayType; + +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; + +/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */ +public class OrcArrayColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.ArrayColumnVector { + + private ListColumnVector hiveVector; Review comment: make it final ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcMapColumnVector.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ColumnarMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.types.logical.MapType; + +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; + +/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */ +public class OrcMapColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.MapColumnVector { + + private MapColumnVector hiveVector; Review comment: It can be final ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcMapColumnVector.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ColumnarMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.types.logical.MapType; + +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; + +/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */ +public class OrcMapColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.MapColumnVector { + + private MapColumnVector hiveVector; + private MapType type; Review comment: Is this needed? ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcRowColumnVector.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; + +/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */ +public class OrcRowColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.RowColumnVector { + + private StructColumnVector hiveVector; + private RowType type; + private VectorizedColumnBatch vectorizedColumnBatch; Review comment: Make it final ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcRowColumnVector.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; + +/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */ +public class OrcRowColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.RowColumnVector { + + private StructColumnVector hiveVector; + private RowType type; Review comment: Are these needed? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/ColumnarMapData.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data; + +import org.apache.flink.table.data.vector.ColumnVector; + +/** Columnar map to support access to vector column data. */ +public final class ColumnarMapData implements MapData { + + private final ColumnVector keyColumnVector; + private final ColumnVector valueColumnVector; + private final int offset; + private final int numElements; + + public ColumnarMapData( + ColumnVector keyColumnVector, + ColumnVector valueColumnVector, + int offset, + int numElements) { + this.keyColumnVector = keyColumnVector; + this.valueColumnVector = valueColumnVector; + this.offset = offset; + this.numElements = numElements; + } + + @Override + public int size() { + return numElements; + } + + @Override + public ArrayData keyArray() { + return new ColumnarArrayData(keyColumnVector, offset, numElements); + } + + @Override + public ArrayData valueArray() { + return new ColumnarArrayData(valueColumnVector, offset, numElements); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException( Review comment: Could you explain why we can't support `equals` and `hashCode`? ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcRowColumnVector.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; + +/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */ +public class OrcRowColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.RowColumnVector { + + private StructColumnVector hiveVector; + private RowType type; + private VectorizedColumnBatch vectorizedColumnBatch; + + public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) { + super(hiveVector); + this.hiveVector = hiveVector; + this.type = type; + int len = hiveVector.fields.length; + ColumnVector[] flinkVectors = new ColumnVector[len]; + for (int i = 0; i < len; i++) { + flinkVectors[i] = createFlinkVector(hiveVector.fields[i], type.getTypeAt(i)); + } + this.vectorizedColumnBatch = new VectorizedColumnBatch(flinkVectors); + } + + @Override + public ColumnarRowData getRow(int i) { + return new ColumnarRowData(this.vectorizedColumnBatch, i); Review comment: Can we cache the `ColumnarRowData` instance and return it with the updated rowId? ########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcMapColumnVector.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.vector; + +import org.apache.flink.table.data.ColumnarMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.types.logical.MapType; + +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; + +/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */ +public class OrcMapColumnVector extends AbstractOrcColumnVector + implements org.apache.flink.table.data.vector.MapColumnVector { + + private MapColumnVector hiveVector; + private MapType type; + private ColumnVector keyFlinkVector; + private ColumnVector valueFlinkVector; Review comment: Make them final ########## File path: flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java ########## @@ -164,4 +167,40 @@ public void testOrcFilterPushDown() throws ExecutionException, InterruptedExcept "select x, y from orcFilterTable where g = timestamp '2020-01-01 05:20:00' and x = 'x10'", Collections.singletonList(Row.of("x10", "10"))); } + + @Test + public void testNestedTypes() throws ExecutionException, InterruptedException { + String path = + this.getClass().getClassLoader().getResource("test-data-nested.orc").getPath(); + super.tableEnv() + .executeSql( + String.format( + "create table orcNestedTypesTable (" + + "boolean1 boolean," + + "byte1 tinyint," + + "short1 smallint," + + "int1 int," + + "long1 bigint," + + "float1 float," + + "double1 double," + + "string1 string," + + "middle ROW<list ARRAY<ROW<int1 int,string1 string>>>," + + "list ARRAY<ROW<int1 int,string1 string>>," + + "map MAP<string,ROW<int1 int,string1 string>>" + + ") with (" + + "'connector' = 'filesystem'," + + "'format' = 'orc'," + + "'path' = '%s')", + path)); + + TableResult tableResult = super.tableEnv().executeSql("SELECT * FROM orcNestedTypesTable"); + List<Row> rows = new ArrayList<>(); + tableResult.collect().forEachRemaining(rows::add); + assertEquals( + "+I[false, 1, 1024, 65536, 9223372036854775807, 1.0, -15.0, hi, +I[[+I[1, bye], +I[2, sigh]]], [+I[3, good], +I[4, bad]], {}]", Review comment: Can we also add test for empty arrays, and null values? ########## File path: flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java ########## @@ -164,4 +167,40 @@ public void testOrcFilterPushDown() throws ExecutionException, InterruptedExcept "select x, y from orcFilterTable where g = timestamp '2020-01-01 05:20:00' and x = 'x10'", Collections.singletonList(Row.of("x10", "10"))); } + + @Test + public void testNestedTypes() throws ExecutionException, InterruptedException { + String path = + this.getClass().getClassLoader().getResource("test-data-nested.orc").getPath(); + super.tableEnv() + .executeSql( + String.format( + "create table orcNestedTypesTable (" + + "boolean1 boolean," + + "byte1 tinyint," + + "short1 smallint," + + "int1 int," + + "long1 bigint," + + "float1 float," + + "double1 double," + + "string1 string," + + "middle ROW<list ARRAY<ROW<int1 int,string1 string>>>," + + "list ARRAY<ROW<int1 int,string1 string>>," + + "map MAP<string,ROW<int1 int,string1 string>>" + + ") with (" + + "'connector' = 'filesystem'," + + "'format' = 'orc'," + + "'path' = '%s')", + path)); + + TableResult tableResult = super.tableEnv().executeSql("SELECT * FROM orcNestedTypesTable"); + List<Row> rows = new ArrayList<>(); + tableResult.collect().forEachRemaining(rows::add); Review comment: Just use `CollectionUtil.iteratorToList` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org