Kyle

Thank you for the assistance.

By specifying "experiments" in PipelineOptions ,
==========================================
        options = PipelineOptions([
                      "--runner=FlinkRunner",
                      "--flink_version=1.8",
                      "--flink_master_url=localhost:8081",
                      "--experiments=beam_fn_api"
                  ])
==========================================

I was able to submit the job successfully.

[grpc-default-executor-0] INFO
org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting
job invocation
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch
Execution Environment.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
Master URL localhost:8081.
[flink-runner-job-invoker] WARN
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default
parallelism could be found. Defaulting to parallelism 1. Please set an
explicit parallelism with --parallelism
[flink-runner-job-invoker] INFO
org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered
types and 0 default Kryo serializers
[flink-runner-job-invoker] INFO
org.apache.flink.configuration.Configuration - Config uses fallback
configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient -
Rest client endpoint started.
[flink-runner-job-invoker] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
4e055a8878dda3f564a7b7c84d48510d (detached: false).

Thanks,
Yu Watanabe

On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com> wrote:

> Try adding "--experiments=beam_fn_api" to your pipeline options. (This is
> a known issue with Beam 2.15 that will be fixed in 2.16.)
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe <yu.w.ten...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I am trying to spin up the flink runner but looks like data serialization
>> is failing.
>> I would like to ask for help to get over with this error.
>>
>> ========================================================================
>> [flink-runner-job-invoker] ERROR
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error
>> during job invocation
>> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>> java.lang.IllegalArgumentException: unable to deserialize BoundedSource
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>         at
>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>>         at
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>>         at
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>>         at
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>>         at
>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>>         at
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>>         at
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>>         at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>>         at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>>         at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
>> ywatanabe@debian-09-00:~$
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>>         at
>> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>>         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>>         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>>         at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>>         at
>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>>         at
>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>>         at
>> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>>         ... 13 more
>> ========================================================================
>>
>> My beam version is below.
>>
>> =======================================================================
>> (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep apache-beam
>> apache-beam==2.15.0
>> =======================================================================
>>
>> I have my harness container ready on  the registry.
>>
>> =======================================================================
>> ywatanabe@debian-09-00:~$ docker search
>> ywatanabe-docker-apache.bintray.io/python3
>> NAME                DESCRIPTION         STARS               OFFICIAL
>>        AUTOMATED
>> beam/python3                            0
>> =======================================================================
>>
>> Flink is ready on separate cluster.
>>
>> =======================================================================
>> (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
>> tcp    LISTEN     0      128      :::8081                 :::*
>> =======================================================================
>>
>> My debian version.
>>
>> =======================================================================
>> (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
>> 9.11
>> =======================================================================
>>
>> My code snippet is below.
>>
>> =======================================================================
>>     options = PipelineOptions([
>>                   "--runner=FlinkRunner",
>>                   "--flink_version=1.8",
>>                   "--flink_master_url=localhost:8081"
>>               ])
>>
>>     with beam.Pipeline(options=options) as p:
>>
>>         (p | beam.Create(["Hello World"]))
>> =======================================================================
>>
>> Would there be any other settings should I look for ?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.ten...@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to