>
> * Don't allow arbitrary nestings returned during expansion, force
>> composite transforms to always provide an unambiguous name (either a tuple
>> with PCollections with unique tags or a dictionary with untagged
>> PCollections or a singular PCollection (Java and Go SDKs do this)).
>>
>
> I believe that aligning with Java and Go would be the right way to go
> here. I don't know if this would limit expressiveness.
>
Yeah this sounds like a much more elegant way of handling this situation. I
would lean towards this limiting expressiveness because there would be a
limit to nesting, but I think that the trade-off with reducing complexity
is worth it.

So in summary it could be:
PTransform.expand: (...) -> Union[PValue, NamedTuple[str, PCollection],
Tuple[str, PCollection], Dict[str, PCollection], DoOutputsTuple]

With the expectation that (pseudo-code):
a_transform = ATransform()
ATransform.from_runner_api(a_transform.to_runner_api()).outputs.keys() ==
a_transform.outputs.keys()

Since this changes the Python SDK composite transform API, what would be
the next steps for the community to come to a consensus on this?

-Sam


On Thu, Mar 26, 2020 at 12:52 PM Udi Meiri <[email protected]> wrote:

>
>
> On Thu, Mar 26, 2020 at 10:13 AM Luke Cwik <[email protected]> wrote:
>
>> The issue seems to be that a PCollection can have a "tag" associated with
>> it and PTransform expansion can return an arbitrary nested dictionary/tuple
>> yet we need to figure out what the user wanted as the local name for the
>> PCollection from all this information.
>>
>> Will this break people who rely on the generated PCollection output tags?
>> One big question is whether a composite transform cares about the name
>> that is used. For primitive transforms such as ParDo, this is very much a
>> yes because the pickled code likely references that name in some way. Some
>> composites could have the same need where the payload that is stored as
>> part of the composite references these local names and hence we have to
>> tell people how to instruct the SDK during transform expansion about what
>> name will be used unambiguously (as long as we document and have tests
>> around this we can choose from many options). Finally, in the XLang world,
>> we need to preserve the names that were provided to us and not change them;
>> which is more about making the Python SDK handle XLang transform expansion
>> carefully.
>>
>> Am I missing edge cases?
>> Concatenation of strings leads to collisions if the delimiter character
>> is used within the tags or map keys. You could use an escaping encoding to
>> guarantee that the concatenation always generates unique names.
>>
>> Some alternatives I thought about were:
>> * Don't allow arbitrary nestings returned during expansion, force
>> composite transforms to always provide an unambiguous name (either a tuple
>> with PCollections with unique tags or a dictionary with untagged
>> PCollections or a singular PCollection (Java and Go SDKs do this)).
>>
>
> I believe that aligning with Java and Go would be the right way to go
> here. I don't know if this would limit expressiveness.
>
>
>> * Have a "best" effort naming system (note the example I give can have
>> many of the "rules" re-ordered) e.g. if all the PCollection tags are unique
>> then use only them, followed by if a flat dictionary is returned then use
>> only the keys as names, followed by if a flat tuple is returned then use
>> indices, and finally fallback to the hierarchical naming scheme.
>>
>>
>> On Tue, Mar 24, 2020 at 1:07 PM Sam Rohde <[email protected]> wrote:
>>
>>> Hi All,
>>>
>>> *Problem*
>>> I would like to discuss BEAM-9322
>>> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-9322> and the
>>> correct way to set the output tags of a transform with nested PCollections,
>>> e.g. a dict of PCollections, a tuple of dicts of PCollections. Before the
>>> fixing of BEAM-1833 <https://issues.apache.org/jira/browse/BEAM-1833>,
>>> the Python SDK when applying a PTransform would auto-generate the output
>>> tags for the output PCollections even if they are manually set by the user:
>>>
>>> class MyComposite(beam.PTransform):
>>>   def expand(self, pcoll):
>>>     a = PCollection.from_(pcoll)
>>>     a.tag = 'a'
>>>
>>>     b = PCollection.from_(pcoll)
>>>     b.tag = 'b'
>>>     return (a, b)
>>>
>>> would yield a PTransform with two output PCollection and output tags
>>> with 'None' and '0' instead of 'a' and 'b'. This was corrected for simple
>>> cases like this. However, this fails when the PCollections share the same
>>> output tag (of course). This can happen like so:
>>>
>>> class MyComposite(beam.PTransform):
>>>   def expand(self, pcoll):
>>>     partition_1 = beam.Partition(pcoll, ...)
>>>     partition_2 = beam.Partition(pcoll, ...)
>>>     return (partition_1[0], partition_2[0])
>>>
>>> With the new code, this leads to an error because both output
>>> PCollections have an output tag of '0'.
>>>
>>> *Proposal*
>>> When applying PTransforms to a pipeline (pipeline.py:550) we name the
>>> PCollections according to their position in the tree concatenated with the
>>> PCollection tag and a delimiter. From the first example, the output
>>> PCollections of the applied transform will be: '0.a' and '1.b' because it
>>> is a tuple of PCollections. In the second example, the outputs should be:
>>> '0.0' and '1.0'. In the case of a dict of PCollections, it should simply be
>>> the keys of the dict.
>>>
>>> What do you think? Am I missing edge cases? Will this be unexpected to
>>> users? Will this break people who rely on the generated PCollection output
>>> tags?
>>>
>>> Regards,
>>> Sam
>>>
>>

Reply via email to