JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402746198
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public boolean isNullAt(int pos) { + return data.isNullAt(offset + pos); + } + + @Override + public void setNullAt(int pos) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean getBoolean(int ordinal) { + return ((BooleanColumnVector) data).getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return ((ByteColumnVector) data).getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return ((ShortColumnVector) data).getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return ((IntColumnVector) data).getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return ((LongColumnVector) data).getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return ((FloatColumnVector) data).getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return ((DoubleColumnVector) data).getDouble(offset + ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return ((DecimalColumnVector) data).getDecimal(offset + ordinal, precision, scale); + } + + @Override + public SqlTimestamp getTimestamp(int ordinal, int precision) { + return ((TimestampColumnVector) data).getTimestamp(offset + ordinal, precision); + } + + @Override + public <T> BinaryGeneric<T> getGeneric(int ordinal) { + throw new UnsupportedOperationException("GenericType is not supported."); + } + + @Override + public byte[] getBinary(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + if (byteArray.len == byteArray.data.length) { + return byteArray.data; + } else { + byte[] ret = new byte[byteArray.len]; + System.arraycopy(byteArray.data, byteArray.offset, ret, 0, byteArray.len); + return ret; + } + } + + @Override + public BaseArray getArray(int ordinal) { + return ((ArrayColumnVector) data).getArray(offset + ordinal); + } + + @Override + public BaseMap getMap(int ordinal) { + throw new UnsupportedOperationException("Map is not supported."); + } + + @Override + public BaseRow getRow(int ordinal, int numFields) { + throw new UnsupportedOperationException("Row is not supported."); + } + + @Override + public void setBoolean(int ordinal, boolean value) { Review comment: These annoyed "set" will be removed after FLIP-95. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services