On Thu, Feb 5, 2026 at 6:06 PM Robert Bradshaw <[email protected]> wrote:

> On Thu, Feb 5, 2026 at 1:44 PM Joey Tran <[email protected]>
> wrote:
>
>> Thanks for such quick feedback!
>>
>> On Thu, Feb 5, 2026 at 3:27 PM Danny McCormick via dev <
>>> [email protected]> wrote:
>>>
>>>> Would you mind opening up the doc for comments?
>>>>
>>>> At a high level, I'm skeptical of the pattern; it seems to me like it
>>>> moves the burden of choosing the correct behavior from authors to consumers
>>>> in non-obvious ways which range from harmless to potentially causing silent
>>>> data loss. I think if a user wants to drop a PCollection, that should
>>>> always be an active choice since the risk of data loss is much greater than
>>>> the EoU benefit of extra code.
>>>>
>>>> I think perhaps I poorly chose a few motivating examples, but it was at
>> least helpful in clarifying two distinct patterns.
>>   - Filters/Samplers/Deduplicators
>>   - Transforms that may run into issues with certain inputs
>>
>
> One usecase that comes to mind is running some computation with an
> (safely ignorable) side output that has statistics about what was
> computed/encountered.
>
>
>>
>>
>>> I'd argue that a better pattern than having a single transform which
>>>> handles this is to either have a *Filter *or a *Partition* transform
>>>> which a user can use as needed. These are different transforms because they
>>>> have different purposes/core functionalities.
>>>>
>>>> This can become unwieldy for a large library of filtering / sampling /
>> data processing transforms. At Schrodinger for example, we may have maybe a
>> dozen transforms some of which...
>>   - are samplers where most consumers will just be interested in the
>> "sample", while other consumers may be interested in both the sample and
>> remaining
>>   - are data processing transforms with a concept of processed outputs
>> and "dropped for well-understood reason"
>>
>
> I prefer transforms that can be used multiple ways than distinct
> transforms that do the same thing.
>
> FWIW, our existing DoFn has somewhat this behavior: by default one
> consumes only the "main" output, but the caller can ask for the side
> outputs if desired. This extends it a bit further in that
>
> 1. Composite operations are supported (though I guess technically nothing
> is stopping someone from manually constructing a DoFnTuple), and
> 2. Side outputs are always returned and available for consumption, without
> inconveniencing use of the main output, rather than having to explicitly
> switch modes at application time.
>
> This seems like a strict win to me.
>

I guess fundamentally, the question is: should we make it easier to drop
outputs with no indication to a consumer (and more importantly IMO, future
readers of a consumer's code)?

For example:

pcoll | Filter(...) | ChainedParDo()

gives no indication of there being multiple outputs from Partition. In some
cases, this is fine, but in others it is dangerous.

I think we can draw on existing language design principles here. IMO, this
is essentially the same as allowing:

   def foo():
      return out1, out2, out3

be successfully unpacked with:

   out1, out2 = foo()

This is convenient if you only need out1/out2, but it makes mistakes less
obvious in the name of removing a line of code. In my experience, people
already find Beam's pipe operator surprising, and this seems like it would
make it worse - we're changing the meaning of the pipe operator with a
transform level change.

With all that said, my core claim is: *As much as possible, there should be
some indication in the code of a consumer if they are, or might be,
dropping** output from a function or transformation.*


>
> We'd likely need to double the size of our library in order to have both
>> Filter and Partition versions of these transforms.
>>
>>
>>> > A parser that routes malformed input to a dead-letter output
>>>> > A validator that routes violations separately
>>>> > An enrichment that routes lookup failures aside
>>>>
>>>> These are the ones I'm really worried about. In all of these cases, we
>>>> are silently dropping error output in a way that might be non-obvious to a
>>>> user. As a user, if I use a parser that returns a single output, I would
>>>> assume that any parse failures would lead to exceptions.
>>>>
>>>> I agree that it'd be an antipattern for these types of transforms to
>> silently capture and drop these erroneous records, but there is nothing
>> preventing an author of parser/validator/enrichment transform from doing
>> this today even without ForkedPCollections. With ForkedPCollections, I
>> think we can and still should discourage transform authors from silently
>> handling errors without some active user configuration (e.g. by requiring
>> as a keyword arg `error_handling_pcoll_name= "failed" to enable any error
>> capturing at all). e.g.
>> ```
>> parsed = pcoll | ParseData()
>> # parsed.failed --> should not exist, ParseData should not automatically
>> do this
>>
>> parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>> # parsed.failed --> exists now but only with user intent
>> ```
>>
>
> +1. Errors should fail the pipeline unless one explicitly requests they be
> passed somewhere else.
>

I'm glad we're generally aligned on faliing the pipeline by default - I'll
note that this is exactly what with_exception_handling is doing, but I'd
still be hesitant to apply this to with_exception_handling, with something
like

   parsed = pcoll | ParseData(failed_pcoll_tag="failed")

Should we really be helping transform authors make it easier for consumers
to swallow unhandled error output?


>
>>
>>
>>
>>> With all that said, I am aligned with the goal of making pipelines like
>>>> this easier to chain. Maybe an in between option would be adding a DoFn
>>>> utility like:
>>>>
>>>> ```
>>>> pcoll | Partition(...).keep_tag('main') | ChainedParDo()
>>>> ```
>>>>
>>>> Where `keep_tag` forces an expansion where all tags other than main are
>>>> dropped. What do you think?
>>>>
>>>
> One can already write
>
>     pcoll | Partition(...)['main'] | ChainedParDo()
>

This is a good point, though as a nit I think it needs to be:

    *(*pcoll | Partition(...)*)*['main'] | ChainedParDo()

since the tag is applied to the PCollection, not the ParDo. But this does
give us the expressivity we need here IMO.


> This would help but this solution would be limited to ParDos. If you have
>> a composite transform like a sophisticated `CompositeFilter` or
>> `CompositeSampler`, then you wouldn't be able to use `.keep_tag`.
>>
>
I don't think there's any reason we couldn't apply the same to composites.

Thanks,
Danny

>

Reply via email to