Jonny Serencsa created SPARK-18394:
--------------------------------------
Summary: Executing the same query twice in a row results in
CodeGenerator cache misses
Key: SPARK-18394
URL: https://issues.apache.org/jira/browse/SPARK-18394
Project: Spark
Issue Type: Bug
Components: SQL
Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
Reporter: Jonny Serencsa
Executing the query:
{noformat}
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem_1_row
where
l_shipdate <= date_sub('1998-12-01', '90')
group by
l_returnflag,
l_linestatus
;
{noformat}
twice (in succession), will result in CodeGenerator cache misses in BOTH
executions. Since the query is identical, I would expect the same code to be
generated.
Turns out, the generated code is not exactly the same, resulting in cache
misses when performing the lookup in the CodeGenerator cache. Yet, the code is
equivalent.
Below is (some portion of the) generated code for two runs of the query:
run-1
{noformat}
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import scala.collection.Iterator;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
public SpecificColumnarIterator generate(Object[] references) {
return new SpecificColumnarIterator();
}
class SpecificColumnarIterator extends
org.apache.spark.sql.execution.columnar.ColumnarIterator {
private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow(7);
private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
private MutableUnsafeRow mutableRow = null;
private int currentRow = 0;
private int numRowsInBatch = 0;
private scala.collection.Iterator input = null;
private DataType[] columnTypes = null;
private int[] columnIndexes = null;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor1;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor6;
public SpecificColumnarIterator() {
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[7][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
}
public void initialize(Iterator input, DataType[] columnTypes, int[]
columnIndexes) {
this.input = input;
this.columnTypes = columnTypes;
this.columnIndexes = columnIndexes;
}
public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
return false;
}
org.apache.spark.sql.execution.columnar.CachedBatch batch =
(org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
accessor = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
accessor1 = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
accessor2 = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
accessor3 = new
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
accessor4 = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
accessor5 = new
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
accessor6 = new
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));
return hasNext();
}
public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
accessor.extractTo(mutableRow, 0);
accessor1.extractTo(mutableRow, 1);
accessor2.extractTo(mutableRow, 2);
accessor3.extractTo(mutableRow, 3);
accessor4.extractTo(mutableRow, 4);
accessor5.extractTo(mutableRow, 5);
accessor6.extractTo(mutableRow, 6);
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}
{noformat}
run-2:
{noformat}
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import scala.collection.Iterator;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
public SpecificColumnarIterator generate(Object[] references) {
return new SpecificColumnarIterator();
}
class SpecificColumnarIterator extends
org.apache.spark.sql.execution.columnar.ColumnarIterator {
private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow(7);
private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
private MutableUnsafeRow mutableRow = null;
private int currentRow = 0;
private int numRowsInBatch = 0;
private scala.collection.Iterator input = null;
private DataType[] columnTypes = null;
private int[] columnIndexes = null;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor1;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor6;
public SpecificColumnarIterator() {
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[7][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
}
public void initialize(Iterator input, DataType[] columnTypes, int[]
columnIndexes) {
this.input = input;
this.columnTypes = columnTypes;
this.columnIndexes = columnIndexes;
}
public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
return false;
}
org.apache.spark.sql.execution.columnar.CachedBatch batch =
(org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
accessor = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
accessor1 = new
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
accessor2 = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
accessor3 = new
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
accessor4 = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
accessor5 = new
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
accessor6 = new
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));
return hasNext();
}
public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
accessor.extractTo(mutableRow, 0);
accessor1.extractTo(mutableRow, 1);
accessor2.extractTo(mutableRow, 2);
accessor3.extractTo(mutableRow, 3);
accessor4.extractTo(mutableRow, 4);
accessor5.extractTo(mutableRow, 5);
accessor6.extractTo(mutableRow, 6);
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}
{noformat}
Diff-ing the two files reveals that the "accessor*" variable definitions are
permuted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]