siying commented on code in PR #50344:
URL: https://github.com/apache/spark/pull/50344#discussion_r2010668281


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -60,23 +60,25 @@ case object StoreTaskCompletionListener extends 
RocksDBOpType("store_task_comple
  *
  * @note This class is not thread-safe, so use it only from one thread.
  * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
- * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
  * @param conf         Configuration for RocksDB
+ * @param stateStoreId StateStoreId for the state store
  * @param localRootDir Root directory in local disk that is used to working 
and checkpointing dirs
  * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
- * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
  */
 class RocksDB(
-    dfsRootDir: String,
     val conf: RocksDBConf,
+    stateStoreId: StateStoreId,

Review Comment:
   Is this refactoring necessary or related at all?
   I think we should separate refactoring changes to the bug fix itself. It 
will make easier to blame issues in case something happens, and make it easier 
to revert a change if possible. 
   
   Regarding the factoring itself, I feel like the previous approach better. 
The more higher layer information (in this case stateStore ID) we pass into the 
lower layer, the less likely we come up with a strong abstraction. It also has 
the side effect that the unit test will be harder to write for example. For 
example, to test class RocksDB, we now need to fake a whole StateStoreId, 
rather than passing two strings.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -335,8 +337,12 @@ private[sql] class RocksDBStateStoreProvider
     }
 
     override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = {
-      val checkpointInfo = rocksDB.getLatestCheckpointInfo(id.partitionId)
-      checkpointInfo
+      if (checkpointInfo == null) {

Review Comment:
   I didn't see a clear reason why we don't use Scala style None, but null. Can 
you change to use Scala Option, or explain why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to