Yes, I did run only Java pipeline with Portable Runner and there is the same 
error.

Also, I did the same (without cross-language component) against Beam 2.19 and 
2.20. 
It works fine against Beam 2.19 (as expected, since I tested it already before) 
and fails with kind the same error against Beam 2.20:

20/05/29 15:59:23 ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during 
job invocation 
classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes 
to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 
primitives, but transform Create.Values/Read(CreateSource) executes in 
environment Optional[urn: "beam:env:docker:v1"
payload: "\n\033apache/beam_java_sdk:2.20.0"

Do you think it’s a bug or I miss something in configuration?

> On 28 May 2020, at 22:25, Kyle Weaver <kcwea...@google.com> wrote:
> 
> Can you try removing the cross-language component(s) from the pipeline and 
> see if it still has the same error?
> 
> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
> 
>> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com 
>> <mailto:kcwea...@google.com>> wrote:
>> 
>> What source are you using?
>> 
>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>> wrote:
>> Hello,
>> 
>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with 
>> an external Python transform) with a PROCESS SDK Harness and Spark Portable 
>> Runner but it fails.
>> To do that I have a running Spark Runner Job Server (Spark local) and 
>> standalone Expansion Service (Python) which contains a code of my Python 
>> transform that should be called from main Java pipeline.
>> 
>> Once job has been submitted on Job Server and started running, it fails with 
>> this error:
>> 
>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: 
>> Invoking job 
>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>> 20/05/28 18:55:12 INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting 
>> job invocation 
>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>> 20/05/28 18:55:12 ERROR 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error 
>> during job invocation 
>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root 
>> nodes to be runner-implemented beam:transform:impulse:v1 or 
>> beam:transform:read:v1 primitives, but transform 
>> Create.Values/Read(CreateSource) executes in environment Optional[urn: 
>> "beam:env:docker:v1"
>> payload: "\n\033apache/beam_java_sdk:2.21.0"
>> capabilities: "beam:coder:bytes:v1”
>> ….
>> 
>> 
>> Some code snippets of my pipeline that can be helpful.
>> 
>> Java transform:
>> private static final String URN = "ml:genreclassifier:python:v1";
>> @Override
>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>   PCollection<KV<String, String>> output =
>>       input.apply(
>>           "ExternalGenreClassifier",
>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>               .<KV<String, String>>withOutputType());
>>   return output;
>> }
>> 
>> expansion_service.py
>> 
>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>> class GenreClassifier(ptransform.PTransform):
>>     def __init__(self):
>>         super(GenreClassifier, self).__init__()
>> 
>>     def expand(self, pcoll):
>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>> 
>>     def to_runner_api_parameter(self, unused_context):
>>         return 'ml:genreclassifier:python:v1', None
>> 
>>     @staticmethod
>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, 
>> unused_context):
>>         return GenreClassifier()
>> 
>> def main(unused_argv):
>>     ...
>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>         expansion_service.ExpansionServiceServicer(
>>             PipelineOptions.from_dictionary({
>>                 'environment_type': 'PROCESS',
>>                 'environment_config': '{"command": 
>> “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
>>                 'sdk_location': 'container',
>>             })
>>         ), server
>>     )
>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>     server.start()
>> 
>> Does anyone have an idea what’s wrong with my setup/pipeline and how to fix 
>> it?
>> 
>> 
> 

Reply via email to