[ https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=777198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-777198 ]
ASF GitHub Bot logged work on BEAM-14511: ----------------------------------------- Author: ASF GitHub Bot Created on: 01/Jun/22 22:04 Start Date: 01/Jun/22 22:04 Worklog Time Spent: 10m Work Description: riteshghorse commented on code in PR #17754: URL: https://github.com/apache/beam/pull/17754#discussion_r887338524 ########## sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go: ########## @@ -299,3 +301,226 @@ func TestTracker_TrySplit(t *testing.T) { }) } } + +type offsetRangeEndEstimator struct { + EstimateRangeEnd int64 +} + +// Estimate provides the estimated end for unbounded offset range. +func (o *offsetRangeEndEstimator) Estimate() int64 { + return o.EstimateRangeEnd +} + +// SetEstimateRangeEnd sets the estimated end for unbounded offset range. +func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) { + o.EstimateRangeEnd = rangeEnd +} Review Comment: Done ########## sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go: ########## @@ -223,3 +227,108 @@ func (tracker *Tracker) GetRestriction() interface{} { func (tracker *Tracker) IsBounded() bool { return true } + +// RangeEndEstimator provides the estimated end offset of the range. Users must implement this interface to +// use the offsetrange.GrowableTracker. +type RangeEndEstimator interface { + // Estimate is called to get the end offset in TrySplit() functions. + // + // The end offset is exclusive for the range. The estimated end is not required to + // monotonically increase as it will only be taken into consideration when the + // estimated end offset is larger than the current position. + // Returning math.MaxInt64 as the estimate implies the largest possible position for the range + // is math.MaxInt64 - 1. + // + // Providing a good estimate is important for an accurate progress signal and will impact + // splitting decisions by the runner. + Estimate() int64 +} + +// GrowableTracker tracks a growable offset range restriction that can be represented as a range of integer values, +// for example for byte offsets in a file, or indices in an array. Note that this tracker makes +// no assumptions about the positions of blocks within the range, so users must handle validation +// of block positions if needed. +type GrowableTracker struct { + Tracker + rangeEndEstimator RangeEndEstimator +} + +// NewGrowableTracker creates a GrowableTracker for handling a growable offset range. +// math.MaxInt64 is used as the end of the range to indicate infinity for an unbounded range. +// +// An OffsetRange is considered growable when the end offset could grow (or change) +// during execution time (e.g. Kafka topic partition offset, appended file, ...). +// +// The growable range is marked as done by claiming math.MaxInt64-1. +// +// For bounded restrictions, this tracker works the same as offsetrange.Tracker. +// Use that directly if you have no need of estimating the end of a bound. +func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) (*GrowableTracker, error) { + if rangeEndEstimator == nil { + return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. Implementing offsetrange.RangeEndEstimator may be required") + } + return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: rest.End}), rangeEndEstimator}, nil +} + +// Start returns the starting range of the restriction tracked by a tracker. +func (tracker *GrowableTracker) Start() int64 { + return tracker.GetRestriction().(Restriction).Start +} + +// End returns the end range of the restriction tracked by a tracker. +func (tracker *GrowableTracker) End() int64 { + return tracker.GetRestriction().(Restriction).End +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + +// TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the +// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1. +func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual interface{}, err error) { + if tracker.stopped || tracker.IsDone() { + return tracker.rest, nil, nil + } + + // If current tracking range is no longer growable, split it as a normal range. + if tracker.End() != math.MaxInt64 { + return tracker.Tracker.TrySplit(fraction) + } + + // If current range has been done, there is no more space to split. + if tracker.attempted == math.MaxInt64 { + return nil, nil, nil + } + + cur := max(tracker.attempted, tracker.Start()-1) + estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1) + + splitPt := cur + int64(math.Ceil(math.Max(1, float64(estimatedEnd-cur)*(fraction)))) + if splitPt > estimatedEnd { + return tracker.rest, nil, nil + } + residual = Restriction{Start: splitPt, End: tracker.End()} + tracker.rest.End = splitPt + return tracker.rest, residual, nil +} + +// GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction. +func (tracker *GrowableTracker) GetProgress() (done, remaining float64) { + // If current tracking range is no longer growable, get its progress as a normal range. + if tracker.End() != math.MaxInt64 { + return tracker.Tracker.GetProgress() + } + + done = float64((tracker.claimed + 1) - tracker.Start()) + remaining = float64(tracker.End() - (tracker.claimed + 1)) + return done, remaining Review Comment: Done. Issue Time Tracking ------------------- Worklog Id: (was: 777198) Time Spent: 5h 20m (was: 5h 10m) > Implement Growable Tracker for Go SDK > ------------------------------------- > > Key: BEAM-14511 > URL: https://issues.apache.org/jira/browse/BEAM-14511 > Project: Beam > Issue Type: Improvement > Components: sdk-go > Reporter: Ritesh Ghorse > Assignee: Ritesh Ghorse > Priority: P2 > Time Spent: 5h 20m > Remaining Estimate: 0h > > Add a growable tracker for > [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60] > Restriction in Go SDK. This would be useful for strreaming/unbounded > restrictions in SDF. -- This message was sent by Atlassian Jira (v8.20.7#820007)