I'm interested adding something like this, I could see these being generally useful for a number of cases (one that immediately comes to mind is partitioning datasets into train/test/validation sets and writing each to a different place).
I'm assuming Top (or FixedSample) needs to be small enough to fit into state? I would also be interested in being able to do percentages as well (something like partitioners.Sample(percent=10)), though that might be much more challenging for an unbounded data set (maybe we could do something as simple as a probabilistic target_percentage). Happy to help review a design doc or PR. Thanks, Danny On Thu, Oct 19, 2023 at 10:06 AM Joey Tran <joey.t...@schrodinger.com> wrote: > Hey all, > > While writing a few pipelines, I was surprised by how few partitioners > there were in the python SDK. I wrote a couple that are pretty generic and > possibly generally useful. Just wanted to do a quick poll to see if they > seem useful enough to be in the sdk's library of transforms. If so, I can > put together a PTransform Design Doc[1] for them. Just wanted to confirm > before spending time on the doc. > > Here are the two that I wrote, I'll just paste the class names and > docstrings: > > class FixedSample(beam.PTransform): > """ > A PTransform that takes a PCollection and partitions it into two > PCollections. > The first PCollection is a random sample of the input PCollection, and > the > second PCollection is the remaining elements of the input PCollection. > > This is useful for creating holdout / test sets in machine learning. > > Example usage: > > >>> with beam.Pipeline() as p: > ... sample, remaining = (p > ... | beam.Create(list(range(10))) > ... | partitioners.FixedSample(3)) > ... # sample will contain three randomly selected elements > from the > ... # input PCollection > ... # remaining will contain the remaining seven elements > > """ > > class Top(beam.PTransform): > """ > A PTransform that takes a PCollection and partitions it into two > PCollections. > The first PCollection contains the largest n elements of the input > PCollection, > and the second PCollection contains the remaining elements of the input > PCollection. > > Parameters: > n: The number of elements to take from the input PCollection. > key: A function that takes an element of the input PCollection and > returns > a value to compare for the purpose of determining the top n > elements, > similar to Python's built-in sorted function. > reverse: If True, the top n elements will be the n smallest > elements of the > input PCollection. > > Example usage: > > >>> with beam.Pipeline() as p: > ... top, remaining = (p > ... | beam.Create(list(range(10))) > ... | partitioners.Top(3)) > ... # top will contain [7, 8, 9] > ... # remaining will contain [0, 1, 2, 3, 4, 5, 6] > > """ > > They're basically partitioner versions of the aggregationers Top and Sample > > Best, > Joey > > > [1] > https://docs.google.com/document/d/1NpCipgvT6lMgf1nuuPPwZoKp5KsteplFancGqOgy8OY/edit#heading=h.x9snb54sjlu9 >