I haven't had any changes since #11670 in this space so nothing is missing from the release.
Also, the GreedyPipelineFuser has not been updated to support XLang as it has some baked-in assumptions around flatten[1] and likely other issues. It has worked because the simple examples we have run haven't hit these kinds of cases. The source seems to be the CreateSource and should be wrapped with the BoundedSourceSDFWrapper and have been expanded by the runner to the appropriate SDF steps. Unfortunately I don't have the time to try to produce a repro since the error is cut off before we see the details of the transform/pipeline that was being executed. Creating a JIRA with a link to a git branch and gradle command that builds/launches the pipeline that produces the error would help the Beam community to investigate further. 1: https://issues.apache.org/jira/browse/BEAM-10016?focusedCommentId=17117378&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17117378 On Thu, May 28, 2020 at 11:50 AM Chamikara Jayalath <chamik...@google.com> wrote: > This might have to do with https://github.com/apache/beam/pull/11670. > +Lukasz Cwik <lc...@google.com> was there a subsequent fix that was not > included in the release ? > > On Thu, May 28, 2020 at 10:29 AM Kyle Weaver <kcwea...@google.com> wrote: > >> What source are you using? >> >> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko < >> aromanenko....@gmail.com> wrote: >> >>> Hello, >>> >>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline >>> with an external Python transform) with a PROCESS SDK Harness and Spark >>> Portable Runner but it fails. >>> To do that I have a running Spark Runner Job Server (Spark local) and >>> standalone Expansion Service (Python) which contains a code of my Python >>> transform that should be called from main Java pipeline. >>> >>> Once job has been submitted on Job Server and started running, it fails >>> with this error: >>> >>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: >>> Invoking job >>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>> 20/05/28 18:55:12 INFO >>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting >>> job invocation >>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>> 20/05/28 18:55:12 ERROR >>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >>> during job invocation >>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719. >>> >>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all >>> root nodes to be runner-implemented beam:transform:impulse:v1 or >>> beam:transform:read:v1 primitives, but >>> transform Create.Values/Read(CreateSource) executes in environment >>> Optional[urn: "beam:env:docker:v1"*payload: >>> "\n\033apache/beam_java_sdk:2.21.0" >>> capabilities: "beam:coder:bytes:v1” >>> …. >>> >>> >>> Some code snippets of my pipeline that can be helpful. >>> >>> *Java transform:* >>> >>> private static final String URN = "ml:genreclassifier:python:v1"; >>> >>> @Override >>> public PCollection<KV<String, String>> expand(PCollection<String> input) { >>> PCollection<KV<String, String>> output = >>> input.apply( >>> "ExternalGenreClassifier", >>> External.of(URN, new byte[] {}, options.getExpansionServiceURL()) >>> .<KV<String, String>>withOutputType()); >>> return output; >>> } >>> >>> >>> *expansion_service.py* >>> >>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None) >>> class GenreClassifier(ptransform.PTransform): >>> def __init__(self): >>> super(GenreClassifier, self).__init__() >>> >>> def expand(self, pcoll): >>> return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn()) >>> >>> def to_runner_api_parameter(self, unused_context): >>> return 'ml:genreclassifier:python:v1', None >>> >>> @staticmethod >>> def from_runner_api_parameter(unused_ptransform, unused_parameter, >>> unused_context): >>> return GenreClassifier() >>> >>> >>> def main(unused_argv): >>> ... >>> server = grpc.server(UnboundedThreadPoolExecutor()) >>> beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( >>> expansion_service.ExpansionServiceServicer( >>> PipelineOptions.from_dictionary({ >>> 'environment_type': 'PROCESS', >>> 'environment_config': *'{"command": >>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*, >>> 'sdk_location': 'container', >>> }) >>> ), server >>> ) >>> server.add_insecure_port('localhost:{}'.format(options.port)) >>> server.start() >>> >>> Does anyone have an idea what’s wrong with my setup/pipeline and how to >>> fix it? >>> >>> >>>