Quick update: this test code works just fine on Dataflow as well as the 
DirectRunner. Looks like the FlinkRunner is problematic for some reason here.

On 2020/06/08 20:11:13, Pradip Thachile <pra...@thachile.com> wrote: 
> Hey folks, 
> 
> I posted this on the Flink user mailing list but didn't get any traction 
> there (potentially since this is Beam related?). I've got a Beam/Python 
> pipeline that works on the DirectRunner and now am trying to run this on a 
> local dev Flink cluster. Running this yields an error out the gate around not 
> being able to deserialize UnboundedSource (my PubSub source). I'm not sure 
> how to debug this and would love to get some feedback on how to solve this 
> issue. I'm also adding in a simple example that reproduces this error.
> 
> Beam SDK: 2.19
> Flink: 1.9.3
> Python: 3.7
> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
> '--flink_submit_uber_jar', '--streaming']
> (Stacktrace below)
> 
> #!/usr/bin/env python3
> import apache_beam as beam
> 
> class DummyPipeline(beam.PTransform):
>     def expand(self, p):
>         (
>             p
>             | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
>                 topic="<valid topic>")
>             | beam.Map(print)
>         )
> 
>         return p
> 
> def main():
>     beam_options = [
>         # "--runner=DirectRunner",
>         "--runner=FlinkRunner",
>         "--flink_version=1.9",
>         "--flink_submit_uber_jar",
>         "--streaming",
>         '--save_main_session',
>     ]
>     popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
>     p = beam.Pipeline(options=popts)
> 
>     (
>         p
>         | "Do It" >> DummyPipeline()
>     )
>     job = p.run()
>     job.wait_until_finish()
> 
> if __name__ == "__main__":
>     main()
> 
> -Pradip
> 
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - ArtifactStagingService started on localhost:55371
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - Java ExpansionService started on localhost:55372
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - JobService started on localhost:55364
> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker 
> - Invoking job 
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f 
> with pipeline runner 
> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
> [grpc-default-executor-0] INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting 
> job invocation 
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
> [flink-runner-job-invoker] INFO 
> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
> Flink program.
> [flink-runner-job-invoker] INFO 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a 
> Streaming Environment.
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation 
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
>     at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>     at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>     at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>     at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>     at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>     at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>     at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>     at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>     at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>     at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>     at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>     at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>     at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>     at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>     at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>     at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>     ... 14 more
> ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> [flink-runner-job-invoker] WARN 
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Failed 
> to remove job staging directory for token 
> {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}:
>  {}
> java.io.FileNotFoundException: 
> /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST
>  (No such file or directory)
>     at java.io.FileInputStream.open0(Native Method)
>     at java.io.FileInputStream.open(FileInputStream.java:195)
>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
>     at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>     at 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
>     at 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
>     at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
>     at 
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
>     at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
>     at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
>     at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
>     at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Traceback (most recent call last):
>   File "bin/run-pipeline.py", line 70, in <module>
>     main()
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
> line 829, in __call__
>     return self.main(*args, **kwargs)
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
> line 782, in main
>     rv = self.invoke(ctx)
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
> line 1066, in invoke
>     return ctx.invoke(self.callback, **ctx.params)
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
> line 610, in invoke
>     return callback(*args, **kwargs)
>   File "bin/run-pipeline.py", line 64, in main
>     job = runner.run(pipeline=pipeline)
>   File 
> "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py",
>  line 42, in run
>     result = dag.run()
>   File "<redacted venv 
> location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in 
> run
>     return self.runner.run_pipeline(self, self._options)
>   File "<redacted venv 
> location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py",
>  line 47, in run_pipeline
>     return super(FlinkRunner, self).run_pipeline(pipeline, options)
>   File "<redacted venv 
> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 334, in run_pipeline
>     result.wait_until_finish()
>   File "<redacted venv 
> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 455, in wait_until_finish
>     self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline 
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f 
> failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> 
> 

Reply via email to