[ https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=774738&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774738 ]
ASF GitHub Bot logged work on BEAM-14511: ----------------------------------------- Author: ASF GitHub Bot Created on: 25/May/22 18:04 Start Date: 25/May/22 18:04 Worklog Time Spent: 10m Work Description: riteshghorse commented on code in PR #17754: URL: https://github.com/apache/beam/pull/17754#discussion_r881966575 ########## sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go: ########## @@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} { func (tracker *Tracker) IsBounded() bool { return true } + +// RangeEndEstimator provides the estimated end offset of the range. Users must implement this interface to +// use the offsetrange.GrowableTracker. +type RangeEndEstimator interface { + // Estimate is called to get the end offset in TrySplit() or GetProgress() functions. + // + // The end offset is exclusive for the range. The estimated end is not required to + // monotonically increase as it will only be taken into consideration when the + // estimated end offset is larger than the current position. + // Returning math.MaxInt64 as the estimate implies the largest possible position for the range + // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be provided. + // + // Providing a good estimate is important for an accurate progress signal and will impact + // splitting decisions by the runner. + Estimate() int64 +} + +// GrowableTracker tracks a growable offset range restriction that can be represented as a range of integer values, +// for example for byte offsets in a file, or indices in an array. Note that this tracker makes +// no assumptions about the positions of blocks within the range, so users must handle validation +// of block positions if needed. +type GrowableTracker struct { + Tracker + rangeEndEstimator RangeEndEstimator +} + +// NewGrowableTracker is a constructor for an GrowableTracker given a start and RangeEndEstimator. +func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) (*GrowableTracker, error) { + if rangeEndEstimator == nil { + return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. Implementing offsetrange.RangeEndEstimator may be required") + } + return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: rest.End}), rangeEndEstimator}, nil +} + +// Start returns the starting range of the restriction tracked by a tracker. +func (tracker *GrowableTracker) Start() int64 { + return tracker.GetRestriction().(Restriction).Start +} + +// End returns the end range of the restriction tracked by a tracker. +func (tracker *GrowableTracker) End() int64 { + return tracker.GetRestriction().(Restriction).End +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + +// TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the +// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1. +func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual interface{}, err error) { + if tracker.stopped || tracker.IsDone() { + log.Infof(context.Background(), "Done in TrySplit(%f)", fraction) + return tracker.rest, nil, nil + } + + // If current tracking range is no longer growable, split it as a normal range. + if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() { + log.Infof(context.Background(), "Doing the normal OffsetTracker TrySplit(%f)", fraction) + return tracker.Tracker.TrySplit(fraction) + } + + // If current range has been done, there is no more space to split. + if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 { + return nil, nil, nil + } + + var cur int64 + if tracker.attempted != -1 { + cur = tracker.attempted + } else { + cur = tracker.Start() - 1 + } + + estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1) + + splitPt := cur + int64(math.Ceil(math.Max(1, float64(estimatedEnd-cur)*(fraction)))) + log.Infof(context.Background(), "Split using estimatedEnd, estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt) + if splitPt > estimatedEnd { + return tracker.rest, nil, nil + } + if splitPt < tracker.rest.Start { + return tracker.rest, nil, nil + } + residual = Restriction{Start: splitPt, End: tracker.End()} + tracker.rest.End = splitPt + return tracker.rest, residual, nil +} + +// GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction. +func (tracker *GrowableTracker) GetProgress() (done, remaining float64) { + log.Infof(context.Background(), "PROGRESS: tracker: %#v", tracker) + + // If current tracking range is no longer growable, split it as a normal range. + if tracker.End() != math.MaxInt64 || tracker.End() == tracker.Start() { + return tracker.Tracker.GetProgress() + } + + if tracker.attempted == -1 { + done = 0 + remaining = math.Max(float64(tracker.End())-float64(tracker.Start()), 0) + return done, remaining + } + + done = float64((tracker.claimed + 1) - tracker.Start()) + remaining = float64(tracker.rest.End - (tracker.claimed + 1)) Review Comment: Estimate should only be used TrySplit. tracker.rest.End is modified there itself to the estimated value. GetProgress should only tell the difference in done and remaining in my opinion. Issue Time Tracking ------------------- Worklog Id: (was: 774738) Time Spent: 3.5h (was: 3h 20m) > Implement Growable Tracker for Go SDK > ------------------------------------- > > Key: BEAM-14511 > URL: https://issues.apache.org/jira/browse/BEAM-14511 > Project: Beam > Issue Type: Improvement > Components: sdk-go > Reporter: Ritesh Ghorse > Assignee: Ritesh Ghorse > Priority: P2 > Time Spent: 3.5h > Remaining Estimate: 0h > > Add a growable tracker for > [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60] > Restriction in Go SDK. This would be useful for strreaming/unbounded > restrictions in SDF. -- This message was sent by Atlassian Jira (v8.20.7#820007)