In the case that the data is too large for side input, you could do the same by reassigning timestamps of the BQ input to BoundedWindow.TIMESTAMP_MIN_VALUE (you would have to do that in a stateful DoFn with a timer having outputTimestamp set to TIMESTAMP_MIN_VALUE to hold watermark, or using splittable DoFn, or if BQ allows you to specify timestamp function use that directly).

In your BusinessLogic() you would set timer that would wait for watermark move (e.g. timer.offset(1).setRelative()) and buffer everything until the timer fires. Because the input from BQ is bounded, it will eventually advance to TIMESTAMP_MAX_VALUE which will fire the timer and flush the buffer.

I think this pattern might be useful on its own, so if you decided to implement it, it might be good to incorporate it into the core transforms (we already have Wait.on() which is somewhat similar). I can imagine a mini-workflow, that would take a bounded and unbounded PCollection, a DoFn and a function to be applied on the DoFn first for elements of the bounded PCollection and only after that start processing the unbounded one.

 Jan

On 2/27/23 20:59, Reuven Lax via dev wrote:
How large is this state spec stored in BQ? If the size isn't too large, you can read it from BQ and make it a side input into the DoFn.

On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <smo...@paloaltonetworks.com> wrote:

    We are trying to re-initialize our state specs in the
    BusinessLogic() DoFn from BQ.
    BQ has data about the state spec, and we would like to make sure
    that the state specs in our BusinessLogic() dofn are initialized
    before it starts consuming the pub/sub.

    This is for handling the case of redeployment of the dataflow jobs
    so that the states are preserved and the BusinessLogic() can work
    seamlessly as it was previously. All our dofns are operating in a
    global window and do not perform any aggregation.

    We are currently using Redis to preserve the state spec
    information but would like to explore using BQ as an alternative
    to Redis.

    On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org>
    wrote:

        My suggestion is to try to solve the problem in terms of what
        you want to compute. Instead of trying to control the
        operational aspects like "read all the BQ before reading
        Pubsub" there is presumably some reason that the BQ data
        naturally "comes first", for example if its timestamps are
        earlier or if there is a join or an aggregation that must
        include it. Whenever you think you want to set up an
        operational dependency between two things that "happen" in a
        pipeline, it is often best to pivot your thinking to the data
        and what you are trying to compute, and the built-in
        dependencies will solve the ordering problems.

        So - is there a way to describe your problem in terms of the
        data and what you are trying to compute?

        Kenn

        On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev
        <dev@beam.apache.org> wrote:

            First PCollections are completely unordered, so there is
            no guarantee on what order you'll see events in the
            flattened PCollection.

            There may be ways to process the BigQuery data in a
            separate transform first, but it depends on the structure
            of the data. How large is the BigQuery table? Are you
            doing any windowed aggregations here?

            Reuven

            On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak
            <smo...@paloaltonetworks.com> wrote:

                Yes, this is a streaming pipeline.

                Some more details about existing implementation v/s
                what we want to achieve.

                Current implementation:
                Reading from pub-sub:

                Pipeline input =Pipeline.create(options);

                PCollection<String> pubsubStream = input.apply("Read From 
Pubsub",PubsubIO.readMessagesWithAttributesAndMessageId()
                                                                
.fromSubscription(inputSubscriptionId))

                Reading from bigquery:

                PCollection<String> bqStream = input.apply("Read from 
BQ",BigQueryIO .readTableRows().fromQuery(bqQuery).usingStandardSql())

                .apply("JSon Transform", AsJsons.of(TableRow.class));

                Merge the inputs:

                PCollection<String> mergedInput 
=PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
Flatten.pCollections());

                Business Logic:

                mergedInput.apply("Business Logic", ParDo.of(new
                BusinessLogic()))

                Above logic is what we use currently in our pipeline.

                We want to make sure that we read from BigQuery first
                & pass the bqStream through our BusinessLogic() before
                we start consuming pubsubStream.

                Is there a way to achieve this?

                Thanks,

                Sahil


                On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax
                <re...@google.com> wrote:

                    Can you explain this use case some more? Is this a
                    streaming pipeline? If so, how are you reading
                    from BigQuery?

                    On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via
                    dev <dev@beam.apache.org> wrote:

                        Hi,

                        We have a requirement wherein we are consuming
                        input from pub/sub (PubSubIO) as well as BQ (BQIO)

                        We want to make sure that we consume the BQ
                        stream first before we start consuming the
                        data from pub-sub. Is there a way to achieve
                        this? Can you please help with some code samples?

                        Currently, we read data from big query using
                        BigQueryIO into a PCollection & also read data
                        from pubsub using PubsubIO. We then use the
                        flatten transform in this manner.

                        PCollection pubsubKvPairs = reads from pubsub
                        using PubsubIO
                        PCollection bigqueryKvPairs = reads from
                        bigquery using BigQueryIO

                        kvPairs 
=PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
Input",Flatten.pCollections());

                        Thanks,
                        Sahil

Reply via email to