[
https://issues.apache.org/jira/browse/SPARK-18147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-18147.
---------------------------------
Resolution: Fixed
Fix Version/s: 2.1.0
Issue resolved by pull request 15807
[https://github.com/apache/spark/pull/15807]
> Broken Spark SQL Codegen
> ------------------------
>
> Key: SPARK-18147
> URL: https://issues.apache.org/jira/browse/SPARK-18147
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.1
> Reporter: koert kuipers
> Priority: Critical
> Fix For: 2.1.0
>
>
> this is me on purpose trying to break spark sql codegen to uncover potential
> issues, by creating arbitrately complex data structures using primitives,
> strings, basic collections (map, seq, option), tuples, and case classes.
> first example: nested case classes
> code:
> {noformat}
> class ComplexResultAgg[B: TypeTag, C: TypeTag](val zero: B, result: C)
> extends Aggregator[Row, B, C] {
> override def reduce(b: B, input: Row): B = b
> override def merge(b1: B, b2: B): B = b1
> override def finish(reduction: B): C = result
> override def bufferEncoder: Encoder[B] = ExpressionEncoder[B]()
> override def outputEncoder: Encoder[C] = ExpressionEncoder[C]()
> }
> case class Struct2(d: Double = 0.0, s1: Seq[Double] = Seq.empty, s2:
> Seq[Long] = Seq.empty)
> case class Struct3(a: Struct2 = Struct2(), b: Struct2 = Struct2())
> val df1 = Seq(("a", "aa"), ("a", "aa"), ("b", "b"), ("b", null)).toDF("x",
> "y").groupBy("x").agg(
> new ComplexResultAgg("boo", Struct3()).toColumn
> )
> df1.printSchema
> df1.show
> {noformat}
> the result is:
> {noformat}
> [info] Cause: java.util.concurrent.ExecutionException: java.lang.Exception:
> failed to compile: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 33, Column 12: Expression "isNull1" is not an rvalue
> [info] /* 001 */ public java.lang.Object generate(Object[] references) {
> [info] /* 002 */ return new SpecificMutableProjection(references);
> [info] /* 003 */ }
> [info] /* 004 */
> [info] /* 005 */ class SpecificMutableProjection extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
> [info] /* 006 */
> [info] /* 007 */ private Object[] references;
> [info] /* 008 */ private MutableRow mutableRow;
> [info] /* 009 */ private Object[] values;
> [info] /* 010 */ private java.lang.String errMsg;
> [info] /* 011 */ private Object[] values1;
> [info] /* 012 */ private java.lang.String errMsg1;
> [info] /* 013 */ private boolean[] argIsNulls;
> [info] /* 014 */ private scala.collection.Seq argValue;
> [info] /* 015 */ private java.lang.String errMsg2;
> [info] /* 016 */ private boolean[] argIsNulls1;
> [info] /* 017 */ private scala.collection.Seq argValue1;
> [info] /* 018 */ private java.lang.String errMsg3;
> [info] /* 019 */ private java.lang.String errMsg4;
> [info] /* 020 */ private Object[] values2;
> [info] /* 021 */ private java.lang.String errMsg5;
> [info] /* 022 */ private boolean[] argIsNulls2;
> [info] /* 023 */ private scala.collection.Seq argValue2;
> [info] /* 024 */ private java.lang.String errMsg6;
> [info] /* 025 */ private boolean[] argIsNulls3;
> [info] /* 026 */ private scala.collection.Seq argValue3;
> [info] /* 027 */ private java.lang.String errMsg7;
> [info] /* 028 */ private boolean isNull_0;
> [info] /* 029 */ private InternalRow value_0;
> [info] /* 030 */
> [info] /* 031 */ private void apply_1(InternalRow i) {
> [info] /* 032 */
> [info] /* 033 */ if (isNull1) {
> [info] /* 034 */ throw new RuntimeException(errMsg3);
> [info] /* 035 */ }
> [info] /* 036 */
> [info] /* 037 */ boolean isNull24 = false;
> [info] /* 038 */ final com.tresata.spark.sql.Struct2 value24 = isNull24 ?
> null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 039 */ isNull24 = value24 == null;
> [info] /* 040 */
> [info] /* 041 */ boolean isNull23 = isNull24;
> [info] /* 042 */ final scala.collection.Seq value23 = isNull23 ? null :
> (scala.collection.Seq) value24.s2();
> [info] /* 043 */ isNull23 = value23 == null;
> [info] /* 044 */ argIsNulls1[0] = isNull23;
> [info] /* 045 */ argValue1 = value23;
> [info] /* 046 */
> [info] /* 047 */
> [info] /* 048 */
> [info] /* 049 */ boolean isNull22 = false;
> [info] /* 050 */ for (int idx = 0; idx < 1; idx++) {
> [info] /* 051 */ if (argIsNulls1[idx]) { isNull22 = true; break; }
> [info] /* 052 */ }
> [info] /* 053 */
> [info] /* 054 */ final ArrayData value22 = isNull22 ? null : new
> org.apache.spark.sql.catalyst.util.GenericArrayData(argValue1);
> [info] /* 055 */ if (isNull22) {
> [info] /* 056 */ values1[2] = null;
> [info] /* 057 */ } else {
> [info] /* 058 */ values1[2] = value22;
> [info] /* 059 */ }
> [info] /* 060 */ }
> [info] /* 061 */
> [info] /* 062 */
> [info] /* 063 */ private void apply1_1(InternalRow i) {
> [info] /* 064 */
> [info] /* 065 */ if (isNull1) {
> [info] /* 066 */ throw new RuntimeException(errMsg7);
> [info] /* 067 */ }
> [info] /* 068 */
> [info] /* 069 */ boolean isNull41 = false;
> [info] /* 070 */ final com.tresata.spark.sql.Struct2 value41 = isNull41 ?
> null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 071 */ isNull41 = value41 == null;
> [info] /* 072 */
> [info] /* 073 */ boolean isNull40 = isNull41;
> [info] /* 074 */ final scala.collection.Seq value40 = isNull40 ? null :
> (scala.collection.Seq) value41.s2();
> [info] /* 075 */ isNull40 = value40 == null;
> [info] /* 076 */ argIsNulls3[0] = isNull40;
> [info] /* 077 */ argValue3 = value40;
> [info] /* 078 */
> [info] /* 079 */
> [info] /* 080 */
> [info] /* 081 */ boolean isNull39 = false;
> [info] /* 082 */ for (int idx = 0; idx < 1; idx++) {
> [info] /* 083 */ if (argIsNulls3[idx]) { isNull39 = true; break; }
> [info] /* 084 */ }
> [info] /* 085 */
> [info] /* 086 */ final ArrayData value39 = isNull39 ? null : new
> org.apache.spark.sql.catalyst.util.GenericArrayData(argValue3);
> [info] /* 087 */ if (isNull39) {
> [info] /* 088 */ values2[2] = null;
> [info] /* 089 */ } else {
> [info] /* 090 */ values2[2] = value39;
> [info] /* 091 */ }
> [info] /* 092 */ }
> [info] /* 093 */
> [info] /* 094 */
> [info] /* 095 */ private void apply_0(InternalRow i) {
> [info] /* 096 */
> [info] /* 097 */ if (isNull1) {
> [info] /* 098 */ throw new RuntimeException(errMsg1);
> [info] /* 099 */ }
> [info] /* 100 */
> [info] /* 101 */ boolean isNull16 = false;
> [info] /* 102 */ final com.tresata.spark.sql.Struct2 value16 = isNull16 ?
> null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 103 */ isNull16 = value16 == null;
> [info] /* 104 */
> [info] /* 105 */ boolean isNull15 = isNull16;
> [info] /* 106 */ final double value15 = isNull15 ? -1.0 : value16.d();
> [info] /* 107 */ if (isNull15) {
> [info] /* 108 */ values1[0] = null;
> [info] /* 109 */ } else {
> [info] /* 110 */ values1[0] = value15;
> [info] /* 111 */ }
> [info] /* 112 */ if (isNull1) {
> [info] /* 113 */ throw new RuntimeException(errMsg2);
> [info] /* 114 */ }
> [info] /* 115 */
> [info] /* 116 */ boolean isNull20 = false;
> [info] /* 117 */ final com.tresata.spark.sql.Struct2 value20 = isNull20 ?
> null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 118 */ isNull20 = value20 == null;
> [info] /* 119 */
> [info] /* 120 */ boolean isNull19 = isNull20;
> [info] /* 121 */ final scala.collection.Seq value19 = isNull19 ? null :
> (scala.collection.Seq) value20.s1();
> [info] /* 122 */ isNull19 = value19 == null;
> [info] /* 123 */ argIsNulls[0] = isNull19;
> [info] /* 124 */ argValue = value19;
> [info] /* 125 */
> [info] /* 126 */
> [info] /* 127 */
> [info] /* 128 */ boolean isNull18 = false;
> [info] /* 129 */ for (int idx = 0; idx < 1; idx++) {
> [info] /* 130 */ if (argIsNulls[idx]) { isNull18 = true; break; }
> [info] /* 131 */ }
> [info] /* 132 */
> [info] /* 133 */ final ArrayData value18 = isNull18 ? null : new
> org.apache.spark.sql.catalyst.util.GenericArrayData(argValue);
> [info] /* 134 */ if (isNull18) {
> [info] /* 135 */ values1[1] = null;
> [info] /* 136 */ } else {
> [info] /* 137 */ values1[1] = value18;
> [info] /* 138 */ }
> [info] /* 139 */ }
> [info] /* 140 */
> [info] /* 141 */
> [info] /* 142 */ private void apply1_0(InternalRow i) {
> [info] /* 143 */
> [info] /* 144 */ if (isNull1) {
> [info] /* 145 */ throw new RuntimeException(errMsg5);
> [info] /* 146 */ }
> [info] /* 147 */
> [info] /* 148 */ boolean isNull33 = false;
> [info] /* 149 */ final com.tresata.spark.sql.Struct2 value33 = isNull33 ?
> null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 150 */ isNull33 = value33 == null;
> [info] /* 151 */
> [info] /* 152 */ boolean isNull32 = isNull33;
> [info] /* 153 */ final double value32 = isNull32 ? -1.0 : value33.d();
> [info] /* 154 */ if (isNull32) {
> [info] /* 155 */ values2[0] = null;
> [info] /* 156 */ } else {
> [info] /* 157 */ values2[0] = value32;
> [info] /* 158 */ }
> [info] /* 159 */ if (isNull1) {
> [info] /* 160 */ throw new RuntimeException(errMsg6);
> [info] /* 161 */ }
> [info] /* 162 */
> [info] /* 163 */ boolean isNull37 = false;
> [info] /* 164 */ final com.tresata.spark.sql.Struct2 value37 = isNull37 ?
> null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 165 */ isNull37 = value37 == null;
> [info] /* 166 */
> [info] /* 167 */ boolean isNull36 = isNull37;
> [info] /* 168 */ final scala.collection.Seq value36 = isNull36 ? null :
> (scala.collection.Seq) value37.s1();
> [info] /* 169 */ isNull36 = value36 == null;
> [info] /* 170 */ argIsNulls2[0] = isNull36;
> [info] /* 171 */ argValue2 = value36;
> [info] /* 172 */
> [info] /* 173 */
> [info] /* 174 */
> [info] /* 175 */ boolean isNull35 = false;
> [info] /* 176 */ for (int idx = 0; idx < 1; idx++) {
> [info] /* 177 */ if (argIsNulls2[idx]) { isNull35 = true; break; }
> [info] /* 178 */ }
> [info] /* 179 */
> [info] /* 180 */ final ArrayData value35 = isNull35 ? null : new
> org.apache.spark.sql.catalyst.util.GenericArrayData(argValue2);
> [info] /* 181 */ if (isNull35) {
> [info] /* 182 */ values2[1] = null;
> [info] /* 183 */ } else {
> [info] /* 184 */ values2[1] = value35;
> [info] /* 185 */ }
> [info] /* 186 */ }
> [info] /* 187 */
> [info] /* 188 */
> [info] /* 189 */ public SpecificMutableProjection(Object[] references) {
> [info] /* 190 */ this.references = references;
> [info] /* 191 */ mutableRow = new
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
> [info] /* 192 */ this.values = null;
> [info] /* 193 */ this.errMsg = (java.lang.String) references[1];
> [info] /* 194 */ this.values1 = null;
> [info] /* 195 */ this.errMsg1 = (java.lang.String) references[2];
> [info] /* 196 */ argIsNulls = new boolean[1];
> [info] /* 197 */
> [info] /* 198 */ this.errMsg2 = (java.lang.String) references[3];
> [info] /* 199 */ argIsNulls1 = new boolean[1];
> [info] /* 200 */
> [info] /* 201 */ this.errMsg3 = (java.lang.String) references[4];
> [info] /* 202 */ this.errMsg4 = (java.lang.String) references[5];
> [info] /* 203 */ this.values2 = null;
> [info] /* 204 */ this.errMsg5 = (java.lang.String) references[6];
> [info] /* 205 */ argIsNulls2 = new boolean[1];
> [info] /* 206 */
> [info] /* 207 */ this.errMsg6 = (java.lang.String) references[7];
> [info] /* 208 */ argIsNulls3 = new boolean[1];
> [info] /* 209 */
> [info] /* 210 */ this.errMsg7 = (java.lang.String) references[8];
> [info] /* 211 */ this.isNull_0 = true;
> [info] /* 212 */ this.value_0 = null;
> [info] /* 213 */ }
> [info] /* 214 */
> [info] /* 215 */ public
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection
> target(MutableRow row) {
> [info] /* 216 */ mutableRow = row;
> [info] /* 217 */ return this;
> [info] /* 218 */ }
> [info] /* 219 */
> [info] /* 220 */ /* Provide immutable access to the last projected row. */
> [info] /* 221 */ public InternalRow currentValue() {
> [info] /* 222 */ return (InternalRow) mutableRow;
> [info] /* 223 */ }
> [info] /* 224 */
> [info] /* 225 */ public java.lang.Object apply(java.lang.Object _i) {
> [info] /* 226 */ InternalRow i = (InternalRow) _i;
> [info] /* 227 */
> [info] /* 228 */
> [info] /* 229 */
> [info] /* 230 */ Object obj = ((Expression) references[0]).eval(null);
> [info] /* 231 */ org.apache.spark.sql.expressions.Aggregator value2 =
> (org.apache.spark.sql.expressions.Aggregator) obj;
> [info] /* 232 */
> [info] /* 233 */ boolean isNull4 = i.isNullAt(0);
> [info] /* 234 */ UTF8String value4 = isNull4 ? null :
> (i.getUTF8String(0));
> [info] /* 235 */
> [info] /* 236 */ boolean isNull3 = isNull4;
> [info] /* 237 */ final java.lang.String value3 = isNull3 ? null :
> (java.lang.String) value4.toString();
> [info] /* 238 */ isNull3 = value3 == null;
> [info] /* 239 */ boolean isNull1 = false || isNull3;
> [info] /* 240 */ final com.tresata.spark.sql.Struct3 value1 = isNull1 ?
> null : (com.tresata.spark.sql.Struct3) value2.finish(value3);
> [info] /* 241 */ isNull1 = value1 == null;
> [info] /* 242 */
> [info] /* 243 */ boolean isNull5 = false;
> [info] /* 244 */ InternalRow value5 = null;
> [info] /* 245 */ if (!false && isNull1) {
> [info] /* 246 */
> [info] /* 247 */ final InternalRow value7 = null;
> [info] /* 248 */ isNull5 = true;
> [info] /* 249 */ value5 = value7;
> [info] /* 250 */ } else {
> [info] /* 251 */
> [info] /* 252 */ boolean isNull8 = false;
> [info] /* 253 */ this.values = new Object[2];
> [info] /* 254 */ if (isNull1) {
> [info] /* 255 */ throw new RuntimeException(errMsg);
> [info] /* 256 */ }
> [info] /* 257 */
> [info] /* 258 */ boolean isNull11 = false;
> [info] /* 259 */ final com.tresata.spark.sql.Struct2 value11 = isNull11
> ? null : (com.tresata.spark.sql.Struct2) value1.a();
> [info] /* 260 */ isNull11 = value11 == null;
> [info] /* 261 */ boolean isNull9 = false;
> [info] /* 262 */ InternalRow value9 = null;
> [info] /* 263 */ if (!false && isNull11) {
> [info] /* 264 */
> [info] /* 265 */ final InternalRow value13 = null;
> [info] /* 266 */ isNull9 = true;
> [info] /* 267 */ value9 = value13;
> [info] /* 268 */ } else {
> [info] /* 269 */
> [info] /* 270 */ boolean isNull14 = false;
> [info] /* 271 */ values1 = new Object[3];apply_0(i);
> [info] /* 272 */ apply_1(i);
> [info] /* 273 */ final InternalRow value14 = new
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values1);
> [info] /* 274 */ this.values1 = null;
> [info] /* 275 */ isNull9 = isNull14;
> [info] /* 276 */ value9 = value14;
> [info] /* 277 */ }
> [info] /* 278 */ if (isNull9) {
> [info] /* 279 */ values[0] = null;
> [info] /* 280 */ } else {
> [info] /* 281 */ values[0] = value9;
> [info] /* 282 */ }
> [info] /* 283 */ if (isNull1) {
> [info] /* 284 */ throw new RuntimeException(errMsg4);
> [info] /* 285 */ }
> [info] /* 286 */
> [info] /* 287 */ boolean isNull28 = false;
> [info] /* 288 */ final com.tresata.spark.sql.Struct2 value28 = isNull28
> ? null : (com.tresata.spark.sql.Struct2) value1.b();
> [info] /* 289 */ isNull28 = value28 == null;
> [info] /* 290 */ boolean isNull26 = false;
> [info] /* 291 */ InternalRow value26 = null;
> [info] /* 292 */ if (!false && isNull28) {
> [info] /* 293 */
> [info] /* 294 */ final InternalRow value30 = null;
> [info] /* 295 */ isNull26 = true;
> [info] /* 296 */ value26 = value30;
> [info] /* 297 */ } else {
> [info] /* 298 */
> [info] /* 299 */ boolean isNull31 = false;
> [info] /* 300 */ values2 = new Object[3];apply1_0(i);
> [info] /* 301 */ apply1_1(i);
> [info] /* 302 */ final InternalRow value31 = new
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values2);
> [info] /* 303 */ this.values2 = null;
> [info] /* 304 */ isNull26 = isNull31;
> [info] /* 305 */ value26 = value31;
> [info] /* 306 */ }
> [info] /* 307 */ if (isNull26) {
> [info] /* 308 */ values[1] = null;
> [info] /* 309 */ } else {
> [info] /* 310 */ values[1] = value26;
> [info] /* 311 */ }
> [info] /* 312 */ final InternalRow value8 = new
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values);
> [info] /* 313 */ this.values = null;
> [info] /* 314 */ isNull5 = isNull8;
> [info] /* 315 */ value5 = value8;
> [info] /* 316 */ }
> [info] /* 317 */ this.isNull_0 = isNull5;
> [info] /* 318 */ this.value_0 = value5;
> [info] /* 319 */
> [info] /* 320 */ // copy all the results into MutableRow
> [info] /* 321 */
> [info] /* 322 */ if (!this.isNull_0) {
> [info] /* 323 */ mutableRow.update(0, this.value_0);
> [info] /* 324 */ } else {
> [info] /* 325 */ mutableRow.setNullAt(0);
> [info] /* 326 */ }
> [info] /* 327 */
> [info] /* 328 */ return mutableRow;
> [info] /* 329 */ }
> [info] /* 330 */ }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]