This issue is still present on v2.38.0 for us, and we're working around it using a custom sink and the upstream PubSub API client directly.
I've now had a chance to experiment further and have made some progress. Based on trial and error, it seems that the pubsubio.Write transform does not serialize messages into the format that the external transform expects, somehow. Without modification I'm not able to write anything back to PubSub using the previous example, regardless of pubsubio.ReadOptions.WithAttributes. However, when modifying the pubsubio.Write transform to pass along PCollecton<*PubSubMessage> verbatim and turn PCollection<[]byte> to PCollection<*pb.PubsubMessage>, then it all starts to come through. Here is the modification > diff --git a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go > b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go > index 88b7395f44..e7d005f84f 100644 > --- a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go > +++ b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go > @@ -38,6 +38,7 @@ var ( > func init() { > beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem()) > beam.RegisterFunction(unmarshalMessageFn) > + beam.RegisterFunction(wrapFn) > } > > // ReadOptions represents options for reading from PubSub. > @@ -81,6 +82,13 @@ func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, > error) { > return &msg, nil > } > > +func wrapFn(data []byte) *pb.PubsubMessage { > + m := pb.PubsubMessage{ > + Data: data, > + } > + return &m > +} > + > // Write writes PubSubMessages or bytes to the given pubsub topic. > func Write(s beam.Scope, project, topic string, col beam.PCollection) { > s = s.Scope("pubsubio.Write") > @@ -90,8 +98,8 @@ func Write(s beam.Scope, project, topic string, col > beam.PCollection) { > } > > out := col > - if col.Type().Type() != reflectx.ByteSlice { > - out = beam.ParDo(s, proto.Marshal, col) > + if col.Type().Type() == reflectx.ByteSlice { > + out = beam.ParDo(s, wrapFn, col) > } > beam.External(s, writeURN, protox.MustEncode(payload), > []beam.PCollection{out}, nil, false) > } ________________________________ From: Hannes Gustafsson <han...@hoxtonanalytics.com> Sent: 22 November 2021 09:12 To: user@beam.apache.org <user@beam.apache.org>; Robert Burke <r...@google.com> Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow Just wanted to follow up to mention that the topic format is probably a red herring for this issue. I noticed the Go documentation uses the same formats for PubSubWritePayload and PubSubReadPayload [1] [2] and while it may be that it needs an update, the Python SDK, also seemingly with stale documentation, still wants the projects/<project>/topics/<topic> format when it comes down to it. > ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got '/topics/xyz-123/test'). I am seeing similar issues with the Python SDK where no output is sent to the output topic, but the symptoms are slightly different, presumably because of SDK differences. The following is the pipeline I've been testing with. I will be investigating a bit further but will reach for the workaround in the near-term. > import argparse > import logging > > import apache_beam as beam > > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.options.pipeline_options import StandardOptions > #from apache_beam.io.external.gcp.pubsub import ReadFromPubSub, WriteToPubSub > from apache_beam.io import ReadFromPubSub, WriteToPubSub > > > if __name__ == '__main__': > parser = argparse.ArgumentParser() > logging.getLogger().setLevel(logging.INFO) > options = PipelineOptions() > options.view_as(StandardOptions).streaming = True > with beam.Pipeline(options=options) as pipeline: > (pipeline > | ReadFromPubSub(topic="projects/xyz-123/topics/input") > | WriteToPubSub(topic="projects/xyz-123/topics/output")) [1] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload [2] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload From: Luke Cwik <lc...@google.com> Sent: 19 November 2021 19:30 To: user@beam.apache.org <user@beam.apache.org>; Robert Burke <r...@google.com> Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow +Robert Burke On Tue, Nov 16, 2021 at 10:41 AM Hannes Gustafsson <han...@hoxtonanalytics.com> wrote: While trying to reproduce the pipeline using the Python SDK I've noticed that the topic format is different for the write transform [1] compared to the read transform [2]. It seems it uses /topics/<project>/<topic> and /projects/<project>/topics/<topic> respectively. This is also documented in the Python SDK documentation [3]. Although note the doc string for PubSubWritePayload says > // Topic format is: /topics/project_id/subscription_name presumably meaning topic_name rather than subscription_name. I'll try using the different format and report back. [1] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload [2] https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload [3] https://beam.apache.org/releases/pydoc/2.34.0/apache_beam.io.external.gcp.pubsub.html