I should also add one more thing. The PR I linked to above is a good way to
introduce a clock, but it was pointed out in the sync that even if we had a
service that provided synchronized timestamps, there is no guarantee that
there isn't a race condition between committers getting timestamps and then
committing. So we would still have an out-of-order problem. It is best not
to rely on timestamps other than for inspecting tables to get a rough idea
of when a node committed.

On Thu, Sep 10, 2020 at 12:14 PM Ryan Blue <rb...@netflix.com> wrote:

> Thanks, Gautam! I think that's a good summary of the discussion.
>
> On Thu, Sep 10, 2020 at 11:56 AM Gautam <gautamkows...@gmail.com> wrote:
>
>> Wanted to circle back on this thread. Linear timestamps was discussed
>> during the sync and the conclusion was that timestamp based incremental
>> reading is generally discouraged as that introduces correctness issues.
>> Even if a custom clock is available keeping timestamps atomic and
>> monotonically increasing is going to be a problem for applications.
>> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
>> potential issues e.g. a client committing an erroneous timestamp, that is
>> way in the future, would block all other clients from committing.
>>
>> This is better handled by attaching a global transaction-id (e.g. UUID
>> that is monotonically increasing) to the snapshot metadata (iceberg allows
>> adding this to the summary). The incremental read application can then use
>> the transaction-id as a key to the exact from/to snapshot-id to do
>> incremental reading.
>>
>> Hope I covered the points raised.
>>
>> Regards,
>> -Gautam.
>>
>> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>>>
>>> Also, I want to point out John's recent PR that added a way to inject a
>>> Clock that is used for timestamp generation:
>>> https://github.com/apache/iceberg/pull/1389
>>>
>>> That fits nicely with the requirements here and would be an easy way to
>>> inject your own time, synchronized by an external service.
>>>
>>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
>>> wrote:
>>>
>>>> Quick question below about the proposed usage of the timestamp:
>>>>
>>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <miw...@adobe.com.INVALID> wrote:
>>>>
>>>> +1 Openlnx’s comment on implementation.
>>>>
>>>> Only if we have an external timing synchronization service and enforce
>>>> all clients using the service, timestamps of different clients are not
>>>> comparable.
>>>>
>>>>
>>>> Do we want to use the timestamp as the real timestamp of the last
>>>> change, or we want to use it only as a monotonously increasing more human
>>>> readable identifier?
>>>> Do we want to compare this timestamp against some external source, or
>>>> we just want to compare this timestamp with other timestamps in the
>>>> different snapshots of the same table?
>>>>
>>>>
>>>> So, there are two asks: 1). Whether to have a timestamp based API for
>>>> delta reading; 2). How to enforce and implement a service/protocol for
>>>> timestamp sync among all clients.
>>>>
>>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could
>>>> be source of truth in any cases.
>>>>
>>>> 2). IMO, it should be an external package to Iceberg.
>>>>
>>>> Miao
>>>>
>>>> *From: *OpenInx <open...@gmail.com>
>>>> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>
>>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>>>> *To: *Iceberg Dev List <dev@iceberg.apache.org>
>>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>>>
>>>> I agree that  it's helpful to allow users to read the incremental delta
>>>> based timestamp,  as Jingsong said timestamp is more friendly.
>>>>
>>>> My question is how to implement this ?
>>>>
>>>>  If just attach the client's timestamp to the iceberg table when
>>>> committing,  then different clients may have different timestamp values
>>>> because of the skewing. In theory, these time values are not strictly
>>>> comparable, and can only be compared within the margin of error.
>>>>
>>>>
>>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>> +1 for timestamps are linear, in implementation, maybe the writer only
>>>> needs to look at the previous snapshot timestamp.
>>>>
>>>> We're trying to think of iceberg as a message queue, Let's take the
>>>> popular queue Kafka as an example,
>>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset
>>>> and timestamp:
>>>> - offset: It is used for incremental read, such as the state of a
>>>> checkpoint in a computing system.
>>>> - timestamp: It is explicitly specified by the user to specify the
>>>> scope of consumption. As start_timestamp of reading. Timestamp is a better
>>>> user aware interface. But offset/snapshotId is not human readable and
>>>> friendly.
>>>>
>>>> So there are scenarios where timestamp is used for incremental read.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>>
>>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <sudssf2...@gmail.com> wrote:
>>>>
>>>>
>>>> We are using incremental read for iceberg tables which gets quite few
>>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>>> snapshot ids and track state of last read snapshot Id.
>>>> We are using timestamp as fallback when the state is incorrect, but as
>>>> you mentioned if timestamps are linear then it works as expected.
>>>> We also found that incremental reader might be slow when dealing with >
>>>> 2k snapshots in range. we are currently testing a manifest based
>>>> incremental reader which looks at manifest entries instead of scanning
>>>> snapshot history and accessing each snapshot.
>>>>
>>>> Is there any reason you can't use snapshot based incremental read?
>>>>
>>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <gautamkows...@gmail.com> wrote:
>>>>
>>>> Hello Devs,
>>>>                    We are looking into adding workflows that read data
>>>> incrementally based on commit time. The ability to read deltas between
>>>> start / end commit timestamps on a table and ability to resume reading from
>>>> last read end timestamp. In that regard, we need the timestamps to be
>>>> linear in the current active snapshot history (newer versions always have
>>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>>> newer, there isn't a check to ensure timestamps are linear.
>>>>
>>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>>> are slightly off (say by a couple seconds), are committing frequently,
>>>> clientB might get to commit after clientA even if it's new snapshot
>>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>
>>>>
>>>> On the other hand, restricting commits due to out-of-order timestamps
>>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>>> might want to enforce based on System.currentTimeMillis(). Although if
>>>> clients had a way to define their own globally synchronized timestamps
>>>> (using external service or some monotonically increasing UUID) then iceberg
>>>> could allow an API to set that on the snapshot or use that instead of
>>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>> Is this a concern others have? If so how are folks handling this today
>>>> or are they not exposing such a feature at all due to the inherent
>>>> distributed timing problem? Would like to hear how others are
>>>> thinking/going about this. Thoughts?
>>>>
>>>> Cheers,
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to