For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"

> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote:
> 
> What source are you using?
> 
> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Hello,
> 
> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with an 
> external Python transform) with a PROCESS SDK Harness and Spark Portable 
> Runner but it fails.
> To do that I have a running Spark Runner Job Server (Spark local) and 
> standalone Expansion Service (Python) which contains a code of my Python 
> transform that should be called from main Java pipeline.
> 
> Once job has been submitted on Job Server and started running, it fails with 
> this error:
> 
> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: 
> Invoking job 
> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
> 20/05/28 18:55:12 INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting job 
> invocation 
> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
> 20/05/28 18:55:12 ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during 
> job invocation 
> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root 
> nodes to be runner-implemented beam:transform:impulse:v1 or 
> beam:transform:read:v1 primitives, but transform 
> Create.Values/Read(CreateSource) executes in environment Optional[urn: 
> "beam:env:docker:v1"
> payload: "\n\033apache/beam_java_sdk:2.21.0"
> capabilities: "beam:coder:bytes:v1”
> ….
> 
> 
> Some code snippets of my pipeline that can be helpful.
> 
> Java transform:
> private static final String URN = "ml:genreclassifier:python:v1";
> @Override
> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>   PCollection<KV<String, String>> output =
>       input.apply(
>           "ExternalGenreClassifier",
>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>               .<KV<String, String>>withOutputType());
>   return output;
> }
> 
> expansion_service.py
> 
> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
> class GenreClassifier(ptransform.PTransform):
>     def __init__(self):
>         super(GenreClassifier, self).__init__()
> 
>     def expand(self, pcoll):
>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
> 
>     def to_runner_api_parameter(self, unused_context):
>         return 'ml:genreclassifier:python:v1', None
> 
>     @staticmethod
>     def from_runner_api_parameter(unused_ptransform, unused_parameter, 
> unused_context):
>         return GenreClassifier()
> 
> def main(unused_argv):
>     ...
>     server = grpc.server(UnboundedThreadPoolExecutor())
>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>         expansion_service.ExpansionServiceServicer(
>             PipelineOptions.from_dictionary({
>                 'environment_type': 'PROCESS',
>                 'environment_config': '{"command": 
> “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
>                 'sdk_location': 'container',
>             })
>         ), server
>     )
>     server.add_insecure_port('localhost:{}'.format(options.port))
>     server.start()
> 
> Does anyone have an idea what’s wrong with my setup/pipeline and how to fix 
> it?
> 
> 

Reply via email to