Yes I am just thinking how to modify/rewrite this piece of code if I want to run my pipeline on Dataflow runner.
Thanks & Regards Rajnil Guha On Wed, Jul 21, 2021 at 1:12 AM Robert Bradshaw <rober...@google.com> wrote: > On Tue, Jul 20, 2021 at 12:33 PM Rajnil Guha <rajnil94.g...@gmail.com> > wrote: > > > > Hi, > > > > Thank you so much for your help, the collect() function works. I tried > below and it prints correctly. > > > > is_empty_check = (dupe_records | "CountGloballyDupes" >> Count.Globally() > > #| "IsEmptyCheck" >> beam.Map(lambda x: x == 0) > > ) > > > > is_empty_df = ib.collect(is_empty_check) > > if(is_empty_df.iloc[0,0] == 0): > > print("Empty") > > else: > > ib.show(is_empty_check) > > Yes, this should work fine. > > > I tried using Beam Dataframes as below but it says iloc() is not > supported for Beam Dataframes. > > Correct. iloc depends on ordering, and PCollections aren't in general > ordered. However, see below. > > > is_empty_beam_df = beam.dataframe.convert.to_dataframe(is_empty_check) > > > > if (is_empty_beam_df.iloc[0, 0] == 0): > > print("Empty") > > else: > > print(is_empty_beam_df) > > > > Any other way to implement similar style checks using Beam Dataframes? > > It sounds like you're trying to get around using collect. The issue > is, until "collect" (or similar) is called, dupe_records doesn't > really have any contents to look into or compare against. All it is is > a pointer to an expression to how to compute it. When you call collect > (or pipeline.run) the pipeline is actually executed and the result (0 > or whatever) is computed. > > > Thanks & Regards > > Rajnil Guha > > > > On Tue, Jul 20, 2021 at 3:52 AM Robert Bradshaw <rober...@google.com> > wrote: > >> > >> On Mon, Jul 19, 2021 at 11:12 AM Reuven Lax <re...@google.com> wrote: > >> > > >> > I know the Java API can handle default values for a combiner when the > PCollection is empty (though this only works in the global window). I'm not > sure offhand about Python. > >> > > >> > On Mon, Jul 19, 2021 at 10:32 AM Rajnil Guha <rajnil94.g...@gmail.com> > wrote: > >> >> > >> >> Hi, > >> >> > >> >> I tried running some experiments on Interactive Runner before > actually running the pipeline on dataflow. Below is what I did though not > sure if this is the correct way to do it:- > >> >> > >> >> Let's say I want to check whether the Pcoll named is_empty_check is > empty or not and based on which I take a decision. > >> >> > >> >> is_empty_check = (dupe_records | "CountGlobally" >> Count.Globally() > >> >> ) > >> > >> is_empty_check now contains a PCollection with a single integer value > >> in it (possibly 0). > >> > >> >> if is_empty_check != 0: > >> > >> is_empty_check is a PCollection, never equal to the int 0. > >> > >> >> ib.show(is_empty_check) > >> >> else: > >> >> print("Empty") > >> >> > >> >> In the above code it works fine when the Pcoll is not empty in which > case it executes the statement under if part. > >> >> But when the Pcoll is empty it does not execute the else part and > instead executes the if part i.e. prints 0. > >> > >> It sounds like what you want to do is ib.collect(is_empty_check) and > >> then see if its only element is 0. > >> > >> >> On Mon, Jul 19, 2021 at 12:32 AM Reuven Lax <re...@google.com> > wrote: > >> >>> > >> >>> You could count the collection (with default value of zero). > >> >>> > >> >>> On Sun, Jul 18, 2021, 11:42 AM Rajnil Guha <rajnil94.g...@gmail.com> > wrote: > >> >>>> > >> >>>> Hi Reuven, > >> >>>> > >> >>>> Yes, for now this is a bounded PCollection. > >> >>>> > >> >>>> Thanks & Regards > >> >>>> Rajnil Guha > >> >>>> > >> >>>> On Mon, Jul 19, 2021 at 12:02 AM Reuven Lax <re...@google.com> > wrote: > >> >>>>> > >> >>>>> Is this a bounded collection? > >> >>>>> > >> >>>>> On Sun, Jul 18, 2021, 11:17 AM Rajnil Guha < > rajnil94.g...@gmail.com> wrote: > >> >>>>>> > >> >>>>>> Hi Beam Users, > >> >>>>>> > >> >>>>>> I have a use-case where I need to check whether a Pcollection is > empty or not. If it's not empty I need to write a message to a Pub/Sub > topic. I am using the Python SDK and Dataflow to write and run my pipelines > respectively. I searched but could not come across any concrete way on how > to check whether a Pcollection is empty or not using Python and how to take > action based on the check. Is there any way to implement this using Beam. > >> >>>>>> > >> >>>>>> Thanks & Regards > >> >>>>>> Rajnil Guha >