Hi all,
I am working through a use case where I would like to write events from
PubSubIO to GCS. The events are protobuf events so I used a custom
FileIO.Sink which is defined as:
// delimited writer
static class ProtobufFileIOSink<T extends Message> implements
FileIO.Sink<T> {
@Nullable private transient CodedOutputStream cos;
@Override
public void open(WritableByteChannel channel) {
this.cos =
CodedOutputStream.newInstance(Channels.newOutputStream(channel));
}
@Override
public void write(Message element) throws IOException {
if (element == null) {
return;
}
if (cos == null) {
return;
}
element.writeTo(cos);
}
@Override
public void flush() throws IOException {
if (cos != null) {
cos.flush();
}
}
}
The pipeline that I am using is this:
p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input
+ "-billing.proto").withIdAttribute("event_id"))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))))
.apply(FileIO.<Billing.BillingMeasurement>write().via(new
ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output +
"/billing.proto/").withSuffix(".pb"));
In order to make sifting through the output easier, I would like to have
the resulting file to be organized by year/month/day/hour so the hours
looks like:
gs://<bucket-name>/<prefixdir>/<year>/<month>/<day>/<hour>/[filename.pb.gz]
I tired looking through FileIO.writeDynamic() and FileNaming but I am not
sure if that is the correct place. Is there an example or another
implementation that someone can point me to that would be a good place to
look at.
— Ankur Chauhan