Hi all,

I am working through a use case where I would like to write events from
PubSubIO to GCS. The events are protobuf events so I used a custom
FileIO.Sink which is defined as:

    // delimited writer
    static class ProtobufFileIOSink<T extends Message> implements
FileIO.Sink<T> {
        @Nullable private transient CodedOutputStream cos;

        @Override
        public void open(WritableByteChannel channel) {
            this.cos =
CodedOutputStream.newInstance(Channels.newOutputStream(channel));
        }

        @Override
        public void write(Message element) throws IOException {
            if (element == null) {
                return;
            }

            if (cos == null) {
                return;
            }

            element.writeTo(cos);
        }

        @Override
        public void flush() throws IOException {
            if (cos != null) {
                cos.flush();
            }
        }
    }

The pipeline that I am using is this:

p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input
+ "-billing.proto").withIdAttribute("event_id"))
 .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))))
 .apply(FileIO.<Billing.BillingMeasurement>write().via(new
ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output +
"/billing.proto/").withSuffix(".pb"));

In order to make sifting through the output easier, I would like to have
the resulting file to be organized by year/month/day/hour so the hours
looks like:

gs://<bucket-name>/<prefixdir>/<year>/<month>/<day>/<hour>/[filename.pb.gz]

I tired looking through FileIO.writeDynamic() and FileNaming but I am not
sure if that is the correct place. Is there an example or another
implementation that someone can point me to that would be a good place to
look at.

— Ankur Chauhan

Reply via email to