On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak <smo...@paloaltonetworks.com> wrote:
> The number of keys/data in BQ would not be constant and grow with time. > > A rough estimate would be around 300k keys with an average size of 5kb per > key. Both the count of the keys and the size of the key would be feature > dependent (based on the upstream pipelines) and we won't have control over > this in the future. > > Using big query client would mean we would have to run individual queries > for each of these 300k keys from the BusinessLogic() dofn which operates in > a global window KV > > Also, the order of the data from BQ would not matter to us since the only > thing we are trying to solve here is regaining the state spec information > before starting to consume pub/sub. > I was referring to order in general, across your whole data set as an abstract concept. If order _really_ doesn't matter, then you wouldn't need to read the BQ data first. You could just flatten them together and run the pipeline like that. So I think there is some order-dependence that you want to represent at the data level. Kenn > I will explore using Wait.on(bigquery) before pub/sub read since I am not > sure if side input would be the best option here. > > > On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles <k...@apache.org> wrote: > >> I'm also curious how much you depend on order to get the state contents >> right. The ordering of the side input will be arbitrary, and even the >> streaming input can have plenty of out of order messages. So I want to >> think about what are the data dependencies that result in the requirement >> of order. Or if there are none and you just want to know that all the past >> data has been processed, Niel's idea is one solution. It isn't parallel, >> though. >> >> Kenn >> >> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax <re...@google.com> wrote: >> >>> How large is this state spec stored in BQ? If the size isn't too large, >>> you can read it from BQ and make it a side input into the DoFn. >>> >>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak < >>> smo...@paloaltonetworks.com> wrote: >>> >>>> We are trying to re-initialize our state specs in the BusinessLogic() >>>> DoFn from BQ. >>>> BQ has data about the state spec, and we would like to make sure that >>>> the state specs in our BusinessLogic() dofn are initialized before it >>>> starts consuming the pub/sub. >>>> >>>> This is for handling the case of redeployment of the dataflow jobs so >>>> that the states are preserved and the BusinessLogic() can work seamlessly >>>> as it was previously. All our dofns are operating in a global window and do >>>> not perform any aggregation. >>>> >>>> We are currently using Redis to preserve the state spec information but >>>> would like to explore using BQ as an alternative to Redis. >>>> >>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> My suggestion is to try to solve the problem in terms of what you want >>>>> to compute. Instead of trying to control the operational aspects like >>>>> "read >>>>> all the BQ before reading Pubsub" there is presumably some reason that the >>>>> BQ data naturally "comes first", for example if its timestamps are earlier >>>>> or if there is a join or an aggregation that must include it. Whenever you >>>>> think you want to set up an operational dependency between two things that >>>>> "happen" in a pipeline, it is often best to pivot your thinking to the >>>>> data >>>>> and what you are trying to compute, and the built-in dependencies will >>>>> solve the ordering problems. >>>>> >>>>> So - is there a way to describe your problem in terms of the data and >>>>> what you are trying to compute? >>>>> >>>>> Kenn >>>>> >>>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev < >>>>> dev@beam.apache.org> wrote: >>>>> >>>>>> First PCollections are completely unordered, so there is no guarantee >>>>>> on what order you'll see events in the flattened PCollection. >>>>>> >>>>>> There may be ways to process the BigQuery data in a >>>>>> separate transform first, but it depends on the structure of the data. >>>>>> How >>>>>> large is the BigQuery table? Are you doing any windowed aggregations >>>>>> here? >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak < >>>>>> smo...@paloaltonetworks.com> wrote: >>>>>> >>>>>>> Yes, this is a streaming pipeline. >>>>>>> >>>>>>> Some more details about existing implementation v/s what we want to >>>>>>> achieve. >>>>>>> >>>>>>> Current implementation: >>>>>>> Reading from pub-sub: >>>>>>> >>>>>>> Pipeline input = Pipeline.create(options); >>>>>>> >>>>>>> PCollection<String> pubsubStream = input.apply("Read From Pubsub", >>>>>>> PubsubIO.readMessagesWithAttributesAndMessageId() >>>>>>> >>>>>>> .fromSubscription(inputSubscriptionId)) >>>>>>> >>>>>>> >>>>>>> Reading from bigquery: >>>>>>> >>>>>>> PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO >>>>>>> .readTableRows().fromQuery(bqQuery).usingStandardSql()) >>>>>>> >>>>>>> .apply("JSon Transform", AsJsons.of(TableRow.class)); >>>>>>> >>>>>>> >>>>>>> Merge the inputs: >>>>>>> >>>>>>> PCollection<String> mergedInput = >>>>>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", >>>>>>> Flatten.pCollections()); >>>>>>> >>>>>>> >>>>>>> >>>>>>> Business Logic: >>>>>>> >>>>>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic())) >>>>>>> >>>>>>> >>>>>>> >>>>>>> Above logic is what we use currently in our pipeline. >>>>>>> >>>>>>> We want to make sure that we read from BigQuery first & pass the >>>>>>> bqStream through our BusinessLogic() before we start consuming >>>>>>> pubsubStream. >>>>>>> >>>>>>> Is there a way to achieve this? >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Sahil >>>>>>> >>>>>>> >>>>>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Can you explain this use case some more? Is this a streaming >>>>>>>> pipeline? If so, how are you reading from BigQuery? >>>>>>>> >>>>>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev < >>>>>>>> dev@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We have a requirement wherein we are consuming input from pub/sub >>>>>>>>> (PubSubIO) as well as BQ (BQIO) >>>>>>>>> >>>>>>>>> We want to make sure that we consume the BQ stream first before we >>>>>>>>> start consuming the data from pub-sub. Is there a way to achieve >>>>>>>>> this? Can >>>>>>>>> you please help with some code samples? >>>>>>>>> >>>>>>>>> Currently, we read data from big query using BigQueryIO into a >>>>>>>>> PCollection & also read data from pubsub using PubsubIO. We then use >>>>>>>>> the >>>>>>>>> flatten transform in this manner. >>>>>>>>> >>>>>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO >>>>>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO >>>>>>>>> >>>>>>>>> kvPairs = >>>>>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge >>>>>>>>> Input", Flatten.pCollections()); >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Sahil >>>>>>>>> >>>>>>>>>