[ https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=775069&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775069 ]
ASF GitHub Bot logged work on BEAM-14511: ----------------------------------------- Author: ASF GitHub Bot Created on: 26/May/22 14:12 Start Date: 26/May/22 14:12 Worklog Time Spent: 10m Work Description: riteshghorse commented on code in PR #17754: URL: https://github.com/apache/beam/pull/17754#discussion_r882714391 ########## sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go: ########## @@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) { }) } } + +type offsetRangeEndEstimator struct { + EstimateRangeEnd int64 +} + +// Estimate provides the estimated end for unbounded offset range. +func (o *offsetRangeEndEstimator) Estimate() int64 { + return o.EstimateRangeEnd +} + +// SetEstimateRangeEnd sets the estimated end for unbounded offset range. +func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) { + o.EstimateRangeEnd = rangeEnd +} + +// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when wrong arguments are passed. +func TestNewGrowableTracker_Bad(t *testing.T) { + rest := Restriction{Start: 0, End: math.MaxInt64} + _, err := NewGrowableTracker(rest, nil) + if err == nil { + t.Errorf("NewGrowableTracker() expected to throw error.") + } +} + +// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker. +func TestGrowableTracker_TryClaim(t *testing.T) { + estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0} + rest := Restriction{Start: 0, End: math.MaxInt64} + tracker, err := NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + + if !tracker.TryClaim(int64(10)) { + t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true) + } + if !tracker.TryClaim(int64(100)) { + t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true) + } + if tracker.TryClaim(int64(math.MaxInt64)) { + t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", true, false, tracker.err) + } + if !tracker.IsDone() { + t.Errorf("tracker has done all work, but IsDone() returns false") + } +} + +// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for GrowableTracker +// before claiming anything. +func TestGrowableTracker_SplitBeforeStart(t *testing.T) { + estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0} + rest := Restriction{Start: 0, End: math.MaxInt64} + tracker, err := NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + estimator.SetEstimateRangeEnd(10) + p, r, _ := tracker.TrySplit(0) + + expected := Restriction{0, 0} + if p.(Restriction) != expected { + t.Errorf("wrong primaries after TrySplit(0), got: %v, want: %v", p.(Restriction), expected) + } + if tracker.GetRestriction().(Restriction) != expected { + t.Errorf("wrong restriction tracked by tracker after TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expected) + } + res := Restriction{0, math.MaxInt64} + if res != r.(Restriction) { + t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", r.(Restriction), expected) + } +} + +// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit +// for GrowableTracker. +func TestGrowableTracker_CheckpointJustStarted(t *testing.T) { + estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0} + rest := Restriction{Start: 0, End: math.MaxInt64} + tracker, err := NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + if !tracker.TryClaim(int64(5)) { + t.Fatal("tracker.TryClaim(int64(5)) should've claimed.") + } + estimator.SetEstimateRangeEnd(0) + p, r, _ := tracker.TrySplit(0) + if tracker.IsDone() { + t.Fatal("tracker not done yet, , but IsDone() returns true") + } + + expPr := Restriction{0, 6} + if p.(Restriction) != expPr { + t.Errorf("wrong primaries after TrySplit(0), got: %v, want: %v", p.(Restriction), expPr) + } + if tracker.GetRestriction().(Restriction) != expPr { + t.Errorf("wrong restriction tracked by tracker after TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr) + } + expRes := Restriction{6, math.MaxInt64} + if r.(Restriction) != expRes { + t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", r.(Restriction), expRes) + } + + tracker, err = NewGrowableTracker(rest, &estimator) + if err != nil { + t.Fatalf("error creating new GrowableTracker: %v", err) + } + if !tracker.TryClaim(int64(5)) { + t.Fatal("tracker.TryClaim(int64(5)) should've claimed.") + } + estimator.SetEstimateRangeEnd(20) + p, r, _ = tracker.TrySplit(0) + if tracker.IsDone() { + t.Fatal("tracker not done yet, , but IsDone() returns true") + } + if p.(Restriction) != expPr { + t.Errorf("wrong primaries after TrySplit(0), got: %v, want: %v", p.(Restriction), expPr) + } + if r.(Restriction) != expRes { + t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", r.(Restriction), expRes) + } +} + +// TestGrowableTracker_Split tests TrySplit method for GrowableTracker +// in regards to returned primary and residual. +func TestGrowableTracker_Split(t *testing.T) { Review Comment: Done Issue Time Tracking ------------------- Worklog Id: (was: 775069) Time Spent: 4h 10m (was: 4h) > Implement Growable Tracker for Go SDK > ------------------------------------- > > Key: BEAM-14511 > URL: https://issues.apache.org/jira/browse/BEAM-14511 > Project: Beam > Issue Type: Improvement > Components: sdk-go > Reporter: Ritesh Ghorse > Assignee: Ritesh Ghorse > Priority: P2 > Time Spent: 4h 10m > Remaining Estimate: 0h > > Add a growable tracker for > [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60] > Restriction in Go SDK. This would be useful for strreaming/unbounded > restrictions in SDF. -- This message was sent by Atlassian Jira (v8.20.7#820007)