Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector

2025-03-02 Thread Shengkai Fang
Hi, Gabor. Thanks for your the FLIP. I have some questions about the FLIP:

1. State TTL for Value Columns
How can users retrieve the state TTL (Time-to-Live) for each value column?
>From my understanding of the current design, it seems that this
functionality is not supported. Could you clarify if there are plans to
address this limitation?

2. Metadata Table vs. Metadata Column
The metadata information described in the FLIP appears to be intended to
describe the state files stored at a specific location. To me, this concept
aligns more closely with system tables like pg_tables in PostgreSQL [1] or
the INFORMATION_SCHEMA in MySQL [2].

If we opt to use metadata columns, every record in the table would end up
having identical values for these columns (please correct me if I’m
mistaken). On the other hand, the state connector requires users to specify
an operator UID or operator UID hash, after which it outputs user-defined
values in its records. This approach feels somewhat redundant to me.

3. Handling LIST and MAP States in the State Connector
I have concerns about how the current design handles LIST and MAP states.
Specifically, the state connector uses Flink SQL’s MAP and ARRAY types,
which implies that it attempts to load entire MAP or LIST states into
memory.

However, in many real-world scenarios, these states can grow very large.
Typically, the state API addresses this by providing an iterator to
traverse elements within the state incrementally. I’m unsure whether I’ve
missed something in FLIP-496 or FLIP-512, but it seems that the current
design might struggle with scalability in such cases.

Best,
Shengkai

[1] https://www.postgresql.org/docs/current/view-pg-tables.html
[2]
https://dev.mysql.com/doc/refman/8.4/en/information-schema-tables-table.html

Gabor Somogyi  于2025年3月3日周一 02:00写道:

> Hi Zakelly,
>
> In order to shoot for simplicity `METADATA VIRTUAL` as key words for
> definition is the target.
> When it's not super complex the latter can be added too.
>
> BR,
> G
>
>
> On Sun, Mar 2, 2025 at 3:37 PM Zakelly Lan  wrote:
>
> > Hi Gabor,
> >
> > +1 for this.
> >
> > Will the metadata column use `METADATA VIRTUAL` as key words for
> > definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the
> > Kafka table?
> >
> >
> > Best,
> > Zakelly
> >
> > On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi 
> > wrote:
> >
> > > Hi All,
> > >
> > > I'd like to start a discussion of FLIP-512: Add meta information to SQL
> > > state connector [1].
> > > Feel free to add your thoughts to make this feature better.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector
> > >
> > > BR,
> > > G
> > >
> >
>


Re:Re: Re: FLIP-510: Drop ChangelogNormalize for operations which don't need it

2025-03-02 Thread Xuyang
Hi, Dawid.

Thanks for your response. I believe I've identified a key point, but I’m a bit 
unclear about the 

following you said. Could you please provide an example for clarification?

```

The only missing information is if the external sink can consume deletes by key 
and if a source

produces full deletes or deletes by key.

```

From my understanding, for a sink, if its schema includes a primary key, we can 
assume it has 

the ability to process delete messages (with '-D') and perform deletions by key 
(PK). If it does not 

include a PK, we would implicitly treat it as a log-structured table that 
supports full row deletions. 

Given that you mentioned `PARTIAL_DELETE`, should I interpret this as referring 
to a scenario 

similar to wide tables, where if the sink has a PK, some columns are deleted 
(set to null or through 

other operations) while others remain unchanged?

Looking forward your reply.







--

Best!
Xuyang





