JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402760160
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##########
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+       private final DataType elementType;
+       private final ColumnVector data;
+       private final int offset;
+       private final int length;
+
+       public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+               this.elementType = elementType;
+               this.data = data;
+               this.offset = offset;
+               this.length = length;
+       }
+
+       @Override
+       public int numElements() {
+               return length;
+       }
+
+       @Override
+       public boolean isNullAt(int pos) {
+               return data.isNullAt(offset + pos);
+       }
+
+       @Override
+       public void setNullAt(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public boolean getBoolean(int ordinal) {
+               return ((BooleanColumnVector) data).getBoolean(offset + 
ordinal);
+       }
+
+       @Override
+       public byte getByte(int ordinal) {
+               return ((ByteColumnVector) data).getByte(offset + ordinal);
+       }
+
+       @Override
+       public short getShort(int ordinal) {
+               return ((ShortColumnVector) data).getShort(offset + ordinal);
+       }
+
+       @Override
+       public int getInt(int ordinal) {
+               return ((IntColumnVector) data).getInt(offset + ordinal);
+       }
+
+       @Override
+       public long getLong(int ordinal) {
+               return ((LongColumnVector) data).getLong(offset + ordinal);
+       }
+
+       @Override
+       public float getFloat(int ordinal) {
+               return ((FloatColumnVector) data).getFloat(offset + ordinal);
+       }
+
+       @Override
+       public double getDouble(int ordinal) {
+               return ((DoubleColumnVector) data).getDouble(offset + ordinal);
+       }
+
+       @Override
+       public BinaryString getString(int ordinal) {
+               BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+               return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+       }
+
+       @Override
+       public Decimal getDecimal(int ordinal, int precision, int scale) {
+               return ((DecimalColumnVector) data).getDecimal(offset + 
ordinal, precision, scale);
+       }
+
+       @Override
+       public SqlTimestamp getTimestamp(int ordinal, int precision) {
+               return ((TimestampColumnVector) data).getTimestamp(offset + 
ordinal, precision);
+       }
+
+       @Override
+       public <T> BinaryGeneric<T> getGeneric(int ordinal) {
+               throw new UnsupportedOperationException("GenericType is not 
supported.");
+       }
+
+       @Override
+       public byte[] getBinary(int ordinal) {
+               BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+               if (byteArray.len == byteArray.data.length) {
+                       return byteArray.data;
+               } else {
+                       byte[] ret = new byte[byteArray.len];
+                       System.arraycopy(byteArray.data, byteArray.offset, ret, 
0, byteArray.len);
+                       return ret;
+               }
+       }
+
+       @Override
+       public BaseArray getArray(int ordinal) {
+               return ((ArrayColumnVector) data).getArray(offset + ordinal);
+       }
+
+       @Override
+       public BaseMap getMap(int ordinal) {
+               throw new UnsupportedOperationException("Map is not 
supported.");
+       }
+
+       @Override
+       public BaseRow getRow(int ordinal, int numFields) {
+               throw new UnsupportedOperationException("Row is not 
supported.");
+       }
+
+       @Override
+       public void setBoolean(int ordinal, boolean value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setByte(int ordinal, byte value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setShort(int ordinal, short value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setInt(int ordinal, int value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setLong(int ordinal, long value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setFloat(int ordinal, float value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setDouble(int ordinal, double value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setDecimal(int i, Decimal value, int precision) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setTimestamp(int ordinal, SqlTimestamp value, int 
precision) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNotNullAt(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullLong(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullInt(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullBoolean(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullByte(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullShort(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullFloat(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setNullDouble(int pos) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public boolean[] toBooleanArray() {
+               boolean[] res = new boolean[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getBoolean(i);
+               }
+               return res;
+       }
+
+       @Override
+       public byte[] toByteArray() {
+               byte[] res = new byte[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getByte(i);
+               }
+               return res;
+       }
+
+       @Override
+       public short[] toShortArray() {
+               short[] res = new short[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getShort(i);
+               }
+               return res;
+       }
+
+       @Override
+       public int[] toIntArray() {
+               int[] res = new int[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getInt(i);
+               }
+               return res;
+       }
+
+       @Override
+       public long[] toLongArray() {
+               long[] res = new long[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getLong(i);
+               }
+               return res;
+       }
+
+       @Override
+       public float[] toFloatArray() {
+               float[] res = new float[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getFloat(i);
+               }
+               return res;
+       }
+
+       @Override
+       public double[] toDoubleArray() {
+               double[] res = new double[length];
+               for (int i = 0; i < length; i++) {
+                       res[i] = getDouble(i);
+               }
+               return res;
+       }
+
+       public BaseArray copy() {
+               LogicalType elementLogicalType = elementType.getLogicalType();
+               if (elementLogicalType instanceof BooleanType) {
+                       return BinaryArray.fromPrimitiveArray(toBooleanArray());
+               } else if (elementLogicalType instanceof TinyIntType) {
+                       return BinaryArray.fromPrimitiveArray(toByteArray());
+               } else if (elementLogicalType instanceof SmallIntType) {
+                       return BinaryArray.fromPrimitiveArray(toShortArray());
+               } else if (elementLogicalType instanceof IntType || 
elementLogicalType instanceof DateType || elementLogicalType instanceof 
TimeType) {
+                       return BinaryArray.fromPrimitiveArray(toIntArray());
+               } else if (elementLogicalType instanceof BigIntType) {
+                       return BinaryArray.fromPrimitiveArray(toLongArray());
+               } else if (elementLogicalType instanceof FloatType) {
+                       return BinaryArray.fromPrimitiveArray(toFloatArray());
+               } else if (elementLogicalType instanceof DoubleType) {
+                       return BinaryArray.fromPrimitiveArray(toDoubleArray());
+               } else {
+                       DataFormatConverters.ObjectArrayConverter converter = 
new DataFormatConverters.ObjectArrayConverter(elementType);
 
 Review comment:
   I can not get this. What we want is copy instead of conversion between 
external and internal.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to