HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r2096687759


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +1023,67 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+    Array(StructField("expiresAtMicros", LongType, nullable = false)))
+
+  protected val extraOptionOnStateStore: Map[String, String] = Map.empty
+
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = {
+    val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow)
+    val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForValueRow))
+    Some(timeoutRow)
+  }
+
+  protected def putDupInfoIntoState(
+      store: StateStore,
+      data: UnsafeRow,
+      key: UnsafeRow,
+      reusedDupInfoRow: Option[UnsafeRow]): Unit = {
+    assert(reusedDupInfoRow.isDefined, "This should have reused row.")
+    val timeoutRow = reusedDupInfoRow.get
+
+    // We expect data type of event time column to be TimestampType or 
TimestampNTZType which both
+    // are internally represented as Long.
+    val timestamp = data.getLong(eventTimeColOrdinal)
+    // The unit of timestamp in Spark is microseconds, convert the delay 
threshold to micros.
+    val expiresAt = timestamp + DateTimeUtils.millisToMicros(delayThresholdMs)

Review Comment:
   Let's say you have 2 hours of watermark delay, and the API guarantees to 
cover duplicates where the duplicates are within 2 hours from the earliest and 
the latest.
   
   Let's say we have 12:00 and 13:59, which are close within 2 hours. If you 
have the event 12:00 comes first, it is mandatory for the operator to wait till 
13:59, to account the event having 13:59.
   
   Honestly I have been questioning that TTL based eviction (extending 
expiresAt based on the input) would be easier to understand...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to