[ https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=774766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774766 ]
ASF GitHub Bot logged work on BEAM-14511: ----------------------------------------- Author: ASF GitHub Bot Created on: 25/May/22 18:40 Start Date: 25/May/22 18:40 Worklog Time Spent: 10m Work Description: riteshghorse commented on code in PR #17754: URL: https://github.com/apache/beam/pull/17754#discussion_r881994891 ########## sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go: ########## @@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) { }) } } + +type offsetRangeEndEstimator struct { + EstimateRangeEnd int64 +} + +// Estimate provides the estimated end for unbounded offset range. +func (o *offsetRangeEndEstimator) Estimate() int64 { + return o.EstimateRangeEnd +} + +// SetEstimateRangeEnd sets the estimated end for unbounded offset range. +func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) { + o.EstimateRangeEnd = rangeEnd +} + +// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when wrong arguments are passed. +func TestNewGrowableTracker_Bad(t *testing.T) { + rest := Restriction{Start: 0, End: math.MaxInt64} + _, err := NewGrowableTracker(rest, nil) + if err == nil { + t.Errorf("NewGrowableTracker() expected to throw error.") + } +} + +// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker. +func TestGrowableTracker_TryClaim(t *testing.T) { + estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0} + rest := Restriction{Start: 0, End: math.MaxInt64} + tracker, err := NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + + if !tracker.TryClaim(int64(10)) { + t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true) + } + if !tracker.TryClaim(int64(100)) { + t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true) + } + if tracker.TryClaim(int64(math.MaxInt64)) { + t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", true, false, tracker.err) + } + if !tracker.IsDone() { + t.Errorf("tracker has done all work, but IsDone() returns false") + } +} + +// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for GrowableTracker +// before claiming anything. +func TestGrowableTracker_SplitBeforeStart(t *testing.T) { + estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0} + rest := Restriction{Start: 0, End: math.MaxInt64} + tracker, err := NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + estimator.SetEstimateRangeEnd(10) + p, r, _ := tracker.TrySplit(0) + + expected := Restriction{0, 0} + if p.(Restriction) != expected { + t.Errorf("wrong primaries after TrySplit(0), got: %v, want: %v", p.(Restriction), expected) + } + if tracker.GetRestriction().(Restriction) != expected { + t.Errorf("wrong restriction tracked by tracker after TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expected) + } + res := Restriction{0, math.MaxInt64} + if res != r.(Restriction) { + t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", r.(Restriction), expected) + } +} + +// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit +// for GrowableTracker. +func TestGrowableTracker_CheckpointJustStarted(t *testing.T) { + estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0} + rest := Restriction{Start: 0, End: math.MaxInt64} + tracker, err := NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + if !tracker.TryClaim(int64(5)) { + t.Fatal("tracker.TryClaim(int64(5)) should've claimed.") + } + estimator.SetEstimateRangeEnd(0) + p, r, _ := tracker.TrySplit(0) + if tracker.IsDone() { + t.Fatal("tracker not done yet, , but IsDone() returns true") + } + + expPr := Restriction{0, 6} + if p.(Restriction) != expPr { + t.Errorf("wrong primaries after TrySplit(0), got: %v, want: %v", p.(Restriction), expPr) + } + if tracker.GetRestriction().(Restriction) != expPr { + t.Errorf("wrong restriction tracked by tracker after TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr) + } + expRes := Restriction{6, math.MaxInt64} + if r.(Restriction) != expRes { + t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", r.(Restriction), expRes) + } Review Comment: Oh this might have caught one bug in IsDone(). Technically our offset restriction has exclusive end. So IsDone(0 should actually be checking `tracker.claimed+1 >= tracker.rest.End` as it does for calculating progress. https://github.com/apache/beam/blob/83c85a5799377eee0410f4c492965749155370d1/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L213 Issue Time Tracking ------------------- Worklog Id: (was: 774766) Time Spent: 3h 50m (was: 3h 40m) > Implement Growable Tracker for Go SDK > ------------------------------------- > > Key: BEAM-14511 > URL: https://issues.apache.org/jira/browse/BEAM-14511 > Project: Beam > Issue Type: Improvement > Components: sdk-go > Reporter: Ritesh Ghorse > Assignee: Ritesh Ghorse > Priority: P2 > Time Spent: 3h 50m > Remaining Estimate: 0h > > Add a growable tracker for > [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60] > Restriction in Go SDK. This would be useful for strreaming/unbounded > restrictions in SDF. -- This message was sent by Atlassian Jira (v8.20.7#820007)