Abacn commented on code in PR #37691:
URL: https://github.com/apache/beam/pull/37691#discussion_r2854682144


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##########
@@ -303,6 +309,11 @@ public void asyncClose() throws Exception {
       pendingRemoves.clear();
     }
 
+    if (onlyBundleForKeys) {

Review Comment:
   Marking. another place where  onlyBundleForKeys taking effect.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -246,7 +246,9 @@ private void addRunnerAndConsumersForPTransformRecursively(
       BundleFinalizer bundleFinalizer,
       Collection<BeamFnDataReadRunner<?>> channelRoots,
       Map<ApiServiceDescriptor, BeamFnDataOutboundAggregator> 
outboundAggregatorMap,
-      Set<String> runnerCapabilities)
+      Set<String> runnerCapabilities,

Review Comment:
   The number of parameters keep growing...



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java:
##########
@@ -127,7 +133,7 @@ public void asyncClose() throws Exception {
       beamFnStateClient.handle(
           
request.toBuilder().setClear(StateClearRequest.getDefaultInstance()));
     }
-    if (!newValues.isEmpty()) {
+    if (!onlyBundleForKeys && !newValues.isEmpty()) {

Review Comment:
   Marking. This is where the plumbed parameter taking effect.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java:
##########
@@ -677,6 +683,15 @@ static class LazyBlockingStateFetchingIterator implements 
PrefetchableIterator<B
       this.continuationToken = 
stateRequestForFirstChunk.getGet().getContinuationToken();
     }
 
+    LazyBlockingStateFetchingIterator(
+        BeamFnStateClient beamFnStateClient,
+        StateRequest stateRequestForFirstChunk,
+        boolean hasNoState) {
+      this.beamFnStateClient = beamFnStateClient;
+      this.stateRequestForFirstChunk = stateRequestForFirstChunk;
+      this.continuationToken = hasNoState ? null : 
stateRequestForFirstChunk.getGet().getContinuationToken();

Review Comment:
   This mostly LGTM me. The need of plumbing these parameters all the way down 
to where it takes effect tells how difficult this kind of changes in an 
existing system became.
   
   My only concern is that we now unconditionally trust requests' 
hasNoState/onlyBundleForKey parameter to skip overheads. Is it possible to add 
some lightweight checks somewhere, for example, when we see there is actually 
states but hasNoState is set, we should throw IllegalArgumentException to 
prevent data loss / corruption when the assumption does not stand (in the case 
of runner bug) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to