I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this suggestion and make it more concrete:
https://issues.apache.org/jira/browse/BEAM-5413 https://github.com/apache/beam/pull/6414 On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <[email protected]> wrote: > Hello all, I'm a data engineer at Mozilla working on a first project using > Beam. I've been impressed with the usability of the API as there are good > built-in solutions for handling many simple transformation cases with > minimal code, and wanted to discuss one bit of ergonomics that seems to be > missing. > > It appears that none of the existing PTransform factories are generic > enough to take in or output a PCollectionTuple, but we've found many use > cases where it's convenient to apply a few transforms on a PCollectionTuple > in a lambda expression. > > For example, we've defined several PTransforms that return main and error > output stream bundled in a PCollectionTuple. We defined a > CompositeTransform interface so that we could handle the error output in a > lambda expression like: > > pipeline > .apply("attempt to deserialize messages", new > MyDeserializationTransform()) > .apply("write deserialization errors", > CompositeTransform.of((PCollectionTuple input) -> { > input.get(errorTag).apply(new MyErrorOutputTransform()) > return input.get(mainTag); > }) > .apply("more processing on the deserialized messages", new > MyOtherTransform()) > > I'd be interested in contributing a patch to add this functionality, > perhaps as a static method PTransform.compose(). Would that patch be > welcome? Are there other thoughts on naming? > > The full code of the CompositeTransform interface we're currently using is > included below. > > > public interface CompositeTransform<InputT extends PInput, OutputT extends > POutput> { > OutputT expand(InputT input); > > /** > * The public factory method that serves as the entrypoint for users to > create a composite PTransform. > */ > static <InputT extends PInput, OutputT extends POutput> > PTransform<InputT, OutputT> of(CompositeTransform<InputT, OutputT> > transform) { > return new PTransform<InputT, OutputT>() { > @Override > public OutputT expand(InputT input) { > return transform.expand(input); > } > }; > } > } > > > >
