Kimahriman commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r2096682405
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -980,3 +1023,67 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( + keyExpressions: Seq[Attribute], + child: SparkPlan, + stateInfo: Option[StatefulOperatorStateInfo] = None, + eventTimeWatermarkForLateEvents: Option[Long] = None, + eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( + Array(StructField("expiresAtMicros", LongType, nullable = false))) + + protected val extraOptionOnStateStore: Map[String, String] = Map.empty + + private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, + allowMultipleEventTimeColumns = false).get + private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + + protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = { + val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow) + val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForValueRow)) + Some(timeoutRow) + } + + protected def putDupInfoIntoState( + store: StateStore, + data: UnsafeRow, + key: UnsafeRow, + reusedDupInfoRow: Option[UnsafeRow]): Unit = { + assert(reusedDupInfoRow.isDefined, "This should have reused row.") + val timeoutRow = reusedDupInfoRow.get + + // We expect data type of event time column to be TimestampType or TimestampNTZType which both + // are internally represented as Long. + val timestamp = data.getLong(eventTimeColOrdinal) + // The unit of timestamp in Spark is microseconds, convert the delay threshold to micros. + val expiresAt = timestamp + DateTimeUtils.millisToMicros(delayThresholdMs) Review Comment: Isn't that just the point of the watermark delay in the first place? Why double the watermark by adding it to the timestamp? For example from the test, a stream with a 2 second watermark delay, an event is added with event time 17, which leads to a watermark timestamp of 15 on the next batch and an expiresAt of 19 for the record. The watermark timestamp will increase to 19 when an event time of 21 comes in, meaning the original record is kept for 4 seconds instead of 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org