Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector
Hi, Gabor. Thanks for your the FLIP. I have some questions about the FLIP: 1. State TTL for Value Columns How can users retrieve the state TTL (Time-to-Live) for each value column? >From my understanding of the current design, it seems that this functionality is not supported. Could you clarify if there are plans to address this limitation? 2. Metadata Table vs. Metadata Column The metadata information described in the FLIP appears to be intended to describe the state files stored at a specific location. To me, this concept aligns more closely with system tables like pg_tables in PostgreSQL [1] or the INFORMATION_SCHEMA in MySQL [2]. If we opt to use metadata columns, every record in the table would end up having identical values for these columns (please correct me if I’m mistaken). On the other hand, the state connector requires users to specify an operator UID or operator UID hash, after which it outputs user-defined values in its records. This approach feels somewhat redundant to me. 3. Handling LIST and MAP States in the State Connector I have concerns about how the current design handles LIST and MAP states. Specifically, the state connector uses Flink SQL’s MAP and ARRAY types, which implies that it attempts to load entire MAP or LIST states into memory. However, in many real-world scenarios, these states can grow very large. Typically, the state API addresses this by providing an iterator to traverse elements within the state incrementally. I’m unsure whether I’ve missed something in FLIP-496 or FLIP-512, but it seems that the current design might struggle with scalability in such cases. Best, Shengkai [1] https://www.postgresql.org/docs/current/view-pg-tables.html [2] https://dev.mysql.com/doc/refman/8.4/en/information-schema-tables-table.html Gabor Somogyi 于2025年3月3日周一 02:00写道: > Hi Zakelly, > > In order to shoot for simplicity `METADATA VIRTUAL` as key words for > definition is the target. > When it's not super complex the latter can be added too. > > BR, > G > > > On Sun, Mar 2, 2025 at 3:37 PM Zakelly Lan wrote: > > > Hi Gabor, > > > > +1 for this. > > > > Will the metadata column use `METADATA VIRTUAL` as key words for > > definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the > > Kafka table? > > > > > > Best, > > Zakelly > > > > On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi > > wrote: > > > > > Hi All, > > > > > > I'd like to start a discussion of FLIP-512: Add meta information to SQL > > > state connector [1]. > > > Feel free to add your thoughts to make this feature better. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector > > > > > > BR, > > > G > > > > > >
Re:Re: Re: FLIP-510: Drop ChangelogNormalize for operations which don't need it
Hi, Dawid. Thanks for your response. I believe I've identified a key point, but I’m a bit unclear about the following you said. Could you please provide an example for clarification? ``` The only missing information is if the external sink can consume deletes by key and if a source produces full deletes or deletes by key. ``` From my understanding, for a sink, if its schema includes a primary key, we can assume it has the ability to process delete messages (with '-D') and perform deletions by key (PK). If it does not include a PK, we would implicitly treat it as a log-structured table that supports full row deletions. Given that you mentioned `PARTIAL_DELETE`, should I interpret this as referring to a scenario similar to wide tables, where if the sink has a PK, some columns are deleted (set to null or through other operations) while others remain unchanged? Looking forward your reply. -- Best! Xuyang At 2025-02-28 19:16:12, "Dawid Wysakowicz" wrote: >Hey Xuyang, >Ad. 1 >Yes, you're right, but we already do that for determining if we need >UPDATE_BEFORE or not. FlinkChangelogModeInferenceProgram already deals with >that. >Ad. 2 >Unfortunately it is. This is also the only reason I need a FLIP. We can >determine internally for every internal operator if we can work with >partial deletes or if we need full deletes. The only missing information is >if the external sink can consume deletes by key and if a source produces >full deletes or deletes by key. Unfortunately this is information that >comes from a connector implementation and thus needs to be provided via a >public API. >Ad. 3 >With ChangelogMode#kinds -> to some degree yes. We theoretically could >split RowKind#DELETE to RowKind#DELETE_BY_KEY and RowKind#FULL_DELETE. >However, that change would 1) be much more involved 2) we would need to >encode that information in every single message, which I think is not >necessary. I don't think it has much to do with PK. >Ad.4 >I don't think so. PK information is part of Schema not about the kind of >messages. We don't have PK information for UPDATE_BEFORE/UPDATE_AFTER and >they also apply per key. If the name containing `DELETE_BY_KEY` is >confusing I am happy to rename it to e.g. PARTIAL_DELETE, therefore I'd add >`supportsPartialDeletes` > >Best, >Dawid > >On Fri, 28 Feb 2025 at 04:43, Xuyang wrote: > >> Hi Dawid. >> >> >> >> >> Big +1 for this FLIP. After reading through it, I have a few questions and >> would appreciate your responses: >> >> 1. IIUC, we only need to provide additional information in the >> `FlinkChangelogModeInferenceProgram` to enable the >> >> inference program to determine whether it is safe to remove >> `ChangelogNormalize`. My first instinct is that we need to >> >> know if all subsequent output-side nodes consuming Upsert Keys include the >> Upsert Keys provided by the input-side operator (source). >> >> If this condition is met, we can safely eliminate `ChangelogNormalize`. >> Perhaps, I have missed some important points, so please feel >> >> free to correct me if necessary. >> >> 2. The introduction of `supportsDeleteByKey` in ChangelogMode seems to >> exist solely as auxiliary information for the >> >> `FlinkChangelogModeInferenceProgram`. If that's the case, it doesn't seem >> necessary to expose it in the public API, does it? >> >> 3. If the purpose of introducing `supportsDeleteByKey` in ChangelogMode is >> to facilitate support for `#fromChangelogStream` >> >> and `#toChangelogStream`, it appears that `supportsDeleteByKey` might >> overlap with ChangelogMode#kinds and Schema#PK >> >> to some extent, right? >> >> 4. Regarding supportsDeleteByKey, as part of a complete ChangelogMode >> entity, should we also store the specific key information? >> >> >> >> >> >> >> >> -- >> >> Best! >> Xuyang >> >> >> >> >> >> 在 2025-02-28 04:27:19,"Martijn Visser" 写道: >> >Hi Dawid, >> > >> >Thanks for the FLIP, looks like a good improvement for me that will bring >> a >> >lot of benefits. +1 >> > >> >Best regards, >> > >> >Martijn >> > >> >On Tue, Feb 25, 2025 at 6:51 AM Sergey Nuyanzin >> wrote: >> > >> >> +1 for such improvement >> >> >> >> On Mon, Feb 24, 2025 at 12:01 PM Dawid Wysakowicz >> >> wrote: >> >> > >> >> > Hi everyone, >> >> > >> >> > I would like to initiate a discussion for the FLIP-510[1] below, which >> >> aims >> >> > on optimising certain use cases in SQL which at the moment add >> >> > ChangelogNormalize, but don't necessarily need to do it. >> >> > >> >> > Looking forward to hearing from you. >> >> > >> >> > [1] https://cwiki.apache.org/confluence/x/7o5EF >> >> >> >> >> >> >> >> -- >> >> Best regards, >> >> Sergey >> >> >>
Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector
Hi Zakelly, In order to shoot for simplicity `METADATA VIRTUAL` as key words for definition is the target. When it's not super complex the latter can be added too. BR, G On Sun, Mar 2, 2025 at 3:37 PM Zakelly Lan wrote: > Hi Gabor, > > +1 for this. > > Will the metadata column use `METADATA VIRTUAL` as key words for > definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the > Kafka table? > > > Best, > Zakelly > > On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi > wrote: > > > Hi All, > > > > I'd like to start a discussion of FLIP-512: Add meta information to SQL > > state connector [1]. > > Feel free to add your thoughts to make this feature better. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector > > > > BR, > > G > > >
Flink vs beam-flink-runner benchmark
Hi All, Curious question! Has someone done a benchmark on native flink vs beam-flink-runner? Im wanting to know or ask, if there are differences in the following areas( Consider Streaming only) 1. Network stack 2. Operators 3. Usecases that would be faster in flink or beam flink runner 4. Chaining I ask this to assess what is a better approach for us as we are wanting to choose an engine for our stream processing platform. Regards, Taher Koitawala
[jira] [Created] (FLINK-37408) Update all docker-compose references to docker compose in documentation
Nil Madhab created FLINK-37408: -- Summary: Update all docker-compose references to docker compose in documentation Key: FLINK-37408 URL: https://issues.apache.org/jira/browse/FLINK-37408 Project: Flink Issue Type: Improvement Reporter: Nil Madhab h2. Summary Replace all instances of "docker-compose" with "docker compose" throughout the Flink documentation to align with Docker's current CLI naming convention. h2. Description Docker has officially deprecated the hyphenated "docker-compose" command in favor of "docker compose" (without the hyphen) as part of their CLI restructuring. The hyphenated version is still functional but considered legacy. We should update all references in the Flink documentation to follow this recommendation and avoid using deprecated terminology. h2. Background Docker introduced the compose command as a built-in subcommand starting with Docker v20.10.x, gradually moving away from the standalone docker-compose binary. Using "docker compose" is now the recommended approach, while "docker-compose" is maintained for backward compatibility. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector
Hi Shengkai, Thanks for the questions. This week I'm off but will answer them when I'm back. G On Mon, Mar 3, 2025, 07:07 Shengkai Fang wrote: > Hi, Gabor. Thanks for your the FLIP. I have some questions about the FLIP: > > 1. State TTL for Value Columns > How can users retrieve the state TTL (Time-to-Live) for each value column? > From my understanding of the current design, it seems that this > functionality is not supported. Could you clarify if there are plans to > address this limitation? > > 2. Metadata Table vs. Metadata Column > The metadata information described in the FLIP appears to be intended to > describe the state files stored at a specific location. To me, this concept > aligns more closely with system tables like pg_tables in PostgreSQL [1] or > the INFORMATION_SCHEMA in MySQL [2]. > > If we opt to use metadata columns, every record in the table would end up > having identical values for these columns (please correct me if I’m > mistaken). On the other hand, the state connector requires users to specify > an operator UID or operator UID hash, after which it outputs user-defined > values in its records. This approach feels somewhat redundant to me. > > 3. Handling LIST and MAP States in the State Connector > I have concerns about how the current design handles LIST and MAP states. > Specifically, the state connector uses Flink SQL’s MAP and ARRAY types, > which implies that it attempts to load entire MAP or LIST states into > memory. > > However, in many real-world scenarios, these states can grow very large. > Typically, the state API addresses this by providing an iterator to > traverse elements within the state incrementally. I’m unsure whether I’ve > missed something in FLIP-496 or FLIP-512, but it seems that the current > design might struggle with scalability in such cases. > > Best, > Shengkai > > [1] https://www.postgresql.org/docs/current/view-pg-tables.html > [2] > > https://dev.mysql.com/doc/refman/8.4/en/information-schema-tables-table.html > > Gabor Somogyi 于2025年3月3日周一 02:00写道: > > > Hi Zakelly, > > > > In order to shoot for simplicity `METADATA VIRTUAL` as key words for > > definition is the target. > > When it's not super complex the latter can be added too. > > > > BR, > > G > > > > > > On Sun, Mar 2, 2025 at 3:37 PM Zakelly Lan > wrote: > > > > > Hi Gabor, > > > > > > +1 for this. > > > > > > Will the metadata column use `METADATA VIRTUAL` as key words for > > > definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the > > > Kafka table? > > > > > > > > > Best, > > > Zakelly > > > > > > On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi < > gabor.g.somo...@gmail.com> > > > wrote: > > > > > > > Hi All, > > > > > > > > I'd like to start a discussion of FLIP-512: Add meta information to > SQL > > > > state connector [1]. > > > > Feel free to add your thoughts to make this feature better. > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector > > > > > > > > BR, > > > > G > > > > > > > > > >
[jira] [Created] (FLINK-37409) Catch throwable in BinlogSplitReader to exit reading binlog when error occurs
Xiao Huang created FLINK-37409: -- Summary: Catch throwable in BinlogSplitReader to exit reading binlog when error occurs Key: FLINK-37409 URL: https://issues.apache.org/jira/browse/FLINK-37409 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Xiao Huang BinlogSplitReader catchs *Exception* now when executing binlogSplitReaderTask, which cannot catch *Errors* such as OutOfMemoryError. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37410) Split-level Watermark Metrics
Efrat Levitan created FLINK-37410: - Summary: Split-level Watermark Metrics Key: FLINK-37410 URL: https://issues.apache.org/jira/browse/FLINK-37410 Project: Flink Issue Type: New Feature Components: Runtime / Metrics Reporter: Efrat Levitan Fix For: 2.1.0 Captures the work allowing split level visibility over watermark progress on sources -- This message was sent by Atlassian Jira (v8.20.10#820010)
[DISCUSS] Split-level Watermark Metrics
Hey everyone! I'd like to propose adding a few watermark related metrics for better visibility on split level watermark alignment and idleness states In addition to per-split watermark, I want to export the split state (active, idle and paused) timers, same as taskIO busy/idle/backpressured time reporting: - Idle clock will tick once a split was marked idle by idleness detection, until it emits a watermark (or marked paused) - Paused clock logs time since a split was added to pausedSplits list by sourceOperator due to watermark alignment, until it is allowed to resume, (or marked idle) - Active time will be the amount of milliseconds the split was neither idle nor paused. For more details, please refer to the FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-513%3A+Split-level+Watermark+Metrics Jira https://issues.apache.org/jira/browse/FLINK-37410 wdyt?
Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector
Hi Gabor, +1 for this. Will the metadata column use `METADATA VIRTUAL` as key words for definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the Kafka table? Best, Zakelly On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi wrote: > Hi All, > > I'd like to start a discussion of FLIP-512: Add meta information to SQL > state connector [1]. > Feel free to add your thoughts to make this feature better. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector > > BR, > G >