At 2025-02-28 19:16:12, "Dawid Wysakowicz"  wrote:
>Hey Xuyang,
>Ad. 1
>Yes, you're right, but we already do that for determining if we need
>UPDATE_BEFORE or not. FlinkChangelogModeInferenceProgram already deals with
>that.
>Ad. 2
>Unfortunately it is. This is also the only reason I need a FLIP. We can
>determine internally for every internal operator if we can work with
>partial deletes or if we need full deletes. The only missing information is
>if the external sink can consume deletes by key and if a source produces
>full deletes or deletes by key. Unfortunately this is information that
>comes from a connector implementation and thus needs to be provided via a
>public API.
>Ad. 3
>With ChangelogMode#kinds -> to some degree yes. We theoretically could
>split RowKind#DELETE to RowKind#DELETE_BY_KEY and RowKind#FULL_DELETE.
>However, that change would 1) be much more involved 2) we would need to
>encode that information in every single message, which I think is not
>necessary. I don't think it has much to do with PK.
>Ad.4
>I don't think so. PK information is part of Schema not about the kind of
>messages. We don't have PK information for UPDATE_BEFORE/UPDATE_AFTER and
>they also apply per key. If the name containing `DELETE_BY_KEY` is
>confusing I am happy to rename it to e.g. PARTIAL_DELETE, therefore I'd add
>`supportsPartialDeletes`
>
>Best,
>Dawid
>
>On Fri, 28 Feb 2025 at 04:43, Xuyang  wrote:
>
>> Hi Dawid.
>>
>>
>>
>>
>> Big +1 for this FLIP. After reading through it, I have a few questions and
>> would appreciate your responses:
>>
>> 1. IIUC, we only need to provide additional information in the
>> `FlinkChangelogModeInferenceProgram` to enable the
>>
>> inference program to determine whether it is safe to remove
>> `ChangelogNormalize`. My first instinct is that we need to
>>
>> know if all subsequent output-side nodes consuming Upsert Keys include the
>> Upsert Keys provided by the input-side operator (source).
>>
>> If this condition is met, we can safely eliminate `ChangelogNormalize`.
>> Perhaps, I have missed some important points, so please feel
>>
>> free to correct me if necessary.
>>
>> 2. The introduction of `supportsDeleteByKey` in ChangelogMode seems to
>> exist solely as auxiliary information for the
>>
>> `FlinkChangelogModeInferenceProgram`. If that's the case, it doesn't seem
>> necessary to expose it in the public API, does it?
>>
>> 3. If the purpose of introducing `supportsDeleteByKey` in ChangelogMode is
>> to facilitate support for `#fromChangelogStream`
>>
>> and `#toChangelogStream`, it appears that `supportsDeleteByKey` might
>> overlap with ChangelogMode#kinds and Schema#PK
>>
>> to some extent, right?
>>
>> 4. Regarding supportsDeleteByKey, as part of a complete ChangelogMode
>> entity, should we also store the specific key information?
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> 在 2025-02-28 04:27:19,"Martijn Visser"  写道:
>> >Hi Dawid,
>> >
>> >Thanks for the FLIP, looks like a good improvement for me that will bring
>> a
>> >lot of benefits. +1
>> >
>> >Best regards,
>> >
>> >Martijn
>> >
>> >On Tue, Feb 25, 2025 at 6:51 AM Sergey Nuyanzin 
>> wrote:
>> >
>> >> +1 for such improvement
>> >>
>> >> On Mon, Feb 24, 2025 at 12:01 PM Dawid Wysakowicz
>> >>  wrote:
>> >> >
>> >> > Hi everyone,
>> >> >
>> >> > I would like to initiate a discussion for the FLIP-510[1] below, which
>> >> aims
>> >> > on optimising certain use cases in SQL which at the moment add
>> >> > ChangelogNormalize, but don't necessarily need to do it.
>> >> >
>> >> > Looking forward to hearing from you.
>> >> >
>> >> > [1] https://cwiki.apache.org/confluence/x/7o5EF
>> >>
>> >>
>> >>
>> >> --
>> >> Best regards,
>> >> Sergey
>> >>
>>


Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector

2025-03-02 Thread Gabor Somogyi
Hi Zakelly,

In order to shoot for simplicity `METADATA VIRTUAL` as key words for
definition is the target.
When it's not super complex the latter can be added too.

