Hi all. It seems that we've reached at an agreement. We'll rename "full" scan.mode to "latest-full" and "compacted" scan.mode to "compacted-full".
I've created a ticket about this [1] and will work on it soon. [1] https://issues.apache.org/jira/browse/FLINK-30410 Shammon FY <zjur...@gmail.com> 于2022年12月12日周一 16:00写道: > Hi jinsong, caizhi > > Thank you for your reply. > > I found that what really confused me was the starting position of the > streaming job to read data. As @caizhi mentioned in kafka, streaming jobs > only read incremental data and its definition is very clear. In the Table > Store, streaming jobs also have the mode of reading full data first, we > need special consideration for it. > > So I really agree with @jinsong's "Solution 3", it looks good to me. We > define that streaming and batch jobs have different read modes by default, > and read data according to the starting position defined by scan.mode. > Personally, I think it is easy for users to understand. For reading modes > beyond the default, for example, full reading for streaming jobs, add > suffixes such as "full" in scan.mode looks nice. In this way, the > definition will be clearer, and the read mode will be unified through > scan.mode. Thanks > > > Best, > Shammon > > > On Mon, Dec 12, 2022 at 2:49 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Thanks Shammon for bringing up the discussion. >> >> My opinion is to combine everything into scan.mode so that we don't >> have orthogonal options. >> >> You first mention that there are two disadvantages for this solution. >> >> *1. The behaviors of some StartupModes in Stream and Batch jobs are >> inconsistent* >> This is acceptable from my point of view, because streaming jobs and >> batch jobs are different after all. Streaming jobs should monitor new >> changes while batch jobs should only see the records when it is started. >> This behavior is expected by the users. >> >> *2. StartupMode does not define all data reading modes.* >> Not all reading modes are useful. For example, you mention a reading mode >> where user first want to read the snapshot at a timestamp, then read all >> incremental changes. Could you explain in what scenario a user will need >> this? >> >> About the solution you proposed, I think the biggest problem is the >> default value. That is, if we should read full snapshots by default. >> >> When user runs a streaming job with "from-timestamp", he expects to just >> read incremental changes after the timestamp, just like he is using a >> "from-timestamp" starting mode of Kafka. >> >> However, when user runs a streaming job without a starting timestamp, he >> expects the result to be correct. In order to provide correct result, we >> have to produce full snapshots first. >> >> So you can see that, for different streaming jobs, the default behavior >> about whether to read full snapshots is different. It is hard to pick a >> default value without disturbing the users. >> >> Jingsong Li <jingsongl...@gmail.com> 于2022年12月9日周五 17:33写道: >> >>> Thanks Shammon! >>> >>> Your summary was very good and very detailed. >>> >>> I thought about it again. >>> >>> ## Solution 1 >>> Actually, according to what you said, there should be so many modes in >>> theory. >>> - Runtime-mode: streaming or batch. >>> - Range: full or incremental. >>> - Position: Latest, timestamp, snapshot-id, compacted. >>> >>> Advantages: The disassembly is very detailed, and every action is very >>> clear. >>> Disadvantages: There are many combinations from orthogonality. In >>> combination with runtime-mode stream or batch, we can say that there >>> are 16 modes from orthogonality, many of which are meaningless. As you >>> said, default behavior is also a problem. >>> >>> ## Solution 2 >>> Currently [1]: >>> - The environment determines the runtime-mode whether it is streaming >>> or a batch. >>> - The `scan.mode` determines position. >>> - No specific option determines `range`, but it is determined by >>> runtime-mode. However, it is not completely determined by runtime >>> mode, such as `full` and `compacted`, which are also read in >>> full-range under the stream. >>> >>> Advantages: Simple. The default values of options are what we want for >>> streaming and batch. >>> Disadvantages: >>> 1. The semantics of from timestamp are different in the case of >>> streaming and batch. >>> 2. `full` and `compacted` are special. >>> >>> ## Solution 3 >>> >>> I understand that the core problem of solution2 may be more problem 2: >>> `full` and `compacted` are special. >>> How about: >>> - the runtime mode determines whether to read incremental only or full >>> data. >>> - `scan.mode` contains: Latest, timestamp, snapshot-id. >>> The default is full in batch mode and incremental in stream mode. >>> >>> However, we have two other choices for `scan.mode`: `latest-full`, >>> `compacted-full`. Regardless of the runtime-mode, the two choices >>> force full range to read. >>> >>> I think solution 3 is a compromise solution. It can also ensure the >>> availability of default values. Conceptually, it can at least explain >>> the current options. >>> >>> What do you think? >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/configuration/ >>> >>> Best, >>> Jingsong >>> >>> On Thu, Dec 8, 2022 at 4:11 PM Shammon FY <zjur...@gmail.com> wrote: >>> > >>> > Hi devs: >>> > >>> > I'm an engineer from ByteDance, and here I'd link to discuss "Scan >>> mode in Table Store for Flink Stream and Batch job". >>> > >>> > Users can execute Flink Steam and Batch jobs on Table Store. In Table >>> Store 0.2 there're two items which determine how the Stream and Batch jobs' >>> sources read data: StartupMode and config in Options. >>> > 1. StartupMode >>> > a) DEFAULT. Determines actual startup mode according to other table >>> properties. If \"scan.timestamp-millis\" is set, the actual startup mode >>> will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be >>> \"full\". >>> > b) FULL. For streaming sources, read the latest snapshot on the >>> table upon first startup, and continue to read the latest changes. For >>> batch sources, just consume the latest snapshot but do not read new changes. >>> > c) LATEST. For streaming sources, continuously reads the latest >>> changes without reading a snapshot at the beginning. For batch sources, >>> behaves the same as the \"full\" startup mode. >>> > d) FROM_TIMESTAMP. For streaming sources, continuously reads changes >>> starting from timestamp specified by \"scan.timestamp-millis\", without >>> reading a snapshot at the beginning. For batch sources, read a snapshot at >>> timestamp specified by \"scan.timestamp-millis\" but do not read new >>> changes. >>> > 2. Config in Options >>> > a) scan.timestamp-millis, log.scan.timestamp-millis. Optional >>> timestamp used in case of \"from-timestamp\" scan mode. >>> > b) read.compacted. Read the latest compact snapshots only. >>> > >>> > After discussing with @Jingsong Li and @Caizhi wen, we found that the >>> config in Options and StartupMode are not orthogonal. For example, >>> read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch >>> sources. We want to improve StartupMode to unify the data reading mode of >>> Stream and Batch jobs, and add the following StartupMode item: >>> > COMPACTED: For streaming sources, read a snapshot after the latest >>> compaction on the table upon first startup, and continue to read the latest >>> changes. For batch sources, just read a snapshot after the latest >>> compaction but do not read new changes. >>> > The advantage is that for Stream and Batch jobs, we only need to >>> determine their behavior through StartupMode, but we also found two main >>> problems: >>> > 1. The behaviors of some StartupModes in Stream and Batch jobs are >>> inconsistent, which may cause user misunderstanding, such as FROM_ >>> TIMESTAMP: streaming job reads incremental data, while batch job reads full >>> data >>> > 2. StartupMode does not define all data reading modes. For example, >>> streaming jobs read snapshots according to timestamp, and then read >>> incremental data. >>> > >>> > To support all data reading modes in Table Store such as time travel, >>> we try to divide data reading into two orthogonal dimensions: data reading >>> range and startup position. >>> > 1. Data reading range >>> > a) Incremental, read delta data >>> > b) Full, read a snapshot first, then read the incremental data >>> according to different job types (Streaming job). >>> > 2. Startup position >>> > a) Latest, read the latest snapshot. >>> > b) From-timestamp,read a snapshot according to the timestamp. >>> > c) From-snapshot-id,read a snapshot according to the snapshot id. >>> > d) Compacted,read the latest compacted snapshot. >>> > >>> > Then there're two questions: >>> > >>> > Q1: Does it need to divide into two configuration items, or combine >>> and unify them in StartupMode? >>> > In StartupMode, it can be unified into eight modes. The advantage is >>> that through StartupMode, we can clearly explain the behavior of stream and >>> batch jobs, but there are many defined items, some of which are meaningless >>> > There can be two configurations, users may need to understand the >>> different meanings of the two configurations. After combination, stream and >>> batch jobs will have different processing logic. >>> > >>> > Q2: In Table Store 0.2, there are some conflicting definitions in >>> Stream and Batch. Does Table Store 0.3 need to be fully compatible or >>> directly implemented according to the new definitions? E.g. behavior in >>> FROM_TIMESTAMP mode. >>> > Fully compatible with the definition of Table Store 0.2, users can >>> directly execute queries in 0.2 on 0.3, but semantic conflicts in the >>> original definition will always exist in the next versions. >>> > Only limited compatibility is available, and the default behavior is >>> implemented according to the new definition, which may result in the query >>> error of 0.3 when the user queries on 0.2 successfully. >>> > >>> > Look forward to your feedback, thanks >>> > >>> > Best, >>> > Shammon >>> > >>> > >>> > >>> >>