Alexey, can you try adding --experiments=beam_fn_api to your pipeline options? We add the option automatically in Python [1] but we don't in Java.
I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can help with that. [1] https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209 [2] https://jira.apache.org/jira/browse/BEAM-10151 On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Yes, I did run only Java pipeline with Portable Runner and there is the > same error. > > Also, I did the same (without cross-language component) against Beam 2.19 > and 2.20. > It works fine against Beam 2.19 (as expected, since I tested it already > before) and fails with kind the same error against Beam 2.20: > > 20/05/29 15:59:23 ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error > during job invocation > classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1. > java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root > nodes to be runner-implemented beam:transform:impulse:v1 or > beam:transform:read:v1 primitives, but > transform Create.Values/Read(CreateSource) executes in environment > Optional[urn: "beam:env:docker:v1" > payload: "\n\033apache/beam_java_sdk:2.20.0" > > Do you think it’s a bug or I miss something in configuration? > > On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com> wrote: > > Can you try removing the cross-language component(s) from the pipeline and > see if it still has the same error? > > On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com> > wrote: > >> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)" >> >> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote: >> >> What source are you using? >> >> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko < >> aromanenko....@gmail.com> wrote: >> >>> Hello, >>> >>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline >>> with an external Python transform) with a PROCESS SDK Harness and Spark >>> Portable Runner but it fails. >>> To do that I have a running Spark Runner Job Server (Spark local) and >>> standalone Expansion Service (Python) which contains a code of my Python >>> transform that should be called from main Java pipeline. >>> >>> Once job has been submitted on Job Server and started running, it fails >>> with this error: >>> >>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: >>> Invoking job >>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>> 20/05/28 18:55:12 INFO >>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting >>> job invocation >>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>> 20/05/28 18:55:12 ERROR >>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >>> during job invocation >>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719. >>> >>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all >>> root nodes to be runner-implemented beam:transform:impulse:v1 or >>> beam:transform:read:v1 primitives, but >>> transform Create.Values/Read(CreateSource) executes in environment >>> Optional[urn: "beam:env:docker:v1"*payload: >>> "\n\033apache/beam_java_sdk:2.21.0" >>> capabilities: "beam:coder:bytes:v1” >>> …. >>> >>> >>> Some code snippets of my pipeline that can be helpful. >>> >>> *Java transform:* >>> >>> private static final String URN = "ml:genreclassifier:python:v1"; >>> >>> @Override >>> public PCollection<KV<String, String>> expand(PCollection<String> input) { >>> PCollection<KV<String, String>> output = >>> input.apply( >>> "ExternalGenreClassifier", >>> External.of(URN, new byte[] {}, options.getExpansionServiceURL()) >>> .<KV<String, String>>withOutputType()); >>> return output; >>> } >>> >>> >>> *expansion_service.py* >>> >>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None) >>> class GenreClassifier(ptransform.PTransform): >>> def __init__(self): >>> super(GenreClassifier, self).__init__() >>> >>> def expand(self, pcoll): >>> return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn()) >>> >>> def to_runner_api_parameter(self, unused_context): >>> return 'ml:genreclassifier:python:v1', None >>> >>> @staticmethod >>> def from_runner_api_parameter(unused_ptransform, unused_parameter, >>> unused_context): >>> return GenreClassifier() >>> >>> >>> def main(unused_argv): >>> ... >>> server = grpc.server(UnboundedThreadPoolExecutor()) >>> beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( >>> expansion_service.ExpansionServiceServicer( >>> PipelineOptions.from_dictionary({ >>> 'environment_type': 'PROCESS', >>> 'environment_config': *'{"command": >>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*, >>> 'sdk_location': 'container', >>> }) >>> ), server >>> ) >>> server.add_insecure_port('localhost:{}'.format(options.port)) >>> server.start() >>> >>> Does anyone have an idea what’s wrong with my setup/pipeline and how to >>> fix it? >>> >>> >>> >> >