From: Kenneth Knowles <k...@apache.org>
Date: Wed, May 8, 2019 at 6:50 PM
To: dev

> This got pretty long, but I don't yet want to end it, because there's not 
> quite yet a solution that will allow a user to treat timestamps from most 
> systems as Beam timestamps.

+1, it'd be really nice to find a solution to this.

> I'm cutting pieces just to make inline replies easier to read.
>
> On Tue, Apr 23, 2019 at 9:03 AM Robert Bradshaw <rober...@google.com> wrote:
>>
>> On Tue, Apr 23, 2019 at 4:20 PM Kenneth Knowles <k...@apache.org> wrote:
>> >  -  WindowFn must receive exactly the data that came from the user's data 
>> > source. So that cannot be rounded.
>> >  - The user's WindowFn assigns to a window, so it can contain arbitrary 
>> > precision as it should be grouped as bytes.
>> >  - End of window, timers, watermark holds, etc, are all treated only as 
>> > bounds, so can all be rounded based on their use as an upper or lower 
>> > bound.
>> >
>> > We already do a lot of this - Pubsub publish timestamps are microsecond 
>> > precision (you could say our current connector constitutes data loss) as 
>> > are Windmill timestamps (since these are only combines of Beam timestamps 
>> > here there is no data loss). There are undoubtedly some corner cases I've 
>> > missed, and naively this might look like duplicating timestamps so that 
>> > could be an unacceptable performance concern.
>>
>> If I understand correctly, in this scheme WindowInto assignment is
>> paramaterized by a function that specifies how to parse/extract the
>> timestamp from the data element (maybe just a field specifier for
>> schema'd data) rather than store the (exact) timestamp in a standard
>> place in the WindowedValue, and the window merging always goes back to
>> the SDK rather than the possibility of it being handled runner-side.
>
> This sounds promising. You could also store the extracted approximate 
> timestamp somewhere, of course.
>
>> Even if the runner doesn't care about interpreting the window, I think
>> we'll want to have compatible window representations (and timestamp
>> representations, and windowing fns) across SDKs (especially for
>> cross-language) which favors choosing a consistent resolution.
>>
>> The end-of-window, for firing, can be approximate, but it seems it
>> should be exact for timestamp assignment of the result (and similarly
>> with the other timestamp combiners).
>
> I was thinking that the window itself should be stored as exact data, while 
> just the firing itself is approximated, since it already is, because of 
> watermarks and timers.

I think this works where we can compare encoded windows, but some
portable interpretation of windows is required for runner-side
implementation of merging windows (for example).

There may also be issues if windows (or timestamps) are assigned to a
high precision in one SDK, then inspected/acted on in another SDK, and
then passed back to the original SDK where the truncation would be
visible.

> You raise a good point that min/max timestamp combiners require actually 
> understanding the higher-precision timestamp. I can think of a couple things 
> to do. One is the old "standardize all 3 or for precisions we need" and the 
> other is that combiners other than EOW exist primarily to hold the watermark, 
> and that hold does not require the original precision. Still, neither of 
> these is that satisfying.

In the current model, the output timestamp is user-visible.

>> > A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown 
>> > as proto (int64 seconds since epoch + int32 nanos within second). It has 
>> > legacy classes that use milliseconds, and Joda itself now encourages 
>> > moving back to Java's new Instant type. Nanoseconds should complicate the 
>> > arithmetic only for the one person authoring the date library, which they 
>> > have already done.
>>
>> The encoding and decoding need to be done in a language-consistent way
>> as well.
>
> I honestly am not sure what you mean by "language-consistent" here.

If we want to make reading and writing of timestamps, windows
cross-language, we can't rely on language-specific libraries to do the
encoding.

>> Also, most date libraries don't division, etc. operators, so
>> we have to do that as well. Not that it should be *that* hard.
>
> If the libraries dedicated to time handling haven't found it needful, is 
> there a specific reason you raise this? We do some simple math to find the 
> window things fall into; is that it?

Yes. E.g.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77

would be a lot messier if there were no mapping date libraries to raw
ints that we can do arithmetic on. Writing this with the (seconds,
nanos) representation is painful. But I suppose we'd only have to do
it once per SDK.

