On Fri, Feb 6, 2026 at 7:21 AM Danny McCormick
<[email protected]> wrote:
>
> On Thu, Feb 5, 2026 at 6:06 PM Robert Bradshaw <[email protected]> wrote:
>>
>> On Thu, Feb 5, 2026 at 1:44 PM Joey Tran <[email protected]> wrote:
>>>
>>> Thanks for such quick feedback!
>>>
>>>> On Thu, Feb 5, 2026 at 3:27 PM Danny McCormick via dev 
>>>> <[email protected]> wrote:
>>>>>
>>>>> Would you mind opening up the doc for comments?
>>>>>
>>>>> At a high level, I'm skeptical of the pattern; it seems to me like it 
>>>>> moves the burden of choosing the correct behavior from authors to 
>>>>> consumers in non-obvious ways which range from harmless to potentially 
>>>>> causing silent data loss. I think if a user wants to drop a PCollection, 
>>>>> that should always be an active choice since the risk of data loss is 
>>>>> much greater than the EoU benefit of extra code.
>>>>>
>>> I think perhaps I poorly chose a few motivating examples, but it was at 
>>> least helpful in clarifying two distinct patterns.
>>>   - Filters/Samplers/Deduplicators
>>>   - Transforms that may run into issues with certain inputs
>>
>>
>> One usecase that comes to mind is running some computation with an (safely 
>> ignorable) side output that has statistics about what was 
>> computed/encountered.
>>
>>>
>>>
>>>>>
>>>>> I'd argue that a better pattern than having a single transform which 
>>>>> handles this is to either have a Filter or a Partition transform which a 
>>>>> user can use as needed. These are different transforms because they have 
>>>>> different purposes/core functionalities.
>>>>>
>>> This can become unwieldy for a large library of filtering / sampling / data 
>>> processing transforms. At Schrodinger for example, we may have maybe a 
>>> dozen transforms some of which...
>>>   - are samplers where most consumers will just be interested in the 
>>> "sample", while other consumers may be interested in both the sample and 
>>> remaining
>>>   - are data processing transforms with a concept of processed outputs and 
>>> "dropped for well-understood reason"
>>
>>
>> I prefer transforms that can be used multiple ways than distinct transforms 
>> that do the same thing.
>>
>> FWIW, our existing DoFn has somewhat this behavior: by default one consumes 
>> only the "main" output, but the caller can ask for the side outputs if 
>> desired. This extends it a bit further in that
>>
>> 1. Composite operations are supported (though I guess technically nothing is 
>> stopping someone from manually constructing a DoFnTuple), and
>> 2. Side outputs are always returned and available for consumption, without 
>> inconveniencing use of the main output, rather than having to explicitly 
>> switch modes at application time.
>>
>> This seems like a strict win to me.
>
>
> I guess fundamentally, the question is: should we make it easier to drop 
> outputs with no indication to a consumer (and more importantly IMO, future 
> readers of a consumer's code)?
>
> For example:
>
> pcoll | Filter(...) | ChainedParDo()
>
> gives no indication of there being multiple outputs from Partition. In some 
> cases, this is fine, but in others it is dangerous.

But that is exactly the behavior our current DoFns have (if one does
not call with_outputs, the TaggedOutputs are ignored). And even if one
does, there's nothing that forces you to name all the outputs, or use
them even if they're named.

Or, with the Filter example here, it's not considered "data loss" that
only some of the elements make it through, but unfortunate that you
can't peek at the elements that were dropped if you want. Making this
a "side output" that is optionally inspected seems strictly better.
(In fact, we could (should) update our Filter DoFn to do this today,
though this would force one to write
Filter(...).with_side_outputs('filtered', main='main').

>>>>> > A parser that routes malformed input to a dead-letter output
>>>>> > A validator that routes violations separately
>>>>> > An enrichment that routes lookup failures aside
>>>>>
>>>>> These are the ones I'm really worried about. In all of these cases, we 
>>>>> are silently dropping error output in a way that might be non-obvious to 
>>>>> a user. As a user, if I use a parser that returns a single output, I 
>>>>> would assume that any parse failures would lead to exceptions.
>>>>>
>>> I agree that it'd be an antipattern for these types of transforms to 
>>> silently capture and drop these erroneous records, but there is nothing 
>>> preventing an author of parser/validator/enrichment transform from doing 
>>> this today even without ForkedPCollections. With ForkedPCollections, I 
>>> think we can and still should discourage transform authors from silently 
>>> handling errors without some active user configuration (e.g. by requiring 
>>> as a keyword arg `error_handling_pcoll_name= "failed" to enable any error 
>>> capturing at all). e.g.
>>> ```
>>> parsed = pcoll | ParseData()
>>> # parsed.failed --> should not exist, ParseData should not automatically do 
>>> this
>>>
>>> parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>>> # parsed.failed --> exists now but only with user intent
>>> ```
>>
>>
>> +1. Errors should fail the pipeline unless one explicitly requests they be 
>> passed somewhere else.
>
>
> I'm glad we're generally aligned on faliing the pipeline by default - I'll 
> note that this is exactly what with_exception_handling is doing, but I'd 
> still be hesitant to apply this to with_exception_handling, with something 
> like
>
>    parsed = pcoll | ParseData(failed_pcoll_tag="failed")
>
> Should we really be helping transform authors make it easier for consumers to 
> swallow unhandled error output?

Yeah, for error handling, I would encourage using the existing error
handling mechanisms (including raising an exception rather that
emitting an output).

>>>>> With all that said, I am aligned with the goal of making pipelines like 
>>>>> this easier to chain. Maybe an in between option would be adding a DoFn 
>>>>> utility like:
>>>>>
>>>>> ```
>>>>> pcoll | Partition(...).keep_tag('main') | ChainedParDo()
>>>>> ```
>>>>>
>>>>> Where `keep_tag` forces an expansion where all tags other than main are 
>>>>> dropped. What do you think?
>>
>>
>> One can already write
>>
>>     pcoll | Partition(...)['main'] | ChainedParDo()
>
> This is a good point, though as a nit I think it needs to be:
>
>     (pcoll | Partition(...))['main'] | ChainedParDo()
>
> since the tag is applied to the PCollection, not the ParDo. But this does 
> give us the expressivity we need here IMO.

Good point. And that makes it a bit uglier. We could introduce a new
trivial transform

    pcoll | Partition(...) | SelectOutput('main') | ChainedParDo

but that is cumbersome to write compared to

    pcoll | Filter(...) | ChainedParDo

when 90% of the time you just want to filter out the elements, but 10%
of the time you might be interested in what was filtered out (and
why).

Reply via email to