On Fri, Feb 6, 2026 at 4:22 PM Joey Tran <[email protected]> wrote:

>
>
> On Fri, Feb 6, 2026 at 1:34 PM Danny McCormick via dev <
> [email protected]> wrote:
>
>> > But that is exactly the behavior our current DoFns have (if one does
>> > not call with_outputs, the TaggedOutputs are ignored). And even if one
>> > does, there's nothing that forces you to name all the outputs, or use
>> > them even if they're named.
>>
>> I missed that this is our default behavior for DoFns. While I disagree
>> with that choice, that ship has sailed long ago, and it could influence how
>> I think about this - thanks for calling it out.
>>
>> Unifying our behavior across DoFns and composites matters more to me than
>> avoiding the non-obvious dropping of a PCollection. But I don't think this
>> proposal quite does that since we're not talking about changing the default
>> behavior.
>>
>
>
> This is an interesting idea, though it does increase the scope of the
> proposal quite a bit, at least in terms of implementation. We could extend
> `PTransform` to have a `.with_outputs()` method so `ParDo`s and
> `PTransforms` have the same interface. Then `PTransform.expand` can
> optionally return a `PCollectionWithSideOutputs` (I agree with Robert that
> `ForkedPCollections` doesn't really make sense).
>
> ```
>   class MyFilter(beam.PTransform):
>       def expand(self, pcoll):
>           results = pcoll | beam.ParDo(FilterDoFn()).with_outputs(
>               'filtered', main='main')
>           return PCollectionWithSideOutputs(main=results.main,
> filtered=results.filtered)
> ```
> Then you'd have similar behavior for both ParDos and PTransforms:
> ```
>   # ParDo
>   pcoll | ParDo(fn)
>   pcoll | ParDo(fn).with_outputs('side', main='main')  # DoOutputsTuple
>
>   # Composite PTransfor
>   pcoll | MyTransform()                                # PCollection
>   pcoll | MyTransform().with_outputs('filtered', main='main')  #
> PCollectionOutputsTuple
> ```
>
> I recognize this still doesn't really address your concerns about making
> it easier to hide dropped outputs though.
>

If we can unify things I'm much more amenable to this, because it at least
helps with a different problem (inconsistency across transform
experiences). I think we'd still have different default behaviors in this
case, though, right? If we wanted to really unify things, we'd need to wrap
things and always return a PCollectionWithSideOutputs, but that probably
doesn't work since composites don't necessarily have a "main" PCollection
in the same way. Argubably, its always the first PCollection.


>
>
>> Secondarily, it is optimizing for writability over readability; you save
>> a few lines of code, but the reader might not have any idea that there was
>> a dropped output (whether that's a harmless Filter secondary collection or
>> an error collection).
>>
>>
> FWIW, much of the value of this proposal to me is the better readability
> from not having to consider multiple versions of transforms and not having
> to break up chains to extract main outputs. I appreciate though that we'd
> be making a trade-off of readability of the "sad path" for readability of
> the "happy path"
>

Yeah, that makes sense; what do you think of the other alternative
mentioned as an option for optimizing for both kinds of readability?
Specifically, allowing:

   pcoll | Partition(...)['main'] | ChainedParDo()

I guess the downside there is education (all pipeline authors need to know
this is an option as opposed to only one expert transform author), but I'm
curious if it is sufficient for your context.


> > pcoll | Partition(...)['main'] | ChainedParDo()
>>
>> We definitely could introduce this syntax FWIW, and I'd be supportive if
>> it solves the underlying problem sufficiently. It is a little less clean
>> than the proposed syntax, but much less ambiguous in intent. It should be
>> straightforward to expand it to SelectOutput('main').
>>
>> Thanks,
>> Danny
>>
>> On Fri, Feb 6, 2026 at 11:54 AM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> On Fri, Feb 6, 2026 at 7:21 AM Danny McCormick
>>> <[email protected]> wrote:
>>> >
>>> > On Thu, Feb 5, 2026 at 6:06 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>> >>
>>> >> On Thu, Feb 5, 2026 at 1:44 PM Joey Tran <[email protected]>
>>> wrote:
>>> >>>
>>> >>> Thanks for such quick feedback!
>>> >>>
>>> >>>> On Thu, Feb 5, 2026 at 3:27 PM Danny McCormick via dev <
>>> [email protected]> wrote:
>>> >>>>>
>>> >>>>> Would you mind opening up the doc for comments?
>>> >>>>>
>>> >>>>> At a high level, I'm skeptical of the pattern; it seems to me like
>>> it moves the burden of choosing the correct behavior from authors to
>>> consumers in non-obvious ways which range from harmless to potentially
>>> causing silent data loss. I think if a user wants to drop a PCollection,
>>> that should always be an active choice since the risk of data loss is much
>>> greater than the EoU benefit of extra code.
>>> >>>>>
>>> >>> I think perhaps I poorly chose a few motivating examples, but it was
>>> at least helpful in clarifying two distinct patterns.
>>> >>>   - Filters/Samplers/Deduplicators
>>> >>>   - Transforms that may run into issues with certain inputs
>>> >>
>>> >>
>>> >> One usecase that comes to mind is running some computation with an
>>> (safely ignorable) side output that has statistics about what was
>>> computed/encountered.
>>> >>
>>> >>>
>>> >>>
>>> >>>>>
>>> >>>>> I'd argue that a better pattern than having a single transform
>>> which handles this is to either have a Filter or a Partition transform
>>> which a user can use as needed. These are different transforms because they
>>> have different purposes/core functionalities.
>>> >>>>>
>>> >>> This can become unwieldy for a large library of filtering / sampling
>>> / data processing transforms. At Schrodinger for example, we may have maybe
>>> a dozen transforms some of which...
>>> >>>   - are samplers where most consumers will just be interested in the
>>> "sample", while other consumers may be interested in both the sample and
>>> remaining
>>> >>>   - are data processing transforms with a concept of processed
>>> outputs and "dropped for well-understood reason"
>>> >>
>>> >>
>>> >> I prefer transforms that can be used multiple ways than distinct
>>> transforms that do the same thing.
>>> >>
>>> >> FWIW, our existing DoFn has somewhat this behavior: by default one
>>> consumes only the "main" output, but the caller can ask for the side
>>> outputs if desired. This extends it a bit further in that
>>> >>
>>> >> 1. Composite operations are supported (though I guess technically
>>> nothing is stopping someone from manually constructing a DoFnTuple), and
>>> >> 2. Side outputs are always returned and available for consumption,
>>> without inconveniencing use of the main output, rather than having to
>>> explicitly switch modes at application time.
>>> >>
>>> >> This seems like a strict win to me.
>>> >
>>> >
>>> > I guess fundamentally, the question is: should we make it easier to
>>> drop outputs with no indication to a consumer (and more importantly IMO,
>>> future readers of a consumer's code)?
>>> >
>>> > For example:
>>> >
>>> > pcoll | Filter(...) | ChainedParDo()
>>> >
>>> > gives no indication of there being multiple outputs from Partition. In
>>> some cases, this is fine, but in others it is dangerous.
>>>
>>> But that is exactly the behavior our current DoFns have (if one does
>>> not call with_outputs, the TaggedOutputs are ignored). And even if one
>>> does, there's nothing that forces you to name all the outputs, or use
>>> them even if they're named.
>>>
>>> Or, with the Filter example here, it's not considered "data loss" that
>>> only some of the elements make it through, but unfortunate that you
>>> can't peek at the elements that were dropped if you want. Making this
>>> a "side output" that is optionally inspected seems strictly better.
>>> (In fact, we could (should) update our Filter DoFn to do this today,
>>> though this would force one to write
>>> Filter(...).with_side_outputs('filtered', main='main').
>>>
>>> >>>>> > A parser that routes malformed input to a dead-letter output
>>> >>>>> > A validator that routes violations separately
>>> >>>>> > An enrichment that routes lookup failures aside
>>> >>>>>
>>> >>>>> These are the ones I'm really worried about. In all of these
>>> cases, we are silently dropping error output in a way that might be
>>> non-obvious to a user. As a user, if I use a parser that returns a single
>>> output, I would assume that any parse failures would lead to exceptions.
>>> >>>>>
>>> >>> I agree that it'd be an antipattern for these types of transforms to
>>> silently capture and drop these erroneous records, but there is nothing
>>> preventing an author of parser/validator/enrichment transform from doing
>>> this today even without ForkedPCollections. With ForkedPCollections, I
>>> think we can and still should discourage transform authors from silently
>>> handling errors without some active user configuration (e.g. by requiring
>>> as a keyword arg `error_handling_pcoll_name= "failed" to enable any error
>>> capturing at all). e.g.
>>> >>> ```
>>> >>> parsed = pcoll | ParseData()
>>> >>> # parsed.failed --> should not exist, ParseData should not
>>> automatically do this
>>> >>>
>>> >>> parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>>> >>> # parsed.failed --> exists now but only with user intent
>>> >>> ```
>>> >>
>>> >>
>>> >> +1. Errors should fail the pipeline unless one explicitly requests
>>> they be passed somewhere else.
>>> >
>>> >
>>> > I'm glad we're generally aligned on faliing the pipeline by default -
>>> I'll note that this is exactly what with_exception_handling is doing, but
>>> I'd still be hesitant to apply this to with_exception_handling, with
>>> something like
>>> >
>>> >    parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>>> >
>>> > Should we really be helping transform authors make it easier for
>>> consumers to swallow unhandled error output?
>>>
>>> Yeah, for error handling, I would encourage using the existing error
>>> handling mechanisms (including raising an exception rather that
>>> emitting an output).
>>>
>>> >>>>> With all that said, I am aligned with the goal of making pipelines
>>> like this easier to chain. Maybe an in between option would be adding a
>>> DoFn utility like:
>>> >>>>>
>>> >>>>> ```
>>> >>>>> pcoll | Partition(...).keep_tag('main') | ChainedParDo()
>>> >>>>> ```
>>> >>>>>
>>> >>>>> Where `keep_tag` forces an expansion where all tags other than
>>> main are dropped. What do you think?
>>> >>
>>> >>
>>> >> One can already write
>>> >>
>>> >>     pcoll | Partition(...)['main'] | ChainedParDo()
>>> >
>>> > This is a good point, though as a nit I think it needs to be:
>>> >
>>> >     (pcoll | Partition(...))['main'] | ChainedParDo()
>>> >
>>> > since the tag is applied to the PCollection, not the ParDo. But this
>>> does give us the expressivity we need here IMO.
>>>
>>> Good point. And that makes it a bit uglier. We could introduce a new
>>> trivial transform
>>>
>>>     pcoll | Partition(...) | SelectOutput('main') | ChainedParDo
>>>
>>> but that is cumbersome to write compared to
>>>
>>>     pcoll | Filter(...) | ChainedParDo
>>>
>>> when 90% of the time you just want to filter out the elements, but 10%
>>> of the time you might be interested in what was filtered out (and
>>> why).
>>>
>>

Reply via email to