I should also add one more thing. The PR I linked to above is a good way to introduce a clock, but it was pointed out in the sync that even if we had a service that provided synchronized timestamps, there is no guarantee that there isn't a race condition between committers getting timestamps and then committing. So we would still have an out-of-order problem. It is best not to rely on timestamps other than for inspecting tables to get a rough idea of when a node committed.
On Thu, Sep 10, 2020 at 12:14 PM Ryan Blue <rb...@netflix.com> wrote: > Thanks, Gautam! I think that's a good summary of the discussion. > > On Thu, Sep 10, 2020 at 11:56 AM Gautam <gautamkows...@gmail.com> wrote: > >> Wanted to circle back on this thread. Linear timestamps was discussed >> during the sync and the conclusion was that timestamp based incremental >> reading is generally discouraged as that introduces correctness issues. >> Even if a custom clock is available keeping timestamps atomic and >> monotonically increasing is going to be a problem for applications. >> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow >> potential issues e.g. a client committing an erroneous timestamp, that is >> way in the future, would block all other clients from committing. >> >> This is better handled by attaching a global transaction-id (e.g. UUID >> that is monotonically increasing) to the snapshot metadata (iceberg allows >> adding this to the summary). The incremental read application can then use >> the transaction-id as a key to the exact from/to snapshot-id to do >> incremental reading. >> >> Hope I covered the points raised. >> >> Regards, >> -Gautam. >> >> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid> >> wrote: >> >>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync. >>> >>> Also, I want to point out John's recent PR that added a way to inject a >>> Clock that is used for timestamp generation: >>> https://github.com/apache/iceberg/pull/1389 >>> >>> That fits nicely with the requirements here and would be an easy way to >>> inject your own time, synchronized by an external service. >>> >>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid> >>> wrote: >>> >>>> Quick question below about the proposed usage of the timestamp: >>>> >>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <miw...@adobe.com.INVALID> wrote: >>>> >>>> +1 Openlnx’s comment on implementation. >>>> >>>> Only if we have an external timing synchronization service and enforce >>>> all clients using the service, timestamps of different clients are not >>>> comparable. >>>> >>>> >>>> Do we want to use the timestamp as the real timestamp of the last >>>> change, or we want to use it only as a monotonously increasing more human >>>> readable identifier? >>>> Do we want to compare this timestamp against some external source, or >>>> we just want to compare this timestamp with other timestamps in the >>>> different snapshots of the same table? >>>> >>>> >>>> So, there are two asks: 1). Whether to have a timestamp based API for >>>> delta reading; 2). How to enforce and implement a service/protocol for >>>> timestamp sync among all clients. >>>> >>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could >>>> be source of truth in any cases. >>>> >>>> 2). IMO, it should be an external package to Iceberg. >>>> >>>> Miao >>>> >>>> *From: *OpenInx <open...@gmail.com> >>>> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> >>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM >>>> *To: *Iceberg Dev List <dev@iceberg.apache.org> >>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ... >>>> >>>> I agree that it's helpful to allow users to read the incremental delta >>>> based timestamp, as Jingsong said timestamp is more friendly. >>>> >>>> My question is how to implement this ? >>>> >>>> If just attach the client's timestamp to the iceberg table when >>>> committing, then different clients may have different timestamp values >>>> because of the skewing. In theory, these time values are not strictly >>>> comparable, and can only be compared within the margin of error. >>>> >>>> >>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>> +1 for timestamps are linear, in implementation, maybe the writer only >>>> needs to look at the previous snapshot timestamp. >>>> >>>> We're trying to think of iceberg as a message queue, Let's take the >>>> popular queue Kafka as an example, >>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset >>>> and timestamp: >>>> - offset: It is used for incremental read, such as the state of a >>>> checkpoint in a computing system. >>>> - timestamp: It is explicitly specified by the user to specify the >>>> scope of consumption. As start_timestamp of reading. Timestamp is a better >>>> user aware interface. But offset/snapshotId is not human readable and >>>> friendly. >>>> >>>> So there are scenarios where timestamp is used for incremental read. >>>> >>>> Best, >>>> Jingsong >>>> >>>> >>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <sudssf2...@gmail.com> wrote: >>>> >>>> >>>> We are using incremental read for iceberg tables which gets quite few >>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use >>>> snapshot ids and track state of last read snapshot Id. >>>> We are using timestamp as fallback when the state is incorrect, but as >>>> you mentioned if timestamps are linear then it works as expected. >>>> We also found that incremental reader might be slow when dealing with > >>>> 2k snapshots in range. we are currently testing a manifest based >>>> incremental reader which looks at manifest entries instead of scanning >>>> snapshot history and accessing each snapshot. >>>> >>>> Is there any reason you can't use snapshot based incremental read? >>>> >>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <gautamkows...@gmail.com> wrote: >>>> >>>> Hello Devs, >>>> We are looking into adding workflows that read data >>>> incrementally based on commit time. The ability to read deltas between >>>> start / end commit timestamps on a table and ability to resume reading from >>>> last read end timestamp. In that regard, we need the timestamps to be >>>> linear in the current active snapshot history (newer versions always have >>>> higher timestamps). Although Iceberg commit flow ensures the versions are >>>> newer, there isn't a check to ensure timestamps are linear. >>>> >>>> Example flow, if two clients (clientA and clientB), whose time-clocks >>>> are slightly off (say by a couple seconds), are committing frequently, >>>> clientB might get to commit after clientA even if it's new snapshot >>>> timestamps is out of order. I might be wrong but I haven't found a check in >>>> HadoopTableOperations.commit() to ensure this above case does not happen. >>>> >>>> >>>> On the other hand, restricting commits due to out-of-order timestamps >>>> can hurt commit throughput so I can see why this isn't something Iceberg >>>> might want to enforce based on System.currentTimeMillis(). Although if >>>> clients had a way to define their own globally synchronized timestamps >>>> (using external service or some monotonically increasing UUID) then iceberg >>>> could allow an API to set that on the snapshot or use that instead of >>>> System.currentTimeMillis(). Iceberg exposes something similar using >>>> Sequence numbers in v2 format to track Deletes and Appends. >>>> Is this a concern others have? If so how are folks handling this today >>>> or are they not exposing such a feature at all due to the inherent >>>> distributed timing problem? Would like to hear how others are >>>> thinking/going about this. Thoughts? >>>> >>>> Cheers, >>>> >>>> -Gautam. >>>> >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix > -- Ryan Blue Software Engineer Netflix