Reuven's answer will result in a group by key (but not window) where no data is dropped and you get deltas for each key. Downstream consumers can recombine the deltas to get per-key aggregation. So instead of putting the time interval into the window, you put it into the key, and then you get the same grouped aggregation.
There are (at least) two other ways to do this: 1. You can set allowed lateness to a high value. 2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to arrival time. I illustrated this in some older talks [2]. Kenn [1] https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184 [2] https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134 On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax <re...@google.com> wrote: > You can definitely group by processing time. The way to do this in Beam is > as follows > > Window.into<T>(new GlobalWindows()) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)) > .discardingFiredPanes()); > > The syntax is a bit unfortunately wordy, but the idea is that you are > creating a single event-time window that encompasses all time, and > "triggering" an aggregation every 30 seconds based on processing time. > > On Fri, Apr 23, 2021 at 8:14 AM Tao Li <t...@zillow.com> wrote: > >> Thanks @Kenneth Knowles <k...@apache.org>. I understand we need to >> specify a window for groupby so that the app knowns when processing is >> “done” to output result. >> >> >> >> Is it possible to specify a event arrival/processing time based window >> for groupby? The purpose is to avoid dropping of late events. With a event >> processing time based window, the app will periodically output the result >> based on all events that arrived in that window, and a late arriving event >> will fall into whatever window covers its arrival time and thus that late >> data will not get lost. >> >> >> >> Does Beam support this kind of mechanism? Thanks. >> >> >> >> *From: *Kenneth Knowles <k...@apache.org> >> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> >> *Date: *Thursday, April 22, 2021 at 1:49 PM >> *To: *user <user@beam.apache.org> >> *Cc: *Kelly Smith <kell...@zillowgroup.com>, Lian Jiang < >> li...@zillowgroup.com> >> *Subject: *Re: Question on late data handling in Beam streaming mode >> >> >> >> Hello! >> >> >> >> In a streaming app, you have two choices: wait forever and never have any >> output OR use some method to decide that aggregation is "done". >> >> >> >> In Beam, the way you decide that aggregation is "done" is the watermark. >> When the watermark predicts no more data for an aggregation, then the >> aggregation is done. For example GROUP BY <minute> is "done" when no more >> data will arrive for that minute. At this point, your result is produced. >> More data may arrive, and it is ignored. The watermark is determined by the >> IO connector to be the best heuristic available. You can configure "allowed >> lateness" for an aggregation to allow out of order data. >> >> >> >> Kenn >> >> >> >> On Thu, Apr 22, 2021 at 1:26 PM Tao Li <t...@zillow.com> wrote: >> >> Hi Beam community, >> >> >> >> I am wondering if there is a risk of losing late data from a Beam stream >> app due to watermarking? >> >> >> >> I just went through this design doc and noticed the “droppable” >> definition there: >> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit# >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23&data=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D&reserved=0> >> >> >> >> Can you please confirm if it’s possible for us to lose some data in a >> stream app in practice? If that’s possible, what would be the best practice >> to avoid data loss? Thanks! >> >> >> >>