Kyle Thank you for the assistance.
By specifying "experiments" in PipelineOptions , ========================================== options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081", "--experiments=beam_fn_api" ]) ========================================== I was able to submit the job successfully. [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch Execution Environment. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master URL localhost:8081. [flink-runner-job-invoker] WARN org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism [flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers [flink-runner-job-invoker] INFO org.apache.flink.configuration.Configuration - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' [flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. [flink-runner-job-invoker] INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 4e055a8878dda3f564a7b7c84d48510d (detached: false). Thanks, Yu Watanabe On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com> wrote: > Try adding "--experiments=beam_fn_api" to your pipeline options. (This is > a known issue with Beam 2.15 that will be fixed in 2.16.) > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com > > > On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe <yu.w.ten...@gmail.com> > wrote: > >> Hello. >> >> I am trying to spin up the flink runner but looks like data serialization >> is failing. >> I would like to ask for help to get over with this error. >> >> ======================================================================== >> [flink-runner-job-invoker] ERROR >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error >> during job invocation >> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. >> java.lang.IllegalArgumentException: unable to deserialize BoundedSource >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> at >> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) >> at >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) >> at >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) >> at >> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) >> ywatanabe@debian-09-00:~$ >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) >> at >> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) >> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) >> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) >> at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) >> at >> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) >> at >> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) >> at >> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) >> ... 13 more >> ======================================================================== >> >> My beam version is below. >> >> ======================================================================= >> (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep apache-beam >> apache-beam==2.15.0 >> ======================================================================= >> >> I have my harness container ready on the registry. >> >> ======================================================================= >> ywatanabe@debian-09-00:~$ docker search >> ywatanabe-docker-apache.bintray.io/python3 >> NAME DESCRIPTION STARS OFFICIAL >> AUTOMATED >> beam/python3 0 >> ======================================================================= >> >> Flink is ready on separate cluster. >> >> ======================================================================= >> (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081 >> tcp LISTEN 0 128 :::8081 :::* >> ======================================================================= >> >> My debian version. >> >> ======================================================================= >> (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version >> 9.11 >> ======================================================================= >> >> My code snippet is below. >> >> ======================================================================= >> options = PipelineOptions([ >> "--runner=FlinkRunner", >> "--flink_version=1.8", >> "--flink_master_url=localhost:8081" >> ]) >> >> with beam.Pipeline(options=options) as p: >> >> (p | beam.Create(["Hello World"])) >> ======================================================================= >> >> Would there be any other settings should I look for ? >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> Weekend Freelancer who loves to challenge building data platform >> yu.w.ten...@gmail.com >> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >> Twitter icon] <https://twitter.com/yuwtennis> >> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>