For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
> On 28 May 2020, at 19:29, Kyle Weaver <kcwea...@google.com> wrote: > > What source are you using? > > On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Hello, > > I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with an > external Python transform) with a PROCESS SDK Harness and Spark Portable > Runner but it fails. > To do that I have a running Spark Runner Job Server (Spark local) and > standalone Expansion Service (Python) which contains a code of my Python > transform that should be called from main Java pipeline. > > Once job has been submitted on Job Server and started running, it fails with > this error: > > 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: > Invoking job > classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 > 20/05/28 18:55:12 INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting job > invocation > classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719 > 20/05/28 18:55:12 ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during > job invocation > classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719. > java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root > nodes to be runner-implemented beam:transform:impulse:v1 or > beam:transform:read:v1 primitives, but transform > Create.Values/Read(CreateSource) executes in environment Optional[urn: > "beam:env:docker:v1" > payload: "\n\033apache/beam_java_sdk:2.21.0" > capabilities: "beam:coder:bytes:v1” > …. > > > Some code snippets of my pipeline that can be helpful. > > Java transform: > private static final String URN = "ml:genreclassifier:python:v1"; > @Override > public PCollection<KV<String, String>> expand(PCollection<String> input) { > PCollection<KV<String, String>> output = > input.apply( > "ExternalGenreClassifier", > External.of(URN, new byte[] {}, options.getExpansionServiceURL()) > .<KV<String, String>>withOutputType()); > return output; > } > > expansion_service.py > > @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None) > class GenreClassifier(ptransform.PTransform): > def __init__(self): > super(GenreClassifier, self).__init__() > > def expand(self, pcoll): > return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn()) > > def to_runner_api_parameter(self, unused_context): > return 'ml:genreclassifier:python:v1', None > > @staticmethod > def from_runner_api_parameter(unused_ptransform, unused_parameter, > unused_context): > return GenreClassifier() > > def main(unused_argv): > ... > server = grpc.server(UnboundedThreadPoolExecutor()) > beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( > expansion_service.ExpansionServiceServicer( > PipelineOptions.from_dictionary({ > 'environment_type': 'PROCESS', > 'environment_config': '{"command": > “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}', > 'sdk_location': 'container', > }) > ), server > ) > server.add_insecure_port('localhost:{}'.format(options.port)) > server.start() > > Does anyone have an idea what’s wrong with my setup/pipeline and how to fix > it? > >