+1 Openlnx’s comment on implementation.

Only if we have an external timing synchronization service and enforce all 
clients using the service, timestamps of different clients are not comparable.

So, there are two asks: 1). Whether to have a timestamp based API for delta 
reading; 2). How to enforce and implement a service/protocol for timestamp sync 
among all clients.

1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be source 
of truth in any cases.

2). IMO, it should be an external package to Iceberg.

Miao

From: OpenInx <open...@gmail.com>
Reply-To: "dev@iceberg.apache.org" <dev@iceberg.apache.org>
Date: Tuesday, September 8, 2020 at 7:55 PM
To: Iceberg Dev List <dev@iceberg.apache.org>
Subject: Re: Timestamp Based Incremental Reading in Iceberg ...

I agree that  it's helpful to allow users to read the incremental delta based 
timestamp,  as Jingsong said timestamp is more friendly.

My question is how to implement this ?

 If just attach the client's timestamp to the iceberg table when committing,  
then different clients may have different timestamp values because of the 
skewing. In theory, these time values are not strictly comparable, and can only 
be compared within the margin of error.


On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li 
<jingsongl...@gmail.com<mailto:jingsongl...@gmail.com>> wrote:
+1 for timestamps are linear, in implementation, maybe the writer only needs to 
look at the previous snapshot timestamp.

We're trying to think of iceberg as a message queue, Let's take the popular 
queue Kafka as an example,
Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and 
timestamp:
- offset: It is used for incremental read, such as the state of a checkpoint in 
a computing system.
- timestamp: It is explicitly specified by the user to specify the scope of 
consumption. As start_timestamp of reading. Timestamp is a better user aware 
interface. But offset/snapshotId is not human readable and friendly.

So there are scenarios where timestamp is used for incremental read.

Best,
Jingsong


On Wed, Sep 9, 2020 at 12:45 AM Sud 
<sudssf2...@gmail.com<mailto:sudssf2...@gmail.com>> wrote:

We are using incremental read for iceberg tables which gets quite few appends ( 
~500- 1000 per hour) . but instead of using timestamp we use snapshot ids and 
track state of last read snapshot Id.
We are using timestamp as fallback when the state is incorrect, but as you 
mentioned if timestamps are linear then it works as expected.
We also found that incremental reader might be slow when dealing with > 2k 
snapshots in range. we are currently testing a manifest based incremental 
reader which looks at manifest entries instead of scanning snapshot history and 
accessing each snapshot.

Is there any reason you can't use snapshot based incremental read?

On Tue, Sep 8, 2020 at 9:06 AM Gautam 
<gautamkows...@gmail.com<mailto:gautamkows...@gmail.com>> wrote:
Hello Devs,
                   We are looking into adding workflows that read data 
incrementally based on commit time. The ability to read deltas between start / 
end commit timestamps on a table and ability to resume reading from last read 
end timestamp. In that regard, we need the timestamps to be linear in the 
current active snapshot history (newer versions always have higher timestamps). 
Although Iceberg commit flow ensures the versions are newer, there isn't a 
check to ensure timestamps are linear.

Example flow, if two clients (clientA and clientB), whose time-clocks are 
slightly off (say by a couple seconds), are committing frequently, clientB 
might get to commit after clientA even if it's new snapshot timestamps is out 
of order. I might be wrong but I haven't found a check in 
HadoopTableOperations.commit() to ensure this above case does not happen.

On the other hand, restricting commits due to out-of-order timestamps can hurt 
commit throughput so I can see why this isn't something Iceberg might want to 
enforce based on System.currentTimeMillis(). Although if clients had a way to 
define their own globally synchronized timestamps (using external service or 
some monotonically increasing UUID) then iceberg could allow an API to set that 
on the snapshot or use that instead of System.currentTimeMillis(). Iceberg 
exposes something similar using Sequence numbers in v2 format to track Deletes 
and Appends.
Is this a concern others have? If so how are folks handling this today or are 
they not exposing such a feature at all due to the inherent distributed timing 
problem? Would like to hear how others are thinking/going about this. Thoughts?

Cheers,

-Gautam.


--
Best, Jingsong Lee

Reply via email to