In the case that the data is too large for side input, you could do the
same by reassigning timestamps of the BQ input to
BoundedWindow.TIMESTAMP_MIN_VALUE (you would have to do that in a
stateful DoFn with a timer having outputTimestamp set to
TIMESTAMP_MIN_VALUE to hold watermark, or using splittable DoFn, or if
BQ allows you to specify timestamp function use that directly).
In your BusinessLogic() you would set timer that would wait for
watermark move (e.g. timer.offset(1).setRelative()) and buffer
everything until the timer fires. Because the input from BQ is bounded,
it will eventually advance to TIMESTAMP_MAX_VALUE which will fire the
timer and flush the buffer.
I think this pattern might be useful on its own, so if you decided to
implement it, it might be good to incorporate it into the core
transforms (we already have Wait.on() which is somewhat similar). I can
imagine a mini-workflow, that would take a bounded and unbounded
PCollection, a DoFn and a function to be applied on the DoFn first for
elements of the bounded PCollection and only after that start processing
the unbounded one.
Jan
On 2/27/23 20:59, Reuven Lax via dev wrote:
How large is this state spec stored in BQ? If the size isn't too
large, you can read it from BQ and make it a side input into the DoFn.
On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak
<smo...@paloaltonetworks.com> wrote:
We are trying to re-initialize our state specs in the
BusinessLogic() DoFn from BQ.
BQ has data about the state spec, and we would like to make sure
that the state specs in our BusinessLogic() dofn are initialized
before it starts consuming the pub/sub.
This is for handling the case of redeployment of the dataflow jobs
so that the states are preserved and the BusinessLogic() can work
seamlessly as it was previously. All our dofns are operating in a
global window and do not perform any aggregation.
We are currently using Redis to preserve the state spec
information but would like to explore using BQ as an alternative
to Redis.
On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org>
wrote:
My suggestion is to try to solve the problem in terms of what
you want to compute. Instead of trying to control the
operational aspects like "read all the BQ before reading
Pubsub" there is presumably some reason that the BQ data
naturally "comes first", for example if its timestamps are
earlier or if there is a join or an aggregation that must
include it. Whenever you think you want to set up an
operational dependency between two things that "happen" in a
pipeline, it is often best to pivot your thinking to the data
and what you are trying to compute, and the built-in
dependencies will solve the ordering problems.
So - is there a way to describe your problem in terms of the
data and what you are trying to compute?
Kenn
On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev
<dev@beam.apache.org> wrote:
First PCollections are completely unordered, so there is
no guarantee on what order you'll see events in the
flattened PCollection.
There may be ways to process the BigQuery data in a
separate transform first, but it depends on the structure
of the data. How large is the BigQuery table? Are you
doing any windowed aggregations here?
Reuven
On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak
<smo...@paloaltonetworks.com> wrote:
Yes, this is a streaming pipeline.
Some more details about existing implementation v/s
what we want to achieve.
Current implementation:
Reading from pub-sub:
Pipeline input =Pipeline.create(options);
PCollection<String> pubsubStream = input.apply("Read From
Pubsub",PubsubIO.readMessagesWithAttributesAndMessageId()
.fromSubscription(inputSubscriptionId))
Reading from bigquery:
PCollection<String> bqStream = input.apply("Read from
BQ",BigQueryIO .readTableRows().fromQuery(bqQuery).usingStandardSql())
.apply("JSon Transform", AsJsons.of(TableRow.class));
Merge the inputs:
PCollection<String> mergedInput
=PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input",
Flatten.pCollections());
Business Logic:
mergedInput.apply("Business Logic", ParDo.of(new
BusinessLogic()))
Above logic is what we use currently in our pipeline.
We want to make sure that we read from BigQuery first
& pass the bqStream through our BusinessLogic() before
we start consuming pubsubStream.
Is there a way to achieve this?
Thanks,
Sahil
On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax
<re...@google.com> wrote:
Can you explain this use case some more? Is this a
streaming pipeline? If so, how are you reading
from BigQuery?
On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via
dev <dev@beam.apache.org> wrote:
Hi,
We have a requirement wherein we are consuming
input from pub/sub (PubSubIO) as well as BQ (BQIO)
We want to make sure that we consume the BQ
stream first before we start consuming the
data from pub-sub. Is there a way to achieve
this? Can you please help with some code samples?
Currently, we read data from big query using
BigQueryIO into a PCollection & also read data
from pubsub using PubsubIO. We then use the
flatten transform in this manner.
PCollection pubsubKvPairs = reads from pubsub
using PubsubIO
PCollection bigqueryKvPairs = reads from
bigquery using BigQueryIO
kvPairs
=PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge
Input",Flatten.pCollections());
Thanks,
Sahil