Yeah, I think it should be "ColumnVector.Array". Already ping @ueshin for this issue.
Jacek Laskowski wrote > Hi, > > Looks like the change has broken the build for me: > > [INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @ > spark-sql_2.11 --- > /Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243: > error: not found: type Array > public void loadBytes(Array array) { > ^ > > ... > > 222 warnings found > one error found > [INFO] > ------------------------------------------------------------------------ > [INFO] Reactor Summary: > [INFO] > [INFO] Spark Project Parent POM ........................... SUCCESS [ > 4.864 s] > [INFO] Spark Project Tags ................................. SUCCESS [ > 5.689 s] > [INFO] Spark Project Sketch ............................... SUCCESS [ > 4.646 s] > [INFO] Spark Project Local DB ............................. SUCCESS [ > 6.074 s] > [INFO] Spark Project Networking ........................... SUCCESS [ > 10.305 s] > [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ > 7.355 s] > [INFO] Spark Project Unsafe ............................... SUCCESS [ > 7.639 s] > [INFO] Spark Project Launcher ............................. SUCCESS [ > 10.364 s] > [INFO] Spark Project Core ................................. SUCCESS [02:01 > min] > [INFO] Spark Project ML Local Library ..................... SUCCESS [ > 9.711 s] > [INFO] Spark Project GraphX ............................... SUCCESS [ > 16.652 s] > [INFO] Spark Project Streaming ............................ SUCCESS [ > 36.845 s] > [INFO] Spark Project Catalyst ............................. SUCCESS [01:41 > min] > [INFO] Spark Project SQL .................................. FAILURE [02:14 > min] > > Is this only me or others suffer from it too? > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > > ---------- Forwarded message ---------- > From: < > wenchen@ > > > Date: Thu, Jul 20, 2017 at 3:00 PM > Subject: spark git commit: [SPARK-21472][SQL] Introduce > ArrowColumnVector as a reader for Arrow vectors. > To: > commits@.apache > > > Repository: spark > Updated Branches: > refs/heads/master 5d1850d4b -> cb19880cd > > > [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow > vectors. > > ## What changes were proposed in this pull request? > > Introducing `ArrowColumnVector` as a reader for Arrow vectors. > It extends `ColumnVector`, so we will be able to use it with > `ColumnarBatch` and its functionalities. > Currently it supports primitive types and `StringType`, `ArrayType` > and `StructType`. > > ## How was this patch tested? > > Added tests for `ArrowColumnVector` and existing tests. > > Author: Takuya UESHIN < > ueshin@ > > > > Closes #18680 from ueshin/issues/SPARK-21472. > > > Project: http://git-wip-us.apache.org/repos/asf/spark/repo > Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c > Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c > Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c > > Branch: refs/heads/master > Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a > Parents: 5d1850d > Author: Takuya UESHIN < > ueshin@ > > > Authored: Thu Jul 20 21:00:30 2017 +0800 > Committer: Wenchen Fan < > wenchen@ > > > Committed: Thu Jul 20 21:00:30 2017 +0800 > > ---------------------------------------------------------------------- > .../execution/vectorized/ArrowColumnVector.java | 590 +++++++++++++++++++ > .../sql/execution/vectorized/ColumnVector.java | 16 +- > .../vectorized/ReadOnlyColumnVector.java | 251 ++++++++ > .../sql/execution/arrow/ArrowConverters.scala | 32 +- > .../spark/sql/execution/arrow/ArrowUtils.scala | 109 ++++ > .../execution/arrow/ArrowConvertersSuite.scala | 2 +- > .../sql/execution/arrow/ArrowUtilsSuite.scala | 65 ++ > .../vectorized/ArrowColumnVectorSuite.scala | 410 +++++++++++++ > 8 files changed, 1436 insertions(+), 39 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java > b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java > new file mode 100644 > index 0000000..68e0abc > --- /dev/null > +++ > b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java > @@ -0,0 +1,590 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.spark.sql.execution.vectorized; > + > +import org.apache.arrow.vector.*; > +import org.apache.arrow.vector.complex.*; > +import org.apache.arrow.vector.holders.NullableVarCharHolder; > + > +import org.apache.spark.memory.MemoryMode; > +import org.apache.spark.sql.execution.arrow.ArrowUtils; > +import org.apache.spark.sql.types.*; > +import org.apache.spark.unsafe.types.UTF8String; > + > +/** > + * A column vector backed by Apache Arrow. > + */ > +public final class ArrowColumnVector extends ReadOnlyColumnVector { > + > + private final ArrowVectorAccessor accessor; > + private final int valueCount; > + > + private void ensureAccessible(int index) { > + if (index < 0 || index >= valueCount) { > + throw new IndexOutOfBoundsException( > + String.format("index: %d, valueCount: %d", index, valueCount)); > + } > + } > + > + private void ensureAccessible(int index, int count) { > + if (index < 0 || index + count > valueCount) { > + throw new IndexOutOfBoundsException( > + String.format("index range: [%d, %d), valueCount: %d", index, > index + count, valueCount)); > + } > + } > + > + @Override > + public long nullsNativeAddress() { > + throw new RuntimeException("Cannot get native address for arrow > column"); > + } > + > + @Override > + public long valuesNativeAddress() { > + throw new RuntimeException("Cannot get native address for arrow > column"); > + } > + > + @Override > + public void close() { > + if (childColumns != null) { > + for (int i = 0; i < childColumns.length; i++) { > + childColumns[i].close(); > + } > + } > + accessor.close(); > + } > + > + // > + // APIs dealing with nulls > + // > + > + @Override > + public boolean isNullAt(int rowId) { > + ensureAccessible(rowId); > + return accessor.isNullAt(rowId); > + } > + > + // > + // APIs dealing with Booleans > + // > + > + @Override > + public boolean getBoolean(int rowId) { > + ensureAccessible(rowId); > + return accessor.getBoolean(rowId); > + } > + > + @Override > + public boolean[] getBooleans(int rowId, int count) { > + ensureAccessible(rowId, count); > + boolean[] array = new boolean[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getBoolean(rowId + i); > + } > + return array; > + } > + > + // > + // APIs dealing with Bytes > + // > + > + @Override > + public byte getByte(int rowId) { > + ensureAccessible(rowId); > + return accessor.getByte(rowId); > + } > + > + @Override > + public byte[] getBytes(int rowId, int count) { > + ensureAccessible(rowId, count); > + byte[] array = new byte[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getByte(rowId + i); > + } > + return array; > + } > + > + // > + // APIs dealing with Shorts > + // > + > + @Override > + public short getShort(int rowId) { > + ensureAccessible(rowId); > + return accessor.getShort(rowId); > + } > + > + @Override > + public short[] getShorts(int rowId, int count) { > + ensureAccessible(rowId, count); > + short[] array = new short[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getShort(rowId + i); > + } > + return array; > + } > + > + // > + // APIs dealing with Ints > + // > + > + @Override > + public int getInt(int rowId) { > + ensureAccessible(rowId); > + return accessor.getInt(rowId); > + } > + > + @Override > + public int[] getInts(int rowId, int count) { > + ensureAccessible(rowId, count); > + int[] array = new int[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getInt(rowId + i); > + } > + return array; > + } > + > + @Override > + public int getDictId(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Longs > + // > + > + @Override > + public long getLong(int rowId) { > + ensureAccessible(rowId); > + return accessor.getLong(rowId); > + } > + > + @Override > + public long[] getLongs(int rowId, int count) { > + ensureAccessible(rowId, count); > + long[] array = new long[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getLong(rowId + i); > + } > + return array; > + } > + > + // > + // APIs dealing with floats > + // > + > + @Override > + public float getFloat(int rowId) { > + ensureAccessible(rowId); > + return accessor.getFloat(rowId); > + } > + > + @Override > + public float[] getFloats(int rowId, int count) { > + ensureAccessible(rowId, count); > + float[] array = new float[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getFloat(rowId + i); > + } > + return array; > + } > + > + // > + // APIs dealing with doubles > + // > + > + @Override > + public double getDouble(int rowId) { > + ensureAccessible(rowId); > + return accessor.getDouble(rowId); > + } > + > + @Override > + public double[] getDoubles(int rowId, int count) { > + ensureAccessible(rowId, count); > + double[] array = new double[count]; > + for (int i = 0; i < count; ++i) { > + array[i] = accessor.getDouble(rowId + i); > + } > + return array; > + } > + > + // > + // APIs dealing with Arrays > + // > + > + @Override > + public int getArrayLength(int rowId) { > + ensureAccessible(rowId); > + return accessor.getArrayLength(rowId); > + } > + > + @Override > + public int getArrayOffset(int rowId) { > + ensureAccessible(rowId); > + return accessor.getArrayOffset(rowId); > + } > + > + @Override > + public void loadBytes(Array array) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Decimals > + // > + > + @Override > + public Decimal getDecimal(int rowId, int precision, int scale) { > + ensureAccessible(rowId); > + return accessor.getDecimal(rowId, precision, scale); > + } > + > + // > + // APIs dealing with UTF8Strings > + // > + > + @Override > + public UTF8String getUTF8String(int rowId) { > + ensureAccessible(rowId); > + return accessor.getUTF8String(rowId); > + } > + > + // > + // APIs dealing with Binaries > + // > + > + @Override > + public byte[] getBinary(int rowId) { > + ensureAccessible(rowId); > + return accessor.getBinary(rowId); > + } > + > + public ArrowColumnVector(ValueVector vector) { > + super(vector.getValueCapacity(), > ArrowUtils.fromArrowField(vector.getField()), > + MemoryMode.OFF_HEAP); > + > + if (vector instanceof NullableBitVector) { > + accessor = new BooleanAccessor((NullableBitVector) vector); > + } else if (vector instanceof NullableTinyIntVector) { > + accessor = new ByteAccessor((NullableTinyIntVector) vector); > + } else if (vector instanceof NullableSmallIntVector) { > + accessor = new ShortAccessor((NullableSmallIntVector) vector); > + } else if (vector instanceof NullableIntVector) { > + accessor = new IntAccessor((NullableIntVector) vector); > + } else if (vector instanceof NullableBigIntVector) { > + accessor = new LongAccessor((NullableBigIntVector) vector); > + } else if (vector instanceof NullableFloat4Vector) { > + accessor = new FloatAccessor((NullableFloat4Vector) vector); > + } else if (vector instanceof NullableFloat8Vector) { > + accessor = new DoubleAccessor((NullableFloat8Vector) vector); > + } else if (vector instanceof NullableDecimalVector) { > + accessor = new DecimalAccessor((NullableDecimalVector) vector); > + } else if (vector instanceof NullableVarCharVector) { > + accessor = new StringAccessor((NullableVarCharVector) vector); > + } else if (vector instanceof NullableVarBinaryVector) { > + accessor = new BinaryAccessor((NullableVarBinaryVector) vector); > + } else if (vector instanceof ListVector) { > + ListVector listVector = (ListVector) vector; > + accessor = new ArrayAccessor(listVector); > + > + childColumns = new ColumnVector[1]; > + childColumns[0] = new > ArrowColumnVector(listVector.getDataVector()); > + resultArray = new Array(childColumns[0]); > + } else if (vector instanceof MapVector) { > + MapVector mapVector = (MapVector) vector; > + accessor = new StructAccessor(mapVector); > + > + childColumns = new ArrowColumnVector[mapVector.size()]; > + for (int i = 0; i < childColumns.length; ++i) { > + childColumns[i] = new > ArrowColumnVector(mapVector.getVectorById(i)); > + } > + resultStruct = new ColumnarBatch.Row(childColumns); > + } else { > + throw new UnsupportedOperationException(); > + } > + valueCount = accessor.getValueCount(); > + numNulls = accessor.getNullCount(); > + anyNullsSet = numNulls > 0; > + } > + > + private static abstract class ArrowVectorAccessor { > + > + private final ValueVector vector; > + private final ValueVector.Accessor nulls; > + > + private final int valueCount; > + private final int nullCount; > + > + ArrowVectorAccessor(ValueVector vector) { > + this.vector = vector; > + this.nulls = vector.getAccessor(); > + this.valueCount = nulls.getValueCount(); > + this.nullCount = nulls.getNullCount(); > + } > + > + final boolean isNullAt(int rowId) { > + return nulls.isNull(rowId); > + } > + > + final int getValueCount() { > + return valueCount; > + } > + > + final int getNullCount() { > + return nullCount; > + } > + > + final void close() { > + vector.close(); > + } > + > + boolean getBoolean(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + byte getByte(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + short getShort(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + int getInt(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + long getLong(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + float getFloat(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + double getDouble(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + Decimal getDecimal(int rowId, int precision, int scale) { > + throw new UnsupportedOperationException(); > + } > + > + UTF8String getUTF8String(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + byte[] getBinary(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + int getArrayLength(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + int getArrayOffset(int rowId) { > + throw new UnsupportedOperationException(); > + } > + } > + > + private static class BooleanAccessor extends ArrowVectorAccessor { > + > + private final NullableBitVector.Accessor accessor; > + > + BooleanAccessor(NullableBitVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final boolean getBoolean(int rowId) { > + return accessor.get(rowId) == 1; > + } > + } > + > + private static class ByteAccessor extends ArrowVectorAccessor { > + > + private final NullableTinyIntVector.Accessor accessor; > + > + ByteAccessor(NullableTinyIntVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final byte getByte(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class ShortAccessor extends ArrowVectorAccessor { > + > + private final NullableSmallIntVector.Accessor accessor; > + > + ShortAccessor(NullableSmallIntVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final short getShort(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class IntAccessor extends ArrowVectorAccessor { > + > + private final NullableIntVector.Accessor accessor; > + > + IntAccessor(NullableIntVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final int getInt(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class LongAccessor extends ArrowVectorAccessor { > + > + private final NullableBigIntVector.Accessor accessor; > + > + LongAccessor(NullableBigIntVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final long getLong(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class FloatAccessor extends ArrowVectorAccessor { > + > + private final NullableFloat4Vector.Accessor accessor; > + > + FloatAccessor(NullableFloat4Vector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final float getFloat(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class DoubleAccessor extends ArrowVectorAccessor { > + > + private final NullableFloat8Vector.Accessor accessor; > + > + DoubleAccessor(NullableFloat8Vector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final double getDouble(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class DecimalAccessor extends ArrowVectorAccessor { > + > + private final NullableDecimalVector.Accessor accessor; > + > + DecimalAccessor(NullableDecimalVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final Decimal getDecimal(int rowId, int precision, int scale) { > + if (isNullAt(rowId)) return null; > + return Decimal.apply(accessor.getObject(rowId), precision, scale); > + } > + } > + > + private static class StringAccessor extends ArrowVectorAccessor { > + > + private final NullableVarCharVector.Accessor accessor; > + private final NullableVarCharHolder stringResult = new > NullableVarCharHolder(); > + > + StringAccessor(NullableVarCharVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final UTF8String getUTF8String(int rowId) { > + accessor.get(rowId, stringResult); > + if (stringResult.isSet == 0) { > + return null; > + } else { > + return UTF8String.fromAddress(null, > + stringResult.buffer.memoryAddress() + stringResult.start, > + stringResult.end - stringResult.start); > + } > + } > + } > + > + private static class BinaryAccessor extends ArrowVectorAccessor { > + > + private final NullableVarBinaryVector.Accessor accessor; > + > + BinaryAccessor(NullableVarBinaryVector vector) { > + super(vector); > + this.accessor = vector.getAccessor(); > + } > + > + @Override > + final byte[] getBinary(int rowId) { > + return accessor.getObject(rowId); > + } > + } > + > + private static class ArrayAccessor extends ArrowVectorAccessor { > + > + private final UInt4Vector.Accessor accessor; > + > + ArrayAccessor(ListVector vector) { > + super(vector); > + this.accessor = vector.getOffsetVector().getAccessor(); > + } > + > + @Override > + final int getArrayLength(int rowId) { > + return accessor.get(rowId + 1) - accessor.get(rowId); > + } > + > + @Override > + final int getArrayOffset(int rowId) { > + return accessor.get(rowId); > + } > + } > + > + private static class StructAccessor extends ArrowVectorAccessor { > + > + StructAccessor(MapVector vector) { > + super(vector); > + } > + } > +} > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java > b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java > index 0c027f8..7796638 100644 > --- > a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java > +++ > b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java > @@ -646,7 +646,7 @@ public abstract class ColumnVector implements > AutoCloseable { > /** > * Returns the decimal for rowId. > */ > - public final Decimal getDecimal(int rowId, int precision, int scale) { > + public Decimal getDecimal(int rowId, int precision, int scale) { > if (precision <= Decimal.MAX_INT_DIGITS()) { > return Decimal.createUnsafe(getInt(rowId), precision, scale); > } else if (precision <= Decimal.MAX_LONG_DIGITS()) { > @@ -661,7 +661,7 @@ public abstract class ColumnVector implements > AutoCloseable { > } > > > - public final void putDecimal(int rowId, Decimal value, int precision) { > + public void putDecimal(int rowId, Decimal value, int precision) { > if (precision <= Decimal.MAX_INT_DIGITS()) { > putInt(rowId, (int) value.toUnscaledLong()); > } else if (precision <= Decimal.MAX_LONG_DIGITS()) { > @@ -675,7 +675,7 @@ public abstract class ColumnVector implements > AutoCloseable { > /** > * Returns the UTF8String for rowId. > */ > - public final UTF8String getUTF8String(int rowId) { > + public UTF8String getUTF8String(int rowId) { > if (dictionary == null) { > ColumnVector.Array a = getByteArray(rowId); > return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, > a.length); > @@ -688,7 +688,7 @@ public abstract class ColumnVector implements > AutoCloseable { > /** > * Returns the byte array for rowId. > */ > - public final byte[] getBinary(int rowId) { > + public byte[] getBinary(int rowId) { > if (dictionary == null) { > ColumnVector.Array array = getByteArray(rowId); > byte[] bytes = new byte[array.length]; > @@ -956,7 +956,7 @@ public abstract class ColumnVector implements > AutoCloseable { > /** > * Data type for this column. > */ > - protected final DataType type; > + protected DataType type; > > /** > * Number of nulls in this column. This is an optimization for the > reader, to skip NULL checks. > @@ -988,17 +988,17 @@ public abstract class ColumnVector implements > AutoCloseable { > /** > * If this is a nested type (array or struct), the column for the child > data. > */ > - protected final ColumnVector[] childColumns; > + protected ColumnVector[] childColumns; > > /** > * Reusable Array holder for getArray(). > */ > - protected final Array resultArray; > + protected Array resultArray; > > /** > * Reusable Struct holder for getStruct(). > */ > - protected final ColumnarBatch.Row resultStruct; > + protected ColumnarBatch.Row resultStruct; > > /** > * The Dictionary for this column. > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java > b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java > new file mode 100644 > index 0000000..e9f6e7c > --- /dev/null > +++ > b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java > @@ -0,0 +1,251 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.spark.sql.execution.vectorized; > + > +import org.apache.spark.memory.MemoryMode; > +import org.apache.spark.sql.types.*; > + > +/** > + * An abstract class for read-only column vector. > + */ > +public abstract class ReadOnlyColumnVector extends ColumnVector { > + > + protected ReadOnlyColumnVector(int capacity, DataType type, > MemoryMode memMode) { > + super(capacity, DataTypes.NullType, memMode); > + this.type = type; > + isConstant = true; > + } > + > + // > + // APIs dealing with nulls > + // > + > + @Override > + public final void putNotNull(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putNull(int rowId) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putNulls(int rowId, int count) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putNotNulls(int rowId, int count) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Booleans > + // > + > + @Override > + public final void putBoolean(int rowId, boolean value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putBooleans(int rowId, int count, boolean value) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Bytes > + // > + > + @Override > + public final void putByte(int rowId, byte value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putBytes(int rowId, int count, byte value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putBytes(int rowId, int count, byte[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Shorts > + // > + > + @Override > + public final void putShort(int rowId, short value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putShorts(int rowId, int count, short value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putShorts(int rowId, int count, short[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Ints > + // > + > + @Override > + public final void putInt(int rowId, int value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putInts(int rowId, int count, int value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putInts(int rowId, int count, int[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putIntsLittleEndian(int rowId, int count, byte[] > src, int srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Longs > + // > + > + @Override > + public final void putLong(int rowId, long value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putLongs(int rowId, int count, long value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putLongs(int rowId, int count, long[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putLongsLittleEndian(int rowId, int count, byte[] > src, int srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with floats > + // > + > + @Override > + public final void putFloat(int rowId, float value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putFloats(int rowId, int count, float value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putFloats(int rowId, int count, float[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putFloats(int rowId, int count, byte[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with doubles > + // > + > + @Override > + public final void putDouble(int rowId, double value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putDoubles(int rowId, int count, double value) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putDoubles(int rowId, int count, double[] src, > int srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final void putDoubles(int rowId, int count, byte[] src, int > srcIndex) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Arrays > + // > + > + @Override > + public final void putArray(int rowId, int offset, int length) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Byte Arrays > + // > + > + @Override > + public final int putByteArray(int rowId, byte[] value, int offset, > int count) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // APIs dealing with Decimals > + // > + > + @Override > + public final void putDecimal(int rowId, Decimal value, int precision) { > + throw new UnsupportedOperationException(); > + } > + > + // > + // Other APIs > + // > + > + @Override > + public final void setDictionary(Dictionary dictionary) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + public final ColumnVector reserveDictionaryIds(int capacity) { > + throw new UnsupportedOperationException(); > + } > + > + @Override > + protected final void reserveInternal(int newCapacity) { > + throw new UnsupportedOperationException(); > + } > +} > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala > index 6af5c73..c913efe 100644 > --- > a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala > +++ > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala > @@ -71,34 +71,6 @@ private[sql] object ArrowPayload { > private[sql] object ArrowConverters { > > /** > - * Map a Spark DataType to ArrowType. > - */ > - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType > = { > - dataType match { > - case BooleanType => ArrowType.Bool.INSTANCE > - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, > true) > - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, > true) > - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) > - case FloatType => new > ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) > - case DoubleType => new > ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) > - case ByteType => new ArrowType.Int(8, true) > - case StringType => ArrowType.Utf8.INSTANCE > - case BinaryType => ArrowType.Binary.INSTANCE > - case _ => throw new UnsupportedOperationException(s"Unsupported > data type: $dataType") > - } > - } > - > - /** > - * Convert a Spark Dataset schema to Arrow schema. > - */ > - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { > - val arrowFields = schema.fields.map { f => > - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), > List.empty[Field].asJava) > - } > - new Schema(arrowFields.toList.asJava) > - } > - > - /** > * Maps Iterator from InternalRow to ArrowPayload. Limit > ArrowRecordBatch size in ArrowPayload > * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. > */ > @@ -178,7 +150,7 @@ private[sql] object ArrowConverters { > batch: ArrowRecordBatch, > schema: StructType, > allocator: BufferAllocator): Array[Byte] = { > - val arrowSchema = ArrowConverters.schemaToArrowSchema(schema) > + val arrowSchema = ArrowUtils.toArrowSchema(schema) > val root = VectorSchemaRoot.create(arrowSchema, allocator) > val out = new ByteArrayOutputStream() > val writer = new ArrowFileWriter(root, null, > Channels.newChannel(out)) > @@ -410,7 +382,7 @@ private[arrow] object ColumnWriter { > * Create an Arrow ColumnWriter given the type and ordinal of row. > */ > def apply(dataType: DataType, ordinal: Int, allocator: > BufferAllocator): ColumnWriter = { > - val dtype = ArrowConverters.sparkTypeToArrowType(dataType) > + val dtype = ArrowUtils.toArrowType(dataType) > dataType match { > case BooleanType => new BooleanColumnWriter(dtype, ordinal, > allocator) > case ShortType => new ShortColumnWriter(dtype, ordinal, allocator) > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala > new file mode 100644 > index 0000000..2caf1ef > --- /dev/null > +++ > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala > @@ -0,0 +1,109 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.spark.sql.execution.arrow > + > +import scala.collection.JavaConverters._ > + > +import org.apache.arrow.memory.RootAllocator > +import org.apache.arrow.vector.types.FloatingPointPrecision > +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, > Schema} > + > +import org.apache.spark.sql.types._ > + > +object ArrowUtils { > + > + val rootAllocator = new RootAllocator(Long.MaxValue) > + > + // todo: support more types. > + > + def toArrowType(dt: DataType): ArrowType = dt match { > + case BooleanType => ArrowType.Bool.INSTANCE > + case ByteType => new ArrowType.Int(8, true) > + case ShortType => new ArrowType.Int(8 * 2, true) > + case IntegerType => new ArrowType.Int(8 * 4, true) > + case LongType => new ArrowType.Int(8 * 8, true) > + case FloatType => new > ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) > + case DoubleType => new > ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) > + case StringType => ArrowType.Utf8.INSTANCE > + case BinaryType => ArrowType.Binary.INSTANCE > + case DecimalType.Fixed(precision, scale) => new > ArrowType.Decimal(precision, scale) > + case _ => throw new UnsupportedOperationException(s"Unsupported > data type: ${dt.simpleString}") > + } > + > + def fromArrowType(dt: ArrowType): DataType = dt match { > + case ArrowType.Bool.INSTANCE => BooleanType > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == > 8 => ByteType > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == > 8 * 2 => ShortType > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == > 8 * 4 => IntegerType > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == > 8 * 8 => LongType > + case float: ArrowType.FloatingPoint > + if float.getPrecision() == FloatingPointPrecision.SINGLE => > FloatType > + case float: ArrowType.FloatingPoint > + if float.getPrecision() == FloatingPointPrecision.DOUBLE => > DoubleType > + case ArrowType.Utf8.INSTANCE => StringType > + case ArrowType.Binary.INSTANCE => BinaryType > + case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) > + case _ => throw new UnsupportedOperationException(s"Unsupported > data type: $dt") > + } > + > + def toArrowField(name: String, dt: DataType, nullable: Boolean): Field > = { > + dt match { > + case ArrayType(elementType, containsNull) => > + val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, > null) > + new Field(name, fieldType, Seq(toArrowField("element", > elementType, containsNull)).asJava) > + case StructType(fields) => > + val fieldType = new FieldType(nullable, > ArrowType.Struct.INSTANCE, null) > + new Field(name, fieldType, > + fields.map { field => > + toArrowField(field.name, field.dataType, field.nullable) > + }.toSeq.asJava) > + case dataType => > + val fieldType = new FieldType(nullable, toArrowType(dataType), > null) > + new Field(name, fieldType, Seq.empty[Field].asJava) > + } > + } > + > + def fromArrowField(field: Field): DataType = { > + field.getType match { > + case ArrowType.List.INSTANCE => > + val elementField = field.getChildren().get(0) > + val elementType = fromArrowField(elementField) > + ArrayType(elementType, containsNull = elementField.isNullable) > + case ArrowType.Struct.INSTANCE => > + val fields = field.getChildren().asScala.map { child => > + val dt = fromArrowField(child) > + StructField(child.getName, dt, child.isNullable) > + } > + StructType(fields) > + case arrowType => fromArrowType(arrowType) > + } > + } > + > + def toArrowSchema(schema: StructType): Schema = { > + new Schema(schema.map { field => > + toArrowField(field.name, field.dataType, field.nullable) > + }.asJava) > + } > + > + def fromArrowSchema(schema: Schema): StructType = { > + StructType(schema.getFields.asScala.map { field => > + val dt = fromArrowField(field) > + StructField(field.getName, dt, field.isNullable) > + }) > + } > +} > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala > index 159328c..55b4655 100644 > --- > a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala > +++ > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala > @@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends > SharedSQLContext with BeforeAndAfterAll { > val allocator = new RootAllocator(Long.MaxValue) > val jsonReader = new JsonFileReader(jsonFile, allocator) > > - val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema) > + val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema) > val jsonSchema = jsonReader.start() > Validator.compareSchemas(arrowSchema, jsonSchema) > > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala > new file mode 100644 > index 0000000..638619f > --- /dev/null > +++ > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala > @@ -0,0 +1,65 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.spark.sql.execution.arrow > + > +import org.apache.spark.SparkFunSuite > +import org.apache.spark.sql.types._ > + > +class ArrowUtilsSuite extends SparkFunSuite { > + > + def roundtrip(dt: DataType): Unit = { > + dt match { > + case schema: StructType => > + > assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema)) > === schema) > + case _ => > + roundtrip(new StructType().add("value", dt)) > + } > + } > + > + test("simple") { > + roundtrip(BooleanType) > + roundtrip(ByteType) > + roundtrip(ShortType) > + roundtrip(IntegerType) > + roundtrip(LongType) > + roundtrip(FloatType) > + roundtrip(DoubleType) > + roundtrip(StringType) > + roundtrip(BinaryType) > + roundtrip(DecimalType.SYSTEM_DEFAULT) > + } > + > + test("array") { > + roundtrip(ArrayType(IntegerType, containsNull = true)) > + roundtrip(ArrayType(IntegerType, containsNull = false)) > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true), > containsNull = true)) > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false), > containsNull = true)) > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true), > containsNull = false)) > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false), > containsNull = false)) > + } > + > + test("struct") { > + roundtrip(new StructType()) > + roundtrip(new StructType().add("i", IntegerType)) > + roundtrip(new StructType().add("arr", ArrayType(IntegerType))) > + roundtrip(new StructType().add("i", IntegerType).add("arr", > ArrayType(IntegerType))) > + roundtrip(new StructType().add( > + "struct", > + new StructType().add("i", IntegerType).add("arr", > ArrayType(IntegerType)))) > + } > +} > > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala > ---------------------------------------------------------------------- > diff --git > a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala > b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala > new file mode 100644 > index 0000000..d24a9e1 > --- /dev/null > +++ > b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala > @@ -0,0 +1,410 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version > 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.spark.sql.execution.vectorized > + > +import org.apache.arrow.vector._ > +import org.apache.arrow.vector.complex._ > + > +import org.apache.spark.SparkFunSuite > +import org.apache.spark.sql.execution.arrow.ArrowUtils > +import org.apache.spark.sql.types._ > +import org.apache.spark.unsafe.types.UTF8String > + > +class ArrowColumnVectorSuite extends SparkFunSuite { > + > + test("boolean") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("boolean", 0, > Long.MaxValue) > + val vector = ArrowUtils.toArrowField("boolean", BooleanType, > nullable = true) > + .createVector(allocator).asInstanceOf[NullableBitVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, if (i % 2 == 0) 1 else 0) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === BooleanType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getBoolean(i) === (i % 2 == 0)) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i => > (i % 2 == 0))) > + > + columnVector.close() > + allocator.close() > + } > + > + test("byte") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("byte", ByteType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableTinyIntVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, i.toByte) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === ByteType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getByte(i) === i.toByte) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getBytes(0, 10) === (0 until 10).map(i => > i.toByte)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("short") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("short", ShortType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableSmallIntVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, i.toShort) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === ShortType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getShort(i) === i.toShort) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getShorts(0, 10) === (0 until 10).map(i => > i.toShort)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("int") { > + val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", > 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableIntVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, i) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === IntegerType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getInt(i) === i) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getInts(0, 10) === (0 until 10)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("long") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("long", LongType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableBigIntVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, i.toLong) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === LongType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getLong(i) === i.toLong) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getLongs(0, 10) === (0 until 10).map(i => > i.toLong)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("float") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("float", FloatType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableFloat4Vector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, i.toFloat) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === FloatType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getFloat(i) === i.toFloat) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getFloats(0, 10) === (0 until 10).map(i => > i.toFloat)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("double") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("double", DoubleType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableFloat8Vector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + mutator.setSafe(i, i.toDouble) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === DoubleType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getDouble(i) === i.toDouble) > + } > + assert(columnVector.isNullAt(10)) > + > + assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i => > i.toDouble)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("string") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("string", StringType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableVarCharVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + val utf8 = s"str$i".getBytes("utf8") > + mutator.setSafe(i, utf8, 0, utf8.length) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === StringType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getUTF8String(i) === > UTF8String.fromString(s"str$i")) > + } > + assert(columnVector.isNullAt(10)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("binary") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableVarBinaryVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + > + (0 until 10).foreach { i => > + val utf8 = s"str$i".getBytes("utf8") > + mutator.setSafe(i, utf8, 0, utf8.length) > + } > + mutator.setNull(10) > + mutator.setValueCount(11) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === BinaryType) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + (0 until 10).foreach { i => > + assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8")) > + } > + assert(columnVector.isNullAt(10)) > + > + columnVector.close() > + allocator.close() > + } > + > + test("array") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue) > + val vector = ArrowUtils.toArrowField("array", > ArrayType(IntegerType), nullable = true) > + .createVector(allocator).asInstanceOf[ListVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + val elementVector = > vector.getDataVector().asInstanceOf[NullableIntVector] > + val elementMutator = elementVector.getMutator() > + > + // [1, 2] > + mutator.startNewValue(0) > + elementMutator.setSafe(0, 1) > + elementMutator.setSafe(1, 2) > + mutator.endValue(0, 2) > + > + // [3, null, 5] > + mutator.startNewValue(1) > + elementMutator.setSafe(2, 3) > + elementMutator.setNull(3) > + elementMutator.setSafe(4, 5) > + mutator.endValue(1, 3) > + > + // null > + > + // [] > + mutator.startNewValue(3) > + mutator.endValue(3, 0) > + > + elementMutator.setValueCount(5) > + mutator.setValueCount(4) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === ArrayType(IntegerType)) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + val array0 = columnVector.getArray(0) > + assert(array0.numElements() === 2) > + assert(array0.getInt(0) === 1) > + assert(array0.getInt(1) === 2) > + > + val array1 = columnVector.getArray(1) > + assert(array1.numElements() === 3) > + assert(array1.getInt(0) === 3) > + assert(array1.isNullAt(1)) > + assert(array1.getInt(2) === 5) > + > + assert(columnVector.isNullAt(2)) > + > + val array3 = columnVector.getArray(3) > + assert(array3.numElements() === 0) > + > + columnVector.close() > + allocator.close() > + } > + > + test("struct") { > + val allocator = > ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) > + val schema = new StructType().add("int", IntegerType).add("long", > LongType) > + val vector = ArrowUtils.toArrowField("struct", schema, nullable = > true) > + .createVector(allocator).asInstanceOf[NullableMapVector] > + vector.allocateNew() > + val mutator = vector.getMutator() > + val intVector = > vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector] > + val intMutator = intVector.getMutator() > + val longVector = > vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector] > + val longMutator = longVector.getMutator() > + > + // (1, 1L) > + mutator.setIndexDefined(0) > + intMutator.setSafe(0, 1) > + longMutator.setSafe(0, 1L) > + > + // (2, null) > + mutator.setIndexDefined(1) > + intMutator.setSafe(1, 2) > + longMutator.setNull(1) > + > + // (null, 3L) > + mutator.setIndexDefined(2) > + intMutator.setNull(2) > + longMutator.setSafe(2, 3L) > + > + // null > + mutator.setNull(3) > + > + // (5, 5L) > + mutator.setIndexDefined(4) > + intMutator.setSafe(4, 5) > + longMutator.setSafe(4, 5L) > + > + intMutator.setValueCount(5) > + longMutator.setValueCount(5) > + mutator.setValueCount(5) > + > + val columnVector = new ArrowColumnVector(vector) > + assert(columnVector.dataType === schema) > + assert(columnVector.anyNullsSet) > + assert(columnVector.numNulls === 1) > + > + val row0 = columnVector.getStruct(0, 2) > + assert(row0.getInt(0) === 1) > + assert(row0.getLong(1) === 1L) > + > + val row1 = columnVector.getStruct(1, 2) > + assert(row1.getInt(0) === 2) > + assert(row1.isNullAt(1)) > + > + val row2 = columnVector.getStruct(2, 2) > + assert(row2.isNullAt(0)) > + assert(row2.getLong(1) === 3L) > + > + assert(columnVector.isNullAt(3)) > + > + val row4 = columnVector.getStruct(4, 2) > + assert(row4.getInt(0) === 5) > + assert(row4.getLong(1) === 5L) > + > + columnVector.close() > + allocator.close() > + } > +} > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: > commits-unsubscribe@.apache > For additional commands, e-mail: > commits-help@.apache > > --------------------------------------------------------------------- > To unsubscribe e-mail: > dev-unsubscribe@.apache ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-spark-git-commit-SPARK-21472-SQL-Introduce-ArrowColumnVector-as-a-reader-for-Arrow-vectors-tp22003p22004.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org