Hi devs: I'm an engineer from ByteDance, and here I'd link to discuss "Scan mode in Table Store for Flink Stream and Batch job".
Users can execute Flink Steam and Batch jobs on Table Store. In Table Store 0.2 there're two items which determine how the Stream and Batch jobs' sources read data: StartupMode and config in Options. 1. StartupMode a) DEFAULT. Determines actual startup mode according to other table properties. If \"scan.timestamp-millis\" is set, the actual startup mode will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be \"full\". b) FULL. For streaming sources, read the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just consume the latest snapshot but do not read new changes. c) LATEST. For streaming sources, continuously reads the latest changes without reading a snapshot at the beginning. For batch sources, behaves the same as the \"full\" startup mode. d) FROM_TIMESTAMP. For streaming sources, continuously reads changes starting from timestamp specified by \"scan.timestamp-millis\", without reading a snapshot at the beginning. For batch sources, read a snapshot at timestamp specified by \"scan.timestamp-millis\" but do not read new changes. 2. Config in Options a) scan.timestamp-millis, log.scan.timestamp-millis. Optional timestamp used in case of \"from-timestamp\" scan mode. b) read.compacted. Read the latest compact snapshots only. After discussing with @Jingsong Li and @Caizhi wen, we found that the config in Options and StartupMode are not orthogonal. For example, read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch sources. We want to improve StartupMode to unify the data reading mode of Stream and Batch jobs, and add the following StartupMode item: COMPACTED: For streaming sources, read a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just read a snapshot after the latest compaction but do not read new changes. The advantage is that for Stream and Batch jobs, we only need to determine their behavior through StartupMode, but we also found two main problems: 1. The behaviors of some StartupModes in Stream and Batch jobs are inconsistent, which may cause user misunderstanding, such as FROM_ TIMESTAMP: streaming job reads incremental data, while batch job reads full data 2. StartupMode does not define all data reading modes. For example, streaming jobs read snapshots according to timestamp, and then read incremental data. To support all data reading modes in Table Store such as time travel, we try to divide data reading into two orthogonal dimensions: data reading range and startup position. 1. Data reading range a) Incremental, read delta data b) Full, read a snapshot first, then read the incremental data according to different job types (Streaming job). 2. Startup position a) Latest, read the latest snapshot. b) From-timestamp,read a snapshot according to the timestamp. c) From-snapshot-id,read a snapshot according to the snapshot id. d) Compacted,read the latest compacted snapshot. Then there're two questions: Q1: Does it need to divide into two configuration items, or combine and unify them in StartupMode? In StartupMode, it can be unified into eight modes. The advantage is that through StartupMode, we can clearly explain the behavior of stream and batch jobs, but there are many defined items, some of which are meaningless There can be two configurations, users may need to understand the different meanings of the two configurations. After combination, stream and batch jobs will have different processing logic. Q2: In Table Store 0.2, there are some conflicting definitions in Stream and Batch. Does Table Store 0.3 need to be fully compatible or directly implemented according to the new definitions? E.g. behavior in FROM_TIMESTAMP mode. Fully compatible with the definition of Table Store 0.2, users can directly execute queries in 0.2 on 0.3, but semantic conflicts in the original definition will always exist in the next versions. Only limited compatibility is available, and the default behavior is implemented according to the new definition, which may result in the query error of 0.3 when the user queries on 0.2 successfully. Look forward to your feedback, thanks Best, Shammon