BR,
G


On Sun, Mar 2, 2025 at 3:37 PM Zakelly Lan  wrote:

> Hi Gabor,
>
> +1 for this.
>
> Will the metadata column use `METADATA VIRTUAL` as key words for
> definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the
> Kafka table?
>
>
> Best,
> Zakelly
>
> On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi 
> wrote:
>
> > Hi All,
> >
> > I'd like to start a discussion of FLIP-512: Add meta information to SQL
> > state connector [1].
> > Feel free to add your thoughts to make this feature better.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector
> >
> > BR,
> > G
> >
>


Flink vs beam-flink-runner benchmark

2025-03-02 Thread Taher Koitawala
Hi All,
  Curious question! Has someone done a benchmark on native flink vs
beam-flink-runner?

Im wanting to know or ask, if there are differences in the following areas(
Consider Streaming only)
1. Network stack
2. Operators
3. Usecases that would be faster in flink or beam flink runner
4. Chaining

I ask this to assess what is a better approach for us as we are wanting to
choose an engine for our stream processing platform.


Regards,
Taher Koitawala


[jira] [Created] (FLINK-37408) Update all docker-compose references to docker compose in documentation

2025-03-02 Thread Nil Madhab (Jira)
Nil Madhab created FLINK-37408:
--

 Summary: Update all docker-compose references to docker compose in 
documentation
 Key: FLINK-37408
 URL: https://issues.apache.org/jira/browse/FLINK-37408
 Project: Flink
  Issue Type: Improvement
Reporter: Nil Madhab


h2. Summary

Replace all instances of "docker-compose" with "docker compose" throughout the 
Flink documentation to align with Docker's current CLI naming convention.
h2. Description

Docker has officially deprecated the hyphenated "docker-compose" command in 
favor of "docker compose" (without the hyphen) as part of their CLI 
restructuring. The hyphenated version is still functional but considered 
legacy. We should update all references in the Flink documentation to follow 
this recommendation and avoid using deprecated terminology.


h2. Background

Docker introduced the compose command as a built-in subcommand starting with 
Docker v20.10.x, gradually moving away from the standalone docker-compose 
binary. Using "docker compose" is now the recommended approach, while 
"docker-compose" is maintained for backward compatibility.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector

2025-03-02 Thread Gabor Somogyi
Hi Shengkai,

Thanks for the questions. This week I'm off but will answer them when I'm
back.

G


On Mon, Mar 3, 2025, 07:07 Shengkai Fang  wrote:

> Hi, Gabor. Thanks for your the FLIP. I have some questions about the FLIP:
>
> 1. State TTL for Value Columns
> How can users retrieve the state TTL (Time-to-Live) for each value column?
> From my understanding of the current design, it seems that this
> functionality is not supported. Could you clarify if there are plans to
> address this limitation?
>
> 2. Metadata Table vs. Metadata Column
> The metadata information described in the FLIP appears to be intended to
> describe the state files stored at a specific location. To me, this concept
> aligns more closely with system tables like pg_tables in PostgreSQL [1] or
> the INFORMATION_SCHEMA in MySQL [2].
>
> If we opt to use metadata columns, every record in the table would end up
> having identical values for these columns (please correct me if I’m
> mistaken). On the other hand, the state connector requires users to specify
> an operator UID or operator UID hash, after which it outputs user-defined
> values in its records. This approach feels somewhat redundant to me.
>
> 3. Handling LIST and MAP States in the State Connector
> I have concerns about how the current design handles LIST and MAP states.
> Specifically, the state connector uses Flink SQL’s MAP and ARRAY types,
> which implies that it attempts to load entire MAP or LIST states into
> memory.
>
> However, in many real-world scenarios, these states can grow very large.
> Typically, the state API addresses this by providing an iterator to
> traverse elements within the state incrementally. I’m unsure whether I’ve
> missed something in FLIP-496 or FLIP-512, but it seems that the current
> design might struggle with scalability in such cases.
>
> Best,
> Shengkai
>
> [1] https://www.postgresql.org/docs/current/view-pg-tables.html
> [2]
>
> https://dev.mysql.com/doc/refman/8.4/en/information-schema-tables-table.html
>
> Gabor Somogyi  于2025年3月3日周一 02:00写道:
>
> > Hi Zakelly,
> >
> > In order to shoot for simplicity `METADATA VIRTUAL` as key words for
> > definition is the target.
> > When it's not super complex the latter can be added too.
> >
> > BR,
> > G
> >
> >
> > On Sun, Mar 2, 2025 at 3:37 PM Zakelly Lan 
> wrote:
> >
> > > Hi Gabor,
> > >
> > > +1 for this.
> > >
> > > Will the metadata column use `METADATA VIRTUAL` as key words for
> > > definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the
> > > Kafka table?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I'd like to start a discussion of FLIP-512: Add meta information to
> SQL
> > > > state connector [1].
> > > > Feel free to add your thoughts to make this feature better.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector
> > > >
> > > > BR,
> > > > G
> > > >
> > >
> >
>


