> > * Don't allow arbitrary nestings returned during expansion, force >> composite transforms to always provide an unambiguous name (either a tuple >> with PCollections with unique tags or a dictionary with untagged >> PCollections or a singular PCollection (Java and Go SDKs do this)). >> > > I believe that aligning with Java and Go would be the right way to go > here. I don't know if this would limit expressiveness. > Yeah this sounds like a much more elegant way of handling this situation. I would lean towards this limiting expressiveness because there would be a limit to nesting, but I think that the trade-off with reducing complexity is worth it.
So in summary it could be: PTransform.expand: (...) -> Union[PValue, NamedTuple[str, PCollection], Tuple[str, PCollection], Dict[str, PCollection], DoOutputsTuple] With the expectation that (pseudo-code): a_transform = ATransform() ATransform.from_runner_api(a_transform.to_runner_api()).outputs.keys() == a_transform.outputs.keys() Since this changes the Python SDK composite transform API, what would be the next steps for the community to come to a consensus on this? -Sam On Thu, Mar 26, 2020 at 12:52 PM Udi Meiri <[email protected]> wrote: > > > On Thu, Mar 26, 2020 at 10:13 AM Luke Cwik <[email protected]> wrote: > >> The issue seems to be that a PCollection can have a "tag" associated with >> it and PTransform expansion can return an arbitrary nested dictionary/tuple >> yet we need to figure out what the user wanted as the local name for the >> PCollection from all this information. >> >> Will this break people who rely on the generated PCollection output tags? >> One big question is whether a composite transform cares about the name >> that is used. For primitive transforms such as ParDo, this is very much a >> yes because the pickled code likely references that name in some way. Some >> composites could have the same need where the payload that is stored as >> part of the composite references these local names and hence we have to >> tell people how to instruct the SDK during transform expansion about what >> name will be used unambiguously (as long as we document and have tests >> around this we can choose from many options). Finally, in the XLang world, >> we need to preserve the names that were provided to us and not change them; >> which is more about making the Python SDK handle XLang transform expansion >> carefully. >> >> Am I missing edge cases? >> Concatenation of strings leads to collisions if the delimiter character >> is used within the tags or map keys. You could use an escaping encoding to >> guarantee that the concatenation always generates unique names. >> >> Some alternatives I thought about were: >> * Don't allow arbitrary nestings returned during expansion, force >> composite transforms to always provide an unambiguous name (either a tuple >> with PCollections with unique tags or a dictionary with untagged >> PCollections or a singular PCollection (Java and Go SDKs do this)). >> > > I believe that aligning with Java and Go would be the right way to go > here. I don't know if this would limit expressiveness. > > >> * Have a "best" effort naming system (note the example I give can have >> many of the "rules" re-ordered) e.g. if all the PCollection tags are unique >> then use only them, followed by if a flat dictionary is returned then use >> only the keys as names, followed by if a flat tuple is returned then use >> indices, and finally fallback to the hierarchical naming scheme. >> >> >> On Tue, Mar 24, 2020 at 1:07 PM Sam Rohde <[email protected]> wrote: >> >>> Hi All, >>> >>> *Problem* >>> I would like to discuss BEAM-9322 >>> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-9322> and the >>> correct way to set the output tags of a transform with nested PCollections, >>> e.g. a dict of PCollections, a tuple of dicts of PCollections. Before the >>> fixing of BEAM-1833 <https://issues.apache.org/jira/browse/BEAM-1833>, >>> the Python SDK when applying a PTransform would auto-generate the output >>> tags for the output PCollections even if they are manually set by the user: >>> >>> class MyComposite(beam.PTransform): >>> def expand(self, pcoll): >>> a = PCollection.from_(pcoll) >>> a.tag = 'a' >>> >>> b = PCollection.from_(pcoll) >>> b.tag = 'b' >>> return (a, b) >>> >>> would yield a PTransform with two output PCollection and output tags >>> with 'None' and '0' instead of 'a' and 'b'. This was corrected for simple >>> cases like this. However, this fails when the PCollections share the same >>> output tag (of course). This can happen like so: >>> >>> class MyComposite(beam.PTransform): >>> def expand(self, pcoll): >>> partition_1 = beam.Partition(pcoll, ...) >>> partition_2 = beam.Partition(pcoll, ...) >>> return (partition_1[0], partition_2[0]) >>> >>> With the new code, this leads to an error because both output >>> PCollections have an output tag of '0'. >>> >>> *Proposal* >>> When applying PTransforms to a pipeline (pipeline.py:550) we name the >>> PCollections according to their position in the tree concatenated with the >>> PCollection tag and a delimiter. From the first example, the output >>> PCollections of the applied transform will be: '0.a' and '1.b' because it >>> is a tuple of PCollections. In the second example, the outputs should be: >>> '0.0' and '1.0'. In the case of a dict of PCollections, it should simply be >>> the keys of the dict. >>> >>> What do you think? Am I missing edge cases? Will this be unexpected to >>> users? Will this break people who rely on the generated PCollection output >>> tags? >>> >>> Regards, >>> Sam >>> >>
