This issue is still present on v2.38.0 for us, and we're working around it 
using a custom sink and the upstream PubSub API client directly.

I've now had a chance to experiment further and have made some progress.

Based on trial and error, it seems that the pubsubio.Write transform does not 
serialize messages into the format that the external transform expects, somehow.

Without modification I'm not able to write anything back to PubSub using the 
previous example, regardless of pubsubio.ReadOptions.WithAttributes. However, 
when modifying the pubsubio.Write transform to pass along 
PCollecton<*PubSubMessage> verbatim and turn PCollection<[]byte> to 
PCollection<*pb.PubsubMessage>, then it all starts to come through.

Here is the modification

> diff --git a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go 
> b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> index 88b7395f44..e7d005f84f 100644
> --- a/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> +++ b/sdks/go/pkg/beam/io/pubsubio/pubsubio.go
> @@ -38,6 +38,7 @@ var (
>  func init() {
>         beam.RegisterType(reflect.TypeOf((*pb.PubsubMessage)(nil)).Elem())
>         beam.RegisterFunction(unmarshalMessageFn)
> + beam.RegisterFunction(wrapFn)
>  }
>
>  // ReadOptions represents options for reading from PubSub.
> @@ -81,6 +82,13 @@ func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, 
> error) {
>         return &msg, nil
>  }
>
> +func wrapFn(data []byte) *pb.PubsubMessage {
> + m := pb.PubsubMessage{
> +         Data: data,
> + }
> + return &m
> +}
> +
>  // Write writes PubSubMessages or bytes to the given pubsub topic.
>  func Write(s beam.Scope, project, topic string, col beam.PCollection) {
>         s = s.Scope("pubsubio.Write")
> @@ -90,8 +98,8 @@ func Write(s beam.Scope, project, topic string, col 
> beam.PCollection) {
>         }
>
>         out := col
> -   if col.Type().Type() != reflectx.ByteSlice {
> -           out = beam.ParDo(s, proto.Marshal, col)
> + if col.Type().Type() == reflectx.ByteSlice {
> +         out = beam.ParDo(s, wrapFn, col)
>         }
>         beam.External(s, writeURN, protox.MustEncode(payload), 
> []beam.PCollection{out}, nil, false)
>  }



________________________________
From: Hannes Gustafsson <han...@hoxtonanalytics.com>
Sent: 22 November 2021 09:12
To: user@beam.apache.org <user@beam.apache.org>; Robert Burke <r...@google.com>
Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow

Just wanted to follow up to mention that the topic format is probably a red 
herring for this issue. I noticed the Go documentation uses the same formats 
for PubSubWritePayload and PubSubReadPayload [1] [2] and while it may be that 
it needs an update, the Python SDK, also seemingly with stale documentation, 
still wants the projects/<project>/topics/<topic> format when it comes down to 
it.

​> ValueError: PubSub topic must be in the form 
"projects/<project>/topics/<topic>" (got '/topics/xyz-123/test').


I am seeing similar issues with the Python SDK where no output is sent to the 
output topic, but the symptoms are slightly different, presumably because of 
SDK differences. The following is the pipeline I've been testing with. I will 
be investigating a bit further but will reach for the workaround in the 
near-term.


> import argparse
> import logging
>
> import apache_beam as beam
>
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
> #from apache_beam.io.external.gcp.pubsub import ReadFromPubSub, WriteToPubSub
> from apache_beam.io import ReadFromPubSub, WriteToPubSub
>
>
> if __name__ == '__main__':
>     parser = argparse.ArgumentParser()
>     logging.getLogger().setLevel(logging.INFO)
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     with beam.Pipeline(options=options) as pipeline:
>         (pipeline
>         | ReadFromPubSub(topic="projects/xyz-123/topics/input")
>         | WriteToPubSub(topic="projects/xyz-123/topics/output"))


[1] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
[2] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload

From: Luke Cwik <lc...@google.com>
Sent: 19 November 2021 19:30
To: user@beam.apache.org <user@beam.apache.org>; Robert Burke <r...@google.com>
Subject: Re: Go SDK pubsubio.Write does not output anything running on Dataflow

+Robert Burke

On Tue, Nov 16, 2021 at 10:41 AM Hannes Gustafsson <han...@hoxtonanalytics.com> 
wrote:
While trying to reproduce the pipeline using the Python SDK I've noticed that 
the topic format is different for the write transform [1] compared to the read 
transform [2]. It seems it uses /topics/<project>/<topic> and 
/projects/<project>/topics/<topic> respectively. This is also documented in the 
Python SDK documentation [3].

Although note the doc string for PubSubWritePayload says

> // Topic format is: /topics/project_id/subscription_name

presumably meaning topic_name rather than subscription_name.

I'll try using the different format and report back.

[1] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubWritePayload
[2] 
https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1#PubSubReadPayload
[3] 
https://beam.apache.org/releases/pydoc/2.34.0/apache_beam.io.external.gcp.pubsub.html

Reply via email to