This is an automated email from the ASF dual-hosted git repository.
chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new ab3317b8ef fix(AttributeTypeUtils): use Double.NEGATIVE_INFINITY
instead of Double.MIN_VALUE (#4145)
ab3317b8ef is described below
commit ab3317b8ef12e1b56205bf0f35c5ae1edde42d9d
Author: carloea2 <[email protected]>
AuthorDate: Sat Jan 10 14:01:24 2026 -0800
fix(AttributeTypeUtils): use Double.NEGATIVE_INFINITY instead of
Double.MIN_VALUE (#4145)
### What changes were proposed in this PR?
This PR fixes the value returned by `minValue` for
`AttributeType.DOUBLE` in `AggregationOperation.scala`.
Previously, the code used `Double.MIN_VALUE`, which is the smallest
positive non-zero double, not the most-negative value.
https://github.com/apache/texera/blob/07c35d004a1185e4098b56591166b62cc9ab4856/common/workflow-operator/src/main/scala/org/apache/amber/operator/aggregate/AggregationOperation.scala#L335-L345
The fix replaces `Double.MIN_VALUE` with `Double.NEGATIVE_INFINITY` and
add new testing.
### Any related issues, documentation, discussions?
Closes #4144
Related discussion: #4049 (Clarify minValue intent in
AggregationOperation)
### How was this PR tested?
- AggregateOpSpec.scala
- AttributeTypeUtilsSpec.scala
- Frontend Manual Test
### Was this PR authored or co-authored using generative AI tooling?
No
---------
Co-authored-by: Xiaozhen Liu <[email protected]>
---
.../amber/core/tuple/AttributeTypeUtils.scala | 4 +-
.../amber/core/tuple/AttributeTypeUtilsSpec.scala | 2 +-
.../amber/operator/aggregate/AggregateOpSpec.scala | 170 +++++++++++++++++++++
3 files changed, 173 insertions(+), 3 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
index 208f94d040..17c3dfef33 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
@@ -509,7 +509,7 @@ object AttributeTypeUtils extends Serializable {
)
}
- /** Returns the minimum possible value for a given attribute type. (note
Double.MIN_VALUE is > 0).
+ /** Returns the minimum possible value for a given attribute type.
* For BINARY under lexicographic order, the empty array is the global
minimum.
*/
@throws[UnsupportedOperationException]
@@ -517,7 +517,7 @@ object AttributeTypeUtils extends Serializable {
attrType match {
case AttributeType.INTEGER =>
java.lang.Integer.valueOf(Integer.MIN_VALUE)
case AttributeType.LONG =>
java.lang.Long.valueOf(java.lang.Long.MIN_VALUE)
- case AttributeType.DOUBLE =>
java.lang.Double.valueOf(java.lang.Double.MIN_VALUE)
+ case AttributeType.DOUBLE =>
java.lang.Double.valueOf(java.lang.Double.NEGATIVE_INFINITY)
case AttributeType.TIMESTAMP => new Timestamp(0L)
case AttributeType.BINARY => Array.emptyByteArray
case _ =>
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
index defa567480..446ce518fa 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
@@ -364,7 +364,7 @@ class AttributeTypeUtilsSpec extends AnyFunSuite {
assert(integerMin == Int.MinValue)
assert(longMin == Long.MinValue)
- assert(doubleMin == java.lang.Double.MIN_VALUE)
+ assert(doubleMin == java.lang.Double.NEGATIVE_INFINITY)
}
test("minValue returns timestamp epoch and empty binary array, and fails for
unsupported types") {
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
index e43f3398cc..170efee773 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
@@ -203,6 +203,176 @@ class AggregateOpSpec extends AnyFunSuite {
assert(result == 250L)
}
+ test(
+ "MIN aggregation (DOUBLE) is solid: empty/null/NaN, infinities, signed
zero, and many values"
+ ) {
+ val schema = makeSchema("temperature" -> AttributeType.DOUBLE)
+
+ val operation = makeAggregationOp(AggregationFunction.MIN, "temperature",
"min_temp")
+ val agg = operation.getAggFunc(AttributeType.DOUBLE)
+
+ // -----------------------
+ // 0) Empty input => null
+ // -----------------------
+ val emptyPartial = agg.init()
+ val emptyResult = agg.finalAgg(emptyPartial)
+ assert(emptyResult == null)
+
+ // ---------------------------------------------------------
+ // 1) Only nulls / only NaNs => null
+ // ---------------------------------------------------------
+ val onlyNulls = Seq(makeTuple(schema, null), makeTuple(schema, null),
makeTuple(schema, null))
+ var partialNulls = agg.init()
+ onlyNulls.foreach(tp => partialNulls = agg.iterate(partialNulls, tp))
+ assert(agg.finalAgg(partialNulls) == null)
+
+ val onlyNaNs = Seq(makeTuple(schema, Double.NaN), makeTuple(schema,
Double.NaN))
+ var partialNans = agg.init()
+ onlyNaNs.foreach(tp => partialNans = agg.iterate(partialNans, tp))
+ assert(agg.finalAgg(partialNans) == null)
+
+ // ---------------------------------------------------------
+ // 2) Basic decimals + negatives: should find the true minimum
+ // ---------------------------------------------------------
+ val basics = Seq(
+ makeTuple(schema, 10.25),
+ makeTuple(schema, -2.5),
+ makeTuple(schema, 5.0),
+ makeTuple(schema, -2.5000000001), // slightly smaller than -2.5
+ makeTuple(schema, 1.0e-12)
+ )
+
+ var partial = agg.init()
+ basics.foreach(tp => partial = agg.iterate(partial, tp))
+
+ val basicResult = agg.finalAgg(partial).asInstanceOf[Number].doubleValue()
+ assert(basicResult == -2.5000000001)
+
+ // ---------------------------------------------------------
+ // 3) NaN + null interleaving must not poison the result
+ // (especially if NaN appears first)
+ // ---------------------------------------------------------
+ val mixed = Seq(
+ makeTuple(schema, Double.NaN),
+ makeTuple(schema, null),
+ makeTuple(schema, 3.14159),
+ makeTuple(schema, -0.125),
+ makeTuple(schema, Double.NaN),
+ makeTuple(schema, -9999.0),
+ makeTuple(schema, null)
+ )
+
+ var partialMixed = agg.init()
+ mixed.foreach(tp => partialMixed = agg.iterate(partialMixed, tp))
+
+ val mixedResult =
agg.finalAgg(partialMixed).asInstanceOf[Number].doubleValue()
+ assert(mixedResult == -9999.0)
+
+ // ---------------------------------------------------------
+ // 4) Infinities: min should be -Infinity if present
+ // ---------------------------------------------------------
+ val infinities = Seq(
+ makeTuple(schema, Double.PositiveInfinity),
+ makeTuple(schema, 42.0),
+ makeTuple(schema, Double.NegativeInfinity),
+ makeTuple(schema, -1.0)
+ )
+
+ var partialInf = agg.init()
+ infinities.foreach(tp => partialInf = agg.iterate(partialInf, tp))
+
+ val infResult = agg.finalAgg(partialInf).asInstanceOf[Number].doubleValue()
+ assert(infResult.isNegInfinity)
+
+ // ---------------------------------------------------------
+ // 5) Signed zero: MIN(-0.0, +0.0) should be -0.0
+ // ---------------------------------------------------------
+ val signedZero = Seq(makeTuple(schema, 0.0), makeTuple(schema, -0.0),
makeTuple(schema, 0.0))
+ var partialZero = agg.init()
+ signedZero.foreach(tp => partialZero = agg.iterate(partialZero, tp))
+
+ val zeroValue =
agg.finalAgg(partialZero).asInstanceOf[Number].doubleValue()
+ val zeroBits = java.lang.Double.doubleToRawLongBits(zeroValue)
+ val negZeroBits = java.lang.Double.doubleToRawLongBits(-0.0)
+ assert(zeroBits == negZeroBits)
+
+ // ---------------------------------------------------------
+ // 6) Stress test: many values, compare against a reference min
+ // Reference rule here: ignore null and NaN; return null if none left.
+ // ---------------------------------------------------------
+ val rng = new scala.util.Random(1337)
+ val values: Seq[java.lang.Double] =
+ (1 to 10000).map { index =>
+ index % 250 match {
+ case 0 => null
+ case 1 => Double.NaN
+ case 2 => Double.PositiveInfinity
+ case 3 => Double.NegativeInfinity
+ case 4 => -0.0
+ case _ =>
+ // wide-ish range with some tiny magnitudes too
+ val sign = if (rng.nextBoolean()) 1.0 else -1.0
+ sign * (rng.nextDouble() * 1.0e6) / (if (rng.nextInt(20) == 0)
1.0e12 else 1.0)
+ }
+ }
+
+ val expected: java.lang.Double = {
+ var found = false
+ var currentMin = 0.0
+ values.foreach { x =>
+ if (x != null && !java.lang.Double.isNaN(x)) {
+ if (!found) {
+ currentMin = x; found = true
+ } else if (java.lang.Double.compare(x, currentMin) < 0) currentMin =
x
+ }
+ }
+ if (!found) null else currentMin
+ }
+
+ var partStress = agg.init()
+ values.foreach(v => partStress = agg.iterate(partStress, makeTuple(schema,
v)))
+ val gotAny = agg.finalAgg(partStress)
+
+ if (expected == null) {
+ assert(gotAny == null)
+ } else {
+ val got = gotAny.asInstanceOf[Number].doubleValue()
+ // exact match: should be one of the seen inputs, no tolerance needed
+ if (
+ expected == 0.0 && java.lang.Double.doubleToRawLongBits(expected) !=
java.lang.Double
+ .doubleToRawLongBits(got)
+ ) {
+ // If expected is -0.0, enforce it
+ assert(
+ java.lang.Double.doubleToRawLongBits(got) ==
java.lang.Double.doubleToRawLongBits(
+ expected
+ )
+ )
+ } else {
+ assert(got == expected)
+ }
+ }
+ }
+
+ test("MAX aggregation finds largest DOUBLE value") {
+ val maxValue = 99.144
+ val schema = makeSchema("debt" -> AttributeType.DOUBLE)
+ val tuple1 = makeTuple(schema, -100.123)
+ val tuple2 = makeTuple(schema, 50.12)
+ val tuple3 = makeTuple(schema, maxValue)
+
+ val operation = makeAggregationOp(AggregationFunction.MAX, "debt",
"nax_debt")
+ val agg = operation.getAggFunc(AttributeType.DOUBLE)
+
+ var partial = agg.init()
+ partial = agg.iterate(partial, tuple1)
+ partial = agg.iterate(partial, tuple2)
+ partial = agg.iterate(partial, tuple3)
+
+ val result =
agg.finalAgg(partial).asInstanceOf[java.lang.Double].doubleValue()
+ assert(result == maxValue)
+ }
+
test("AVERAGE aggregation ignores nulls and returns null when all values are
null") {
val schema = makeSchema("price" -> AttributeType.DOUBLE)
val tuple1 = makeTuple(schema, 10.0)