Thanks, Gautam! I think that's a good summary of the discussion. On Thu, Sep 10, 2020 at 11:56 AM Gautam <gautamkows...@gmail.com> wrote:
> Wanted to circle back on this thread. Linear timestamps was discussed > during the sync and the conclusion was that timestamp based incremental > reading is generally discouraged as that introduces correctness issues. > Even if a custom clock is available keeping timestamps atomic and > monotonically increasing is going to be a problem for applications. > Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow > potential issues e.g. a client committing an erroneous timestamp, that is > way in the future, would block all other clients from committing. > > This is better handled by attaching a global transaction-id (e.g. UUID > that is monotonically increasing) to the snapshot metadata (iceberg allows > adding this to the summary). The incremental read application can then use > the transaction-id as a key to the exact from/to snapshot-id to do > incremental reading. > > Hope I covered the points raised. > > Regards, > -Gautam. > > On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid> > wrote: > >> Hi everyone, I'm putting this on the agenda for today's Iceberg sync. >> >> Also, I want to point out John's recent PR that added a way to inject a >> Clock that is used for timestamp generation: >> https://github.com/apache/iceberg/pull/1389 >> >> That fits nicely with the requirements here and would be an easy way to >> inject your own time, synchronized by an external service. >> >> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid> >> wrote: >> >>> Quick question below about the proposed usage of the timestamp: >>> >>> On Sep 9, 2020, at 7:24 AM, Miao Wang <miw...@adobe.com.INVALID> wrote: >>> >>> +1 Openlnx’s comment on implementation. >>> >>> Only if we have an external timing synchronization service and enforce >>> all clients using the service, timestamps of different clients are not >>> comparable. >>> >>> >>> Do we want to use the timestamp as the real timestamp of the last >>> change, or we want to use it only as a monotonously increasing more human >>> readable identifier? >>> Do we want to compare this timestamp against some external source, or we >>> just want to compare this timestamp with other timestamps in the different >>> snapshots of the same table? >>> >>> >>> So, there are two asks: 1). Whether to have a timestamp based API for >>> delta reading; 2). How to enforce and implement a service/protocol for >>> timestamp sync among all clients. >>> >>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be >>> source of truth in any cases. >>> >>> 2). IMO, it should be an external package to Iceberg. >>> >>> Miao >>> >>> *From: *OpenInx <open...@gmail.com> >>> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> >>> *Date: *Tuesday, September 8, 2020 at 7:55 PM >>> *To: *Iceberg Dev List <dev@iceberg.apache.org> >>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ... >>> >>> I agree that it's helpful to allow users to read the incremental delta >>> based timestamp, as Jingsong said timestamp is more friendly. >>> >>> My question is how to implement this ? >>> >>> If just attach the client's timestamp to the iceberg table when >>> committing, then different clients may have different timestamp values >>> because of the skewing. In theory, these time values are not strictly >>> comparable, and can only be compared within the margin of error. >>> >>> >>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>> +1 for timestamps are linear, in implementation, maybe the writer only >>> needs to look at the previous snapshot timestamp. >>> >>> We're trying to think of iceberg as a message queue, Let's take the >>> popular queue Kafka as an example, >>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset >>> and timestamp: >>> - offset: It is used for incremental read, such as the state of a >>> checkpoint in a computing system. >>> - timestamp: It is explicitly specified by the user to specify the scope >>> of consumption. As start_timestamp of reading. Timestamp is a better user >>> aware interface. But offset/snapshotId is not human readable and friendly. >>> >>> So there are scenarios where timestamp is used for incremental read. >>> >>> Best, >>> Jingsong >>> >>> >>> On Wed, Sep 9, 2020 at 12:45 AM Sud <sudssf2...@gmail.com> wrote: >>> >>> >>> We are using incremental read for iceberg tables which gets quite few >>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use >>> snapshot ids and track state of last read snapshot Id. >>> We are using timestamp as fallback when the state is incorrect, but as >>> you mentioned if timestamps are linear then it works as expected. >>> We also found that incremental reader might be slow when dealing with > >>> 2k snapshots in range. we are currently testing a manifest based >>> incremental reader which looks at manifest entries instead of scanning >>> snapshot history and accessing each snapshot. >>> >>> Is there any reason you can't use snapshot based incremental read? >>> >>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <gautamkows...@gmail.com> wrote: >>> >>> Hello Devs, >>> We are looking into adding workflows that read data >>> incrementally based on commit time. The ability to read deltas between >>> start / end commit timestamps on a table and ability to resume reading from >>> last read end timestamp. In that regard, we need the timestamps to be >>> linear in the current active snapshot history (newer versions always have >>> higher timestamps). Although Iceberg commit flow ensures the versions are >>> newer, there isn't a check to ensure timestamps are linear. >>> >>> Example flow, if two clients (clientA and clientB), whose time-clocks >>> are slightly off (say by a couple seconds), are committing frequently, >>> clientB might get to commit after clientA even if it's new snapshot >>> timestamps is out of order. I might be wrong but I haven't found a check in >>> HadoopTableOperations.commit() to ensure this above case does not happen. >>> >>> >>> On the other hand, restricting commits due to out-of-order timestamps >>> can hurt commit throughput so I can see why this isn't something Iceberg >>> might want to enforce based on System.currentTimeMillis(). Although if >>> clients had a way to define their own globally synchronized timestamps >>> (using external service or some monotonically increasing UUID) then iceberg >>> could allow an API to set that on the snapshot or use that instead of >>> System.currentTimeMillis(). Iceberg exposes something similar using >>> Sequence numbers in v2 format to track Deletes and Appends. >>> Is this a concern others have? If so how are folks handling this today >>> or are they not exposing such a feature at all due to the inherent >>> distributed timing problem? Would like to hear how others are >>> thinking/going about this. Thoughts? >>> >>> Cheers, >>> >>> -Gautam. >>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >>> >>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix