On Fri, Feb 6, 2026 at 4:43 PM Danny McCormick <[email protected]>
wrote:

> On Fri, Feb 6, 2026 at 4:22 PM Joey Tran <[email protected]>
> wrote:
>
>>
>>
>> On Fri, Feb 6, 2026 at 1:34 PM Danny McCormick via dev <
>> [email protected]> wrote:
>>
>>> > But that is exactly the behavior our current DoFns have (if one does
>>> > not call with_outputs, the TaggedOutputs are ignored). And even if one
>>> > does, there's nothing that forces you to name all the outputs, or use
>>> > them even if they're named.
>>>
>>> I missed that this is our default behavior for DoFns. While I disagree
>>> with that choice, that ship has sailed long ago, and it could influence how
>>> I think about this - thanks for calling it out.
>>>
>>> Unifying our behavior across DoFns and composites matters more to me
>>> than avoiding the non-obvious dropping of a PCollection. But I don't think
>>> this proposal quite does that since we're not talking about changing the
>>> default behavior.
>>>
>>
>>
>> This is an interesting idea, though it does increase the scope of the
>> proposal quite a bit, at least in terms of implementation. We could extend
>> `PTransform` to have a `.with_outputs()` method so `ParDo`s and
>> `PTransforms` have the same interface. Then `PTransform.expand` can
>> optionally return a `PCollectionWithSideOutputs` (I agree with Robert that
>> `ForkedPCollections` doesn't really make sense).
>>
>> ```
>>   class MyFilter(beam.PTransform):
>>       def expand(self, pcoll):
>>           results = pcoll | beam.ParDo(FilterDoFn()).with_outputs(
>>               'filtered', main='main')
>>           return PCollectionWithSideOutputs(main=results.main,
>> filtered=results.filtered)
>> ```
>> Then you'd have similar behavior for both ParDos and PTransforms:
>> ```
>>   # ParDo
>>   pcoll | ParDo(fn)
>>   pcoll | ParDo(fn).with_outputs('side', main='main')  # DoOutputsTuple
>>
>>   # Composite PTransfor
>>   pcoll | MyTransform()                                # PCollection
>>   pcoll | MyTransform().with_outputs('filtered', main='main')  #
>> PCollectionOutputsTuple
>> ```
>>
>> I recognize this still doesn't really address your concerns about making
>> it easier to hide dropped outputs though.
>>
>
> If we can unify things I'm much more amenable to this, because it at least
> helps with a different problem (inconsistency across transform
> experiences). I think we'd still have different default behaviors in this
> case, though, right? If we wanted to really unify things, we'd need to wrap
> things and always return a PCollectionWithSideOutputs, but that probably
> doesn't work since composites don't necessarily have a "main" PCollection
> in the same way. Argubably, its always the first PCollection.
>
>

I don't quite follow what you mean about different default behaviors. The
default for both ParDo and MyTransform when they're applied to a
pcollection is to return a PCollection, regardless of whether or not they
had tagged/side outputs. Do you mean that ParDo().with_outputs() and
PTransform().with_outputs() return different types?


>
>>
>>> Secondarily, it is optimizing for writability over readability; you save
>>> a few lines of code, but the reader might not have any idea that there was
>>> a dropped output (whether that's a harmless Filter secondary collection or
>>> an error collection).
>>>
>>>
>> FWIW, much of the value of this proposal to me is the better readability
>> from not having to consider multiple versions of transforms and not having
>> to break up chains to extract main outputs. I appreciate though that we'd
>> be making a trade-off of readability of the "sad path" for readability of
>> the "happy path"
>>
>
> Yeah, that makes sense; what do you think of the other alternative
> mentioned as an option for optimizing for both kinds of readability?
> Specifically, allowing:
>
>    pcoll | Partition(...)['main'] | ChainedParDo()
>
> I guess the downside there is education (all pipeline authors need to know
> this is an option as opposed to only one expert transform author), but I'm
> curious if it is sufficient for your context.
>
>

Is the suggestion here to implement `__getitem__` on PTransform/ParDo so a
particular pcollection can be specified? This would definitely be an
improvement from the current state. I think one further improvement would
be if we could specify the pcollection by attribute rather than by
key/string, so `Partition(...).main` instead, but that risks pcollection
name and ptransform method collisions.

I'm still partial toward the other suggestions, particularly towards
implementing `PTransform.with_outputs`, but this is probably sufficient for
my context.



> > pcoll | Partition(...)['main'] | ChainedParDo()
>>>
>>> We definitely could introduce this syntax FWIW, and I'd be supportive if
>>> it solves the underlying problem sufficiently. It is a little less clean
>>> than the proposed syntax, but much less ambiguous in intent. It should be
>>> straightforward to expand it to SelectOutput('main').
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Fri, Feb 6, 2026 at 11:54 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On Fri, Feb 6, 2026 at 7:21 AM Danny McCormick
>>>> <[email protected]> wrote:
>>>> >
>>>> > On Thu, Feb 5, 2026 at 6:06 PM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>> >>
>>>> >> On Thu, Feb 5, 2026 at 1:44 PM Joey Tran <[email protected]>
>>>> wrote:
>>>> >>>
>>>> >>> Thanks for such quick feedback!
>>>> >>>
>>>> >>>> On Thu, Feb 5, 2026 at 3:27 PM Danny McCormick via dev <
>>>> [email protected]> wrote:
>>>> >>>>>
>>>> >>>>> Would you mind opening up the doc for comments?
>>>> >>>>>
>>>> >>>>> At a high level, I'm skeptical of the pattern; it seems to me
>>>> like it moves the burden of choosing the correct behavior from authors to
>>>> consumers in non-obvious ways which range from harmless to potentially
>>>> causing silent data loss. I think if a user wants to drop a PCollection,
>>>> that should always be an active choice since the risk of data loss is much
>>>> greater than the EoU benefit of extra code.
>>>> >>>>>
>>>> >>> I think perhaps I poorly chose a few motivating examples, but it
>>>> was at least helpful in clarifying two distinct patterns.
>>>> >>>   - Filters/Samplers/Deduplicators
>>>> >>>   - Transforms that may run into issues with certain inputs
>>>> >>
>>>> >>
>>>> >> One usecase that comes to mind is running some computation with an
>>>> (safely ignorable) side output that has statistics about what was
>>>> computed/encountered.
>>>> >>
>>>> >>>
>>>> >>>
>>>> >>>>>
>>>> >>>>> I'd argue that a better pattern than having a single transform
>>>> which handles this is to either have a Filter or a Partition transform
>>>> which a user can use as needed. These are different transforms because they
>>>> have different purposes/core functionalities.
>>>> >>>>>
>>>> >>> This can become unwieldy for a large library of filtering /
>>>> sampling / data processing transforms. At Schrodinger for example, we may
>>>> have maybe a dozen transforms some of which...
>>>> >>>   - are samplers where most consumers will just be interested in
>>>> the "sample", while other consumers may be interested in both the sample
>>>> and remaining
>>>> >>>   - are data processing transforms with a concept of processed
>>>> outputs and "dropped for well-understood reason"
>>>> >>
>>>> >>
>>>> >> I prefer transforms that can be used multiple ways than distinct
>>>> transforms that do the same thing.
>>>> >>
>>>> >> FWIW, our existing DoFn has somewhat this behavior: by default one
>>>> consumes only the "main" output, but the caller can ask for the side
>>>> outputs if desired. This extends it a bit further in that
>>>> >>
>>>> >> 1. Composite operations are supported (though I guess technically
>>>> nothing is stopping someone from manually constructing a DoFnTuple), and
>>>> >> 2. Side outputs are always returned and available for consumption,
>>>> without inconveniencing use of the main output, rather than having to
>>>> explicitly switch modes at application time.
>>>> >>
>>>> >> This seems like a strict win to me.
>>>> >
>>>> >
>>>> > I guess fundamentally, the question is: should we make it easier to
>>>> drop outputs with no indication to a consumer (and more importantly IMO,
>>>> future readers of a consumer's code)?
>>>> >
>>>> > For example:
>>>> >
>>>> > pcoll | Filter(...) | ChainedParDo()
>>>> >
>>>> > gives no indication of there being multiple outputs from Partition.
>>>> In some cases, this is fine, but in others it is dangerous.
>>>>
>>>> But that is exactly the behavior our current DoFns have (if one does
>>>> not call with_outputs, the TaggedOutputs are ignored). And even if one
>>>> does, there's nothing that forces you to name all the outputs, or use
>>>> them even if they're named.
>>>>
>>>> Or, with the Filter example here, it's not considered "data loss" that
>>>> only some of the elements make it through, but unfortunate that you
>>>> can't peek at the elements that were dropped if you want. Making this
>>>> a "side output" that is optionally inspected seems strictly better.
>>>> (In fact, we could (should) update our Filter DoFn to do this today,
>>>> though this would force one to write
>>>> Filter(...).with_side_outputs('filtered', main='main').
>>>>
>>>> >>>>> > A parser that routes malformed input to a dead-letter output
>>>> >>>>> > A validator that routes violations separately
>>>> >>>>> > An enrichment that routes lookup failures aside
>>>> >>>>>
>>>> >>>>> These are the ones I'm really worried about. In all of these
>>>> cases, we are silently dropping error output in a way that might be
>>>> non-obvious to a user. As a user, if I use a parser that returns a single
>>>> output, I would assume that any parse failures would lead to exceptions.
>>>> >>>>>
>>>> >>> I agree that it'd be an antipattern for these types of transforms
>>>> to silently capture and drop these erroneous records, but there is nothing
>>>> preventing an author of parser/validator/enrichment transform from doing
>>>> this today even without ForkedPCollections. With ForkedPCollections, I
>>>> think we can and still should discourage transform authors from silently
>>>> handling errors without some active user configuration (e.g. by requiring
>>>> as a keyword arg `error_handling_pcoll_name= "failed" to enable any error
>>>> capturing at all). e.g.
>>>> >>> ```
>>>> >>> parsed = pcoll | ParseData()
>>>> >>> # parsed.failed --> should not exist, ParseData should not
>>>> automatically do this
>>>> >>>
>>>> >>> parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>>>> >>> # parsed.failed --> exists now but only with user intent
>>>> >>> ```
>>>> >>
>>>> >>
>>>> >> +1. Errors should fail the pipeline unless one explicitly requests
>>>> they be passed somewhere else.
>>>> >
>>>> >
>>>> > I'm glad we're generally aligned on faliing the pipeline by default -
>>>> I'll note that this is exactly what with_exception_handling is doing, but
>>>> I'd still be hesitant to apply this to with_exception_handling, with
>>>> something like
>>>> >
>>>> >    parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>>>> >
>>>> > Should we really be helping transform authors make it easier for
>>>> consumers to swallow unhandled error output?
>>>>
>>>> Yeah, for error handling, I would encourage using the existing error
>>>> handling mechanisms (including raising an exception rather that
>>>> emitting an output).
>>>>
>>>> >>>>> With all that said, I am aligned with the goal of making
>>>> pipelines like this easier to chain. Maybe an in between option would be
>>>> adding a DoFn utility like:
>>>> >>>>>
>>>> >>>>> ```
>>>> >>>>> pcoll | Partition(...).keep_tag('main') | ChainedParDo()
>>>> >>>>> ```
>>>> >>>>>
>>>> >>>>> Where `keep_tag` forces an expansion where all tags other than
>>>> main are dropped. What do you think?
>>>> >>
>>>> >>
>>>> >> One can already write
>>>> >>
>>>> >>     pcoll | Partition(...)['main'] | ChainedParDo()
>>>> >
>>>> > This is a good point, though as a nit I think it needs to be:
>>>> >
>>>> >     (pcoll | Partition(...))['main'] | ChainedParDo()
>>>> >
>>>> > since the tag is applied to the PCollection, not the ParDo. But this
>>>> does give us the expressivity we need here IMO.
>>>>
>>>> Good point. And that makes it a bit uglier. We could introduce a new
>>>> trivial transform
>>>>
>>>>     pcoll | Partition(...) | SelectOutput('main') | ChainedParDo
>>>>
>>>> but that is cumbersome to write compared to
>>>>
>>>>     pcoll | Filter(...) | ChainedParDo
>>>>
>>>> when 90% of the time you just want to filter out the elements, but 10%
>>>> of the time you might be interested in what was filtered out (and
>>>> why).
>>>>
>>>

Reply via email to