Just wanted to follow up to mention that the topic format is probably a red 
herring for this issue. I noticed the Go documentation uses the same formats 
for PubSubWritePayload and PubSubReadPayload [1] [2] and while it may be that 
it needs an update, the Python SDK, also seemingly with stale documentation, 
still wants the projects/<project>/topics/<topic> format when it comes down to 
it.

​> ValueError: PubSub topic must be in the form 
"projects/<project>/topics/<topic>" (got '/topics/xyz-123/test').


I am seeing similar issues with the Python SDK where no output is sent to the 
output topic, but the symptoms are slightly different, presumably because of 
SDK differences. The following is the pipeline I've been testing with. I will 
be investigating a bit further but will reach for the workaround in the 
near-term.


> import argparse
> import logging
> 
> import apache_beam as beam
> 
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
> #from apache_beam.io.external.gcp.pubsub import ReadFromPubSub, WriteToPubSub
> from apache_beam.io import ReadFromPubSub, WriteToPubSub
> 
> 
> if __name__ == '__main__':
>     parser = argparse.ArgumentParser()
>     logging.getLogger().setLevel(logging.INFO)
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     with beam.Pipeline(options=options) as pipeline:
>         (pipeline
>         | ReadFromPubSub(topic="projects/xyz-123/topics/input")
>         | WriteToPubSub(topic="projects/xyz-123/topics/output"))


[1] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
[2] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload

From: Luke Cwik <lc...@google.com>
Sent: 19 November 2021 19:30
To: user@beam.apache.org <user@beam.apache.org>; Robert Burke <r...@google.com>
Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow 
 
+Robert Burke 

On Tue, Nov 16, 2021 at 10:41 AM Hannes Gustafsson <han...@hoxtonanalytics.com> 
wrote:
While trying to reproduce the pipeline using the Python SDK I've noticed that 
the topic format is different for the write transform [1] compared to the read 
transform [2]. It seems it uses /topics/<project>/<topic> and 
/projects/<project>/topics/<topic> respectively. This is also documented in the 
Python SDK documentation [3].

Although note the doc string for PubSubWritePayload says

> // Topic format is: /topics/project_id/subscription_name

presumably meaning topic_name rather than subscription_name.

I'll try using the different format and report back.

[1] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
[2] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
[3] 
https://beam.apache.org/releases/pydoc/2.34.0/apache_beam.io.external.gcp.pubsub.html

Reply via email to