[
https://issues.apache.org/jira/browse/HUDI-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiayu Shen updated HUDI-4219:
-----------------------------
Description:
{color:red}colored text{color}Merge sql:
{code:java}
merge into target_table as t using source_table as s on t.primary_key =
s.primary_key
when matched then update set col0=s.col0, col1=s.col1, col2=s.col2+2;
// col2 is precombine field{code}
In MergeIntoHoodieTableCommand, already process preCombine field expression.
{code:java}
target2SourcePreCombineFiled.foreach {
case (targetPreCombineField, sourceExpression)
if !isEqualToTarget(targetPreCombineField, sourceExpression) =>
sourceDF = sourceDF.withColumn(targetPreCombineField, new
Column(sourceExpression))
sourceDFOutput = sourceDFOutput :+
AttributeReference(targetPreCombineField, sourceExpression.dataType)()
case _=>
} {code}
When "col=s.col", _isEqualToTarget_ will return true, but "col=s.col+2" return
false.
In this case, "col=s.col+2" is added to sourceDF, and then the result of
"s.col+2" will overwrite origin column "col" after optimizer.
So, "col=s.col+2" has been calculated in sourceDF, but it is also in
UpdateAction and to be calculated in gencode, then throw exception as below.
There is exception detail, we can reproduce it in test case.
{code:java}
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to
combine/merge new record with old value in storage, for new record
{HoodieRecord{key=HoodieKey { recordKey=id:8 partitionPath=dt=2021-03-21},
currentLocation='HoodieRecordLocation {instantTime=20220610022503112,
fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}',
newLocation='HoodieRecordLocation {instantTime=20220610022508224,
fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}'}}, old value
{{"_hoodie_commit_time": "20220610022453319", "_hoodie_commit_seqno":
"20220610022453319_0_1", "_hoodie_record_key": "id:8",
"_hoodie_partition_path": "dt=2021-03-21", "_hoodie_file_name":
"9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0_0-136-175_20220610022503112.parquet",
"id": 8, "name": "a8", "price": 80.0, "ts": 1008, "dt": "2021-03-21"}}
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
... 3 more
Caused by: java.lang.RuntimeException: Error in execute expression:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer.
Expressions is: [boundreference() AS `id` 'sxx' AS `name` (boundreference() *
CAST(2 AS DOUBLE)) AS `price` (boundreference() + CAST(10000 AS BIGINT)) AS
`ts` boundreference() AS `dt`]
CodeBody is: {private Object[] references;
private String code;
private AvroSerializer serializer;public
ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520(Object
references, String code, AvroSerializer serializer) {
this.references = (Object[])references;
this.code = code;
this.serializer = serializer;
}public GenericRecord eval(IndexedRecord record) {
boolean isNull_0 = record.get(6) == null;
int value_0 = isNull_0 ?
-1 : ((Integer)record.get(6));boolean isNull_2 = true;
double value_2 = -1.0;
boolean isNull_3 = record.get(2) == null;
double value_3 = isNull_3 ?
-1.0 : ((Double)record.get(2));
if (!isNull_3) {
boolean isNull_4 = false;
double value_4 = -1.0;
if (!false) {
value_4 = (double) 2;
} isNull_2 = false; // resultCode could change nullability.
value_2 = value_3 * value_4;
}
boolean isNull_6 = true;
long value_6 = -1L;
boolean isNull_7 = record.get(3) == null;
long value_7 = isNull_7 ?
-1L : ((Long)record.get(3));
if (!isNull_7) {
boolean isNull_8 = false;
long value_8 = -1L;
if (!false) {
value_8 = (long) 10000;
} isNull_6 = false; // resultCode could change nullability.
value_6 = value_7 + value_8;
}
boolean isNull_10 = record.get(4) == null;
UTF8String value_10 = isNull_10 ?
null : ((UTF8String)record.get(4));
Object[] results = new Object[5];
if (isNull_0) {
results[0] = null;
} else {
results[0] = value_0;
}
if (false) {
results[1] = null;
} else {
results[1] = ((UTF8String) references[0] /* literal */);
}
if (isNull_2) {
results[2] = null;
} else {
results[2] = value_2;
}
if (isNull_6) {
results[3] = null;
} else {
results[3] = value_6;
}
if (isNull_10) {
results[4] = null;
} else {
results[4] = value_10;
}
InternalRow row = new GenericInternalRow(results);
return (GenericRecord) serializer.serialize(row);
}public String getCode() {
return code;
}
}
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:261)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4(ExpressionPayload.scala:109)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4$adapted(ExpressionPayload.scala:103)
at
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:103)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:77)
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
... 9 more
Caused by: java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at
org.apache.hudi.sql.payload.ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520.eval(Unknown
Source)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:259)
... 17 more{code}
was:
Merge sql:
{code:java}
merge into target_table as t using source_table as s on t.primary_key =
s.primary_key
when matched then update set col0=s.col0, col1=s.col1, col2=s.col2+2;
// col2 is precombine field{code}
In MergeIntoHoodieTableCommand, already process preCombine field expression.
{code:java}
target2SourcePreCombineFiled.foreach {
case (targetPreCombineField, sourceExpression)
if !isEqualToTarget(targetPreCombineField, sourceExpression) =>
sourceDF = sourceDF.withColumn(targetPreCombineField, new
Column(sourceExpression))
sourceDFOutput = sourceDFOutput :+
AttributeReference(targetPreCombineField, sourceExpression.dataType)()
case _=>
} {code}
When "col=s.col", _isEqualToTarget_ will return true, but "col=s.col+2" return
false.
In this case, "col=s.col+2" is added to sourceDF, and then the result of
"s.col+2" will overwrite origin column "col" after optimizer.
So, "col=s.col+2" has been calculated in sourceDF, but it is also in
UpdateAction and to be calculated in gencode, then throw exception as below.
There is exception detail, we can reproduce it in test case.
{code:java}
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to
combine/merge new record with old value in storage, for new record
{HoodieRecord{key=HoodieKey { recordKey=id:8 partitionPath=dt=2021-03-21},
currentLocation='HoodieRecordLocation {instantTime=20220610022503112,
fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}',
newLocation='HoodieRecordLocation {instantTime=20220610022508224,
fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}'}}, old value
{{"_hoodie_commit_time": "20220610022453319", "_hoodie_commit_seqno":
"20220610022453319_0_1", "_hoodie_record_key": "id:8",
"_hoodie_partition_path": "dt=2021-03-21", "_hoodie_file_name":
"9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0_0-136-175_20220610022503112.parquet",
"id": 8, "name": "a8", "price": 80.0, "ts": 1008, "dt": "2021-03-21"}}
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
... 3 more
Caused by: java.lang.RuntimeException: Error in execute expression:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer.
Expressions is: [boundreference() AS `id` 'sxx' AS `name` (boundreference() *
CAST(2 AS DOUBLE)) AS `price` (boundreference() + CAST(10000 AS BIGINT)) AS
`ts` boundreference() AS `dt`]
CodeBody is: {private Object[] references;
private String code;
private AvroSerializer serializer;public
ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520(Object
references, String code, AvroSerializer serializer) {
this.references = (Object[])references;
this.code = code;
this.serializer = serializer;
}public GenericRecord eval(IndexedRecord record) {
boolean isNull_0 = record.get(6) == null;
int value_0 = isNull_0 ?
-1 : ((Integer)record.get(6));boolean isNull_2 = true;
double value_2 = -1.0;
boolean isNull_3 = record.get(2) == null;
double value_3 = isNull_3 ?
-1.0 : ((Double)record.get(2));
if (!isNull_3) {
boolean isNull_4 = false;
double value_4 = -1.0;
if (!false) {
value_4 = (double) 2;
} isNull_2 = false; // resultCode could change nullability.
value_2 = value_3 * value_4;
}
boolean isNull_6 = true;
long value_6 = -1L;
boolean isNull_7 = record.get(3) == null;
long value_7 = isNull_7 ?
-1L : ((Long)record.get(3));
if (!isNull_7) {
boolean isNull_8 = false;
long value_8 = -1L;
if (!false) {
value_8 = (long) 10000;
} isNull_6 = false; // resultCode could change nullability.
value_6 = value_7 + value_8;
}
boolean isNull_10 = record.get(4) == null;
UTF8String value_10 = isNull_10 ?
null : ((UTF8String)record.get(4));
Object[] results = new Object[5];
if (isNull_0) {
results[0] = null;
} else {
results[0] = value_0;
}
if (false) {
results[1] = null;
} else {
results[1] = ((UTF8String) references[0] /* literal */);
}
if (isNull_2) {
results[2] = null;
} else {
results[2] = value_2;
}
if (isNull_6) {
results[3] = null;
} else {
results[3] = value_6;
}
if (isNull_10) {
results[4] = null;
} else {
results[4] = value_10;
}
InternalRow row = new GenericInternalRow(results);
return (GenericRecord) serializer.serialize(row);
}public String getCode() {
return code;
}
}
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:261)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4(ExpressionPayload.scala:109)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4$adapted(ExpressionPayload.scala:103)
at
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:103)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:77)
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
... 9 more
Caused by: java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at
org.apache.hudi.sql.payload.ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520.eval(Unknown
Source)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:259)
... 17 more{code}
> Merge Into when update expression "col=s.col+2" on precombine cause exception
> -----------------------------------------------------------------------------
>
> Key: HUDI-4219
> URL: https://issues.apache.org/jira/browse/HUDI-4219
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Jiayu Shen
> Priority: Major
> Labels: pull-request-available
>
> {color:red}colored text{color}Merge sql:
> {code:java}
> merge into target_table as t using source_table as s on t.primary_key =
> s.primary_key
> when matched then update set col0=s.col0, col1=s.col1, col2=s.col2+2;
> // col2 is precombine field{code}
>
> In MergeIntoHoodieTableCommand, already process preCombine field expression.
> {code:java}
> target2SourcePreCombineFiled.foreach {
> case (targetPreCombineField, sourceExpression)
> if !isEqualToTarget(targetPreCombineField, sourceExpression) =>
> sourceDF = sourceDF.withColumn(targetPreCombineField, new
> Column(sourceExpression))
> sourceDFOutput = sourceDFOutput :+
> AttributeReference(targetPreCombineField, sourceExpression.dataType)()
> case _=>
> } {code}
> When "col=s.col", _isEqualToTarget_ will return true, but "col=s.col+2"
> return false.
> In this case, "col=s.col+2" is added to sourceDF, and then the result of
> "s.col+2" will overwrite origin column "col" after optimizer.
> So, "col=s.col+2" has been calculated in sourceDF, but it is also in
> UpdateAction and to be calculated in gencode, then throw exception as below.
>
> There is exception detail, we can reproduce it in test case.
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to
> combine/merge new record with old value in storage, for new record
> {HoodieRecord{key=HoodieKey { recordKey=id:8 partitionPath=dt=2021-03-21},
> currentLocation='HoodieRecordLocation {instantTime=20220610022503112,
> fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}',
> newLocation='HoodieRecordLocation {instantTime=20220610022508224,
> fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}'}}, old value
> {{"_hoodie_commit_time": "20220610022453319", "_hoodie_commit_seqno":
> "20220610022453319_0_1", "_hoodie_record_key": "id:8",
> "_hoodie_partition_path": "dt=2021-03-21", "_hoodie_file_name":
> "9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0_0-136-175_20220610022503112.parquet",
> "id": 8, "name": "a8", "price": 80.0, "ts": 1008, "dt": "2021-03-21"}}
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
> at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> at java.util.concurrent.FutureTask.run(FutureTask.java)
> ... 3 more
> Caused by: java.lang.RuntimeException: Error in execute expression:
> org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer.
> Expressions is: [boundreference() AS `id` 'sxx' AS `name` (boundreference()
> * CAST(2 AS DOUBLE)) AS `price` (boundreference() + CAST(10000 AS BIGINT))
> AS `ts` boundreference() AS `dt`]
> CodeBody is: {private Object[] references;
> private String code;
> private AvroSerializer serializer;public
> ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520(Object
> references, String code, AvroSerializer serializer) {
> this.references = (Object[])references;
> this.code = code;
> this.serializer = serializer;
> }public GenericRecord eval(IndexedRecord record) {
> boolean isNull_0 = record.get(6) == null;
> int value_0 = isNull_0 ?
> -1 : ((Integer)record.get(6));boolean isNull_2 = true;
> double value_2 = -1.0;
> boolean isNull_3 = record.get(2) == null;
> double value_3 = isNull_3 ?
> -1.0 : ((Double)record.get(2));
> if (!isNull_3) {
> boolean isNull_4 = false;
> double value_4 = -1.0;
> if (!false) {
> value_4 = (double) 2;
> } isNull_2 = false; // resultCode could change nullability.
>
> value_2 = value_3 * value_4;
>
>
> }
> boolean isNull_6 = true;
> long value_6 = -1L;
> boolean isNull_7 = record.get(3) == null;
> long value_7 = isNull_7 ?
> -1L : ((Long)record.get(3));
> if (!isNull_7) {
> boolean isNull_8 = false;
> long value_8 = -1L;
> if (!false) {
> value_8 = (long) 10000;
> } isNull_6 = false; // resultCode could change nullability.
>
> value_6 = value_7 + value_8;
>
>
> }
> boolean isNull_10 = record.get(4) == null;
> UTF8String value_10 = isNull_10 ?
> null : ((UTF8String)record.get(4));
> Object[] results = new Object[5];
>
> if (isNull_0) {
> results[0] = null;
> } else {
> results[0] = value_0;
> }
> if (false) {
> results[1] = null;
> } else {
> results[1] = ((UTF8String) references[0] /* literal */);
> }
> if (isNull_2) {
> results[2] = null;
> } else {
> results[2] = value_2;
> }
> if (isNull_6) {
> results[3] = null;
> } else {
> results[3] = value_6;
> }
> if (isNull_10) {
> results[4] = null;
> } else {
> results[4] = value_10;
> }
>
> InternalRow row = new GenericInternalRow(results);
> return (GenericRecord) serializer.serialize(row);
> }public String getCode() {
> return code;
> }
>
> }
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:261)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4(ExpressionPayload.scala:109)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4$adapted(ExpressionPayload.scala:103)
> at
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:103)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:77)
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
> ... 9 more
> Caused by: java.lang.ClassCastException:
> org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
> at
> org.apache.hudi.sql.payload.ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520.eval(Unknown
> Source)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:259)
> ... 17 more{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)