Thanks! It was an issue with a setting virtualenv for a worker console where it should be running.
It would be useful to print out such errors with Error level log, I think. > On 29 May 2020, at 18:55, Kyle Weaver <kcwea...@google.com> wrote: > > That's probably a problem with your worker. You'll need to get additional > logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278 > <https://jira.apache.org/jira/browse/BEAM-8278>) > > On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Many thanks! It helped to avoid the error. I saw this option in the xlang > tests before but I didn’t add it since I was confused because of the name =) > Also, I think we need to added “—sdk_location=container” for Expansion Service > > Finally, I've managed to only Java and xlang pipeline (with Python external) > and it works for Docker Harness (though, I observe some new exceptions in the > runtime). > > On the other hand, with Process Harness it still fails with an error: > > 20/05/29 18:33:30 INFO > org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory: > Still waiting for startup of environment > '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot' > for worker id 1-10 > 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in task > 1.0 in stage 0.0 (TID 1) > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: Process died with exit code 1 > > If it’s unknown issue, I’ll create a Jira for that. > >> On 29 May 2020, at 16:46, Kyle Weaver <kcwea...@google.com >> <mailto:kcwea...@google.com>> wrote: >> >> Alexey, can you try adding --experiments=beam_fn_api to your pipeline >> options? We add the option automatically in Python [1] but we don't in Java. >> >> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can >> help with that. >> >> [1] >> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209 >> >> <https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209> >> [2] https://jira.apache.org/jira/browse/BEAM-10151 >> <https://jira.apache.org/jira/browse/BEAM-10151> >> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <aromanenko....@gmail.com >> <mailto:aromanenko....@gmail.com>> wrote: >> Yes, I did run only Java pipeline with Portable Runner and there is the same >> error. >> >> Also, I did the same (without cross-language component) against Beam 2.19 >> and 2.20. >> It works fine against Beam 2.19 (as expected, since I tested it already >> before) and fails with kind the same error against Beam 2.20: >> >> 20/05/29 15:59:23 ERROR >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >> during job invocation >> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1. >> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root >> nodes to be runner-implemented beam:transform:impulse:v1 or >> beam:transform:read:v1 primitives, but transform >> Create.Values/Read(CreateSource) executes in environment Optional[urn: >> "beam:env:docker:v1" >> payload: "\n\033apache/beam_java_sdk:2.20.0" >> >> Do you think it’s a bug or I miss something in configuration? >> >>> On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com >>> <mailto:kcwea...@google.com>> wrote: >>> >>> Can you try removing the cross-language component(s) from the pipeline and >>> see if it still has the same error? >>> >>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com >>> <mailto:aromanenko....@gmail.com>> wrote: >>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)" >>> >>>> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com >>>> <mailto:kcwea...@google.com>> wrote: >>>> >>>> What source are you using? >>>> >>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko....@gmail.com >>>> <mailto:aromanenko....@gmail.com>> wrote: >>>> Hello, >>>> >>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with >>>> an external Python transform) with a PROCESS SDK Harness and Spark >>>> Portable Runner but it fails. >>>> To do that I have a running Spark Runner Job Server (Spark local) and >>>> standalone Expansion Service (Python) which contains a code of my Python >>>> transform that should be called from main Java pipeline. >>>> >>>> Once job has been submitted on Job Server and started running, it fails >>>> with this error: >>>> >>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: >>>> Invoking job >>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>>> 20/05/28 18:55:12 INFO >>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting >>>> job invocation >>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>>> 20/05/28 18:55:12 ERROR >>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >>>> during job invocation >>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719. >>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root >>>> nodes to be runner-implemented beam:transform:impulse:v1 or >>>> beam:transform:read:v1 primitives, but transform >>>> Create.Values/Read(CreateSource) executes in environment Optional[urn: >>>> "beam:env:docker:v1" >>>> payload: "\n\033apache/beam_java_sdk:2.21.0" >>>> capabilities: "beam:coder:bytes:v1” >>>> …. >>>> >>>> >>>> Some code snippets of my pipeline that can be helpful. >>>> >>>> Java transform: >>>> private static final String URN = "ml:genreclassifier:python:v1"; >>>> @Override >>>> public PCollection<KV<String, String>> expand(PCollection<String> input) { >>>> PCollection<KV<String, String>> output = >>>> input.apply( >>>> "ExternalGenreClassifier", >>>> External.of(URN, new byte[] {}, options.getExpansionServiceURL()) >>>> .<KV<String, String>>withOutputType()); >>>> return output; >>>> } >>>> >>>> expansion_service.py >>>> >>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None) >>>> class GenreClassifier(ptransform.PTransform): >>>> def __init__(self): >>>> super(GenreClassifier, self).__init__() >>>> >>>> def expand(self, pcoll): >>>> return pcoll | "GenreClassifier" >> >>>> beam.ParDo(_GenreClassifierFn()) >>>> >>>> def to_runner_api_parameter(self, unused_context): >>>> return 'ml:genreclassifier:python:v1', None >>>> >>>> @staticmethod >>>> def from_runner_api_parameter(unused_ptransform, unused_parameter, >>>> unused_context): >>>> return GenreClassifier() >>>> >>>> def main(unused_argv): >>>> ... >>>> server = grpc.server(UnboundedThreadPoolExecutor()) >>>> beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( >>>> expansion_service.ExpansionServiceServicer( >>>> PipelineOptions.from_dictionary({ >>>> 'environment_type': 'PROCESS', >>>> 'environment_config': '{"command": >>>> “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}', >>>> 'sdk_location': 'container', >>>> }) >>>> ), server >>>> ) >>>> server.add_insecure_port('localhost:{}'.format(options.port)) >>>> server.start() >>>> >>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to >>>> fix it? >>>> >>>> >>> >> >