HeartSaVioR commented on code in PR #53930:
URL: https://github.com/apache/spark/pull/53930#discussion_r2862092236
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala:
##########
@@ -539,22 +539,19 @@ case class StreamingSymmetricHashJoinExec(
// the outer side (e.g., left side for left outer join) while
generating the outer "null"
// outputs. Now, we have to remove unnecessary state rows from the
other side (e.g., right
// side for the left outer join) if possible. In all cases, nothing
needs to be outputted,
- // hence the removal needs to be done greedily by immediately
consuming the returned
- // iterator.
+ // hence the removal needs to be done greedily.
//
// For full outer joins, we have already removed unnecessary states
from both sides, so
// nothing needs to be outputted here.
- val cleanupIter = joinType match {
- case Inner | LeftSemi => joinerManager.removeOldState()
- case LeftOuter => joinerManager.rightSideJoiner.removeOldState()
- case RightOuter => joinerManager.leftSideJoiner.removeOldState()
- case FullOuter => Iterator.empty
- case _ => throwBadJoinTypeException()
- }
- while (cleanupIter.hasNext) {
- cleanupIter.next()
- numRemovedStateRows += 1
- }
+ numRemovedStateRows += (
+ joinType match {
+ case Inner | LeftSemi => joinerManager.removeOldState()
+ case LeftOuter => joinerManager.rightSideJoiner.removeOldState()
+ case RightOuter => joinerManager.leftSideJoiner.removeOldState()
+ case FullOuter => 0L
Review Comment:
Yes; removeOldState simply performs eviction by itself and gives the number
of state rows to be evicted. This isn't impacting anything because at the very
next line we fully consumed the iterator prior to this change.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala:
##########
@@ -792,6 +789,32 @@ case class StreamingSymmetricHashJoinExec(
joinStateManager.get(key)
}
+ /**
+ * Remove the old state key-value pairs from this joiner's state manager
based on the state
+ * watermark predicate, and return the number of removed rows.
+ */
+ def removeOldState(): Long = {
Review Comment:
It's in OneSideHashJoiner while we have a caller in
OneSideHashJoinerManager. Also other same level methods are all public.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinValueRowConverter.scala:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.operators.stateful.join
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Literal, UnsafeProjection, UnsafeRow}
+import
org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager.ValueAndMatchPair
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * Converter between the value row stored in state store and the (actual
value, match) pair.
+ */
+trait StreamingSymmetricHashJoinValueRowConverter {
+ /** Defines the schema of the value row (the value side of K-V in state
store). */
+ def valueAttributes: Seq[Attribute]
+
+ /**
+ * Convert the value row to (actual value, match) pair.
+ *
+ * NOTE: implementations should ensure the result row is NOT reused during
execution, so
+ * that caller can safely read the value in any time.
+ */
+ def convertValue(value: UnsafeRow): ValueAndMatchPair
+
+ /**
+ * Build the value row from (actual value, match) pair. This is expected to
be called just
+ * before storing to the state store.
+ *
+ * NOTE: depending on the implementation, the result row "may" be reused
during execution
+ * (to avoid initialization of object), so the caller should ensure that the
logic doesn't
+ * affect by such behavior. Call copy() against the result row if needed.
+ */
+ def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
+}
+
+/**
+ * V1 implementation of the converter, which simply stores the actual value in
state store and
+ * treats the match status as false. Note that only state format version 1
uses this converter,
+ * and this is only for backward compatibility. There is known correctness
issue for outer join
+ * with this converter - see SPARK-26154 for more details.
+ */
+class StreamingSymmetricHashJoinValueRowConverterFormatV1(
+ inputValueAttributes: Seq[Attribute]) extends
StreamingSymmetricHashJoinValueRowConverter {
+ override val valueAttributes: Seq[Attribute] = inputValueAttributes
+
+ override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
+ if (value != null) ValueAndMatchPair(value, false) else null
Review Comment:
It's probably more about preference than style guidance but I don't mind
changing it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinValueRowConverter.scala:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.operators.stateful.join
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Literal, UnsafeProjection, UnsafeRow}
+import
org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager.ValueAndMatchPair
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * Converter between the value row stored in state store and the (actual
value, match) pair.
+ */
+trait StreamingSymmetricHashJoinValueRowConverter {
+ /** Defines the schema of the value row (the value side of K-V in state
store). */
+ def valueAttributes: Seq[Attribute]
+
+ /**
+ * Convert the value row to (actual value, match) pair.
+ *
+ * NOTE: implementations should ensure the result row is NOT reused during
execution, so
+ * that caller can safely read the value in any time.
+ */
+ def convertValue(value: UnsafeRow): ValueAndMatchPair
+
+ /**
+ * Build the value row from (actual value, match) pair. This is expected to
be called just
+ * before storing to the state store.
+ *
+ * NOTE: depending on the implementation, the result row "may" be reused
during execution
+ * (to avoid initialization of object), so the caller should ensure that the
logic doesn't
+ * affect by such behavior. Call copy() against the result row if needed.
+ */
+ def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
+}
+
+/**
+ * V1 implementation of the converter, which simply stores the actual value in
state store and
+ * treats the match status as false. Note that only state format version 1
uses this converter,
+ * and this is only for backward compatibility. There is known correctness
issue for outer join
+ * with this converter - see SPARK-26154 for more details.
+ */
+class StreamingSymmetricHashJoinValueRowConverterFormatV1(
+ inputValueAttributes: Seq[Attribute]) extends
StreamingSymmetricHashJoinValueRowConverter {
+ override val valueAttributes: Seq[Attribute] = inputValueAttributes
+
+ override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
+ if (value != null) ValueAndMatchPair(value, false) else null
+ }
+
+ override def convertToValueRow(value: UnsafeRow, matched: Boolean):
UnsafeRow = value
+}
+
+/**
+ * V2 implementation of the converter, which adds an extra boolean field to
store the match status
+ * in state store. This is the default implementation for state format version
2 and above, which
+ * fixes the correctness issue for outer join in V1 implementation.
+ */
+class StreamingSymmetricHashJoinValueRowConverterFormatV2(
+ inputValueAttributes: Seq[Attribute]) extends
StreamingSymmetricHashJoinValueRowConverter {
+ private val valueWithMatchedExprs = inputValueAttributes :+ Literal(true)
+ private val indexOrdinalInValueWithMatchedRow = inputValueAttributes.size
+
+ private val valueWithMatchedRowGenerator =
UnsafeProjection.create(valueWithMatchedExprs,
+ inputValueAttributes)
+
+ override val valueAttributes: Seq[Attribute] = inputValueAttributes :+
+ AttributeReference("matched", BooleanType)()
+
+ // Projection to generate key row from (value + matched) row
+ private val valueRowGenerator = UnsafeProjection.create(
+ inputValueAttributes, valueAttributes)
+
+ override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
+ if (value != null) {
Review Comment:
if / else statement is too long to be described without braces; IMHO it'd
make more confusion to not have a brace for method but have braces in the
method definition. It's a preference of the style so I hope to keep this as it
is unless you prefer it strongly.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinValueRowConverter.scala:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.operators.stateful.join
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Literal, UnsafeProjection, UnsafeRow}
+import
org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager.ValueAndMatchPair
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * Converter between the value row stored in state store and the (actual
value, match) pair.
+ */
+trait StreamingSymmetricHashJoinValueRowConverter {
+ /** Defines the schema of the value row (the value side of K-V in state
store). */
+ def valueAttributes: Seq[Attribute]
+
+ /**
+ * Convert the value row to (actual value, match) pair.
+ *
+ * NOTE: implementations should ensure the result row is NOT reused during
execution, so
+ * that caller can safely read the value in any time.
+ */
+ def convertValue(value: UnsafeRow): ValueAndMatchPair
+
+ /**
+ * Build the value row from (actual value, match) pair. This is expected to
be called just
+ * before storing to the state store.
+ *
+ * NOTE: depending on the implementation, the result row "may" be reused
during execution
+ * (to avoid initialization of object), so the caller should ensure that the
logic doesn't
+ * affect by such behavior. Call copy() against the result row if needed.
+ */
+ def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
+}
+
+/**
+ * V1 implementation of the converter, which simply stores the actual value in
state store and
+ * treats the match status as false. Note that only state format version 1
uses this converter,
+ * and this is only for backward compatibility. There is known correctness
issue for outer join
+ * with this converter - see SPARK-26154 for more details.
+ */
+class StreamingSymmetricHashJoinValueRowConverterFormatV1(
+ inputValueAttributes: Seq[Attribute]) extends
StreamingSymmetricHashJoinValueRowConverter {
+ override val valueAttributes: Seq[Attribute] = inputValueAttributes
+
+ override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
+ if (value != null) ValueAndMatchPair(value, false) else null
+ }
+
+ override def convertToValueRow(value: UnsafeRow, matched: Boolean):
UnsafeRow = value
+}
+
+/**
+ * V2 implementation of the converter, which adds an extra boolean field to
store the match status
+ * in state store. This is the default implementation for state format version
2 and above, which
+ * fixes the correctness issue for outer join in V1 implementation.
+ */
+class StreamingSymmetricHashJoinValueRowConverterFormatV2(
+ inputValueAttributes: Seq[Attribute]) extends
StreamingSymmetricHashJoinValueRowConverter {
+ private val valueWithMatchedExprs = inputValueAttributes :+ Literal(true)
+ private val indexOrdinalInValueWithMatchedRow = inputValueAttributes.size
+
+ private val valueWithMatchedRowGenerator =
UnsafeProjection.create(valueWithMatchedExprs,
+ inputValueAttributes)
+
+ override val valueAttributes: Seq[Attribute] = inputValueAttributes :+
+ AttributeReference("matched", BooleanType)()
+
+ // Projection to generate key row from (value + matched) row
+ private val valueRowGenerator = UnsafeProjection.create(
+ inputValueAttributes, valueAttributes)
+
+ override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
+ if (value != null) {
+ ValueAndMatchPair(valueRowGenerator(value).copy(),
+ value.getBoolean(indexOrdinalInValueWithMatchedRow))
+ } else {
+ null
+ }
+ }
+
+ override def convertToValueRow(value: UnsafeRow, matched: Boolean):
UnsafeRow = {
+ val row = valueWithMatchedRowGenerator(value)
+ row.setBoolean(indexOrdinalInValueWithMatchedRow, matched)
+ row
+ }
+}
+
+/**
+ * The entry point to create the converter for value row in state store. The
converter is created
+ * based on the state format version.
+ */
+object StreamingSymmetricHashJoinValueRowConverter {
+ def create(
+ inputValueAttributes: Seq[Attribute],
+ stateFormatVersion: Int): StreamingSymmetricHashJoinValueRowConverter = {
+ stateFormatVersion match {
+ case 1 => new
StreamingSymmetricHashJoinValueRowConverterFormatV1(inputValueAttributes)
Review Comment:
The question is "where" - were do we mention it? The config is internal and
only a few users would adjust it. We may not want to deprecate v1 as a user
facing way since we expose the internal config then. Maybe only to config doc?
Btw, we fail the query explicitly when the state format version is 1 and it
is outer join.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -27,16 +27,729 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX,
STATE_STORE_ID}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, JoinedRow, Literal, SafeProjection,
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, JoinedRow, Literal, NamedExpression,
SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.execution.metric.SQLMetric
-import
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorStateInfo
-import
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOpStateStoreCheckpointInfo
+import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
import
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._
-import
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
KeyStateEncoderSpec, NoopStatePartitionKeyExtractor,
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast,
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema,
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics,
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay}
-import org.apache.spark.sql.types.{BooleanType, LongType, StructField,
StructType}
+import
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
KeyStateEncoderSpec, NoopStatePartitionKeyExtractor,
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast,
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema,
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics,
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay,
TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeyStateEncoderSpec,
TimestampKeyStateEncoder}
+import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType,
StructField, StructType}
import org.apache.spark.util.NextIterator
+/**
+ * Base trait of the state manager for stream-stream symmetric hash join
operator.
+ *
+ * This defines the basic APIs for the state manager, except the methods for
eviction which are
+ * defined in separate traits - See [[SupportsEvictByCondition]] and
[[SupportsEvictByTimestamp]].
+ *
+ * Implementation classes are expected to inherit those traits as needed,
depending on the eviction
+ * strategy they support.
+ */
+trait SymmetricHashJoinStateManager {
+ import SymmetricHashJoinStateManager._
+
+ /** Append a new value to the given key, with the flag of matched or not. */
+ def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean): Unit
+
+ /**
+ * Retrieve all matched values from given key. This doesn't update the
matched flag for the
+ * values being returned, hence should be only used if appropriate for the
join type and the
+ * side.
+ */
+ def get(key: UnsafeRow): Iterator[UnsafeRow]
+
+ /**
+ * Retrieve all joined rows for the given key. The joined rows are generated
with the provided
+ * generateJoinedRow function and filtered with the provided predicate.
+ *
+ * For excludeRowsAlreadyMatched = true, the method will only return the
joined rows for the
+ * values which have not been marked as matched yet. The matched flag will
be updated to true
+ * for the values being returned, if it is semantically required to do so.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def getJoinedRows(
+ key: UnsafeRow,
+ generateJoinedRow: InternalRow => JoinedRow,
+ predicate: JoinedRow => Boolean,
+ excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow]
+
+ /**
+ * Provide all key-value pairs in the state manager.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def iterator: Iterator[KeyToValuePair]
+
+ /** Commit all the changes to all the state stores */
+ def commit(): Unit
+
+ /** Abort any changes to the state stores if needed */
+ def abortIfNeeded(): Unit
+
+ /** Provide the metrics. */
+ def metrics: StateStoreMetrics
+
+ /**
+ * Get state store checkpoint information of the two state stores for this
joiner, after
+ * they finished data processing.
+ */
+ def getLatestCheckpointInfo(): JoinerStateStoreCkptInfo
+}
+
+/**
+ * This trait is specific to help the old version of state manager
implementation (v1-v3) to work
+ * with existing tests which look up the state store with key with index.
+ */
+trait SupportsIndexedKeys {
+ def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow
+
+ protected[streaming] def updateNumValuesTestOnly(key: UnsafeRow, numValues:
Long): Unit
+}
+
+/**
+ * This trait is for state manager implementations that support eviction by
condition.
+ * This is for the state manager implementations which have to perform full
scan
+ * for eviction.
+ */
+trait SupportsEvictByCondition { self: SymmetricHashJoinStateManager =>
+ import SymmetricHashJoinStateManager._
+
+ /** Evict the state via condition on the key. Returns the number of values
evicted. */
+ def evictByKeyCondition(removalCondition: UnsafeRow => Boolean): Long
+
+ /**
+ * Evict the state via condition on the key, and return the evicted
key-value pairs.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def evictAndReturnByKeyCondition(
+ removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
+
+ /** Evict the state via condition on the value. Returns the number of values
evicted. */
+ def evictByValueCondition(removalCondition: UnsafeRow => Boolean): Long
+
+ /**
+ * Evict the state via condition on the value, and return the evicted
key-value pairs.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def evictAndReturnByValueCondition(
+ removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
+}
+
+/**
+ * This trait is for state manager implementations that support eviction by
timestamp. This is for
+ * the state manager implementations which maintain the state with event time
and can efficiently
+ * scan the keys with event time smaller than the given timestamp for eviction.
+ */
+trait SupportsEvictByTimestamp { self: SymmetricHashJoinStateManager =>
+ import SymmetricHashJoinStateManager._
+
+ /** Evict the state by timestamp. Returns the number of values evicted. */
+ def evictByTimestamp(endTimestamp: Long): Long
+
+ /**
+ * Evict the state by timestamp and return the evicted key-value pairs.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def evictAndReturnByTimestamp(endTimestamp: Long): Iterator[KeyToValuePair]
+}
+
+/**
+ * The version 4 of stream-stream join state manager implementation, which is
designed to optimize
+ * the eviction with watermark. Previous versions require full scan to find
the keys to evict,
+ * while this version only scans the keys with event time smaller than the
watermark.
+ *
+ * In this implementation, we no longer build a logical array of values;
instead, we store the
+ * (key, timestamp) -> values in the primary store, and maintain a secondary
index of
+ * (timestamp, key) to scan the keys to evict for each watermark. To retrieve
the values for a key,
+ * we perform prefix scan with the key to get all the (key, timestamp) ->
values.
+ *
+ * Refer to the [[KeyWithTsToValuesStore]] and [[TsWithKeyTypeStore]] for more
details.
Review Comment:
I just filed it - https://issues.apache.org/jira/browse/SPARK-55729
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -27,16 +27,729 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX,
STATE_STORE_ID}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, JoinedRow, Literal, SafeProjection,
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, JoinedRow, Literal, NamedExpression,
SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.execution.metric.SQLMetric
-import
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorStateInfo
-import
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOpStateStoreCheckpointInfo
+import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
import
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._
-import
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
KeyStateEncoderSpec, NoopStatePartitionKeyExtractor,
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast,
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema,
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics,
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay}
-import org.apache.spark.sql.types.{BooleanType, LongType, StructField,
StructType}
+import
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
KeyStateEncoderSpec, NoopStatePartitionKeyExtractor,
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast,
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema,
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics,
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay,
TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeyStateEncoderSpec,
TimestampKeyStateEncoder}
+import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType,
StructField, StructType}
import org.apache.spark.util.NextIterator
+/**
+ * Base trait of the state manager for stream-stream symmetric hash join
operator.
+ *
+ * This defines the basic APIs for the state manager, except the methods for
eviction which are
+ * defined in separate traits - See [[SupportsEvictByCondition]] and
[[SupportsEvictByTimestamp]].
+ *
+ * Implementation classes are expected to inherit those traits as needed,
depending on the eviction
+ * strategy they support.
+ */
+trait SymmetricHashJoinStateManager {
+ import SymmetricHashJoinStateManager._
+
+ /** Append a new value to the given key, with the flag of matched or not. */
+ def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean): Unit
+
+ /**
+ * Retrieve all matched values from given key. This doesn't update the
matched flag for the
+ * values being returned, hence should be only used if appropriate for the
join type and the
+ * side.
+ */
+ def get(key: UnsafeRow): Iterator[UnsafeRow]
+
+ /**
+ * Retrieve all joined rows for the given key. The joined rows are generated
with the provided
+ * generateJoinedRow function and filtered with the provided predicate.
+ *
+ * For excludeRowsAlreadyMatched = true, the method will only return the
joined rows for the
+ * values which have not been marked as matched yet. The matched flag will
be updated to true
+ * for the values being returned, if it is semantically required to do so.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def getJoinedRows(
+ key: UnsafeRow,
+ generateJoinedRow: InternalRow => JoinedRow,
+ predicate: JoinedRow => Boolean,
+ excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow]
+
+ /**
+ * Provide all key-value pairs in the state manager.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def iterator: Iterator[KeyToValuePair]
+
+ /** Commit all the changes to all the state stores */
+ def commit(): Unit
+
+ /** Abort any changes to the state stores if needed */
+ def abortIfNeeded(): Unit
+
+ /** Provide the metrics. */
+ def metrics: StateStoreMetrics
+
+ /**
+ * Get state store checkpoint information of the two state stores for this
joiner, after
+ * they finished data processing.
+ */
+ def getLatestCheckpointInfo(): JoinerStateStoreCkptInfo
+}
+
+/**
+ * This trait is specific to help the old version of state manager
implementation (v1-v3) to work
+ * with existing tests which look up the state store with key with index.
+ */
+trait SupportsIndexedKeys {
+ def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow
+
+ protected[streaming] def updateNumValuesTestOnly(key: UnsafeRow, numValues:
Long): Unit
+}
+
+/**
+ * This trait is for state manager implementations that support eviction by
condition.
+ * This is for the state manager implementations which have to perform full
scan
+ * for eviction.
+ */
+trait SupportsEvictByCondition { self: SymmetricHashJoinStateManager =>
+ import SymmetricHashJoinStateManager._
+
+ /** Evict the state via condition on the key. Returns the number of values
evicted. */
+ def evictByKeyCondition(removalCondition: UnsafeRow => Boolean): Long
+
+ /**
+ * Evict the state via condition on the key, and return the evicted
key-value pairs.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def evictAndReturnByKeyCondition(
+ removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
+
+ /** Evict the state via condition on the value. Returns the number of values
evicted. */
+ def evictByValueCondition(removalCondition: UnsafeRow => Boolean): Long
+
+ /**
+ * Evict the state via condition on the value, and return the evicted
key-value pairs.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def evictAndReturnByValueCondition(
+ removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
+}
+
+/**
+ * This trait is for state manager implementations that support eviction by
timestamp. This is for
+ * the state manager implementations which maintain the state with event time
and can efficiently
+ * scan the keys with event time smaller than the given timestamp for eviction.
+ */
+trait SupportsEvictByTimestamp { self: SymmetricHashJoinStateManager =>
+ import SymmetricHashJoinStateManager._
+
+ /** Evict the state by timestamp. Returns the number of values evicted. */
+ def evictByTimestamp(endTimestamp: Long): Long
+
+ /**
+ * Evict the state by timestamp and return the evicted key-value pairs.
+ *
+ * It is caller's responsibility to consume the whole iterator.
+ */
+ def evictAndReturnByTimestamp(endTimestamp: Long): Iterator[KeyToValuePair]
+}
+
+/**
+ * The version 4 of stream-stream join state manager implementation, which is
designed to optimize
+ * the eviction with watermark. Previous versions require full scan to find
the keys to evict,
+ * while this version only scans the keys with event time smaller than the
watermark.
+ *
+ * In this implementation, we no longer build a logical array of values;
instead, we store the
+ * (key, timestamp) -> values in the primary store, and maintain a secondary
index of
+ * (timestamp, key) to scan the keys to evict for each watermark. To retrieve
the values for a key,
+ * we perform prefix scan with the key to get all the (key, timestamp) ->
values.
+ *
+ * Refer to the [[KeyWithTsToValuesStore]] and [[TsWithKeyTypeStore]] for more
details.
+ */
+class SymmetricHashJoinStateManagerV4(
+ joinSide: JoinSide,
+ inputValueAttributes: Seq[Attribute],
+ joinKeys: Seq[Expression],
+ stateInfo: Option[StatefulOperatorStateInfo],
+ storeConf: StateStoreConf,
+ hadoopConf: Configuration,
+ partitionId: Int,
+ keyToNumValuesStateStoreCkptId: Option[String],
+ keyWithIndexToValueStateStoreCkptId: Option[String],
+ stateFormatVersion: Int,
+ skippedNullValueCount: Option[SQLMetric] = None,
+ useStateStoreCoordinator: Boolean = true,
+ snapshotOptions: Option[SnapshotOptions] = None,
+ joinStoreGenerator: JoinStateManagerStoreGenerator)
+ extends SymmetricHashJoinStateManager with SupportsEvictByTimestamp with
Logging {
+
+ import SymmetricHashJoinStateManager._
+
+ protected val keySchema = StructType(
+ joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i",
k.dataType, k.nullable) })
+ protected val keyAttributes = toAttributes(keySchema)
+ private val eventTimeColIdxOpt = WatermarkSupport.findEventTimeColumnIndex(
+ inputValueAttributes,
+ // NOTE: This does not accept multiple event time columns. This is not the
same with the
+ // operator which we offer the backward compatibility, but it involves too
many layers to
+ // pass the information. The information is in SQLConf.
+ allowMultipleEventTimeColumns = false)
+
+ private val random = new scala.util.Random(System.currentTimeMillis())
+ private val bucketCountForNoEventTime = 1024
+ private val extractEventTimeFn: UnsafeRow => Long = { row =>
+ eventTimeColIdxOpt match {
+ case Some(idx) =>
+ val attr = inputValueAttributes(idx)
+
+ if (attr.dataType.isInstanceOf[StructType]) {
+ // NOTE: We assume this is window struct, as same as
WatermarkSupport.watermarkExpression
+ row.getStruct(idx, 2).getLong(1)
+ } else {
+ row.getLong(idx)
+ }
+
+ case _ =>
+ // When event time column is not available, we will use random
bucketing strategy to decide
+ // where the new value will be stored. There is a trade-off between
the bucket size and the
+ // number of values in each bucket; we can tune the bucket size with
the configuration if
+ // we figure out the magic number to not work well.
+ random.nextInt(bucketCountForNoEventTime)
+ }
+ }
+
+ private val eventTimeColIdxOptInKey: Option[Int] = {
+ joinKeys.zipWithIndex.collectFirst {
+ case (ne: NamedExpression, index)
+ if ne.metadata.contains(EventTimeWatermark.delayKey) => index
+ }
+ }
+
+ private val extractEventTimeFnFromKey: UnsafeRow => Option[Long] = { row =>
+ eventTimeColIdxOptInKey.map { idx =>
+ val attr = keyAttributes(idx)
+ if (attr.dataType.isInstanceOf[StructType]) {
+ // NOTE: We assume this is window struct, as same as
WatermarkSupport.watermarkExpression
+ row.getStruct(idx, 2).getLong(1)
+ } else {
+ row.getLong(idx)
+ }
+ }
+ }
+
+ private val dummySchema = StructType(
+ Seq(StructField("dummy", NullType, nullable = true))
+ )
+
+ // TODO: [SPARK-55628] Below two fields need to be handled properly during
integration with
+ // the operator.
+ private val stateStoreCkptId: Option[String] = None
+ private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None
+
+ private var stateStoreProvider: StateStoreProvider = _
+
+ // We will use the dummy schema for the default CF since we will register CF
separately.
+ private val stateStore = getStateStore(
+ dummySchema, dummySchema, useVirtualColumnFamilies = true,
+ NoPrefixKeyStateEncoderSpec(dummySchema), useMultipleValuesPerKey = false
+ )
+
+ private def getStateStore(
+ keySchema: StructType,
+ valueSchema: StructType,
+ useVirtualColumnFamilies: Boolean,
+ keyStateEncoderSpec: KeyStateEncoderSpec,
+ useMultipleValuesPerKey: Boolean): StateStore = {
+ val storeName = StateStoreId.DEFAULT_STORE_NAME
+ val storeProviderId = StateStoreProviderId(stateInfo.get, partitionId,
storeName)
+ val store = if (useStateStoreCoordinator) {
+ assert(handlerSnapshotOptions.isEmpty, "Should not use state store
coordinator " +
+ "when reading state as data source.")
+ joinStoreGenerator.getStore(
+ storeProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ stateInfo.get.storeVersion, stateStoreCkptId, None,
useVirtualColumnFamilies,
+ useMultipleValuesPerKey, storeConf, hadoopConf)
+ } else {
+ // This class will manage the state store provider by itself.
+ stateStoreProvider = StateStoreProvider.createAndInit(
+ storeProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies = useVirtualColumnFamilies,
+ storeConf, hadoopConf, useMultipleValuesPerKey =
useMultipleValuesPerKey,
+ stateSchemaProvider = None)
+ if (handlerSnapshotOptions.isDefined) {
+ if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplay]) {
+ throw
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
+ stateStoreProvider.getClass.toString)
+ }
+ val opts = handlerSnapshotOptions.get
+ stateStoreProvider.asInstanceOf[SupportsFineGrainedReplay]
+ .replayStateFromSnapshot(
+ opts.snapshotVersion,
+ opts.endVersion,
+ readOnly = true,
+ opts.startStateStoreCkptId,
+ opts.endStateStoreCkptId)
+ } else {
+ stateStoreProvider.getStore(stateInfo.get.storeVersion,
stateStoreCkptId)
+ }
+ }
+ logInfo(log"Loaded store ${MDC(STATE_STORE_ID, store.id)}")
Review Comment:
This should only happen once per task. This is actually what we have in
prior version of state manager.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -512,8 +512,11 @@ class SymmetricHashJoinStateManagerV4(
key: UnsafeRow,
timestamp: Long,
valuesWithMatched: Seq[(UnsafeRow, Boolean)]): Unit = {
+ // copy() is required because convertToValueRow reuses its internal
UnsafeProjection output
+ // TODO: StateStore.putList should allow iterator to be passed in, so
that we don't need to
Review Comment:
@anishshri-db Friendly reminder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]