[ 
https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=775277&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775277
 ]

ASF GitHub Bot logged work on BEAM-14511:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/May/22 22:55
            Start Date: 26/May/22 22:55
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17754:
URL: https://github.com/apache/beam/pull/17754#discussion_r883132570


##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)

Review Comment:
   This defines a new variable `res` then outputs expected.  Instead, as above, 
change it to `want` and update the existing variable `want = Restriction{0, 
math.MaxInt64}` instead. This convention is intended to help avoid the 
copypaste error like there is here.



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       fmt.Printf("tracker: %#v", tracker)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       tracker, err = NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(20)
+       p, r, _ = tracker.TrySplit(0)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+}
+
+func TestGrowableTracker_TrySplit(t *testing.T) {

Review Comment:
   It feels like the various tests that call TrySplit with various initial 
conditions and then a mid condition adjustment should be able to become a Table 
Test. Instead of having a cases following each other, it should be identical to 
setting the initial conditions (eg the estimate, various bits of Claimed and 
not claimed state manually), and checking that the end results for each initial 
conditions are sound.
   
   Yes, good testing practice recommends that we get there as much as possible 
via the public APIs (eg. TryClaim & TrySplit), but we don't need to be starting 
each from the 0 start, since we have well defined midpoints that can be cases 
themselves.  
   
   This will simplify and consolidate the code for the tests.



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       fmt.Printf("tracker: %#v", tracker)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }

Review Comment:
   Same thing here, prefer using `got` and `want` to follow the Go idioms.  
They can be used as prefixes as well.
   
   It's also possible to inline things in the if to make things cleaer, and 
easier to use the "correct" variable and avoid copy pasta or repetition errors.
   
   ```
   if got, want := r.(Restriction), Restriction{5, math.MaxInt64}; got != want {
     t.Errorf( ..., got, want)
   }
   ```
   
   (naturally this isn't possible in all cases, such as the two instances of 
the primary needing to be checked, so there will be a `want` in the function 
scope)



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}

Review Comment:
   nit: prefer `want` instead of expected.
   
   Similarly, you could name the returned restrictions as gotP, and gotR, which 
