Yes, I did run only Java pipeline with Portable Runner and there is the same error.
Also, I did the same (without cross-language component) against Beam 2.19 and 2.20. It works fine against Beam 2.19 (as expected, since I tested it already before) and fails with kind the same error against Beam 2.20: 20/05/29 15:59:23 ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during job invocation classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1. java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 primitives, but transform Create.Values/Read(CreateSource) executes in environment Optional[urn: "beam:env:docker:v1" payload: "\n\033apache/beam_java_sdk:2.20.0" Do you think it’s a bug or I miss something in configuration? > On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com> wrote: > > Can you try removing the cross-language component(s) from the pipeline and > see if it still has the same error? > > On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)" > >> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com >> <mailto:kcwea...@google.com>> wrote: >> >> What source are you using? >> >> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko....@gmail.com >> <mailto:aromanenko....@gmail.com>> wrote: >> Hello, >> >> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with >> an external Python transform) with a PROCESS SDK Harness and Spark Portable >> Runner but it fails. >> To do that I have a running Spark Runner Job Server (Spark local) and >> standalone Expansion Service (Python) which contains a code of my Python >> transform that should be called from main Java pipeline. >> >> Once job has been submitted on Job Server and started running, it fails with >> this error: >> >> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: >> Invoking job >> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >> 20/05/28 18:55:12 INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting >> job invocation >> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >> 20/05/28 18:55:12 ERROR >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >> during job invocation >> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719. >> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root >> nodes to be runner-implemented beam:transform:impulse:v1 or >> beam:transform:read:v1 primitives, but transform >> Create.Values/Read(CreateSource) executes in environment Optional[urn: >> "beam:env:docker:v1" >> payload: "\n\033apache/beam_java_sdk:2.21.0" >> capabilities: "beam:coder:bytes:v1” >> …. >> >> >> Some code snippets of my pipeline that can be helpful. >> >> Java transform: >> private static final String URN = "ml:genreclassifier:python:v1"; >> @Override >> public PCollection<KV<String, String>> expand(PCollection<String> input) { >> PCollection<KV<String, String>> output = >> input.apply( >> "ExternalGenreClassifier", >> External.of(URN, new byte[] {}, options.getExpansionServiceURL()) >> .<KV<String, String>>withOutputType()); >> return output; >> } >> >> expansion_service.py >> >> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None) >> class GenreClassifier(ptransform.PTransform): >> def __init__(self): >> super(GenreClassifier, self).__init__() >> >> def expand(self, pcoll): >> return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn()) >> >> def to_runner_api_parameter(self, unused_context): >> return 'ml:genreclassifier:python:v1', None >> >> @staticmethod >> def from_runner_api_parameter(unused_ptransform, unused_parameter, >> unused_context): >> return GenreClassifier() >> >> def main(unused_argv): >> ... >> server = grpc.server(UnboundedThreadPoolExecutor()) >> beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( >> expansion_service.ExpansionServiceServicer( >> PipelineOptions.from_dictionary({ >> 'environment_type': 'PROCESS', >> 'environment_config': '{"command": >> “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}', >> 'sdk_location': 'container', >> }) >> ), server >> ) >> server.add_insecure_port('localhost:{}'.format(options.port)) >> server.start() >> >> Does anyone have an idea what’s wrong with my setup/pipeline and how to fix >> it? >> >> >