Can you try removing the cross-language component(s) from the pipeline and
see if it still has the same error?

On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>
> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote:
>
> What source are you using?
>
> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko....@gmail.com>
> wrote:
>
>> Hello,
>>
>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline
>> with an external Python transform) with a PROCESS SDK Harness and Spark
>> Portable Runner but it fails.
>> To do that I have a running Spark Runner Job Server (Spark local) and
>> standalone Expansion Service (Python) which contains a code of my Python
>> transform that should be called from main Java pipeline.
>>
>> Once job has been submitted on Job Server and started running, it fails
>> with this error:
>>
>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker:
>> Invoking job
>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>> 20/05/28 18:55:12 INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>> job invocation
>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>> 20/05/28 18:55:12 ERROR
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>> during job invocation
>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>
>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>> beam:transform:read:v1 primitives, but
>> transform Create.Values/Read(CreateSource) executes in environment
>> Optional[urn: "beam:env:docker:v1"*payload:
>> "\n\033apache/beam_java_sdk:2.21.0"
>> capabilities: "beam:coder:bytes:v1”
>> ….
>>
>>
>> Some code snippets of my pipeline that can be helpful.
>>
>> *Java transform:*
>>
>> private static final String URN = "ml:genreclassifier:python:v1";
>>
>> @Override
>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>   PCollection<KV<String, String>> output =
>>       input.apply(
>>           "ExternalGenreClassifier",
>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>               .<KV<String, String>>withOutputType());
>>   return output;
>> }
>>
>>
>> *expansion_service.py*
>>
>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>> class GenreClassifier(ptransform.PTransform):
>>     def __init__(self):
>>         super(GenreClassifier, self).__init__()
>>
>>     def expand(self, pcoll):
>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>
>>     def to_runner_api_parameter(self, unused_context):
>>         return 'ml:genreclassifier:python:v1', None
>>
>>     @staticmethod
>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, 
>> unused_context):
>>         return GenreClassifier()
>>
>>
>> def main(unused_argv):
>>     ...
>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>         expansion_service.ExpansionServiceServicer(
>>             PipelineOptions.from_dictionary({
>>                 'environment_type': 'PROCESS',
>>                 'environment_config': *'{"command": 
>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>                 'sdk_location': 'container',
>>             })
>>         ), server
>>     )
>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>     server.start()
>>
>> Does anyone have an idea what’s wrong with my setup/pipeline and how to
>> fix it?
>>
>>
>>
>

Reply via email to