signals to readers that these are the variables that we'll be comparing to soon.



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       fmt.Printf("tracker: %#v", tracker)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       tracker, err = NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(20)
+       p, r, _ = tracker.TrySplit(0)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+}
+
+func TestGrowableTracker_TrySplit(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error in creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(0)) {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", false, true)
+       }
+
+       estimator.SetEstimateRangeEnd(16)
+
+       p, r, _ := tracker.TrySplit(0.5)
+       expPr := Restriction{0, 8}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes := Restriction{8, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       estimator.SetEstimateRangeEnd(12)
+       p, r, _ = tracker.TrySplit(0.5)
+       expPr = Restriction{0, 4}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes = Restriction{4, 8}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       if tracker.TryClaim(int64(4)) {
+               t.Errorf("tracker.TryClaim(4) = %v, want: %v", true, false)
+       }
+}
+
+// TestGrowableTracker_Splits tests that TrySplit follows its contract, 
meaning that
+// splits don't lose any elements, split fractions are clamped to 0 or 1, and
+// that TrySplit always splits at the nearest integer greater than the given
+// fraction.
+func TestGrowableTracker_Splits(t *testing.T) {

Review Comment:
   This is exactly what I'm thinking of. What's missing from these cases that 
is being covered by the tests above? We likely can migrate everything else into 
this test's table.



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       fmt.Printf("tracker: %#v", tracker)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       tracker, err = NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(20)
+       p, r, _ = tracker.TrySplit(0)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+}
+
+func TestGrowableTracker_TrySplit(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error in creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(0)) {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", false, true)
+       }
+
+       estimator.SetEstimateRangeEnd(16)
+
+       p, r, _ := tracker.TrySplit(0.5)
+       expPr := Restriction{0, 8}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes := Restriction{8, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       estimator.SetEstimateRangeEnd(12)
+       p, r, _ = tracker.TrySplit(0.5)
+       expPr = Restriction{0, 4}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes = Restriction{4, 8}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       if tracker.TryClaim(int64(4)) {
+               t.Errorf("tracker.TryClaim(4) = %v, want: %v", true, false)
+       }
+}
+
+// TestGrowableTracker_Splits tests that TrySplit follows its contract, 
meaning that
+// splits don't lose any elements, split fractions are clamped to 0 or 1, and
+// that TrySplit always splits at the nearest integer greater than the given
+// fraction.
+func TestGrowableTracker_Splits(t *testing.T) {
+       // rangeEndEstimator := offsetRangeEndEstimator{EstimateRangeEnd: 10}
+       tests := []struct {
+               rest              Restriction
+               claimed           int64
+               fraction          float64
+               rangeEndEstimator offsetRangeEndEstimator
+               // Index where we want the split to happen. This will be the end
+               // (exclusive) of the primary and first element of the residual.
+               splitPt int64
+       }{
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           0,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+                       splitPt:           5,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           100,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 100},
+                       splitPt:           101,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           5,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 0},
+                       splitPt:           6,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 10},
+                       claimed:           5,
+                       fraction:          -0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+                       splitPt:           5,
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("(split at %v of [%v, %v])",
+                       test.fraction, test.claimed, test.rest.End), func(t 
*testing.T) {
+                       rt, err := NewGrowableTracker(test.rest, 
&test.rangeEndEstimator)
+                       if err != nil {
+                               t.Fatalf("error in creating a new growable 
tracker: %v", err)
+                       }
+                       ok := rt.TryClaim(test.claimed)
+                       if !ok {
+                               t.Fatalf("tracker failed on initial claim: %v", 
test.claimed)
+                       }
+                       gotP, gotR, err := rt.TrySplit(test.fraction)
+                       if err != nil {
+                               t.Fatalf("tracker failed on split: %v", err)
+                       }
+                       var wantP interface{} = Restriction{Start: 
test.rest.Start, End: test.splitPt}
+                       var wantR interface{} = Restriction{Start: 
test.splitPt, End: test.rest.End}
+                       if test.splitPt == test.rest.End {
+                               wantR = nil // When residuals are empty we 
should get nil.
+                       }
+                       if !cmp.Equal(gotP, wantP) {
+                               t.Errorf("split got incorrect primary: got: %v, 
want: %v", gotP, wantP)
+                       }
+                       if !cmp.Equal(gotR, wantR) {
+                               t.Errorf("split got incorrect residual: got: 
%v, want: %v", gotR, wantR)
+                       }
+               })
+       }
+}
+
+// TestGrowableTracker_IsBounded tests IsBounded method for GrowableTracker.
+func TestGrowableTracker_IsBounded(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker is bounded, want unbounded 
initially.")
+       }
+
+       if !tracker.TryClaim(int64(0)) {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", false, true)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker should've been unbounded.")
+       }
+
+       estimator.SetEstimateRangeEnd(16)
+       tracker.TrySplit(0.5)
+       if !tracker.IsBounded() {
+               t.Errorf("tracker should've been bounded after split")
+       }
+}
+
+// TestGrowableTracker_Progress tests GetProgess method for GrowableTracker.
+func TestGrowableTracker_Progress(t *testing.T) {

Review Comment:
   We need a second case for Progress here for validating that we go down the 
branch to the non-growable GetProgress.



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,299 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       fmt.Printf("tracker: %#v", tracker)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       tracker, err = NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(20)
+       p, r, _ = tracker.TrySplit(0)
+       if !tracker.IsDone() {
+               t.Fatal("tracker should have been done, but IsDone() returns 
false")
+       }
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+}
+
+func TestGrowableTracker_TrySplit(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error in creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(0)) {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", false, true)
+       }
+
+       estimator.SetEstimateRangeEnd(16)
+
+       p, r, _ := tracker.TrySplit(0.5)
+       expPr := Restriction{0, 8}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes := Restriction{8, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       estimator.SetEstimateRangeEnd(12)
+       p, r, _ = tracker.TrySplit(0.5)
+       expPr = Restriction{0, 4}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes = Restriction{4, 8}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       if tracker.TryClaim(int64(4)) {
+               t.Errorf("tracker.TryClaim(4) = %v, want: %v", true, false)
+       }
+}
+
+// TestGrowableTracker_Splits tests that TrySplit follows its contract, 
meaning that
+// splits don't lose any elements, split fractions are clamped to 0 or 1, and
+// that TrySplit always splits at the nearest integer greater than the given
+// fraction.
+func TestGrowableTracker_Splits(t *testing.T) {
+       // rangeEndEstimator := offsetRangeEndEstimator{EstimateRangeEnd: 10}
+       tests := []struct {
+               rest              Restriction
+               claimed           int64
+               fraction          float64
+               rangeEndEstimator offsetRangeEndEstimator
+               // Index where we want the split to happen. This will be the end
+               // (exclusive) of the primary and first element of the residual.
+               splitPt int64
+       }{
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           0,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+                       splitPt:           5,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           100,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 100},
+                       splitPt:           101,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           5,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 0},
+                       splitPt:           6,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 10},
+                       claimed:           5,
+                       fraction:          -0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+                       splitPt:           5,
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("(split at %v of [%v, %v])",
+                       test.fraction, test.claimed, test.rest.End), func(t 
*testing.T) {
+                       rt, err := NewGrowableTracker(test.rest, 
&test.rangeEndEstimator)
+                       if err != nil {
+                               t.Fatalf("error in creating a new growable 
tracker: %v", err)
+                       }
+                       ok := rt.TryClaim(test.claimed)
+                       if !ok {
+                               t.Fatalf("tracker failed on initial claim: %v", 
test.claimed)
+                       }
+                       gotP, gotR, err := rt.TrySplit(test.fraction)
+                       if err != nil {
+                               t.Fatalf("tracker failed on split: %v", err)
+                       }
+                       var wantP interface{} = Restriction{Start: 
test.rest.Start, End: test.splitPt}
+                       var wantR interface{} = Restriction{Start: 
test.splitPt, End: test.rest.End}
+                       if test.splitPt == test.rest.End {
+                               wantR = nil // When residuals are empty we 
should get nil.
+                       }
+                       if !cmp.Equal(gotP, wantP) {
+                               t.Errorf("split got incorrect primary: got: %v, 
want: %v", gotP, wantP)
+                       }
+                       if !cmp.Equal(gotR, wantR) {
+                               t.Errorf("split got incorrect residual: got: 
%v, want: %v", gotR, wantR)
+                       }
+               })
+       }
+}
+
+// TestGrowableTracker_IsBounded tests IsBounded method for GrowableTracker.
+func TestGrowableTracker_IsBounded(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker is bounded, want unbounded 
initially.")
+       }
+
+       if !tracker.TryClaim(int64(0)) {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", false, true)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker should've been unbounded.")
+       }
+
+       estimator.SetEstimateRangeEnd(16)
+       tracker.TrySplit(0.5)
+       if !tracker.IsBounded() {
+               t.Errorf("tracker should've been bounded after split")
+       }
+}
+
+// TestGrowableTracker_Progress tests GetProgess method for GrowableTracker.
+func TestGrowableTracker_Progress(t *testing.T) {
+       var start, cur int64
+       start = 10
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: start, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error in creating a new GrowableTracker: %v", err)
+       }
+       cur = 20
+
+       if !tracker.TryClaim(int64(cur)) {
+               t.Fatalf("couldn't claim tracker.TryClaim(int64(%v))", cur)
+       }
+
+       estimator.SetEstimateRangeEnd(5)
+       done, remaining := tracker.GetProgress()
+       if float64(cur+1-start) != done {
+               t.Errorf("wrong amount of work done, got: %v, want: %v", 
cur+1-start, done)
+       }
+       if math.MaxInt64-done != remaining {
+               t.Errorf("work remaining:%v, expected: %v", remaining, 
math.MaxInt64-done)
+       }

Review Comment:
   To avoid repeating the math in the message, assign them to `got` and `want`, 
and avoid the possibility of error entirely. (which saves me, as a reviewer, 
from having to check)
   
   eg
   ```
        if got, want := done, float64(cur+1-start); got != want {
                t.Errorf("wrong amount of work done, got: %v, want: %v", got, 
want)
        }





Issue Time Tracking
-------------------

    Worklog Id:     (was: 775277)
    Time Spent: 4.5h  (was: 4h 20m)

> Implement Growable Tracker for Go SDK
> -------------------------------------
>
>                 Key: BEAM-14511
>                 URL: https://issues.apache.org/jira/browse/BEAM-14511
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Ritesh Ghorse
>            Assignee: Ritesh Ghorse
>            Priority: P2
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Add a growable tracker for 
> [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60]
>  Restriction in Go SDK. This would be useful for strreaming/unbounded 
> restrictions in SDF.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to