[ https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=774746&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774746 ]
ASF GitHub Bot logged work on BEAM-14511: ----------------------------------------- Author: ASF GitHub Bot Created on: 25/May/22 18:12 Start Date: 25/May/22 18:12 Worklog Time Spent: 10m Work Description: riteshghorse commented on code in PR #17754: URL: https://github.com/apache/beam/pull/17754#discussion_r881972483 ########## sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go: ########## @@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} { func (tracker *Tracker) IsBounded() bool { return true } + +// RangeEndEstimator provides the estimated end offset of the range. Users must implement this interface to +// use the offsetrange.GrowableTracker. +type RangeEndEstimator interface { + // Estimate is called to get the end offset in TrySplit() or GetProgress() functions. + // + // The end offset is exclusive for the range. The estimated end is not required to + // monotonically increase as it will only be taken into consideration when the + // estimated end offset is larger than the current position. + // Returning math.MaxInt64 as the estimate implies the largest possible position for the range + // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be provided. + // + // Providing a good estimate is important for an accurate progress signal and will impact + // splitting decisions by the runner. + Estimate() int64 +} + +// GrowableTracker tracks a growable offset range restriction that can be represented as a range of integer values, +// for example for byte offsets in a file, or indices in an array. Note that this tracker makes +// no assumptions about the positions of blocks within the range, so users must handle validation +// of block positions if needed. +type GrowableTracker struct { + Tracker + rangeEndEstimator RangeEndEstimator +} + +// NewGrowableTracker is a constructor for an GrowableTracker given a start and RangeEndEstimator. +func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) (*GrowableTracker, error) { + if rangeEndEstimator == nil { + return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. Implementing offsetrange.RangeEndEstimator may be required") + } + return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: rest.End}), rangeEndEstimator}, nil +} + +// Start returns the starting range of the restriction tracked by a tracker. +func (tracker *GrowableTracker) Start() int64 { + return tracker.GetRestriction().(Restriction).Start +} + +// End returns the end range of the restriction tracked by a tracker. +func (tracker *GrowableTracker) End() int64 { + return tracker.GetRestriction().(Restriction).End +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + +// TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the +// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1. +func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual interface{}, err error) { + if tracker.stopped || tracker.IsDone() { + log.Infof(context.Background(), "Done in TrySplit(%f)", fraction) + return tracker.rest, nil, nil + } + + // If current tracking range is no longer growable, split it as a normal range. + if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() { + log.Infof(context.Background(), "Doing the normal OffsetTracker TrySplit(%f)", fraction) + return tracker.Tracker.TrySplit(fraction) + } + + // If current range has been done, there is no more space to split. + if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 { + return nil, nil, nil + } + + var cur int64 + if tracker.attempted != -1 { + cur = tracker.attempted + } else { + cur = tracker.Start() - 1 + } + + estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1) + + splitPt := cur + int64(math.Ceil(math.Max(1, float64(estimatedEnd-cur)*(fraction)))) + log.Infof(context.Background(), "Split using estimatedEnd, estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt) + if splitPt > estimatedEnd { + return tracker.rest, nil, nil + } + if splitPt < tracker.rest.Start { Review Comment: yeah, we don't need it Issue Time Tracking ------------------- Worklog Id: (was: 774746) Time Spent: 3h 40m (was: 3.5h) > Implement Growable Tracker for Go SDK > ------------------------------------- > > Key: BEAM-14511 > URL: https://issues.apache.org/jira/browse/BEAM-14511 > Project: Beam > Issue Type: Improvement > Components: sdk-go > Reporter: Ritesh Ghorse > Assignee: Ritesh Ghorse > Priority: P2 > Time Spent: 3h 40m > Remaining Estimate: 0h > > Add a growable tracker for > [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60] > Restriction in Go SDK. This would be useful for strreaming/unbounded > restrictions in SDF. -- This message was sent by Atlassian Jira (v8.20.7#820007)