anishshri-db commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1902064252


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -149,7 +149,8 @@ case class TransformWithStateInPandasExec(
     initialStateGroupingAttrs.map(SortOrder(_, Ascending)))
 
   override def operatorStateMetadata(
-      stateSchemaPaths: List[String]): OperatorStateMetadata = {
+      stateSchemaPaths: List[List[String]]
+  ): OperatorStateMetadata = {

Review Comment:
   nit: can fit in line above ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateMetricsImpl.scala:
##########
@@ -21,6 +21,10 @@ import 
org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj
 import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStore}
 import org.apache.spark.sql.types._
 
+object ListStateMetricsImpl {

Review Comment:
   Lets combine this with some existing utils object ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -140,7 +140,13 @@ case class TransformWithStateExec(
    * after init is called.
    */
   override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = 
{
-    val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
+    val keySchema = keyExpressions.toStructType
+    val columnFamilySchemas = 
getDriverProcessorHandle().getColumnFamilySchemas ++
+      Map(
+        StateStore.DEFAULT_COL_FAMILY_NAME ->
+          StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,

Review Comment:
   indent seems off ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -135,6 +136,8 @@ class MicroBatchExecution(
   // operatorID -> (partitionID -> array of uniqueID)
   private val currentStateStoreCkptId = MutableMap[Long, 
Array[Array[String]]]()
 
+  private val stateSchemaMetadatas = MutableMap[Long, StateSchemaBroadcast]()

Review Comment:
   nit: lets add some comment here explaining what this is for 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -67,39 +69,117 @@ object StateStoreColumnFamilySchemaUtils {
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       valEncoder: Encoder[T],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
-   StateStoreColFamilySchema(
+      hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = {
+    val schemas = mutable.Map[String, StateStoreColFamilySchema]()
+
+    // Add main value state schema
+    schemas.put(stateName, StateStoreColFamilySchema(
       stateName,
+      keySchemaId = 0,
       keyEncoder.schema,
+      valueSchemaId = 0,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
-      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))))
+
+    // Add TTL index if needed
+    if (hasTtl) {
+      val ttlIndexSchema = StateStoreColFamilySchema(
+        getTtlColFamilyName(stateName), 0,
+        getTTLRowKeySchema(keyEncoder.schema), 0,
+        StructType(Array(StructField("__empty__", NullType))),
+        
Some(RangeKeyScanStateEncoderSpec(getTTLRowKeySchema(keyEncoder.schema), 
Seq(0))))
+      schemas.put(ttlIndexSchema.colFamilyName, ttlIndexSchema)
+    }
+
+    schemas.toMap
   }
 
   def getListStateSchema[T](
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       valEncoder: Encoder[T],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
-  StateStoreColFamilySchema(
+      hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = {
+    val schemas = mutable.Map[String, StateStoreColFamilySchema]()

Review Comment:
   Why do we need these maps ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -344,7 +347,7 @@ class StatefulProcessorHandleImpl(
  * the StatefulProcessor is initialized.
  */
 class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: 
ExpressionEncoder[Any])
-  extends StatefulProcessorHandleImplBase(timeMode, keyExprEnc) {
+  extends StatefulProcessorHandleImplBase(timeMode, keyExprEnc) with Logging {

Review Comment:
   intentional ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -43,6 +43,15 @@ object TimerStateUtils {
       TimerStateUtils.PROC_TIMERS_STATE_NAME + 
TimerStateUtils.KEY_TO_TIMESTAMP_CF
     }
   }
+
+  def getTimerStateSecIndexName(timeMode: String): String = {

Review Comment:
   Could we combine with function above and just make a generic function ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -462,12 +468,13 @@ case class TransformWithStateExec(
       stateSchemaVersion: Int): List[StateSchemaValidationResult] = {
     val info = getStateInfo
     validateAndWriteStateSchema(hadoopConf, batchId, stateSchemaVersion,
-      info, session, operatorStateMetadataVersion)
+      info, session, operatorStateMetadataVersion, 
conf.stateStoreEncodingFormat)
   }
 
   /** Metadata of this stateful operator and its states stores. */
   override def operatorStateMetadata(
-      stateSchemaPaths: List[String]): OperatorStateMetadata = {
+      stateSchemaPaths: List[List[String]]
+  ): OperatorStateMetadata = {

Review Comment:
   nit: could go to line above ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to