>> >> It would also be really nice to clean up the infinite-future being the
>> >> somewhat arbitrary max micros rounded to millis, and
>> >> end-of-global-window being infinite-future minus 1 hour (IIRC), etc.
>> >> as well as the ugly logic in Python to cope with millis-micros
>> >> conversion.
>> >
>> > I actually don't have a problem with this. If you are trying to keep the 
>> > representation compact, not add bytes on top of instants, then you just 
>> > have to choose magic numbers, right?
>>
>> It's not about compactness, it's the (historically-derived?)
>> arbitrariness of the numbers.
>
> What I mean is that the only reason to fit them into an integer at all is 
> compactness. Otherwise, you could use a proper disjoint union representing 
> your intent directly, and all fiddling goes away, like `Timestamp ::= PosInf 
> | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of 
> bits.

The other cost is not being able to use standard libraries to
represent all of your timestamps.

>> For example, the bounds are chosen to
>> fit within 64-bit mircos despite milliseconds being the "chosen"
>> granularity, and care was taken that
>>
>>     WindowInto(Global) | GBK | WindowInto(Minute) | GBK
>>
>> works, but
>>
>>     WindowInto(Global) | GBK | WindowInto(Day) | GBK
>>
>> may produce elements with timestamps greater than MaxTimestamp.
>>
>> >
>> > Kenn
>> >
>> > [1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
>> >
>> >>
>> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <rob...@frantil.com> wrote:
>> >> >>
>> >> >> +1 for plan B. Nano second precision on windowing seems... a little 
>> >> >> much for a system that's aggregating data over time. Even for 
>> >> >> processing say particle super collider data, they'd get away with 
>> >> >> artificially increasing the granularity in batch settings.
>> >> >>
>> >> >> Now if they were streaming... they'd probably want femtoseconds anyway.
>> >> >> The point is, we should see if users demand it before adding in the 
>> >> >> necessary work.
>> >> >>
>> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath 
>> >> >> <chamik...@google.com> wrote:
>> >> >>>
>> >> >>> +1 for plan B as well. I think it's important to make timestamp 
>> >> >>> precision consistent now without introducing surprising behaviors for 
>> >> >>> existing users. But we should move towards a higher granularity 
>> >> >>> timestamp precision in the long run to support use-cases that Beam 
>> >> >>> users otherwise might miss out (on a runner that supports such 
>> >> >>> precision).
>> >> >>>
>> >> >>> - Cham
>> >> >>>
>> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >>>>
>> >> >>>> I also like Plan B because in the cross language case, the pipeline 
>> >> >>>> would not work since every party (Runners & SDKs) would have to be 
>> >> >>>> aware of the new beam:coder:windowed_value:v2 coder. Plan A has the 
>> >> >>>> property where if the SDK/Runner wasn't updated then it may start 
>> >> >>>> truncating the timestamps unexpectedly.
>> >> >>>>
>> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >>>>>
>> >> >>>>> Kenn, this discussion is about the precision of the timestamp in 
>> >> >>>>> the user data. As you had mentioned, Runners need not have the same 
>> >> >>>>> granularity of user data as long as they correctly round the 
>> >> >>>>> timestamp to guarantee that triggers are executed correctly but the 
>> >> >>>>> user data should have the same precision across SDKs otherwise user 
>> >> >>>>> data timestamps will be truncated in cross language scenarios.
>> >> >>>>>
>> >> >>>>> Based on the systems that were listed, either microsecond or 
>> >> >>>>> nanosecond would make sense. The issue with changing the precision 
>> >> >>>>> is that all Beam runners except for possibly Beam Python on 
>> >> >>>>> Dataflow are using millisecond precision since they are all using 
>> >> >>>>> the same Java Runner windowing/trigger logic.
>> >> >>>>>
>> >> >>>>> Plan A: Swap precision to nanosecond
>> >> >>>>> 1) Change the Python SDK to only expose millisecond precision 
>> >> >>>>> timestamps (do now)
>> >> >>>>> 2) Change the user data encoding to support nanosecond precision 
>> >> >>>>> (do now)
>> >> >>>>> 3) Swap runner libraries to be nanosecond precision aware updating 
>> >> >>>>> all window/triggering logic (do later)
>> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later)
>> >> >>>>>
>> >> >>>>> Plan B:
>> >> >>>>> 1) Change the Python SDK to only expose millisecond precision 
>> >> >>>>> timestamps and keep the data encoding as is (do now)
>> >> >>>>> (We could add greater precision later to plan B by creating a new 
>> >> >>>>> version beam:coder:windowed_value:v2 which would be nanosecond and 
>> >> >>>>> would require runners to correctly perform an internal conversions 
>> >> >>>>> for windowing/triggering.)
>> >> >>>>>
>> >> >>>>> I think we should go with Plan B and when users request greater 
>> >> >>>>> precision we can make that an explicit effort. What do people think?
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels 
>> >> >>>>> <m...@apache.org> wrote:
>> >> >>>>>>
>> >> >>>>>> Hi,
>> >> >>>>>>
>> >> >>>>>> Thanks for taking care of this issue in the Python SDK, Thomas!
>> >> >>>>>>
>> >> >>>>>> It would be nice to have a uniform precision for timestamps but, 
>> >> >>>>>> as Kenn
>> >> >>>>>> pointed out, timestamps are extracted from systems that have 
>> >> >>>>>> different
>> >> >>>>>> precision.
>> >> >>>>>>
>> >> >>>>>> To add to the list: Flink - milliseconds
>> >> >>>>>>
>> >> >>>>>> After all, it doesn't matter as long as there is sufficient 
>> >> >>>>>> precision
>> >> >>>>>> and conversions are done correctly.
>> >> >>>>>>
>> >> >>>>>> I think we could improve the situation by at least adding a
>> >> >>>>>> "milliseconds" constructor to the Python SDK's Timestamp.
>> >> >>>>>>
>> >> >>>>>> Cheers,
>> >> >>>>>> Max
>> >> >>>>>>
>> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote:
>> >> >>>>>> > I am not so sure this is a good idea. Here are some systems and 
>> >> >>>>>> > their
>> >> >>>>>> > precision:
>> >> >>>>>> >
>> >> >>>>>> > Arrow - microseconds
>> >> >>>>>> > BigQuery - microseconds
>> >> >>>>>> > New Java instant - nanoseconds
>> >> >>>>>> > Firestore - microseconds
>> >> >>>>>> > Protobuf - nanoseconds
>> >> >>>>>> > Dataflow backend - microseconds
>> >> >>>>>> > Postgresql - microseconds
>> >> >>>>>> > Pubsub publish time - nanoseconds
>> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime about 3 
>> >> >>>>>> > millis)
>> >> >>>>>> > Cassandra - milliseconds
>> >> >>>>>> >
>> >> >>>>>> > IMO it is important to be able to treat any of these as a Beam
>> >> >>>>>> > timestamp, even though they aren't all streaming. Who knows when 
>> >> >>>>>> > we
>> >> >>>>>> > might be ingesting a streamed changelog, or using them for 
>> >> >>>>>> > reprocessing
>> >> >>>>>> > an archived stream. I think for this purpose we either should
>> >> >>>>>> > standardize on nanoseconds or make the runner's resolution 
>> >> >>>>>> > independent
>> >> >>>>>> > of the data representation.
>> >> >>>>>> >
>> >> >>>>>> > I've had some offline conversations about this. I think we can 
>> >> >>>>>> > have
>> >> >>>>>> > higher-than-runner precision in the user data, and allow 
>> >> >>>>>> > WindowFns and
>> >> >>>>>> > DoFns to operate on this higher-than-runner precision data, and 
>> >> >>>>>> > still
>> >> >>>>>> > have consistent watermark treatment. Watermarks are just bounds, 
>> >> >>>>>> > after all.
>> >> >>>>>> >
>> >> >>>>>> > Kenn
>> >> >>>>>> >
>> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <t...@apache.org
>> >> >>>>>> > <mailto:t...@apache.org>> wrote:
>> >> >>>>>> >
>> >> >>>>>> >     The Python SDK currently uses timestamps in microsecond 
>> >> >>>>>> > resolution
>> >> >>>>>> >     while Java SDK, as most would probably expect, uses 
>> >> >>>>>> > milliseconds.
>> >> >>>>>> >
>> >> >>>>>> >     This causes a few difficulties with portability (Python 
>> >> >>>>>> > coders need
>> >> >>>>>> >     to convert to millis for WindowedValue and Timers, which is 
>> >> >>>>>> > related
>> >> >>>>>> >     to a bug I'm looking into:
>> >> >>>>>> >
>> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-7035
>> >> >>>>>> >
>> >> >>>>>> >     As Luke pointed out, the issue was previously discussed:
>> >> >>>>>> >
>> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-1524
>> >> >>>>>> >
>> >> >>>>>> >     I'm not privy to the reasons why we decided to go with 
>> >> >>>>>> > micros in
>> >> >>>>>> >     first place, but would it be too big of a change or 
>> >> >>>>>> > impractical for
>> >> >>>>>> >     other reasons to switch Python SDK to millis before it gets 
>> >> >>>>>> > more users?
>> >> >>>>>> >
>> >> >>>>>> >     Thanks,
>> >> >>>>>> >     Thomas
>> >> >>>>>> >

Reply via email to