Great. Thanks. On Mon, Jun 1, 2020 at 9:14 AM Alexey Romanenko <aromanenko....@gmail.com> wrote:
> Yes, I tested it with the cross-language transform (Java pipeline with > Python external transform). > > On 1 Jun 2020, at 17:49, Chamikara Jayalath <chamik...@google.com> wrote: > > To clarify, is the error resolved with the cross-language transform as > well ? If not please file a Jira. > > On Mon, Jun 1, 2020 at 8:24 AM Kyle Weaver <kcwea...@google.com> wrote: > >> > It would be useful to print out such errors with Error level log, I >> think. >> >> I agree, using environment_type=PROCESS is difficult enough without >> hiding the logs by default. I re-opened the issue. >> >> On Mon, Jun 1, 2020 at 11:01 AM Alexey Romanenko < >> aromanenko....@gmail.com> wrote: >> >>> Thanks! It was an issue with a setting *virtualenv* for a worker >>> console where it should be running. >>> >>> It would be useful to print out such errors with Error level log, I >>> think. >>> >>> On 29 May 2020, at 18:55, Kyle Weaver <kcwea...@google.com> wrote: >>> >>> That's probably a problem with your worker. You'll need to get >>> additional logs to debug (see >>> https://jira.apache.org/jira/browse/BEAM-8278) >>> >>> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko < >>> aromanenko....@gmail.com> wrote: >>> >>>> Many thanks! It helped to avoid the error. I saw this option in the >>>> xlang tests before but I didn’t add it since I was confused because of the >>>> name =) >>>> Also, I think we need to added “*—**sdk_location=container*” for >>>> Expansion Service >>>> >>>> Finally, I've managed to only Java and xlang pipeline (with Python >>>> external) and it works for Docker Harness (though, I observe some new >>>> exceptions in the runtime). >>>> >>>> On the other hand, with Process Harness it still fails with an error: >>>> >>>> 20/05/29 18:33:30 INFO >>>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory: >>>> Still waiting for startup of >>>> environment >>>> '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot' >>>> for worker id 1-10 >>>> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception >>>> in task 1.0 in stage 0.0 (TID 1) >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: >>>> java.lang.IllegalStateException: Process died with exit code 1 >>>> >>>> If it’s unknown issue, I’ll create a Jira for that. >>>> >>>> On 29 May 2020, at 16:46, Kyle Weaver <kcwea...@google.com> wrote: >>>> >>>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline >>>> options? We add the option automatically in Python [1] but we don't in >>>> Java. >>>> >>>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you >>>> can help with that. >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209 >>>> [2] https://jira.apache.org/jira/browse/BEAM-10151 >>>> >>>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko < >>>> aromanenko....@gmail.com> wrote: >>>> >>>>> Yes, I did run only Java pipeline with Portable Runner and there is >>>>> the same error. >>>>> >>>>> Also, I did the same (without cross-language component) against Beam >>>>> 2.19 and 2.20. >>>>> It works fine against Beam 2.19 (as expected, since I tested it >>>>> already before) and fails with kind the same error against Beam 2.20: >>>>> >>>>> 20/05/29 15:59:23 ERROR >>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >>>>> during job invocation >>>>> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1. >>>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all >>>>> root nodes to be runner-implemented beam:transform:impulse:v1 or >>>>> beam:transform:read:v1 primitives, but >>>>> transform Create.Values/Read(CreateSource) executes in environment >>>>> Optional[urn: "beam:env:docker:v1" >>>>> payload: "\n\033apache/beam_java_sdk:2.20.0" >>>>> >>>>> Do you think it’s a bug or I miss something in configuration? >>>>> >>>>> On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com> wrote: >>>>> >>>>> Can you try removing the cross-language component(s) from the >>>>> pipeline and see if it still has the same error? >>>>> >>>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko < >>>>> aromanenko....@gmail.com> wrote: >>>>> >>>>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)" >>>>>> >>>>>> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote: >>>>>> >>>>>> What source are you using? >>>>>> >>>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko < >>>>>> aromanenko....@gmail.com> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java >>>>>>> pipeline with an external Python transform) with a PROCESS SDK Harness >>>>>>> and >>>>>>> Spark Portable Runner but it fails. >>>>>>> To do that I have a running Spark Runner Job Server (Spark local) >>>>>>> and standalone Expansion Service (Python) which contains a code of my >>>>>>> Python transform that should be called from main Java pipeline. >>>>>>> >>>>>>> Once job has been submitted on Job Server and started running, it >>>>>>> fails with this error: >>>>>>> >>>>>>> 20/05/28 18:55:12 INFO >>>>>>> org.apache.beam.runners.spark.SparkJobInvoker: Invoking job >>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>>>>>> 20/05/28 18:55:12 INFO >>>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: >>>>>>> Starting >>>>>>> job invocation >>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 >>>>>>> 20/05/28 18:55:12 ERROR >>>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error >>>>>>> during job invocation >>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719. >>>>>>> >>>>>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires >>>>>>> all root nodes to be runner-implemented beam:transform:impulse:v1 or >>>>>>> beam:transform:read:v1 primitives, but >>>>>>> transform Create.Values/Read(CreateSource) executes in environment >>>>>>> Optional[urn: "beam:env:docker:v1"*payload: >>>>>>> "\n\033apache/beam_java_sdk:2.21.0" >>>>>>> capabilities: "beam:coder:bytes:v1” >>>>>>> …. >>>>>>> >>>>>>> >>>>>>> Some code snippets of my pipeline that can be helpful. >>>>>>> >>>>>>> *Java transform:* >>>>>>> >>>>>>> private static final String URN = "ml:genreclassifier:python:v1"; >>>>>>> >>>>>>> @Override >>>>>>> public PCollection<KV<String, String>> expand(PCollection<String> >>>>>>> input) { >>>>>>> PCollection<KV<String, String>> output = >>>>>>> input.apply( >>>>>>> "ExternalGenreClassifier", >>>>>>> External.of(URN, new byte[] {}, >>>>>>> options.getExpansionServiceURL()) >>>>>>> .<KV<String, String>>withOutputType()); >>>>>>> return output; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> *expansion_service.py* >>>>>>> >>>>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', >>>>>>> None) >>>>>>> class GenreClassifier(ptransform.PTransform): >>>>>>> def __init__(self): >>>>>>> super(GenreClassifier, self).__init__() >>>>>>> >>>>>>> def expand(self, pcoll): >>>>>>> return pcoll | "GenreClassifier" >> >>>>>>> beam.ParDo(_GenreClassifierFn()) >>>>>>> >>>>>>> def to_runner_api_parameter(self, unused_context): >>>>>>> return 'ml:genreclassifier:python:v1', None >>>>>>> >>>>>>> @staticmethod >>>>>>> def from_runner_api_parameter(unused_ptransform, unused_parameter, >>>>>>> unused_context): >>>>>>> return GenreClassifier() >>>>>>> >>>>>>> >>>>>>> def main(unused_argv): >>>>>>> ... >>>>>>> server = grpc.server(UnboundedThreadPoolExecutor()) >>>>>>> beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( >>>>>>> expansion_service.ExpansionServiceServicer( >>>>>>> PipelineOptions.from_dictionary({ >>>>>>> 'environment_type': 'PROCESS', >>>>>>> 'environment_config': *'{"command": >>>>>>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*, >>>>>>> 'sdk_location': 'container', >>>>>>> }) >>>>>>> ), server >>>>>>> ) >>>>>>> server.add_insecure_port('localhost:{}'.format(options.port)) >>>>>>> server.start() >>>>>>> >>>>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how >>>>>>> to fix it? >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >