From: Kenneth Knowles <k...@apache.org> Date: Wed, May 8, 2019 at 6:50 PM To: dev
> This got pretty long, but I don't yet want to end it, because there's not > quite yet a solution that will allow a user to treat timestamps from most > systems as Beam timestamps. +1, it'd be really nice to find a solution to this. > I'm cutting pieces just to make inline replies easier to read. > > On Tue, Apr 23, 2019 at 9:03 AM Robert Bradshaw <rober...@google.com> wrote: >> >> On Tue, Apr 23, 2019 at 4:20 PM Kenneth Knowles <k...@apache.org> wrote: >> > - WindowFn must receive exactly the data that came from the user's data >> > source. So that cannot be rounded. >> > - The user's WindowFn assigns to a window, so it can contain arbitrary >> > precision as it should be grouped as bytes. >> > - End of window, timers, watermark holds, etc, are all treated only as >> > bounds, so can all be rounded based on their use as an upper or lower >> > bound. >> > >> > We already do a lot of this - Pubsub publish timestamps are microsecond >> > precision (you could say our current connector constitutes data loss) as >> > are Windmill timestamps (since these are only combines of Beam timestamps >> > here there is no data loss). There are undoubtedly some corner cases I've >> > missed, and naively this might look like duplicating timestamps so that >> > could be an unacceptable performance concern. >> >> If I understand correctly, in this scheme WindowInto assignment is >> paramaterized by a function that specifies how to parse/extract the >> timestamp from the data element (maybe just a field specifier for >> schema'd data) rather than store the (exact) timestamp in a standard >> place in the WindowedValue, and the window merging always goes back to >> the SDK rather than the possibility of it being handled runner-side. > > This sounds promising. You could also store the extracted approximate > timestamp somewhere, of course. > >> Even if the runner doesn't care about interpreting the window, I think >> we'll want to have compatible window representations (and timestamp >> representations, and windowing fns) across SDKs (especially for >> cross-language) which favors choosing a consistent resolution. >> >> The end-of-window, for firing, can be approximate, but it seems it >> should be exact for timestamp assignment of the result (and similarly >> with the other timestamp combiners). > > I was thinking that the window itself should be stored as exact data, while > just the firing itself is approximated, since it already is, because of > watermarks and timers. I think this works where we can compare encoded windows, but some portable interpretation of windows is required for runner-side implementation of merging windows (for example). There may also be issues if windows (or timestamps) are assigned to a high precision in one SDK, then inspected/acted on in another SDK, and then passed back to the original SDK where the truncation would be visible. > You raise a good point that min/max timestamp combiners require actually > understanding the higher-precision timestamp. I can think of a couple things > to do. One is the old "standardize all 3 or for precisions we need" and the > other is that combiners other than EOW exist primarily to hold the watermark, > and that hold does not require the original precision. Still, neither of > these is that satisfying. In the current model, the output timestamp is user-visible. >> > A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown >> > as proto (int64 seconds since epoch + int32 nanos within second). It has >> > legacy classes that use milliseconds, and Joda itself now encourages >> > moving back to Java's new Instant type. Nanoseconds should complicate the >> > arithmetic only for the one person authoring the date library, which they >> > have already done. >> >> The encoding and decoding need to be done in a language-consistent way >> as well. > > I honestly am not sure what you mean by "language-consistent" here. If we want to make reading and writing of timestamps, windows cross-language, we can't rely on language-specific libraries to do the encoding. >> Also, most date libraries don't division, etc. operators, so >> we have to do that as well. Not that it should be *that* hard. > > If the libraries dedicated to time handling haven't found it needful, is > there a specific reason you raise this? We do some simple math to find the > window things fall into; is that it? Yes. E.g. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77 would be a lot messier if there were no mapping date libraries to raw ints that we can do arithmetic on. Writing this with the (seconds, nanos) representation is painful. But I suppose we'd only have to do it once per SDK. >> >> It would also be really nice to clean up the infinite-future being the >> >> somewhat arbitrary max micros rounded to millis, and >> >> end-of-global-window being infinite-future minus 1 hour (IIRC), etc. >> >> as well as the ugly logic in Python to cope with millis-micros >> >> conversion. >> > >> > I actually don't have a problem with this. If you are trying to keep the >> > representation compact, not add bytes on top of instants, then you just >> > have to choose magic numbers, right? >> >> It's not about compactness, it's the (historically-derived?) >> arbitrariness of the numbers. > > What I mean is that the only reason to fit them into an integer at all is > compactness. Otherwise, you could use a proper disjoint union representing > your intent directly, and all fiddling goes away, like `Timestamp ::= PosInf > | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of > bits. The other cost is not being able to use standard libraries to represent all of your timestamps. >> For example, the bounds are chosen to >> fit within 64-bit mircos despite milliseconds being the "chosen" >> granularity, and care was taken that >> >> WindowInto(Global) | GBK | WindowInto(Minute) | GBK >> >> works, but >> >> WindowInto(Global) | GBK | WindowInto(Day) | GBK >> >> may produce elements with timestamps greater than MaxTimestamp. >> >> > >> > Kenn >> > >> > [1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html >> > >> >> >> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <rob...@frantil.com> wrote: >> >> >> >> >> >> +1 for plan B. Nano second precision on windowing seems... a little >> >> >> much for a system that's aggregating data over time. Even for >> >> >> processing say particle super collider data, they'd get away with >> >> >> artificially increasing the granularity in batch settings. >> >> >> >> >> >> Now if they were streaming... they'd probably want femtoseconds anyway. >> >> >> The point is, we should see if users demand it before adding in the >> >> >> necessary work. >> >> >> >> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath >> >> >> <chamik...@google.com> wrote: >> >> >>> >> >> >>> +1 for plan B as well. I think it's important to make timestamp >> >> >>> precision consistent now without introducing surprising behaviors for >> >> >>> existing users. But we should move towards a higher granularity >> >> >>> timestamp precision in the long run to support use-cases that Beam >> >> >>> users otherwise might miss out (on a runner that supports such >> >> >>> precision). >> >> >>> >> >> >>> - Cham >> >> >>> >> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <lc...@google.com> wrote: >> >> >>>> >> >> >>>> I also like Plan B because in the cross language case, the pipeline >> >> >>>> would not work since every party (Runners & SDKs) would have to be >> >> >>>> aware of the new beam:coder:windowed_value:v2 coder. Plan A has the >> >> >>>> property where if the SDK/Runner wasn't updated then it may start >> >> >>>> truncating the timestamps unexpectedly. >> >> >>>> >> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <lc...@google.com> wrote: >> >> >>>>> >> >> >>>>> Kenn, this discussion is about the precision of the timestamp in >> >> >>>>> the user data. As you had mentioned, Runners need not have the same >> >> >>>>> granularity of user data as long as they correctly round the >> >> >>>>> timestamp to guarantee that triggers are executed correctly but the >> >> >>>>> user data should have the same precision across SDKs otherwise user >> >> >>>>> data timestamps will be truncated in cross language scenarios. >> >> >>>>> >> >> >>>>> Based on the systems that were listed, either microsecond or >> >> >>>>> nanosecond would make sense. The issue with changing the precision >> >> >>>>> is that all Beam runners except for possibly Beam Python on >> >> >>>>> Dataflow are using millisecond precision since they are all using >> >> >>>>> the same Java Runner windowing/trigger logic. >> >> >>>>> >> >> >>>>> Plan A: Swap precision to nanosecond >> >> >>>>> 1) Change the Python SDK to only expose millisecond precision >> >> >>>>> timestamps (do now) >> >> >>>>> 2) Change the user data encoding to support nanosecond precision >> >> >>>>> (do now) >> >> >>>>> 3) Swap runner libraries to be nanosecond precision aware updating >> >> >>>>> all window/triggering logic (do later) >> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later) >> >> >>>>> >> >> >>>>> Plan B: >> >> >>>>> 1) Change the Python SDK to only expose millisecond precision >> >> >>>>> timestamps and keep the data encoding as is (do now) >> >> >>>>> (We could add greater precision later to plan B by creating a new >> >> >>>>> version beam:coder:windowed_value:v2 which would be nanosecond and >> >> >>>>> would require runners to correctly perform an internal conversions >> >> >>>>> for windowing/triggering.) >> >> >>>>> >> >> >>>>> I think we should go with Plan B and when users request greater >> >> >>>>> precision we can make that an explicit effort. What do people think? >> >> >>>>> >> >> >>>>> >> >> >>>>> >> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels >> >> >>>>> <m...@apache.org> wrote: >> >> >>>>>> >> >> >>>>>> Hi, >> >> >>>>>> >> >> >>>>>> Thanks for taking care of this issue in the Python SDK, Thomas! >> >> >>>>>> >> >> >>>>>> It would be nice to have a uniform precision for timestamps but, >> >> >>>>>> as Kenn >> >> >>>>>> pointed out, timestamps are extracted from systems that have >> >> >>>>>> different >> >> >>>>>> precision. >> >> >>>>>> >> >> >>>>>> To add to the list: Flink - milliseconds >> >> >>>>>> >> >> >>>>>> After all, it doesn't matter as long as there is sufficient >> >> >>>>>> precision >> >> >>>>>> and conversions are done correctly. >> >> >>>>>> >> >> >>>>>> I think we could improve the situation by at least adding a >> >> >>>>>> "milliseconds" constructor to the Python SDK's Timestamp. >> >> >>>>>> >> >> >>>>>> Cheers, >> >> >>>>>> Max >> >> >>>>>> >> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote: >> >> >>>>>> > I am not so sure this is a good idea. Here are some systems and >> >> >>>>>> > their >> >> >>>>>> > precision: >> >> >>>>>> > >> >> >>>>>> > Arrow - microseconds >> >> >>>>>> > BigQuery - microseconds >> >> >>>>>> > New Java instant - nanoseconds >> >> >>>>>> > Firestore - microseconds >> >> >>>>>> > Protobuf - nanoseconds >> >> >>>>>> > Dataflow backend - microseconds >> >> >>>>>> > Postgresql - microseconds >> >> >>>>>> > Pubsub publish time - nanoseconds >> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime about 3 >> >> >>>>>> > millis) >> >> >>>>>> > Cassandra - milliseconds >> >> >>>>>> > >> >> >>>>>> > IMO it is important to be able to treat any of these as a Beam >> >> >>>>>> > timestamp, even though they aren't all streaming. Who knows when >> >> >>>>>> > we >> >> >>>>>> > might be ingesting a streamed changelog, or using them for >> >> >>>>>> > reprocessing >> >> >>>>>> > an archived stream. I think for this purpose we either should >> >> >>>>>> > standardize on nanoseconds or make the runner's resolution >> >> >>>>>> > independent >> >> >>>>>> > of the data representation. >> >> >>>>>> > >> >> >>>>>> > I've had some offline conversations about this. I think we can >> >> >>>>>> > have >> >> >>>>>> > higher-than-runner precision in the user data, and allow >> >> >>>>>> > WindowFns and >> >> >>>>>> > DoFns to operate on this higher-than-runner precision data, and >> >> >>>>>> > still >> >> >>>>>> > have consistent watermark treatment. Watermarks are just bounds, >> >> >>>>>> > after all. >> >> >>>>>> > >> >> >>>>>> > Kenn >> >> >>>>>> > >> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <t...@apache.org >> >> >>>>>> > <mailto:t...@apache.org>> wrote: >> >> >>>>>> > >> >> >>>>>> > The Python SDK currently uses timestamps in microsecond >> >> >>>>>> > resolution >> >> >>>>>> > while Java SDK, as most would probably expect, uses >> >> >>>>>> > milliseconds. >> >> >>>>>> > >> >> >>>>>> > This causes a few difficulties with portability (Python >> >> >>>>>> > coders need >> >> >>>>>> > to convert to millis for WindowedValue and Timers, which is >> >> >>>>>> > related >> >> >>>>>> > to a bug I'm looking into: >> >> >>>>>> > >> >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-7035 >> >> >>>>>> > >> >> >>>>>> > As Luke pointed out, the issue was previously discussed: >> >> >>>>>> > >> >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-1524 >> >> >>>>>> > >> >> >>>>>> > I'm not privy to the reasons why we decided to go with >> >> >>>>>> > micros in >> >> >>>>>> > first place, but would it be too big of a change or >> >> >>>>>> > impractical for >> >> >>>>>> > other reasons to switch Python SDK to millis before it gets >> >> >>>>>> > more users? >> >> >>>>>> > >> >> >>>>>> > Thanks, >> >> >>>>>> > Thomas >> >> >>>>>> >