Jonny Serencsa created SPARK-18394:
--------------------------------------

             Summary: Executing the same query twice in a row results in 
CodeGenerator cache misses
                 Key: SPARK-18394
                 URL: https://issues.apache.org/jira/browse/SPARK-18394
             Project: Spark
          Issue Type: Bug
          Components: SQL
         Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
            Reporter: Jonny Serencsa


Executing the query:
{noformat}
select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem_1_row
where
    l_shipdate <= date_sub('1998-12-01', '90')
group by
    l_returnflag,
    l_linestatus
;
{noformat}
twice (in succession), will result in CodeGenerator cache misses in BOTH 
executions. Since the query is identical, I would expect the same code to be 
generated. 

Turns out, the generated code is not exactly the same, resulting in cache 
misses when performing the lookup in the CodeGenerator cache. Yet, the code is 
equivalent. 

Below is (some portion of the) generated code for two runs of the query:

run-1
{noformat}
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import scala.collection.Iterator;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;

public SpecificColumnarIterator generate(Object[] references) {
return new SpecificColumnarIterator();
}

class SpecificColumnarIterator extends 
org.apache.spark.sql.execution.columnar.ColumnarIterator {

private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow(7);
private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
private MutableUnsafeRow mutableRow = null;

private int currentRow = 0;
private int numRowsInBatch = 0;

private scala.collection.Iterator input = null;
private DataType[] columnTypes = null;
private int[] columnIndexes = null;

private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor1;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor6;

public SpecificColumnarIterator() {
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[7][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
}

public void initialize(Iterator input, DataType[] columnTypes, int[] 
columnIndexes) {
this.input = input;
this.columnTypes = columnTypes;
this.columnIndexes = columnIndexes;
}



public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
return false;
}

org.apache.spark.sql.execution.columnar.CachedBatch batch = 
(org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
accessor = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
accessor1 = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
accessor2 = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
accessor3 = new 
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
accessor4 = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
accessor5 = new 
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
accessor6 = new 
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));

return hasNext();
}

public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
accessor.extractTo(mutableRow, 0);
accessor1.extractTo(mutableRow, 1);
accessor2.extractTo(mutableRow, 2);
accessor3.extractTo(mutableRow, 3);
accessor4.extractTo(mutableRow, 4);
accessor5.extractTo(mutableRow, 5);
accessor6.extractTo(mutableRow, 6);
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}
{noformat}

run-2:
{noformat}
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import scala.collection.Iterator;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;

public SpecificColumnarIterator generate(Object[] references) {
return new SpecificColumnarIterator();
}

class SpecificColumnarIterator extends 
org.apache.spark.sql.execution.columnar.ColumnarIterator {

private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow(7);
private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
private MutableUnsafeRow mutableRow = null;

private int currentRow = 0;
private int numRowsInBatch = 0;

private scala.collection.Iterator input = null;
private DataType[] columnTypes = null;
private int[] columnIndexes = null;

private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor1;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor6;

public SpecificColumnarIterator() {
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[7][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
}

public void initialize(Iterator input, DataType[] columnTypes, int[] 
columnIndexes) {
this.input = input;
this.columnTypes = columnTypes;
this.columnIndexes = columnIndexes;
}



public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
}
if (!input.hasNext()) {
return false;
}

org.apache.spark.sql.execution.columnar.CachedBatch batch = 
(org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
accessor = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
accessor1 = new 
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
accessor2 = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
accessor3 = new 
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
accessor4 = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
accessor5 = new 
org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
accessor6 = new 
org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));

return hasNext();
}

public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
accessor.extractTo(mutableRow, 0);
accessor1.extractTo(mutableRow, 1);
accessor2.extractTo(mutableRow, 2);
accessor3.extractTo(mutableRow, 3);
accessor4.extractTo(mutableRow, 4);
accessor5.extractTo(mutableRow, 5);
accessor6.extractTo(mutableRow, 6);
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}
{noformat}

Diff-ing the two files reveals that the "accessor*" variable definitions are 
permuted. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to