Hey folks,

I've got a Beam/Python pipeline that works on the DirectRunner and now am
trying to run this on a local dev Flink cluster. Running this yields an
error out the gate around not being able to deserialize UnboundedSource (my
PubSub source). I'm not sure how to debug this and would love to get some
feedback on how to solve this issue.

Beam SDK: 2.19
Flink: 1.9.3
Python: 3.7
Beam args: ['--runner=FlinkRunner', '--flink_version=1.9',
'--flink_submit_uber_jar', '--streaming']
(Stacktrace below)

-Pradip

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
ArtifactStagingService started on localhost:55371
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
ExpansionService started on localhost:55372
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:55364
[grpc-default-executor-0] INFO
org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
with pipeline runner
org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting
job invocation
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
Streaming Environment.
[flink-runner-job-invoker] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error
during job invocation
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
    at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
    at
org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
    at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
    at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
    at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
    at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
    at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
    at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
    at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
    at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
    at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
    at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
    at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
    ... 14 more
ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
[flink-runner-job-invoker] WARN
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService -
Failed to remove job staging directory for token
{"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}:
{}
java.io.FileNotFoundException:
/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST
(No such file or directory)
    at java.io.FileInputStream.open0(Native Method)
    at java.io.FileInputStream.open(FileInputStream.java:195)
    at java.io.FileInputStream.<init>(FileInputStream.java:138)
    at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
    at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
    at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
    at
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
    at
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
    at
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
    at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
    at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
    at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
    at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "bin/run-pipeline.py", line 70, in <module>
    main()
  File "<redacted venv
location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "<redacted venv
location>/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "<redacted venv
location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "<redacted venv
location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "bin/run-pipeline.py", line 64, in main
    job = runner.run(pipeline=pipeline)
  File
"/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py",
line 42, in run
    result = dag.run()
  File "<redacted venv
location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474,
in run
    return self.runner.run_pipeline(self, self._options)
  File "<redacted venv
location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py",
line 47, in run_pipeline
    return super(FlinkRunner, self).run_pipeline(pipeline, options)
  File "<redacted venv
location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 334, in run_pipeline
    result.wait_until_finish()
  File "<redacted venv
location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 455, in wait_until_finish
    self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)

Reply via email to