Alexey, can you try adding --experiments=beam_fn_api to your pipeline
options? We add the option automatically in Python [1] but we don't in Java.

I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can
help with that.

[1]
https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209
[2] https://jira.apache.org/jira/browse/BEAM-10151

On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Yes, I did run only Java pipeline with Portable Runner and there is the
> same error.
>
> Also, I did the same (without cross-language component) against Beam 2.19
> and 2.20.
> It works fine against Beam 2.19 (as expected, since I tested it already
> before) and fails with kind the same error against Beam 2.20:
>
> 20/05/29 15:59:23 ERROR
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
> during job invocation
> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root
> nodes to be runner-implemented beam:transform:impulse:v1 or
> beam:transform:read:v1 primitives, but
> transform Create.Values/Read(CreateSource) executes in environment
> Optional[urn: "beam:env:docker:v1"
> payload: "\n\033apache/beam_java_sdk:2.20.0"
>
> Do you think it’s a bug or I miss something in configuration?
>
> On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com> wrote:
>
> Can you try removing the cross-language component(s) from the pipeline and
> see if it still has the same error?
>
> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com>
> wrote:
>
>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>
>> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote:
>>
>> What source are you using?
>>
>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline
>>> with an external Python transform) with a PROCESS SDK Harness and Spark
>>> Portable Runner but it fails.
>>> To do that I have a running Spark Runner Job Server (Spark local) and
>>> standalone Expansion Service (Python) which contains a code of my Python
>>> transform that should be called from main Java pipeline.
>>>
>>> Once job has been submitted on Job Server and started running, it fails
>>> with this error:
>>>
>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker:
>>> Invoking job
>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>> 20/05/28 18:55:12 INFO
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>>> job invocation
>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>> 20/05/28 18:55:12 ERROR
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>> during job invocation
>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>
>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>> beam:transform:read:v1 primitives, but
>>> transform Create.Values/Read(CreateSource) executes in environment
>>> Optional[urn: "beam:env:docker:v1"*payload:
>>> "\n\033apache/beam_java_sdk:2.21.0"
>>> capabilities: "beam:coder:bytes:v1”
>>> ….
>>>
>>>
>>> Some code snippets of my pipeline that can be helpful.
>>>
>>> *Java transform:*
>>>
>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>
>>> @Override
>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>   PCollection<KV<String, String>> output =
>>>       input.apply(
>>>           "ExternalGenreClassifier",
>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>               .<KV<String, String>>withOutputType());
>>>   return output;
>>> }
>>>
>>>
>>> *expansion_service.py*
>>>
>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>> class GenreClassifier(ptransform.PTransform):
>>>     def __init__(self):
>>>         super(GenreClassifier, self).__init__()
>>>
>>>     def expand(self, pcoll):
>>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>
>>>     def to_runner_api_parameter(self, unused_context):
>>>         return 'ml:genreclassifier:python:v1', None
>>>
>>>     @staticmethod
>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, 
>>> unused_context):
>>>         return GenreClassifier()
>>>
>>>
>>> def main(unused_argv):
>>>     ...
>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>         expansion_service.ExpansionServiceServicer(
>>>             PipelineOptions.from_dictionary({
>>>                 'environment_type': 'PROCESS',
>>>                 'environment_config': *'{"command": 
>>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>>                 'sdk_location': 'container',
>>>             })
>>>         ), server
>>>     )
>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>     server.start()
>>>
>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to
>>> fix it?
>>>
>>>
>>>
>>
>

Reply via email to