[jira] [Created] (FLINK-37409) Catch throwable in BinlogSplitReader to exit reading binlog when error occurs

2025-03-02 Thread Xiao Huang (Jira)
Xiao Huang created FLINK-37409:
--

 Summary: Catch throwable in BinlogSplitReader to exit reading 
binlog when error occurs
 Key: FLINK-37409
 URL: https://issues.apache.org/jira/browse/FLINK-37409
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Xiao Huang


BinlogSplitReader catchs *Exception* now when executing binlogSplitReaderTask, 
which cannot catch *Errors* such as OutOfMemoryError.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37410) Split-level Watermark Metrics

2025-03-02 Thread Efrat Levitan (Jira)
Efrat Levitan created FLINK-37410:
-

 Summary: Split-level Watermark Metrics
 Key: FLINK-37410
 URL: https://issues.apache.org/jira/browse/FLINK-37410
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Metrics
Reporter: Efrat Levitan
 Fix For: 2.1.0


Captures the work allowing split level visibility over watermark progress on 
sources



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] Split-level Watermark Metrics

2025-03-02 Thread אפרת לויטן
Hey everyone!
I'd like to propose adding a few watermark related metrics for better
visibility on split level watermark alignment and idleness states
In addition to per-split watermark, I want to export the split state
(active, idle and paused) timers, same as taskIO
busy/idle/backpressured time reporting:

   - Idle clock will tick once a split was marked idle by idleness
   detection, until it emits a watermark (or marked paused)
   - Paused clock logs time since a split was added to pausedSplits list by
   sourceOperator due to watermark alignment, until it is allowed to resume,
   (or marked idle)
   - Active time will be the amount of milliseconds the split was neither
   idle nor paused.

For more details, please refer to the FLIP
https://cwiki.apache.org/confluence/display/FLINK/FLIP-513%3A+Split-level+Watermark+Metrics
Jira https://issues.apache.org/jira/browse/FLINK-37410
wdyt?


Re: [DISCUSS] FLIP-512: Add meta information to SQL state connector

2025-03-02 Thread Zakelly Lan
Hi Gabor,

+1 for this.

Will the metadata column use `METADATA VIRTUAL` as key words for
definition, or `METADATA FROM xxx VIRTUAL` for renaming, just like the
Kafka table?


Best,
Zakelly

On Sat, Mar 1, 2025 at 1:31 PM Gabor Somogyi 
wrote:

> Hi All,
>
> I'd like to start a discussion of FLIP-512: Add meta information to SQL
> state connector [1].
> Feel free to add your thoughts to make this feature better.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-512%3A+Add+meta+information+to+SQL+state+connector
>
> BR,
> G
>