Hi,
  Is google pubsub supported with beam running in flink?
I'm having a hard time getting to work a pipeline that consumes from Google
pubsub. It works perfectly with beam DirectRunner, but is refusing to work
with FlinkRunner. I debugged a lot and came to the conclusion that the
problem is triggered when translating my Python pipeline for flink runner.

I've found that it is not even submitting the job to flink, so for I've
seen the culprit is flink job server.
This is the full log when I run it locally:

$ python shares_pipeline.py -batch_size 10 --runner=FlinkRunner
--flink_master=localhost:8081 --flink_version=1.13 --streaming
--environment_type=LOOPBACK
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at
localhost:33805
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is
apache/beam_python3.7_sdk:2.31.0
INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol
scheme to flink_master parameter: http://localhost:8081
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.13-job-server/2.31.0/beam-runners-flink-1.13-job-server-2.31.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
'-jar'
'/home/comtom/.apache_beam/cache/jars/beam-runners-flink-1.13-job-server-2.31.0.jar'
'--flink-master' 'http://localhost:8081' '--artifacts-dir'
'/tmp/beam-temp3izv3a04/artifactsg943fqea' '--job-port' '48261'
'--artifact-port' '0' '--expansion-port' '0']
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM
org.apache.beam.runners.jobsubmission.JobServerDriver
createArtifactStagingService'
INFO:apache_beam.utils.subprocess_server:b'INFO: ArtifactStagingService
started on localhost:41579'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM
org.apache.beam.runners.jobsubmission.JobServerDriver
createExpansionService'
INFO:apache_beam.utils.subprocess_server:b'INFO: Java ExpansionService
started on localhost:41357'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM
org.apache.beam.runners.jobsubmission.JobServerDriver createJobServer'
INFO:apache_beam.utils.subprocess_server:b'INFO: JobService started on
localhost:48261'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM
org.apache.beam.runners.jobsubmission.JobServerDriver run'
INFO:apache_beam.utils.subprocess_server:b'INFO: Job server now running,
terminate with Ctrl+C'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
onNext'
INFO:apache_beam.utils.subprocess_server:b'INFO: Staging artifacts for
job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
resolveNextEnvironment'
INFO:apache_beam.utils.subprocess_server:b'INFO: Resolving artifacts for
job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.ref_Environment_default_environment_1.'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
onNext'
INFO:apache_beam.utils.subprocess_server:b'INFO: Getting 0 artifacts for
job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.null.'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2
finishStaging'
INFO:apache_beam.utils.subprocess_server:b'INFO: Artifacts fully staged for
job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM
org.apache.beam.runners.flink.FlinkJobInvoker invokeWithExecutor'
INFO:apache_beam.utils.subprocess_server:b'INFO: Invoking job
BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250
with pipeline runner
org.apache.beam.runners.flink.FlinkPipelineRunner@75975728'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:49 PM
org.apache.beam.runners.jobsubmission.JobInvocation start'
INFO:apache_beam.utils.subprocess_server:b'INFO: Starting job invocation
BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250'
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK"
has started a component necessary for the execution. Be sure to run the
pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
RUNNING
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:49 PM
org.apache.beam.runners.flink.FlinkPipelineRunner runPipelineWithTranslator'
INFO:apache_beam.utils.subprocess_server:b'INFO: Translating pipeline to
Flink program.'
INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:49 PM
org.apache.beam.runners.jobsubmission.JobInvocation$1 onFailure'
INFO:apache_beam.utils.subprocess_server:b'SEVERE: Error during job
invocation
BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250.'
INFO:apache_beam.utils.subprocess_server:b'java.lang.IllegalArgumentException:
PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1,
PCollection=unique_name: "22Read From Pub/Sub/Read.None"'
INFO:apache_beam.utils.subprocess_server:b'coder_id:
"ref_Coder_BytesCoder_1"'
INFO:apache_beam.utils.subprocess_server:b'is_bounded: UNBOUNDED'
INFO:apache_beam.utils.subprocess_server:b'windowing_strategy_id:
"ref_Windowing_Windowing_1"'
INFO:apache_beam.utils.subprocess_server:b'}] were consumed but never
produced'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:112)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)'
INFO:apache_beam.utils.subprocess_server:b'\tat
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)'
INFO:apache_beam.utils.subprocess_server:b'\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)'
INFO:apache_beam.utils.subprocess_server:b'\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)'
INFO:apache_beam.utils.subprocess_server:b'\tat
java.base/java.lang.Thread.run(Thread.java:832)'
INFO:apache_beam.utils.subprocess_server:b''
ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name:
"22Read From Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
INFO:apache_beam.runners.portability.portable_runner:Job state changed to
FAILED
Traceback (most recent call last):
  File "shares_pipeline.py", line 80, in <module>
    run(known_args, pipeline_args)
  File "shares_pipeline.py", line 51, in run
    p = create_pipeline(pipeline_options, known_args)
  File "shares_pipeline.py", line 38, in create_pipeline
    | "print" >> beam.Map(print)
  File
"/home/comtom/.pyenv/versions/venv/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 586, in __exit__
    self.result.wait_until_finish()
  File
"/home/comtom/.pyenv/versions/venv/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 602, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250
failed in state FAILED: java.lang.IllegalArgumentException:
PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1,
PCollection=unique_name: "22Read From Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced


I've even simplified the pipeline and the same error is triggered, this is
the pipeline definition:

with beam.Pipeline(options=pipeline_options) as p:
(
p
| "Read From Pub/Sub" >>
beam.io.ReadFromPubSub(topic=known_args.topic)
| "print" >> beam.Map(print)
)


Could you give me a hint on what to look for?

Reply via email to