dylanwong250 commented on code in PR #52202:
URL: https://github.com/apache/spark/pull/52202#discussion_r2330685228
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -509,13 +515,19 @@ abstract class SymmetricHashJoinStateManager(
storeProviderId, keySchema, valueSchema,
NoPrefixKeyStateEncoderSpec(keySchema),
useColumnFamilies = useVirtualColumnFamilies, storeConf, hadoopConf,
useMultipleValuesPerKey = false, stateSchemaProvider = None)
- if (snapshotStartVersion.isDefined) {
+ if (handlerSnapshotOptions.isDefined) {
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplay]) {
throw
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
stateStoreProvider.getClass.toString)
}
+ val opts = handlerSnapshotOptions.get
stateStoreProvider.asInstanceOf[SupportsFineGrainedReplay]
- .replayStateFromSnapshot(snapshotStartVersion.get,
stateInfo.get.storeVersion)
+ .replayStateFromSnapshot(
+ opts.snapshotVersion,
+ opts.endVersion,
+ readOnly = false,
Review Comment:
I initially had it to readOnly = false to match what the
replayStateFromSnapshot default value was without the uniqueIds. Also looking
at the previous PR for snapshot reading
https://github.com/apache/spark/pull/46944 it seems the readOnly option did not
exist at the time. I changed to readOnly = true and all the tests pass so I
think it is better to switch to readOnly = true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]