On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak <smo...@paloaltonetworks.com>
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>

I was referring to order in general, across your whole data set as an
abstract concept. If order _really_ doesn't matter, then you wouldn't need
to read the BQ data first. You could just flatten them together and run the
pipeline like that. So I think there is some order-dependence that you want
to represent at the data level.

Kenn


> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
>>>> We are trying to re-initialize our state specs in the BusinessLogic()
>>>> DoFn from BQ.
>>>> BQ has data about the state spec, and we would like to make sure that
>>>> the state specs in our BusinessLogic() dofn are initialized before it
>>>> starts consuming the pub/sub.
>>>>
>>>> This is for handling the case of redeployment of the dataflow jobs so
>>>> that the states are preserved and the BusinessLogic() can work seamlessly
>>>> as it was previously. All our dofns are operating in a global window and do
>>>> not perform any aggregation.
>>>>
>>>> We are currently using Redis to preserve the state spec information but
>>>> would like to explore using BQ as an alternative to Redis.
>>>>
>>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> My suggestion is to try to solve the problem in terms of what you want
>>>>> to compute. Instead of trying to control the operational aspects like 
>>>>> "read
>>>>> all the BQ before reading Pubsub" there is presumably some reason that the
>>>>> BQ data naturally "comes first", for example if its timestamps are earlier
>>>>> or if there is a join or an aggregation that must include it. Whenever you
>>>>> think you want to set up an operational dependency between two things that
>>>>> "happen" in a pipeline, it is often best to pivot your thinking to the 
>>>>> data
>>>>> and what you are trying to compute, and the built-in dependencies will
>>>>> solve the ordering problems.
>>>>>
>>>>> So - is there a way to describe your problem in terms of the data and
>>>>> what you are trying to compute?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> First PCollections are completely unordered, so there is no guarantee
>>>>>> on what order you'll see events in the flattened PCollection.
>>>>>>
>>>>>> There may be ways to process the BigQuery data in a
>>>>>> separate transform first, but it depends on the structure of the data. 
>>>>>> How
>>>>>> large is the BigQuery table? Are you doing any windowed aggregations 
>>>>>> here?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>>>>> smo...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Yes, this is a streaming pipeline.
>>>>>>>
>>>>>>> Some more details about existing implementation v/s what we want to
>>>>>>> achieve.
>>>>>>>
>>>>>>> Current implementation:
>>>>>>> Reading from pub-sub:
>>>>>>>
>>>>>>> Pipeline input = Pipeline.create(options);
>>>>>>>
>>>>>>> PCollection<String> pubsubStream = input.apply("Read From Pubsub", 
>>>>>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>>>>>                                                
>>>>>>> .fromSubscription(inputSubscriptionId))
>>>>>>>
>>>>>>>
>>>>>>> Reading from bigquery:
>>>>>>>
>>>>>>> PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO
>>>>>>>         .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>>>>>
>>>>>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>>>>>
>>>>>>>
>>>>>>> Merge the inputs:
>>>>>>>
>>>>>>> PCollection<String> mergedInput = 
>>>>>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>>>>>> Flatten.pCollections());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Business Logic:
>>>>>>>
>>>>>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Above logic is what we use currently in our pipeline.
>>>>>>>
>>>>>>> We want to make sure that we read from BigQuery first & pass the 
>>>>>>> bqStream through our BusinessLogic() before we start consuming 
>>>>>>> pubsubStream.
>>>>>>>
>>>>>>> Is there a way to achieve this?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Sahil
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you explain this use case some more? Is this a streaming
>>>>>>>> pipeline? If so, how are you reading from BigQuery?
>>>>>>>>
>>>>>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have a requirement wherein we are consuming input from pub/sub
>>>>>>>>> (PubSubIO) as well as BQ (BQIO)
>>>>>>>>>
>>>>>>>>> We want to make sure that we consume the BQ stream first before we
>>>>>>>>> start consuming the data from pub-sub. Is there a way to achieve 
>>>>>>>>> this? Can
>>>>>>>>> you please help with some code samples?
>>>>>>>>>
>>>>>>>>> Currently, we read data from big query using BigQueryIO into a
>>>>>>>>> PCollection & also read data from pubsub using PubsubIO. We then use 
>>>>>>>>> the
>>>>>>>>> flatten transform in this manner.
>>>>>>>>>
>>>>>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>>>>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>>>>>>>
>>>>>>>>> kvPairs = 
>>>>>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>>>>>>>>> Input", Flatten.pCollections());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Sahil
>>>>>>>>>
>>>>>>>>>

Reply via email to