Yes, I tested it with the cross-language transform (Java pipeline with Python 
external transform).
> On 1 Jun 2020, at 17:49, Chamikara Jayalath <chamik...@google.com> wrote:
> 
> To clarify, is the error resolved with the cross-language transform as well ? 
> If not please file a Jira.
> 
> On Mon, Jun 1, 2020 at 8:24 AM Kyle Weaver <kcwea...@google.com 
> <mailto:kcwea...@google.com>> wrote:
> > It would be useful to print out such errors with Error level log, I think.
> 
> I agree, using environment_type=PROCESS is difficult enough without hiding 
> the logs by default. I re-opened the issue.
> 
> On Mon, Jun 1, 2020 at 11:01 AM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Thanks! It was an issue with a setting virtualenv for a worker console where 
> it should be running.
> 
> It would be useful to print out such errors with Error level log, I think.
> 
>> On 29 May 2020, at 18:55, Kyle Weaver <kcwea...@google.com 
>> <mailto:kcwea...@google.com>> wrote:
>> 
>> That's probably a problem with your worker. You'll need to get additional 
>> logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278 
>> <https://jira.apache.org/jira/browse/BEAM-8278>)
>> 
>> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>> wrote:
>> Many thanks! It helped to avoid the error. I saw this option in the xlang 
>> tests before but I didn’t add it since I was confused because of the name =)
>> Also, I think we need to added “—sdk_location=container” for Expansion 
>> Service
>> 
>> Finally, I've managed to only Java and xlang pipeline (with Python external) 
>> and it works for Docker Harness (though, I observe some new exceptions in 
>> the runtime).
>> 
>> On the other hand, with Process Harness it still fails with an error:
>> 
>> 20/05/29 18:33:30 INFO 
>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory: 
>> Still waiting for startup of environment 
>> '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot'
>>  for worker id 1-10
>> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in 
>> task 1.0 in stage 0.0 (TID 1)
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>  java.lang.IllegalStateException: Process died with exit code 1
>> 
>> If it’s unknown issue, I’ll create a Jira for that.
>> 
>>> On 29 May 2020, at 16:46, Kyle Weaver <kcwea...@google.com 
>>> <mailto:kcwea...@google.com>> wrote:
>>> 
>>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline 
>>> options? We add the option automatically in Python [1] but we don't in Java.
>>> 
>>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can 
>>> help with that.
>>> 
>>> [1] 
>>> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209
>>>  
>>> <https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209>
>>> [2] https://jira.apache.org/jira/browse/BEAM-10151 
>>> <https://jira.apache.org/jira/browse/BEAM-10151>
>>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <aromanenko....@gmail.com 
>>> <mailto:aromanenko....@gmail.com>> wrote:
>>> Yes, I did run only Java pipeline with Portable Runner and there is the 
>>> same error.
>>> 
>>> Also, I did the same (without cross-language component) against Beam 2.19 
>>> and 2.20. 
>>> It works fine against Beam 2.19 (as expected, since I tested it already 
>>> before) and fails with kind the same error against Beam 2.20:
>>> 
>>> 20/05/29 15:59:23 ERROR 
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error 
>>> during job invocation 
>>> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root 
>>> nodes to be runner-implemented beam:transform:impulse:v1 or 
>>> beam:transform:read:v1 primitives, but transform 
>>> Create.Values/Read(CreateSource) executes in environment Optional[urn: 
>>> "beam:env:docker:v1"
>>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>>> 
>>> Do you think it’s a bug or I miss something in configuration?
>>> 
>>>> On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com 
>>>> <mailto:kcwea...@google.com>> wrote:
>>>> 
>>>> Can you try removing the cross-language component(s) from the pipeline and 
>>>> see if it still has the same error?
>>>> 
>>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com 
>>>> <mailto:aromanenko....@gmail.com>> wrote:
>>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>>> 
>>>>> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com 
>>>>> <mailto:kcwea...@google.com>> wrote:
>>>>> 
>>>>> What source are you using?
>>>>> 
>>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko 
>>>>> <aromanenko....@gmail.com <mailto:aromanenko....@gmail.com>> wrote:
>>>>> Hello,
>>>>> 
>>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline 
>>>>> with an external Python transform) with a PROCESS SDK Harness and Spark 
>>>>> Portable Runner but it fails.
>>>>> To do that I have a running Spark Runner Job Server (Spark local) and 
>>>>> standalone Expansion Service (Python) which contains a code of my Python 
>>>>> transform that should be called from main Java pipeline.
>>>>> 
>>>>> Once job has been submitted on Job Server and started running, it fails 
>>>>> with this error:
>>>>> 
>>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: 
>>>>> Invoking job 
>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>> 20/05/28 18:55:12 INFO 
>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting 
>>>>> job invocation 
>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>> 20/05/28 18:55:12 ERROR 
>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error 
>>>>> during job invocation 
>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root 
>>>>> nodes to be runner-implemented beam:transform:impulse:v1 or 
>>>>> beam:transform:read:v1 primitives, but transform 
>>>>> Create.Values/Read(CreateSource) executes in environment Optional[urn: 
>>>>> "beam:env:docker:v1"
>>>>> payload: "\n\033apache/beam_java_sdk:2.21.0"
>>>>> capabilities: "beam:coder:bytes:v1”
>>>>> ….
>>>>> 
>>>>> 
>>>>> Some code snippets of my pipeline that can be helpful.
>>>>> 
>>>>> Java transform:
>>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>>> @Override
>>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>>   PCollection<KV<String, String>> output =
>>>>>       input.apply(
>>>>>           "ExternalGenreClassifier",
>>>>>           External.of(URN, new byte[] {}, 
>>>>> options.getExpansionServiceURL())
>>>>>               .<KV<String, String>>withOutputType());
>>>>>   return output;
>>>>> }
>>>>> 
>>>>> expansion_service.py
>>>>> 
>>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>>> class GenreClassifier(ptransform.PTransform):
>>>>>     def __init__(self):
>>>>>         super(GenreClassifier, self).__init__()
>>>>> 
>>>>>     def expand(self, pcoll):
>>>>>         return pcoll | "GenreClassifier" >> 
>>>>> beam.ParDo(_GenreClassifierFn())
>>>>> 
>>>>>     def to_runner_api_parameter(self, unused_context):
>>>>>         return 'ml:genreclassifier:python:v1', None
>>>>> 
>>>>>     @staticmethod
>>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, 
>>>>> unused_context):
>>>>>         return GenreClassifier()
>>>>> 
>>>>> def main(unused_argv):
>>>>>     ...
>>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>>         expansion_service.ExpansionServiceServicer(
>>>>>             PipelineOptions.from_dictionary({
>>>>>                 'environment_type': 'PROCESS',
>>>>>                 'environment_config': '{"command": 
>>>>> “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
>>>>>                 'sdk_location': 'container',
>>>>>             })
>>>>>         ), server
>>>>>     )
>>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>>     server.start()
>>>>> 
>>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to 
>>>>> fix it?
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to