Hi,

there is (rather old and long) discussion of this for Java SDK in [1]. This discussion resulted in adding @RequiresTimeSortedInput annotation [2]. Unfortunately this probably has not been transferred to Python SDK.

I'll sum up reasons why it was added:

 a) inputs to stateful DoFn are naturally unsorted

 b) batch Pipelines have two options how to feed data to stateful DoFn:

  b1) unsorted, feed as data arrive

  b2) explicitly sort by timestamp (or correlated field in data, e.g. sequential index, if provided)

In case b1) there is no way to move watermark before *all* data is read from input - moving watermark might produce late data with arbitrary lateness (that would be consistency bug).

In case b2) it would be possible to advance watermark (and thus fire event-time timers).

The case b2) was decided to be too restrictive to be added to the model as a requirement for batch pipelines - which is totally reasonable. We are therefore left with b1), which means that any requirement of use-case like yours requires first reading the complete batch data to a state, then manually sorting and only then processing the ordered data. This requires a lot of coding (that could be wrapped into a reusable PTransform, for sure), but it is also inefficient, because pure batch runner it likely to perform merge-sort grouping anyway. It only needs to know that it should keep this guarantee for the DoFn (and add timestamp to the sorting key). This is the reason why the annotation was introduced - to keep the Beam model as flexible as possible while enabling runners to make use of sorting they already do anyway, in case it is needed by the application logic.

I think that until an equivalent information is provided to a DoFn in Python SDK, the only option is buffering and manual sorting of the complete data set (broken per key).

 Jan

[1] https://lists.apache.org/thread/7ryqg3bm1c3bs7g1nk4krnrjxlkd7srn
[2] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

On 4/24/23 18:35, Guagliardo, Patrizio via user wrote:
Ok great, so what I did at the end was:

def cumulative_sums(key, timestamped_values):
   running = 0
   for x in sorted(timestamped_values, key=lambda x: x[1]):
     running += x[0]
     yield key, running


with beam.Pipeline() as p:
     sums = (p
         | 'Create' >> beam.Create([
             (3.1, 3),
             (4.2, 4),
             (5.4, 5),
             (2.3, 2),
             (1.5, 6)
         ])
         | 'AddTimestamps' >> beam.Map(lambda x: 
beam.transforms.window.TimestampedValue(x, x[1]))
         | beam.Map(lambda x: ('key', x))
         | 'Window' >> beam.WindowInto(FixedWindows(11))
         | beam.GroupByKey()
         | beam.FlatMapTuple(cumulative_sums)
         | 'Print' >> beam.Map(print))

However I am asking if there is a way to take a state from one window to 
another. I am asking this as I would like to do also other transformations 
where for example you take one value from one time step to the next for 
whatever reason: calculate timediff, fill in missing value (taken the value 
from time step before), etc etc...Can that be done? I have read something about 
looping timers, but could no get the details in Python. Is there a manner to do 
this?

Thanks a lot.

-----Original Message-----
From: Robert Bradshaw <rober...@google.com>
Sent: lunes, 24 de abril de 2023 18:00
To: user@beam.apache.org; Guagliardo, Patrizio 
<patrizio.guaglia...@oliverwyman.com>
Subject: Re: [Question] - Time series - cumulative sum in right order with 
python api in a batch process


CAUTION: This email originated outside the company. Do not click links or open 
attachments unless you are expecting them from the sender.



You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all the data 
up to that point (modulo late data) and can then confidently compute your 
ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
   running = 0
   for _, x in sorted(timestamped_values):
     yield x

sums = (timestamped_data
   | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
   | beam.WindowInto(...)
   | beam.GroupByKey()
   | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user 
<user@beam.apache.org> wrote:
Hi together,



I want to create a cumulative sum over a time series in a bounded batch 
processing in Apache beam with the Python API. What you can do is to write a 
cummulative sum with a stateful DoFn, but the problem you would still face is 
that you cannot handle it this way when the data in unordered, which is the 
case in a PCollection. Is there a way to make the cumulative sum over time in a 
batch process? This is what i did (whithout order):

import apache_beam as beam

from apache_beam import TimeDomain

from apache_beam.transforms.userstate import ReadModifyWriteStateSpec,
TimerSpec, CombiningValueStateSpec

from apache_beam.transforms.window import FixedWindows, GlobalWindows





class TimestampedSumAccumulator(beam.DoFn):

     SUM_STATE = 'sum'



     def process(

         self, element,

         timestamp=beam.DoFn.TimestampParam,


sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE,
beam.coders.FloatCoder()))

     ):

         sum_value = sum_state.read() or 0.0

         # print(element)

         sum_value += element[1]

         sum_state.write(sum_value)

         yield beam.transforms.window.TimestampedValue(sum_value,
timestamp)





with beam.Pipeline() as p:

     sums = (p

         | 'Create' >> beam.Create([

             (3.1, 3),

             (1.5, 1),

             (4.2, 4),

             (5.4, 5),

             (2.3, 2)

         ])

         | 'AddTimestamps' >> beam.Map(lambda x:
beam.transforms.window.TimestampedValue(x[0], x[1]))

         | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))

         | 'Window' >> beam.WindowInto(FixedWindows(10))

         | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())

         | 'Print' >> beam.Map(print))



How could that be done to make the cumulative sum in the “right” order?



Thank you very much in advance.




________________________________
This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation. For more information on how we use your 
personal data please see our Privacy Notice.
________________________________
This e-mail and any attachments may be confidential or legally privileged. If you 
received this message in error or are not the intended recipient, you should destroy 
the e-mail message and any attachments or copies, and you are prohibited from 
retaining, distributing, disclosing or using any information contained herein. Please 
inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. 
For more information on how we use your personal data please see our Privacy 
Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.

Reply via email to