That's probably a problem with your worker. You'll need to get additional
logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278)

On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Many thanks! It helped to avoid the error. I saw this option in the xlang
> tests before but I didn’t add it since I was confused because of the name =)
> Also, I think we need to added “*—**sdk_location=container*” for
> Expansion Service
>
> Finally, I've managed to only Java and xlang pipeline (with Python
> external) and it works for Docker Harness (though, I observe some new
> exceptions in the runtime).
>
> On the other hand, with Process Harness it still fails with an error:
>
> 20/05/29 18:33:30 INFO
> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory:
> Still waiting for startup of
> environment 
> '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot'
> for worker id 1-10
> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in
> task 1.0 in stage 0.0 (TID 1)
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalStateException: Process died with exit code 1
>
> If it’s unknown issue, I’ll create a Jira for that.
>
> On 29 May 2020, at 16:46, Kyle Weaver <kcwea...@google.com> wrote:
>
> Alexey, can you try adding --experiments=beam_fn_api to your pipeline
> options? We add the option automatically in Python [1] but we don't in Java.
>
> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can
> help with that.
>
> [1]
> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209
> [2] https://jira.apache.org/jira/browse/BEAM-10151
>
> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <
> aromanenko....@gmail.com> wrote:
>
>> Yes, I did run only Java pipeline with Portable Runner and there is the
>> same error.
>>
>> Also, I did the same (without cross-language component) against Beam 2.19
>> and 2.20.
>> It works fine against Beam 2.19 (as expected, since I tested it already
>> before) and fails with kind the same error against Beam 2.20:
>>
>> 20/05/29 15:59:23 ERROR
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>> during job invocation
>> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root
>> nodes to be runner-implemented beam:transform:impulse:v1 or
>> beam:transform:read:v1 primitives, but
>> transform Create.Values/Read(CreateSource) executes in environment
>> Optional[urn: "beam:env:docker:v1"
>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>>
>> Do you think it’s a bug or I miss something in configuration?
>>
>> On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com> wrote:
>>
>> Can you try removing the cross-language component(s) from the
>> pipeline and see if it still has the same error?
>>
>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>>
>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>>
>>> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote:
>>>
>>> What source are you using?
>>>
>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <
>>> aromanenko....@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline
>>>> with an external Python transform) with a PROCESS SDK Harness and Spark
>>>> Portable Runner but it fails.
>>>> To do that I have a running Spark Runner Job Server (Spark local) and
>>>> standalone Expansion Service (Python) which contains a code of my Python
>>>> transform that should be called from main Java pipeline.
>>>>
>>>> Once job has been submitted on Job Server and started running, it fails
>>>> with this error:
>>>>
>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker:
>>>> Invoking job
>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>> 20/05/28 18:55:12 INFO
>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>>>> job invocation
>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>> 20/05/28 18:55:12 ERROR
>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>>> during job invocation
>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>>
>>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>>> beam:transform:read:v1 primitives, but
>>>> transform Create.Values/Read(CreateSource) executes in environment
>>>> Optional[urn: "beam:env:docker:v1"*payload:
>>>> "\n\033apache/beam_java_sdk:2.21.0"
>>>> capabilities: "beam:coder:bytes:v1”
>>>> ….
>>>>
>>>>
>>>> Some code snippets of my pipeline that can be helpful.
>>>>
>>>> *Java transform:*
>>>>
>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>>
>>>> @Override
>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>   PCollection<KV<String, String>> output =
>>>>       input.apply(
>>>>           "ExternalGenreClassifier",
>>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>>               .<KV<String, String>>withOutputType());
>>>>   return output;
>>>> }
>>>>
>>>>
>>>> *expansion_service.py*
>>>>
>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>> class GenreClassifier(ptransform.PTransform):
>>>>     def __init__(self):
>>>>         super(GenreClassifier, self).__init__()
>>>>
>>>>     def expand(self, pcoll):
>>>>         return pcoll | "GenreClassifier" >> 
>>>> beam.ParDo(_GenreClassifierFn())
>>>>
>>>>     def to_runner_api_parameter(self, unused_context):
>>>>         return 'ml:genreclassifier:python:v1', None
>>>>
>>>>     @staticmethod
>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, 
>>>> unused_context):
>>>>         return GenreClassifier()
>>>>
>>>>
>>>> def main(unused_argv):
>>>>     ...
>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>         expansion_service.ExpansionServiceServicer(
>>>>             PipelineOptions.from_dictionary({
>>>>                 'environment_type': 'PROCESS',
>>>>                 'environment_config': *'{"command": 
>>>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>>>                 'sdk_location': 'container',
>>>>             })
>>>>         ), server
>>>>     )
>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>     server.start()
>>>>
>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to
>>>> fix it?
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to