Hey folks, 

I posted this on the Flink user mailing list but didn't get any traction there 
(potentially since this is Beam related?). I've got a Beam/Python pipeline that 
works on the DirectRunner and now am trying to run this on a local dev Flink 
cluster. Running this yields an error out the gate around not being able to 
deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this 
and would love to get some feedback on how to solve this issue. I'm also adding 
in a simple example that reproduces this error.

Beam SDK: 2.19
Flink: 1.9.3
Python: 3.7
Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
'--flink_submit_uber_jar', '--streaming']
(Stacktrace below)

#!/usr/bin/env python3
import apache_beam as beam

class DummyPipeline(beam.PTransform):
    def expand(self, p):
        (
            p
            | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
                topic="<valid topic>")
            | beam.Map(print)
        )

        return p

def main():
    beam_options = [
        # "--runner=DirectRunner",
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_submit_uber_jar",
        "--streaming",
        '--save_main_session',
    ]
    popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
    p = beam.Pipeline(options=popts)

    (
        p
        | "Do It" >> DummyPipeline()
    )
    job = p.run()
    job.wait_until_finish()

if __name__ == "__main__":
    main()

-Pradip

[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on localhost:55371
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
Java ExpansionService started on localhost:55372
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
JobService started on localhost:55364
[grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - 
Invoking job 
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f with 
pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
[grpc-default-executor-0] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job 
invocation 
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
Flink program.
[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming 
Environment.
[flink-runner-job-invoker] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during 
job invocation 
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
    at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
    at 
org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
    at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
    at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
    at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
    at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
    at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
    at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
    at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
    at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
    at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
    at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
    at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
    ... 14 more
ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
[flink-runner-job-invoker] WARN 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Failed 
to remove job staging directory for token 
{"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}:
 {}
java.io.FileNotFoundException: 
/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST
 (No such file or directory)
    at java.io.FileInputStream.open0(Native Method)
    at java.io.FileInputStream.open(FileInputStream.java:195)
    at java.io.FileInputStream.<init>(FileInputStream.java:138)
    at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
    at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
    at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
    at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
    at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
    at 
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
    at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
    at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
    at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
    at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "bin/run-pipeline.py", line 70, in <module>
    main()
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
line 829, in __call__
    return self.main(*args, **kwargs)
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
line 782, in main
    rv = self.invoke(ctx)
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
line 610, in invoke
    return callback(*args, **kwargs)
  File "bin/run-pipeline.py", line 64, in main
    job = runner.run(pipeline=pipeline)
  File 
"/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py",
 line 42, in run
    result = dag.run()
  File "<redacted venv 
location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
    return self.runner.run_pipeline(self, self._options)
  File "<redacted venv 
location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py",
 line 47, in run_pipeline
    return super(FlinkRunner, self).run_pipeline(pipeline, options)
  File "<redacted venv 
location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 334, in run_pipeline
    result.wait_until_finish()
  File "<redacted venv 
location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 455, in wait_until_finish
    self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline 
BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f 
failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)

Reply via email to