Hi devs:

I'm an engineer from ByteDance, and here I'd link to discuss "Scan mode in
Table Store for Flink Stream and Batch job".

Users can execute Flink Steam and Batch jobs on Table Store. In Table Store
0.2 there're two items which determine how the Stream and Batch jobs'
sources read data: StartupMode and config in Options.
1. StartupMode
  a) DEFAULT. Determines actual startup mode according to other table
properties.  If \"scan.timestamp-millis\" is set, the actual startup mode
will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be
\"full\".
  b) FULL. For streaming sources, read the latest snapshot on the table
upon first startup, and continue to read the latest changes. For batch
sources, just consume the latest snapshot but do not read new changes.
  c) LATEST. For streaming sources, continuously reads the latest changes
without reading a snapshot at the beginning.  For batch sources, behaves
the same as the \"full\" startup mode.
  d) FROM_TIMESTAMP. For streaming sources, continuously reads changes
starting from timestamp specified by \"scan.timestamp-millis\", without
reading a snapshot at the beginning. For batch sources, read a snapshot at
timestamp specified by \"scan.timestamp-millis\" but do not read new
changes.
2. Config in Options
  a) scan.timestamp-millis, log.scan.timestamp-millis. Optional timestamp
used in case of \"from-timestamp\" scan mode.
  b) read.compacted. Read the latest compact snapshots only.

After discussing with @Jingsong Li and @Caizhi wen, we found that the
config in Options and StartupMode are not orthogonal. For example,
read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch
sources. We want to improve StartupMode to unify the data reading mode of
Stream and Batch jobs, and add the following StartupMode item:
COMPACTED: For streaming sources, read a snapshot after the latest
compaction on the table upon first startup, and continue to read the latest
changes. For batch sources, just read a snapshot after the latest
compaction but do not read new changes.
The advantage is that for Stream and Batch jobs, we only need to determine
their behavior through StartupMode, but we also found two main problems:
1. The behaviors of some StartupModes in Stream and Batch jobs are
inconsistent, which may cause user misunderstanding, such as FROM_
TIMESTAMP: streaming job reads incremental data, while batch job reads full
data
2. StartupMode does not define all data reading modes. For example,
streaming jobs read snapshots according to timestamp, and then read
incremental data.

To support all data reading modes in Table Store such as time travel, we
try to divide data reading into two orthogonal dimensions: data reading
range and startup position.
1. Data reading range
  a) Incremental, read delta data
  b) Full, read a snapshot first, then read the incremental data according
to different job types (Streaming job).
2. Startup position
  a) Latest, read the latest snapshot.
  b) From-timestamp,read a snapshot according to the timestamp.
  c) From-snapshot-id,read a snapshot according to the snapshot id.
  d) Compacted,read the latest compacted snapshot.

Then there're two questions:

Q1: Does it need to divide into two configuration items, or combine and
unify them in StartupMode?
In StartupMode, it can be unified into eight modes. The advantage is that
through StartupMode, we can clearly explain the behavior of stream and
batch jobs, but there are many defined items, some of which are meaningless
There can be two configurations, users may need to understand the different
meanings of the two configurations. After combination, stream and batch
jobs will have different processing logic.

Q2: In Table Store 0.2, there are some conflicting definitions in Stream
and Batch. Does Table Store 0.3 need to be fully compatible or directly
implemented according to the new definitions? E.g. behavior in
FROM_TIMESTAMP mode.
Fully compatible with the definition of Table Store 0.2, users can directly
execute queries in 0.2 on 0.3, but semantic conflicts in the original
definition will always exist in the next versions.
Only limited compatibility is available, and the default behavior is
implemented according to the new definition, which may result in the query
error of 0.3 when the user queries on 0.2 successfully.

Look forward to your feedback, thanks

Best,
Shammon

Reply via email to