[jira] [Created] (FLINK-37622) Exactly once Kafka sink does not produce any records in batch mode

2025-04-07 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-37622:
---

 Summary: Exactly once Kafka sink does not produce any records in 
batch mode
 Key: FLINK-37622
 URL: https://issues.apache.org/jira/browse/FLINK-37622
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-3.4.0
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: kafka-4.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36648) Release flink-connector-kafka v4.0.0 for Flink 2.0

2025-04-07 Thread Arvid Heise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise reassigned FLINK-36648:
---

Assignee: Arvid Heise  (was: Yanquan Lv)

> Release flink-connector-kafka v4.0.0 for Flink 2.0
> --
>
> Key: FLINK-36648
> URL: https://issues.apache.org/jira/browse/FLINK-36648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.4.0
>Reporter: Yanquan Lv
>Assignee: Arvid Heise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-4.0.0
>
>
> Flink now already have a release for 2.0-preview1 version, Kafka, as one of 
> the most commonly used connectors, needs to be bump to this version as soon 
> as possible for users and developers to use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] fix ci failure due to code style [flink-cdc]

2025-04-07 Thread via GitHub


MOBIN-F commented on PR #3980:
URL: https://github.com/apache/flink-cdc/pull/3980#issuecomment-2785142817

   ci failed. I'll check again later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36684][cdc-connector][mysql] Support read changelog as append only mode [flink-cdc]

2025-04-07 Thread via GitHub


ruanhang1993 commented on code in PR #3708:
URL: https://github.com/apache/flink-cdc/pull/3708#discussion_r2032439969


##
docs/content/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -426,6 +426,19 @@ During a snapshot operation, the connector will query each 
included table to pro
 Experimental option, defaults to false.
   
 
+
+  scan.read-changelog-as-append-only.enabled
+  optional
+  false
+  Boolean
+  
+Whether to convert the changelog stream to an append-only stream.
+This feature is only used in special scenarios where you need to save 
upstream table deletion messages. For example, in a logical deletion scenario, 
users are not allowed to physically delete downstream messages. In this case, 
this feature is used in conjunction with the row_kind metadata field. 
Therefore, the downstream can save all detailed data at first, and then use the 
row_kind field to determine whether to perform logical deletion.
+The option values are as follows:
+  true: All types of messages (including INSERT, DELETE, 
UPDATE_BEFORE, and UPDATE_AFTER) will be converted into INSERT messages.
+  false (default): All types of messages are sent as is.
+  
+

Review Comment:
   The row-kind part need to be updated as in the content.zh.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36456] [Runtime/Configuration] Improve Security in DatadogHttpReporterFactory by providing ENV alternative to retrieve API key [flink]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #25470:
URL: https://github.com/apache/flink/pull/25470#issuecomment-2785362064

   This PR is being marked as stale since it has not had any activity in the 
last 90 days. 
   If you would like to keep this PR alive, please leave a comment asking for a 
review. 
   If the PR has merge conflicts, update it with the latest from the base 
branch.
   
   If you are having difficulty finding a reviewer, please reach out to the 
   community, contact details can be found here: 
https://flink.apache.org/what-is-flink/community/
   
   If this PR is no longer valid or desired, please feel free to close it. 
   If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add timestamp_mapping.legacy option to avro filesystem and avro-confluent format [flink]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #25439:
URL: https://github.com/apache/flink/pull/25439#issuecomment-2785361786

   This PR is being marked as stale since it has not had any activity in the 
last 90 days. 
   If you would like to keep this PR alive, please leave a comment asking for a 
review. 
   If the PR has merge conflicts, update it with the latest from the base 
branch.
   
   If you are having difficulty finding a reviewer, please reach out to the 
   community, contact details can be found here: 
https://flink.apache.org/what-is-flink/community/
   
   If this PR is no longer valid or desired, please feel free to close it. 
   If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [runtime-web] chore: update Angular to v15 [flink]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #25450:
URL: https://github.com/apache/flink/pull/25450#issuecomment-2785361905

   This PR is being marked as stale since it has not had any activity in the 
last 90 days. 
   If you would like to keep this PR alive, please leave a comment asking for a 
review. 
   If the PR has merge conflicts, update it with the latest from the base 
branch.
   
   If you are having difficulty finding a reviewer, please reach out to the 
   community, contact details can be found here: 
https://flink.apache.org/what-is-flink/community/
   
   If this PR is no longer valid or desired, please feel free to close it. 
   If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Updated readme with stream-processing [flink]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #25455:
URL: https://github.com/apache/flink/pull/25455#issuecomment-2785361975

   This PR is being marked as stale since it has not had any activity in the 
last 90 days. 
   If you would like to keep this PR alive, please leave a comment asking for a 
review. 
   If the PR has merge conflicts, update it with the latest from the base 
branch.
   
   If you are having difficulty finding a reviewer, please reach out to the 
   community, contact details can be found here: 
https://flink.apache.org/what-is-flink/community/
   
   If this PR is no longer valid or desired, please feel free to close it. 
   If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]

2025-04-07 Thread via GitHub


Zakelly opened a new pull request, #26415:
URL: https://github.com/apache/flink/pull/26415

   ## What is the purpose of the change
   
   There is a concurrency issue for reference counting in ForSt file cache, 
which could lead to a read error in some special scenarios (e.g. extremely 
frequent cache thrashing)
   
   
   ## Brief change log
   
   Two problem:
   
- All the read interfaces in `CachedDataInputStream` should release the 
reference **only after** the stream is not the `originalStream`
- The stream retrieving methods in task threads should consider the latency 
of status switch.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`ForStFlinkFileSystemTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Rename registeredSlots to allocatedSlots [flink]

2025-04-07 Thread via GitHub


beliefer opened a new pull request, #26417:
URL: https://github.com/apache/flink/pull/26417

   ## What is the purpose of the change
   
   This PR aims to improve the readability of `DefaultAllocatedSlotPool`.
   The original name `registeredSlots` can't tell developers any useful 
information. I think `allocatedSlots` is better than `registeredSlots`.
   
   
   ## Brief change log
   
   Rename `registeredSlots` to `allocatedSlots`.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37553) Make ForStKeyedStateBackend consider namespaceSerializer compatibility when restoring state

2025-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37553:
---
Labels: pull-request-available  (was: )

> Make ForStKeyedStateBackend consider namespaceSerializer compatibility when 
> restoring state
> ---
>
> Key: FLINK-37553
> URL: https://issues.apache.org/jira/browse/FLINK-37553
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 2.0.0
>Reporter: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.1.0
>
>
> ForStKeyedStateBackend currently does not consider namespaceSerializer 
> compatibility when restoring state. This may cause the namespaceSerializer to 
> change when it is not known whether it should be compatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]

2025-04-07 Thread via GitHub


fredia commented on code in PR #26415:
URL: https://github.com/apache/flink/pull/26415#discussion_r2032341068


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##
@@ -102,62 +102,81 @@ private FSDataInputStream getStream() throws IOException {
 if (isFlinkThread()) {
 cacheEntry.touch();
 }
-FSDataInputStream stream = tryGetCacheStream();
-if (stream != null) {
-fileBasedCache.incHitCounter();
-return stream;
-}
-
-if (streamStatus == StreamStatus.CACHED_CLOSED
-|| streamStatus == StreamStatus.CACHED_CLOSING) {
-if (streamStatus == StreamStatus.CACHED_CLOSING) {
-try {
-semaphore.acquire(1);
-} catch (InterruptedException e) {
-throw new RuntimeException(e);
-}
-originalStream.seek(position);
-position = -1;
-LOG.trace(
-"Stream {} status from {} to {}",
-cacheEntry.cachePath,
-streamStatus,
-StreamStatus.CACHED_CLOSED);
-streamStatus = StreamStatus.CACHED_CLOSED;
-}
-// try reopen
-tryReopen();
-stream = tryGetCacheStream();
+// Repeat until get a stream
+while (true) {
+// Firstly, we try to get cache stream
+FSDataInputStream stream = tryGetCacheStream();
 if (stream != null) {
 fileBasedCache.incHitCounter();
 return stream;
 }
-fileBasedCache.incMissCounter();
-return originalStream;
-} else if (streamStatus == StreamStatus.ORIGINAL) {
-fileBasedCache.incMissCounter();
-return originalStream;
-} else {
-if (streamStatus == StreamStatus.CACHED_OPEN) {
-stream = tryGetCacheStream();
+
+// No cache stream, so is it closing?
+if (streamStatus == StreamStatus.CACHED_CLOSED
+|| streamStatus == StreamStatus.CACHED_CLOSING) {
+if (streamStatus == StreamStatus.CACHED_CLOSING) {
+// if closing, update the position
+try {
+semaphore.acquire(1);
+} catch (InterruptedException e) {
+throw new RuntimeException(e);
+}
+originalStream.seek(position);
+position = -1;
+LOG.trace(
+"Stream {} status from {} to {}",
+cacheEntry.cachePath,
+streamStatus,
+StreamStatus.CACHED_CLOSED);
+streamStatus = StreamStatus.CACHED_CLOSED;
+}
+// try reopen
+stream = tryReopen();
 if (stream != null) {
 fileBasedCache.incHitCounter();
 return stream;
 }
+fileBasedCache.incMissCounter();

Review Comment:
   Add `streamStatus = StreamStatus.ORIGINAL` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-37553] Make ForStKeyedStateBackend update NamespaceSerializer during restoring state [flink]

2025-04-07 Thread via GitHub


mayuehappy opened a new pull request, #26416:
URL: https://github.com/apache/flink/pull/26416

   …during restoring state
   
   
   
   ## What is the purpose of the change
   
   Make ForStKeyedStateBackend update NamespaceSerializer during restoring state
   
   
   ## Brief change log
   
   - add `updateNamespaceSerializer` in `RegisteredKeyValueStateBackendMetaInfo`
   - invoke RegisteredKeyValueStateBackendMetaInfo#updateNamespaceSerializer 
when creating State
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework
   
   This change added tests and can be verified as follows:
   
   add unit test ForStStateMigrationTest#testStateNamespaceSerializerChanged
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (checkpointing)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26415:
URL: https://github.com/apache/flink/pull/26415#issuecomment-2785105538

   
   ## CI report:
   
   * 7b963ba791e89acf37bb95e08dcfdc724dee6624 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-37458) Forbid enableAsyncState() for synchronous operators

2025-04-07 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-37458:
--

Assignee: Yanfei Lei

> Forbid enableAsyncState() for synchronous operators
> ---
>
> Key: FLINK-37458
> URL: https://issues.apache.org/jira/browse/FLINK-37458
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-37458) Forbid enableAsyncState() for synchronous operators

2025-04-07 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941759#comment-17941759
 ] 

Yanfei Lei commented on FLINK-37458:


Merged into release-2.0 via 14e85eced10e98bc75870ac0360bad67d0722697

> Forbid enableAsyncState() for synchronous operators
> ---
>
> Key: FLINK-37458
> URL: https://issues.apache.org/jira/browse/FLINK-37458
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix] fix ci failure due to code style [flink-cdc]

2025-04-07 Thread via GitHub


MOBIN-F opened a new pull request, #3980:
URL: https://github.com/apache/flink-cdc/pull/3980

   Code style verifier has been updated to JUnit 5
   
![image](https://github.com/user-attachments/assets/316c7713-b6af-41a7-9efd-5850c9ac5f19)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] fix ci failure due to code style [flink-cdc]

2025-04-07 Thread via GitHub


leonardBang commented on PR #3980:
URL: https://github.com/apache/flink-cdc/pull/3980#issuecomment-2785104152

   Thanks @MOBIN-F for the quickfix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37628) Wrong reference counting in ForSt file cache

2025-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37628:
---
Labels: pull-request-available  (was: )

> Wrong reference counting in ForSt file cache
> 
>
> Key: FLINK-37628
> URL: https://issues.apache.org/jira/browse/FLINK-37628
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> There is a concurrency issue for reference counting in ForSt file cache, 
> which could lead to a read error in some special scenarios (e.g. extremely 
> frequent cache thrashing)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37553] Make ForStKeyedStateBackend update NamespaceSerializer during restoring state [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26416:
URL: https://github.com/apache/flink/pull/26416#issuecomment-2785176177

   
   ## CI report:
   
   * 8f4cfd9ee17624cda10a40d69b8891bd1edd4593 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Rename registeredSlots to allocatedSlots [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26417:
URL: https://github.com/apache/flink/pull/26417#issuecomment-2785177001

   
   ## CI report:
   
   * 7fd9bfbbd3106b4c43f3f14d69c056550fa66aff UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [postgres] fix when backfill will scan all table schame which match config [flink-cdc]

2025-04-07 Thread via GitHub


hql0312 commented on PR #3979:
URL: https://github.com/apache/flink-cdc/pull/3979#issuecomment-2785178413

   
   
   
   > Please check the code style. @hql0312
   
   ok,done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Unify the name for the collection of SlotOffer with slotOffers [flink]

2025-04-07 Thread via GitHub


beliefer opened a new pull request, #26418:
URL: https://github.com/apache/flink/pull/26418

   ## What is the purpose of the change
   
   This PR aims to unify the name for the collection of `SlotOffer`.
   Currently, there are two name of variable about collection of `SlotOffer`. 
One is `slots` and another is `offers`.
   I think `slotOffers` have better readability than them.
   
   ## Brief change log
   
   Unify the name for the collection of `SlotOffer` with `slotOffers`.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]

2025-04-07 Thread via GitHub


Zakelly commented on code in PR #26415:
URL: https://github.com/apache/flink/pull/26415#discussion_r2032354803


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##
@@ -102,62 +102,81 @@ private FSDataInputStream getStream() throws IOException {
 if (isFlinkThread()) {
 cacheEntry.touch();
 }
-FSDataInputStream stream = tryGetCacheStream();
-if (stream != null) {
-fileBasedCache.incHitCounter();
-return stream;
-}
-
-if (streamStatus == StreamStatus.CACHED_CLOSED
-|| streamStatus == StreamStatus.CACHED_CLOSING) {
-if (streamStatus == StreamStatus.CACHED_CLOSING) {
-try {
-semaphore.acquire(1);
-} catch (InterruptedException e) {
-throw new RuntimeException(e);
-}
-originalStream.seek(position);
-position = -1;
-LOG.trace(
-"Stream {} status from {} to {}",
-cacheEntry.cachePath,
-streamStatus,
-StreamStatus.CACHED_CLOSED);
-streamStatus = StreamStatus.CACHED_CLOSED;
-}
-// try reopen
-tryReopen();
-stream = tryGetCacheStream();
+// Repeat until get a stream
+while (true) {
+// Firstly, we try to get cache stream
+FSDataInputStream stream = tryGetCacheStream();
 if (stream != null) {
 fileBasedCache.incHitCounter();
 return stream;
 }
-fileBasedCache.incMissCounter();
-return originalStream;
-} else if (streamStatus == StreamStatus.ORIGINAL) {
-fileBasedCache.incMissCounter();
-return originalStream;
-} else {
-if (streamStatus == StreamStatus.CACHED_OPEN) {
-stream = tryGetCacheStream();
+
+// No cache stream, so is it closing?
+if (streamStatus == StreamStatus.CACHED_CLOSED
+|| streamStatus == StreamStatus.CACHED_CLOSING) {
+if (streamStatus == StreamStatus.CACHED_CLOSING) {
+// if closing, update the position
+try {
+semaphore.acquire(1);
+} catch (InterruptedException e) {
+throw new RuntimeException(e);
+}
+originalStream.seek(position);
+position = -1;
+LOG.trace(
+"Stream {} status from {} to {}",
+cacheEntry.cachePath,
+streamStatus,
+StreamStatus.CACHED_CLOSED);
+streamStatus = StreamStatus.CACHED_CLOSED;
+}
+// try reopen
+stream = tryReopen();
 if (stream != null) {
 fileBasedCache.incHitCounter();
 return stream;
 }
+fileBasedCache.incMissCounter();

Review Comment:
   I suggest not. The `ORIGINAL` means this stream could only be read via 
original stream, no cache mechanism enabled. It's a stablized status without 
transition. There is an `else if` branch under this to handle the `ORIGINAL` 
condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-37623) Async state support for `process()` in Datastream API

2025-04-07 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-37623:
---

Assignee: Yanfei Lei  (was: Zakelly Lan)

> Async state support for `process()`  in Datastream API
> --
>
> Key: FLINK-37623
> URL: https://issues.apache.org/jira/browse/FLINK-37623
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
> Fix For: 2.1.0, 2.0.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Unify the name for the collection of SlotOffer with slotOffers [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26418:
URL: https://github.com/apache/flink/pull/26418#issuecomment-2785194374

   
   ## CI report:
   
   * 43a54812fb035d279a21137de5f65d667500142d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fixing flaky tests in flink-table-planner /hints [flink]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #25486:
URL: https://github.com/apache/flink/pull/25486#issuecomment-2785362205

   This PR is being marked as stale since it has not had any activity in the 
last 90 days. 
   If you would like to keep this PR alive, please leave a comment asking for a 
review. 
   If the PR has merge conflicts, update it with the latest from the base 
branch.
   
   If you are having difficulty finding a reviewer, please reach out to the 
   community, contact details can be found here: 
https://flink.apache.org/what-is-flink/community/
   
   If this PR is no longer valid or desired, please feel free to close it. 
   If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-36683) Support metadata 'row_kind' virtual column for Mongo CDC Connector

2025-04-07 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36683.

Resolution: Resolved

Resolved via master: 2e4abdb68ed960f09610cb8be332b770a07ba53e

> Support metadata 'row_kind' virtual column for Mongo CDC Connector
> --
>
> Key: FLINK-36683
> URL: https://issues.apache.org/jira/browse/FLINK-36683
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Runkang He
>Assignee: Runkang He
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.4.0
>
>
> 'row_kind' metadata is very useful in actual user scenarios, the two main 
> scenarios are below:
> 1. Save all upstream messages: In this scenario, the downstream will save all 
> message includes delete messages from upstream. To achieve this requirement, 
> we should convert full changelogs to append only message, and need to use 
> metadata row_kind to represent the changelog kind.
> 2. Ignore upstream delete messages: In this scenario, to save storage space, 
> the upstream cdc source often deletes historical data regularly and only 
> retains data within seven days. However, the business requires the downstream 
> OLAP system to retain the full amount of historical data, so it is necessary 
> to ignore the delete messages from source. A reasonable way is to use 
> metadata row_kind to filter out these delete messages.
> So I think we should support 'row_kind' metadata in Mongo CDC Connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37583] Upgrade to Kafka 4.0.0 client. [flink-connector-kafka]

2025-04-07 Thread via GitHub


tomncooper commented on code in PR #161:
URL: 
https://github.com/apache/flink-connector-kafka/pull/161#discussion_r2030882248


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java:
##
@@ -40,7 +40,7 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
 private static final String INIT_KAFKA_RETRIES = "0";
 private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000";
 private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000";
-private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000";
+private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500";

Review Comment:
   Sorry, I should have put a comment in the PR description about this. The new 
kafka client now enforces the requirement for the delivery timeout to be longer 
than the request timeout (which is part of the overall delivery timeout) at 
compile time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-37626) Flaky test: ForStFlinkFileSystemTest.testSstFileInCache

2025-04-07 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-37626:
---

Assignee: Zakelly Lan

> Flaky test: ForStFlinkFileSystemTest.testSstFileInCache
> ---
>
> Key: FLINK-37626
> URL: https://issues.apache.org/jira/browse/FLINK-37626
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Gabor Somogyi
>Assignee: Zakelly Lan
>Priority: Major
>
> https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041
> {code:java}
> Apr 07 09:38:18 09:38:18.871 [ERROR] 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache 
> -- Time elapsed: 0.049 s <<< FAILURE!
> Apr 07 09:38:18 org.opentest4j.AssertionFailedError: 
> Apr 07 09:38:18 
> Apr 07 09:38:18 expected: 89
> Apr 07 09:38:18  but was: 0
> Apr 07 09:38:18   at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache(ForStFlinkFileSystemTest.java:332)
> Apr 07 09:38:18   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Apr 07 09:38:18 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26412:
URL: https://github.com/apache/flink/pull/26412#issuecomment-2782507746

   
   ## CI report:
   
   * f818133916a63feec3aba9361323b2c2231d0afd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]

2025-04-07 Thread via GitHub


davidradl commented on code in PR #26412:
URL: https://github.com/apache/flink/pull/26412#discussion_r2030769398


##
docs/layouts/shortcodes/generated/forst_configuration.html:
##
@@ -116,6 +116,12 @@
 String
 The primary directory where ForSt puts its SST files. By 
default, it will be the same as the checkpoint directory. Recognized shortcut 
name is 'checkpoint-dir', which means that ForSt shares the directory with 
checkpoint, and 'local-dir', which means that ForSt will use the local 
directory of TaskManager.
 
+
+state.backend.forst.sync.enforce-local
+false
+Boolean
+Whether to enforce local state for operators in synchronous 
mode when enabling disaggregated state. This is useful in cases where both 
synchronous operators and asynchronous operators are used in the same job.

Review Comment:
   nit: is this true?
same job. -> same local job.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37596][metrics] Close metric group of a finished split [flink]

2025-04-07 Thread via GitHub


pnowojski commented on code in PR #26388:
URL: https://github.com/apache/flink/pull/26388#discussion_r2030767818


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java:
##
@@ -191,4 +191,17 @@ public Boolean isIdle() {
 public Boolean isActive() {
 return !isPaused() && !isIdle();
 }
+
+public void onSplitFinished() {
+if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) {
+((AbstractMetricGroup) splitWatermarkMetricGroup).close();
+} else {
+LOG.warn("Split watermark metric group can not be closed");

Review Comment:
   nit: log what's the instance type of the metric group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Update copyright NOTICE year to 2025 [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26413:
URL: https://github.com/apache/flink/pull/26413#issuecomment-2782943869

   
   ## CI report:
   
   * 15cc4c070ca4a56aa1057071d421c0a95a10dd89 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37598][table] Support list and map state in PTFs [flink]

2025-04-07 Thread via GitHub


twalthr commented on PR #26396:
URL: https://github.com/apache/flink/pull/26396#issuecomment-2783419929

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36090) Bug with IngestDB restore operation for priority queue state in backend

2025-04-07 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-36090:
--

Assignee: Maxim Vershinin

> Bug with IngestDB restore operation for priority queue state in backend
> ---
>
> Key: FLINK-36090
> URL: https://issues.apache.org/jira/browse/FLINK-36090
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 2.0.0
>Reporter: Maxim Vershinin
>Assignee: Maxim Vershinin
>Priority: Major
>  Labels: pull-request-available
>
> *Summary:* Incorrect handling of priority queue states in IngestDB during 
> restoring due to missing {{equals()}} and {{hashCode()}} methods in 
> {{{}RegisteredPriorityQueueStateBackendMetaInfo{}}}.
> *Problem Description:*
> During the restoring by IngestDB in my Flink project, an issue was identified 
> where the priority queue states are not managed correctly in the backend. The 
> problem stems from the absence of {{equals()}} and {{hashCode()}} methods in 
> the {{RegisteredPriorityQueueStateBackendMetaInfo}} class.
> In particular, within the {{exportColumnFamiliesWithSstDataInKeyGroupsRange}} 
> method of the {{RocksDBIncrementalRestoreOperation}} class, if the state is a 
> priority queue, identical states from different subtasks are erroneously 
> treated as distinct states within the {{exportedColumnFamiliesOut}} map. This 
> leads to inconsistent behavior and errors during the restoring process.
> *Proposed Solution:*
> To address this issue, add {{equals()}} and {{hashCode()}} methods to the 
> {{RegisteredPriorityQueueStateBackendMetaInfo}} class. Implementing these 
> methods will ensure that priority queue states are accurately recognized and 
> handled across different subtasks, thereby preventing errors during restoring 
> by IngestDB.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37511][rest] Use Jackson serialization in JobPlanInfo.Plan [flink]

2025-04-07 Thread via GitHub


akalash commented on code in PR #26320:
URL: https://github.com/apache/flink/pull/26320#discussion_r2031317302


##
docs/layouts/shortcodes/generated/rest_v1_dispatcher.html:
##
@@ -2879,7 +3009,72 @@
   "properties" : {
 "plan" : {
   "type" : "object",
-  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson"
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:Plan",

Review Comment:
   I see there are still several open questions(cc. @davidradl ):
   
   >  this appears to be an issue with the open api and rest docs - that are 
not generating correctly.
  
   Kind of, the RawJson didn't generate the proper docs and rest_api indeed 
looked unreadable with RawJson
   
   > the fix seems to be to change the way we do job serialization
   
   Yes, indeed we used to serialize it manually but right now Jackson does it 
for us based on POJO class but in fact, the serialization remains the same(it's 
still Jackson) but it just happens in different time/place
   
   > on the face of it this seems a large change to a core part of Flink, that 
could introduce other considerations for example how are existing job 
serializations effected.
   
   It doesn't look like a big change since, as we discussed in 
(https://github.com/apache/flink/pull/26320/files#r2022598714), the result JSON 
hasn't changed. So the only change is the internal representation changed from 
String to Object and the documentation has more details now.
   
   @Efrat19 maybe you have something to add.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-36090) Bug with IngestDB restore operation for priority queue state in backend

2025-04-07 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter resolved FLINK-36090.

  Assignee: (was: Maxim Vershinin)
Resolution: Duplicate

Already fixed in FLINK-35580, 122c7ee628ba0a670894aeea7b0bb48f3c06d4f6

> Bug with IngestDB restore operation for priority queue state in backend
> ---
>
> Key: FLINK-36090
> URL: https://issues.apache.org/jira/browse/FLINK-36090
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 2.0.0
>Reporter: Maxim Vershinin
>Priority: Major
>  Labels: pull-request-available
>
> *Summary:* Incorrect handling of priority queue states in IngestDB during 
> restoring due to missing {{equals()}} and {{hashCode()}} methods in 
> {{{}RegisteredPriorityQueueStateBackendMetaInfo{}}}.
> *Problem Description:*
> During the restoring by IngestDB in my Flink project, an issue was identified 
> where the priority queue states are not managed correctly in the backend. The 
> problem stems from the absence of {{equals()}} and {{hashCode()}} methods in 
> the {{RegisteredPriorityQueueStateBackendMetaInfo}} class.
> In particular, within the {{exportColumnFamiliesWithSstDataInKeyGroupsRange}} 
> method of the {{RocksDBIncrementalRestoreOperation}} class, if the state is a 
> priority queue, identical states from different subtasks are erroneously 
> treated as distinct states within the {{exportedColumnFamiliesOut}} map. This 
> leads to inconsistent behavior and errors during the restoring process.
> *Proposed Solution:*
> To address this issue, add {{equals()}} and {{hashCode()}} methods to the 
> {{RegisteredPriorityQueueStateBackendMetaInfo}} class. Implementing these 
> methods will ensure that priority queue states are accurately recognized and 
> handled across different subtasks, thereby preventing errors during restoring 
> by IngestDB.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36683][cdc-connector][mongo] Support metadata 'row_kind' virtual column [flink-cdc]

2025-04-07 Thread via GitHub


leonardBang merged PR #3705:
URL: https://github.com/apache/flink-cdc/pull/3705


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.kafka:kafka-clients from 3.4.0 to 3.7.1 [flink-connector-kafka]

2025-04-07 Thread via GitHub


prshnt commented on PR #135:
URL: 
https://github.com/apache/flink-connector-kafka/pull/135#issuecomment-2782506369

   @dependabot rebase


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords calls are made on idle source causing high CPU utilisation and throttling [flink-connector-aws]

2025-04-07 Thread via GitHub


gguptp commented on code in PR #195:
URL: 
https://github.com/apache/flink-connector-aws/pull/195#discussion_r2030973955


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java:
##
@@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader(
 this.kinesis = kinesisProxy;
 this.configuration = configuration;
 this.maxRecordsToGet = 
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
+this.getRecordsIntervalMillis = 
configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis();
+this.idleSourceGetRecordsIntervalMillis =
+
configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis();
 }
 
 @Override
-protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+protected RecordBatch fetchRecords(KinesisShardSplitState splitState) 
throws IOException {
+if (skipUntilScheduledGetRecordTime(splitState)) {
+return null;
+}
+
 GetRecordsResponse getRecordsResponse =
 kinesis.getRecords(
 splitState.getStreamArn(),
 splitState.getShardId(),
 splitState.getNextStartingPosition(),
 this.maxRecordsToGet);
+
+scheduleNextGetRecord(splitState, getRecordsResponse);
+
 boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
 return new RecordBatch(
 getRecordsResponse.records(), 
getRecordsResponse.millisBehindLatest(), isCompleted);
 }
 
+private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState 
splitState)
+throws IOException {
+if (scheduledGetRecordTimes.containsKey(splitState)
+&& scheduledGetRecordTimes.get(splitState) > 
System.currentTimeMillis()) {
+try {
+Thread.sleep(1);

Review Comment:
   if you dont add this, flink calls fetch function infinitely causing 100% CPU



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords calls are made on idle source causing high CPU utilisation and throttling [flink-connector-aws]

2025-04-07 Thread via GitHub


darenwkt commented on code in PR #195:
URL: 
https://github.com/apache/flink-connector-aws/pull/195#discussion_r2030971648


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java:
##
@@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader(
 this.kinesis = kinesisProxy;
 this.configuration = configuration;
 this.maxRecordsToGet = 
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
+this.getRecordsIntervalMillis = 
configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis();
+this.idleSourceGetRecordsIntervalMillis =
+
configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis();
 }
 
 @Override
-protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+protected RecordBatch fetchRecords(KinesisShardSplitState splitState) 
throws IOException {
+if (skipUntilScheduledGetRecordTime(splitState)) {
+return null;
+}
+
 GetRecordsResponse getRecordsResponse =
 kinesis.getRecords(
 splitState.getStreamArn(),
 splitState.getShardId(),
 splitState.getNextStartingPosition(),
 this.maxRecordsToGet);
+
+scheduleNextGetRecord(splitState, getRecordsResponse);
+
 boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
 return new RecordBatch(
 getRecordsResponse.records(), 
getRecordsResponse.millisBehindLatest(), isCompleted);
 }
 
+private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState 
splitState)
+throws IOException {
+if (scheduledGetRecordTimes.containsKey(splitState)
+&& scheduledGetRecordTimes.get(splitState) > 
System.currentTimeMillis()) {
+try {
+Thread.sleep(1);

Review Comment:
   Q: Why do we add a 1ms sleep here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37618][table-planner] Fix PTFs INTERVAL argument [flink]

2025-04-07 Thread via GitHub


juntaozhang commented on code in PR #26410:
URL: https://github.com/apache/flink/pull/26410#discussion_r2031022714


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java:
##
@@ -538,6 +539,11 @@ private CastAvoidanceChecker(LogicalType sourceType) {
 this.sourceType = sourceType;
 }
 
+@Override
+public Boolean visit(DayTimeIntervalType targetType) {
+return true;

Review Comment:
   Hi @davidradl, thanks for your kindly reminding, already fix it, and also 
fix `INTERVAL_YEAR_MONTH` situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35466) Cannot pass all columns to SQL UDFs using `*`

2025-04-07 Thread Lorenzo Affetti (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941590#comment-17941590
 ] 

Lorenzo Affetti commented on FLINK-35466:
-

I would like to take this one

> Cannot pass all columns to SQL UDFs using `*`
> -
>
> Key: FLINK-35466
> URL: https://issues.apache.org/jira/browse/FLINK-35466
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Gyula Fora
>Priority: Major
>
> The SQL API does not allow calling UDFs using all the columns of a given 
> table using * notation such as:
> {noformat}
> tableEnv.executeSql("SELECT MyFun(Orders.*) FROM Orders").print();{noformat}
> The above call fails with the following error:
> {noformat}
> org.apache.flink.table.api.ValidationException: SQL validation failed. At 
> line 1, column 21: Unknown field '*'
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>  at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
>at com.apple.pie.flink.utils.testing.Test2.test(Test2.java:29)  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
> at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>  at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>  at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
> at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>  at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>at 
> org.jun

[jira] [Commented] (FLINK-37621) EnableAsyncState doesn't seem to do anything on DataStream API and misleading error message

2025-04-07 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941475#comment-17941475
 ] 

Zakelly Lan commented on FLINK-37621:
-

Seems we lack support for `process()` with async state in Datastream API. Will 
add this. 

And the error message should be more precise for the unsupported scenarios.


Thanks for reporting this [~gyfora]

> EnableAsyncState doesn't seem to do anything on DataStream API and misleading 
> error message
> ---
>
> Key: FLINK-37621
> URL: https://issues.apache.org/jira/browse/FLINK-37621
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 2.0.0
>Reporter: Gyula Fora
>Priority: Major
>
> eventStream
> .keyBy(e -> e.key)
> .enableAsyncState()
> .process(new EventHistoryProcessor(params))
> .enableAsyncState();
> Leads to:
> Caused by: java.lang.IllegalStateException: Current operator integrates the 
> async processing logic, thus only supports state v2 APIs. Please use 
> StateDescriptor under 'org.apache.flink.runtime.state.v2'.
> The error is misleading because the v2 apis are used but the check combines 
> the async enabled check. We need to split the error reporting.
> Replacing with:
> eventStream
> .keyBy(e -> e.key)
> .transform(
> "Event History",
> BasicTypeInfo.LONG_TYPE_INFO,
> new AsyncKeyedProcessOperator<>(new EventHistoryProcessor(params)));
> Would fix the problem but that doesn't seem right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-37621) EnableAsyncState doesn't seem to do anything on DataStream API and misleading error message

2025-04-07 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-37621:
---

Assignee: Zakelly Lan

> EnableAsyncState doesn't seem to do anything on DataStream API and misleading 
> error message
> ---
>
> Key: FLINK-37621
> URL: https://issues.apache.org/jira/browse/FLINK-37621
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 2.0.0
>Reporter: Gyula Fora
>Assignee: Zakelly Lan
>Priority: Major
>
> eventStream
> .keyBy(e -> e.key)
> .enableAsyncState()
> .process(new EventHistoryProcessor(params))
> .enableAsyncState();
> Leads to:
> Caused by: java.lang.IllegalStateException: Current operator integrates the 
> async processing logic, thus only supports state v2 APIs. Please use 
> StateDescriptor under 'org.apache.flink.runtime.state.v2'.
> The error is misleading because the v2 apis are used but the check combines 
> the async enabled check. We need to split the error reporting.
> Replacing with:
> eventStream
> .keyBy(e -> e.key)
> .transform(
> "Event History",
> BasicTypeInfo.LONG_TYPE_INFO,
> new AsyncKeyedProcessOperator<>(new EventHistoryProcessor(params)));
> Would fix the problem but that doesn't seem right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Avoid copy Set for variable implementedRpcGateways [flink]

2025-04-07 Thread via GitHub


1996fanrui merged PR #26372:
URL: https://github.com/apache/flink/pull/26372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37624) Support enableAsyncState and switch operator after datastream transformation

2025-04-07 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-37624:
---

 Summary: Support enableAsyncState and switch operator after 
datastream transformation
 Key: FLINK-37624
 URL: https://issues.apache.org/jira/browse/FLINK-37624
 Project: Flink
  Issue Type: Improvement
Affects Versions: 2.0.0
Reporter: Zakelly Lan
 Fix For: 2.1.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37623) Async state support for `process()` in Datastream API

2025-04-07 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-37623:
---

 Summary: Async state support for `process()`  in Datastream API
 Key: FLINK-37623
 URL: https://issues.apache.org/jira/browse/FLINK-37623
 Project: Flink
  Issue Type: Improvement
Affects Versions: 2.0.0
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 2.1.0, 2.0.1






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Avoid duplicate fetch the size of memory segment [flink]

2025-04-07 Thread via GitHub


1996fanrui merged PR #26367:
URL: https://github.com/apache/flink/pull/26367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37407][state] Add savepoint metadata SQL built-in process function [flink]

2025-04-07 Thread via GitHub


Zakelly commented on code in PR #26393:
URL: https://github.com/apache/flink/pull/26393#discussion_r2030677692


##
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+@Internal
+@FunctionHint(
+output =
+@DataTypeHint(
+"ROW"))
+public class SavepointMetadataTableFunction extends TableFunction {
+public 
SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) 
{}
+
+public void eval(String savepointPath) {
+try {
+CheckpointMetadata checkpointMetadata =
+SavepointLoader.loadSavepointMetadata(savepointPath);
+
+for (OperatorState operatorState : 
checkpointMetadata.getOperatorStates()) {
+Row row = Row.withNames();
+row.setField("checkpoint-id", 
checkpointMetadata.getCheckpointId());
+row.setField("operator-name", 
operatorState.getOperatorName().orElse(null));
+row.setField("operator-uid", 
operatorState.getOperatorUid().orElse(null));
+row.setField("operator-uid-hash", 
operatorState.getOperatorID().toHexString());
+row.setField("operator-parallelism", 
operatorState.getParallelism());
+row.setField("operator-max-parallelism", 
operatorState.getMaxParallelism());
+row.setField("operator-subtask-state-count", 
operatorState.getStates().size());

Review Comment:
   Thus I suggest adding a more detailed description for 
`operator-subtask-state-count` in the doc. It represents the state partition 
count divided by the operator's parallelism and might be 0 if the state is not 
partitioned. That's my understanding, correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-37143) Update version of flink-connector-kafka to 4.0-SNAPSHOT

2025-04-07 Thread Arvid Heise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise resolved FLINK-37143.
-
  Assignee: Yanquan Lv
Resolution: Fixed

> Update version of flink-connector-kafka to 4.0-SNAPSHOT
> ---
>
> Key: FLINK-37143
> URL: https://issues.apache.org/jira/browse/FLINK-37143
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: kafka-4.0.0
>Reporter: Yanquan Lv
>Assignee: Yanquan Lv
>Priority: Minor
> Fix For: kafka-4.0.0
>
>
> As [https://lists.apache.org/thread/rl7prqop7wfn2o8j2j9fd96dgr1bjjnx] 
> discuss, next version after kafka-3.4.0 would be kafka-4.0.0 to support Flink 
> 2.0-preview and Flink 2.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37407][state] Add savepoint metadata SQL built-in process function [flink]

2025-04-07 Thread via GitHub


gaborgsomogyi commented on code in PR #26393:
URL: https://github.com/apache/flink/pull/26393#discussion_r2030693310


##
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+@Internal
+@FunctionHint(
+output =
+@DataTypeHint(
+"ROW"))
+public class SavepointMetadataTableFunction extends TableFunction {
+public 
SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) 
{}
+
+public void eval(String savepointPath) {
+try {
+CheckpointMetadata checkpointMetadata =
+SavepointLoader.loadSavepointMetadata(savepointPath);
+
+for (OperatorState operatorState : 
checkpointMetadata.getOperatorStates()) {
+Row row = Row.withNames();
+row.setField("checkpoint-id", 
checkpointMetadata.getCheckpointId());
+row.setField("operator-name", 
operatorState.getOperatorName().orElse(null));
+row.setField("operator-uid", 
operatorState.getOperatorUid().orElse(null));
+row.setField("operator-uid-hash", 
operatorState.getOperatorID().toHexString());
+row.setField("operator-parallelism", 
operatorState.getParallelism());
+row.setField("operator-max-parallelism", 
operatorState.getMaxParallelism());
+row.setField("operator-subtask-state-count", 
operatorState.getStates().size());

Review Comment:
   Yeah, this is a hidden gem and worth to add some more info. Your 
understanding is correct so I would add your sentence as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37583] Upgrade to Kafka 4.0.0 client. [flink-connector-kafka]

2025-04-07 Thread via GitHub


tomncooper commented on PR #161:
URL: 
https://github.com/apache/flink-connector-kafka/pull/161#issuecomment-2782791845

   > @tomncooper @MartijnVisser @AHeise I suggest we move to Kafka 4.0..0 
client when we do Kafka connector v4 for Flink 2. WDYT? ( assuming we can sort 
out the tests)
   > 
   > I think there is a case to say we do not backport Kafka client v4.0.0 
support to Kafka connector v3.3 or 3.4, in case there are old Kafka clusters 
that we would not want to break on the v3 stream.
   
   @davidradl As per the 
[discussion](https://lists.apache.org/thread/l0kzx2wb2qz7ntcv6wkbbfwx688y0o69) 
on the dev mailing list, we are going to move ahead with a Connector 4.0 
release with Flink 2.0 and Kafka 3.9.0. We can then do point release updates.
   
   I still think, given that this PR would drop support for older Kafka 
versions it should be part of a further major version bump (ie 5.0) but we can 
have that discussion when the time comes to merge this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]

2025-04-07 Thread via GitHub


Zakelly commented on code in PR #26412:
URL: https://github.com/apache/flink/pull/26412#discussion_r2030928114


##
docs/content.zh/docs/ops/state/disaggregated_state.md:
##
@@ -150,6 +150,18 @@ state.backend.forst.primary-dir: 
s3://your-bucket/forst-state
 checkpoint and fast recovery, since the ForSt will perform file copy between 
the primary
 storage location and the checkpoint directory during checkpointing and 
recovery.
 
+ ForSt Local Storage Location
+
+ForSt stores the state of synchronous operators in the local disk by default, 
to avoid the high

Review Comment:
   ```suggestion
   By default, ForSt will **ONLY** disaggregate state when asynchronous APIs 
(State V2) are used. When using synchronous state APIs in Datastream and SQL 
jobs, ForSt will only serve as **local state store**. Since a job
   may contain multiple ForSt instances with API mixed usage, synchronous local 
state access along with asynchronous remote state access could help achieve 
better overall throughput. If you ... 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Update copyright NOTICE year to 2025 [flink]

2025-04-07 Thread via GitHub


Zakelly opened a new pull request, #26413:
URL: https://github.com/apache/flink/pull/26413

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37626) Flaky test: ForStFlinkFileSystemTest.testSstFileInCache

2025-04-07 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-37626:
-

 Summary: Flaky test: ForStFlinkFileSystemTest.testSstFileInCache
 Key: FLINK-37626
 URL: https://issues.apache.org/jira/browse/FLINK-37626
 Project: Flink
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Gabor Somogyi


https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37626) Flaky test: ForStFlinkFileSystemTest.testSstFileInCache

2025-04-07 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-37626:
--
Description: 
https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041
{code:java}
Apr 07 09:38:18 09:38:18.871 [ERROR] 
org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache -- 
Time elapsed: 0.049 s <<< FAILURE!
Apr 07 09:38:18 org.opentest4j.AssertionFailedError: 
Apr 07 09:38:18 
Apr 07 09:38:18 expected: 89
Apr 07 09:38:18  but was: 0
Apr 07 09:38:18 at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache(ForStFlinkFileSystemTest.java:332)
Apr 07 09:38:18 at 
java.base/java.lang.reflect.Method.invoke(Method.java:568)
Apr 07 09:38:18 at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
Apr 07 09:38:18 at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
Apr 07 09:38:18 at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
Apr 07 09:38:18 at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
Apr 07 09:38:18 at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Apr 07 09:38:18 
{code}


  
was:https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041


> Flaky test: ForStFlinkFileSystemTest.testSstFileInCache
> ---
>
> Key: FLINK-37626
> URL: https://issues.apache.org/jira/browse/FLINK-37626
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041
> {code:java}
> Apr 07 09:38:18 09:38:18.871 [ERROR] 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache 
> -- Time elapsed: 0.049 s <<< FAILURE!
> Apr 07 09:38:18 org.opentest4j.AssertionFailedError: 
> Apr 07 09:38:18 
> Apr 07 09:38:18 expected: 89
> Apr 07 09:38:18  but was: 0
> Apr 07 09:38:18   at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache(ForStFlinkFileSystemTest.java:332)
> Apr 07 09:38:18   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 07 09:38:18   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Apr 07 09:38:18 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]

2025-04-07 Thread via GitHub


fredia commented on PR #26412:
URL: https://github.com/apache/flink/pull/26412#issuecomment-2783112395

   > Should we also use ResourceGuard like 
[FLINK-37597](https://issues.apache.org/jira/browse/FLINK-37597) when using 
sync mode to access remote storage ?
   
   `ResourceGuard` is used to ensure that the snapshot threads are completed 
before `ForStSyncKeyedStateBackend` disposing, and 
`ForStSyncKeyedStateBackend#dispose()` has already achieved this.
   
   IIUC, the remote storage accessing in `ForStSyncKeyedStateBackend` is done 
in the main task thread, maybe extra work is not necessary, correct me if I'm 
wrong :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36813][cdc-connectors][mysql] support mysql sync part columns [flink-cdc]

2025-04-07 Thread via GitHub


JNSimba commented on PR #3767:
URL: https://github.com/apache/flink-cdc/pull/3767#issuecomment-2783640913

   > @JNSimba Could we provide this feature by `SupportsProjectionPushDown`? 
`debezium.column.include.list` and `debezium.column.exclude.list` is hard for 
users to understand and use. We could use `SupportsProjectionPushDown` 
interface to automatically generate the debezium setting. In this way we don't 
need to validate whether there is a conflict between the provided schema and 
the debezium setting.
   
   @ruanhang1993 `SupportsProjectionPushDown` is indeed an elegant approach, 
but some of the cdc logic seems to have problems in the push-down phase, for 
example, here
   
   If you run the `MySqlConnectorITCase.testConsumingAllEvents` case, the 
chunkkey will be adjusted here. For example, if my project column does not have 
a chunkkey, an error will be reported
   
   
https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37
   ```java
   Object[] chunkKey =
   RecordUtils.getSplitKey(
   splitKeyType, 
statefulTaskContext.getSchemaNameAdjuster(), target);
   for (FinishedSnapshotSplitInfo splitInfo : 
finishedSplitsInfo.get(tableId)) {
   if (RecordUtils.splitKeyRangeContains(
   chunkKey, splitInfo.getSplitStart(), 
splitInfo.getSplitEnd())
   && 
position.isAfter(splitInfo.getHighWatermark())) {
   return true;
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Translate "Table ID" Page for Flink CDC Chinese Documentation [flink-cdc]

2025-04-07 Thread via GitHub


leonardBang merged PR #3888:
URL: https://github.com/apache/flink-cdc/pull/3888


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34554] Introduce transaction strategies [flink-connector-kafka]

2025-04-07 Thread via GitHub


fapaul commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2031443184


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kafka.sink.KafkaWriterState;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Describes the ownership model of transactional ids and with that ownership 
of the transactions.
+ *
+ * A subtask that owns a transactional id is responsible for committing and 
aborting the
+ * transactions having that id. Only that subtask may create new ids.
+ *
+ * Transactional ids have the form transactionalIdPrefix + "-" + 
subtaskId + "-" + counter
+ * . The prefix is given by the user, the subtask id is defined through 
the ownership model
+ * and the counter through the {@link
+ * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}.
+ *
+ * For all strategies ownership is extrapolated for subtask ids beyond the 
currently known
+ * subtasks. This is necessary to support cases of intermediate upscaling 
where no checkpoint has
+ * been taken. Consider an application that runs with 3 subtasks and 
checkpointed. Later, its
+ * upscaled to 5 but then a failure happens. We need to have at least 5 open 
transactions. If the
+ * application is finally resumed from the checkpoint with 3 subtasks again. 
These 3 subtasks need
+ * to assume ownership of the remaining 2.
+ */
+@Internal
+public enum TransactionOwnership {
+/**
+ * The ownership is determined by the current subtask ID. Ownership is 
extrapolated by
+ * extracting the original subtask id of the ongoing transactions and 
applying modulo on the
+ * current parallelism.
+ */
+IMPLICIT_BY_SUBTASK_ID {
+@Override
+public int[] getOwnedSubtaskIds(
+int currentSubtaskId,
+int currentParallelism,
+Collection recoveredStates) {
+if (!recoveredStates.isEmpty()) {
+checkForMigration(currentSubtaskId, recoveredStates);
+}
+
+return new int[] {currentSubtaskId};
+}
+
+private void checkForMigration(
+int currentSubtaskId, Collection 
recoveredStates) {
+TransactionOwnership oldOwnership =
+recoveredStates.stream()
+.map(KafkaWriterState::getTransactionOwnership)
+.findFirst()
+.orElseThrow();
+if (oldOwnership == this) {
+return;
+}
+
+Set ownedSubtaskIds =
+recoveredStates.stream()
+.mapToInt(KafkaWriterState::getOwnedSubtaskId)
+.boxed()
+.collect(Collectors.toSet());
+if (!ownedSubtaskIds.contains(currentSubtaskId)
+&& !ownedSubtaskIds.equals(Set.of(UNKNOWN))) {
+int numShares =
+recoveredStates.stream()
+.mapToInt(KafkaWriterState::getOwnedSubtaskId)
+.findFirst()
+.orElse(UNKNOWN);
+throw new IllegalStateException(
+"Attempted to switch back to INCREMENTING from a 
transaction naming strategy that uses the new writer state. A possible way to 
safely do it is to scale back to the maximum parallelism of "
++ numShares);
+}
+}
+
+@Override
+public int getTotalNumberOfOwnedSubtasks(
+int currentSubtaskId,
+int currentParallelism,
+Collection recoveredStates) {
+return currentParallelism;

[jira] [Assigned] (FLINK-36611) Add schema info to output of Kafka sink

2025-04-07 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-36611:
--

Assignee: MOBIN

> Add schema info to output of Kafka sink  
> -
>
> Key: FLINK-36611
> URL: https://issues.apache.org/jira/browse/FLINK-36611
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: Yanquan Lv
>Assignee: MOBIN
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.4.0
>
>
> Currently, the output of Kafka sink in debezium format looks like this:
> {code:java}
> {
>   "before": {
> "id": 4,
> "name": "John",
> "address": "New York",
> "phone_number": "",
> "age": 12
>   },
>   "after": {
> "id": 4,
> "name": "John",
> "address": "New York",
> "phone_number": "1234",
> "age": 12
>   },
>   "op": "u",
>   "source": {
> "db": null,
> "table": "customers"
>   }
> } {code}
> It contains record data with full before/after and db info, but schema info 
> wasn't included. 
> However, In some scenarios, we need this information to determine the type of 
> data. For example, Paimon's Kafka CDC source requires this type information, 
> otherwise all types are considered String, refer to 
> [https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/#supported-formats.]
> Considering that this will increase the data load, I suggest adding a 
> parameter to configure whether to enable it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36611) Add schema info to output of Kafka sink

2025-04-07 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36611.

Resolution: Implemented

Implemented via master: 3457a922b5da331243fb3a4a1bc26a1b58e81b36

> Add schema info to output of Kafka sink  
> -
>
> Key: FLINK-36611
> URL: https://issues.apache.org/jira/browse/FLINK-36611
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: Yanquan Lv
>Assignee: MOBIN
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.4.0
>
>
> Currently, the output of Kafka sink in debezium format looks like this:
> {code:java}
> {
>   "before": {
> "id": 4,
> "name": "John",
> "address": "New York",
> "phone_number": "",
> "age": 12
>   },
>   "after": {
> "id": 4,
> "name": "John",
> "address": "New York",
> "phone_number": "1234",
> "age": 12
>   },
>   "op": "u",
>   "source": {
> "db": null,
> "table": "customers"
>   }
> } {code}
> It contains record data with full before/after and db info, but schema info 
> wasn't included. 
> However, In some scenarios, we need this information to determine the type of 
> data. For example, Paimon's Kafka CDC source requires this type information, 
> otherwise all types are considered String, refer to 
> [https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/#supported-formats.]
> Considering that this will increase the data load, I suggest adding a 
> parameter to configure whether to enable it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink [flink-cdc]

2025-04-07 Thread via GitHub


leonardBang merged PR #3791:
URL: https://github.com/apache/flink-cdc/pull/3791


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-cli] fix duplicated option in CliFrontendOptions [flink-cdc]

2025-04-07 Thread via GitHub


leonardBang merged PR #3848:
URL: https://github.com/apache/flink-cdc/pull/3848


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][pipeline-connectors][doris]Fix deprecated method usage in DorisSchemaChangeManager [flink-cdc]

2025-04-07 Thread via GitHub


leonardBang merged PR #3959:
URL: https://github.com/apache/flink-cdc/pull/3959


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34554] Introduce transaction strategies [flink-connector-kafka]

2025-04-07 Thread via GitHub


AHeise commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2031458928


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kafka.sink.KafkaWriterState;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Describes the ownership model of transactional ids and with that ownership 
of the transactions.
+ *
+ * A subtask that owns a transactional id is responsible for committing and 
aborting the
+ * transactions having that id. Only that subtask may create new ids.
+ *
+ * Transactional ids have the form transactionalIdPrefix + "-" + 
subtaskId + "-" + counter
+ * . The prefix is given by the user, the subtask id is defined through 
the ownership model
+ * and the counter through the {@link
+ * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}.
+ *
+ * For all strategies ownership is extrapolated for subtask ids beyond the 
currently known
+ * subtasks. This is necessary to support cases of intermediate upscaling 
where no checkpoint has
+ * been taken. Consider an application that runs with 3 subtasks and 
checkpointed. Later, its
+ * upscaled to 5 but then a failure happens. We need to have at least 5 open 
transactions. If the
+ * application is finally resumed from the checkpoint with 3 subtasks again. 
These 3 subtasks need
+ * to assume ownership of the remaining 2.
+ */
+@Internal
+public enum TransactionOwnership {
+/**
+ * The ownership is determined by the current subtask ID. Ownership is 
extrapolated by
+ * extracting the original subtask id of the ongoing transactions and 
applying modulo on the
+ * current parallelism.
+ */
+IMPLICIT_BY_SUBTASK_ID {
+@Override
+public int[] getOwnedSubtaskIds(
+int currentSubtaskId,
+int currentParallelism,
+Collection recoveredStates) {
+if (!recoveredStates.isEmpty()) {
+checkForMigration(currentSubtaskId, recoveredStates);
+}
+
+return new int[] {currentSubtaskId};
+}
+
+private void checkForMigration(
+int currentSubtaskId, Collection 
recoveredStates) {
+TransactionOwnership oldOwnership =
+recoveredStates.stream()
+.map(KafkaWriterState::getTransactionOwnership)
+.findFirst()
+.orElseThrow();
+if (oldOwnership == this) {
+return;
+}
+
+Set ownedSubtaskIds =
+recoveredStates.stream()
+.mapToInt(KafkaWriterState::getOwnedSubtaskId)
+.boxed()
+.collect(Collectors.toSet());
+if (!ownedSubtaskIds.contains(currentSubtaskId)
+&& !ownedSubtaskIds.equals(Set.of(UNKNOWN))) {
+int numShares =
+recoveredStates.stream()
+.mapToInt(KafkaWriterState::getOwnedSubtaskId)
+.findFirst()
+.orElse(UNKNOWN);
+throw new IllegalStateException(
+"Attempted to switch back to INCREMENTING from a 
transaction naming strategy that uses the new writer state. A possible way to 
safely do it is to scale back to the maximum parallelism of "
++ numShares);
+}
+}
+
+@Override
+public int getTotalNumberOfOwnedSubtasks(
+int currentSubtaskId,
+int currentParallelism,
+Collection recoveredStates) {
+return currentParallelism;

Re: [PR] [FLINK-36648] Bump Flink version to Flink 2.0.0 [flink-connector-kafka]

2025-04-07 Thread via GitHub


FranMorilloAWS commented on PR #140:
URL: 
https://github.com/apache/flink-connector-kafka/pull/140#issuecomment-2783947758

   When is this releasing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031778448


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ModelDescriptor.java:
##
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Describes a {@link CatalogModel} representing a model.
+ *
+ * A {@link ModelDescriptor} is a template for creating a {@link 
CatalogModel} instance. It
+ * closely resembles the "CREATE MODEL" SQL DDL statement, containing input 
schema, output schema,
+ * and other characteristics.
+ *
+ * This can be used to register a Model in the Table API.
+ */
+@PublicEvolving
+public class ModelDescriptor {
+private final @Nullable Schema inputSchema;
+private final @Nullable Schema outputSchema;
+private final Map modelOptions;
+private final @Nullable String comment;
+
+protected ModelDescriptor(
+@Nullable Schema inputSchema,
+@Nullable Schema outputSchema,
+Map modelOptions,
+@Nullable String comment) {
+this.inputSchema = inputSchema;
+this.outputSchema = outputSchema;
+this.modelOptions = modelOptions;
+this.comment = comment;
+}
+
+/** Converts this descriptor into a {@link CatalogModel}. */
+public CatalogModel toCatalogModel() {
+final Schema inputSchema =
+getInputSchema()
+.orElseThrow(
+() ->
+new ValidationException(
+"Input schema missing in 
ModelDescriptor. Input schema cannot be null."));
+final Schema outputSchema =
+getOutputSchema()
+.orElseThrow(
+() ->
+new ValidationException(
+"Output schema missing in 
ModelDescriptor. Output schema cannot be null."));
+return CatalogModel.of(inputSchema, outputSchema, modelOptions, 
comment);
+}
+
+/** Converts this immutable instance into a mutable {@link Builder}. */
+public Builder toBuilder() {
+return new Builder(this);
+}
+
+/**
+ * Returns a map of string-based model options.
+ *
+ * @return options of the model.
+ */
+Map getOptions() {
+return modelOptions;
+}
+
+/**
+ * Get the unresolved input schema of the model.
+ *
+ * @return unresolved input schema of the model.
+ */

Review Comment:
   Do we really need such duplicating javadocs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords calls are made on idle source causing high CPU utilisation and throttling [flink-connector-aws]

2025-04-07 Thread via GitHub


leekeiabstraction commented on code in PR #195:
URL: 
https://github.com/apache/flink-connector-aws/pull/195#discussion_r2031062113


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java:
##
@@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader(
 this.kinesis = kinesisProxy;
 this.configuration = configuration;
 this.maxRecordsToGet = 
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
+this.getRecordsIntervalMillis = 
configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis();
+this.idleSourceGetRecordsIntervalMillis =
+
configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis();
 }
 
 @Override
-protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+protected RecordBatch fetchRecords(KinesisShardSplitState splitState) 
throws IOException {
+if (skipUntilScheduledGetRecordTime(splitState)) {
+return null;
+}
+
 GetRecordsResponse getRecordsResponse =
 kinesis.getRecords(
 splitState.getStreamArn(),
 splitState.getShardId(),
 splitState.getNextStartingPosition(),
 this.maxRecordsToGet);
+
+scheduleNextGetRecord(splitState, getRecordsResponse);
+
 boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
 return new RecordBatch(
 getRecordsResponse.records(), 
getRecordsResponse.millisBehindLatest(), isCompleted);
 }
 
+private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState 
splitState)
+throws IOException {
+if (scheduledGetRecordTimes.containsKey(splitState)
+&& scheduledGetRecordTimes.get(splitState) > 
System.currentTimeMillis()) {
+try {
+Thread.sleep(1);

Review Comment:
   To elaborate on Abhi's point further. This is to prevent 
KinesisShardSplitReaderBase from immediately looping through fetch() for each 
split in the case where we have idle sources, causing very high CPU usage.
   
   Adding a 1ms pause ensures lower CPU usage at the cost of at most 1ms wait 
time per split. There is an edge case where a source with large number of idle 
assigned split will not loop through all its assigned split in a timely manner 
adding latency to GetRecords from a minority of non-idle shards, but this would 
require upwards of 1000 assigned splits to incur a 1 second delay. Assigning 
upwards of 1000 active splits to a source subtask is an unlikely and 
impractical scenario (app should really have much higher parallelism).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]

2025-04-07 Thread via GitHub


masteryhx commented on PR #26412:
URL: https://github.com/apache/flink/pull/26412#issuecomment-2783084846

   Thanks for the PR.
   Just an extra comment: Should we also use ResourceGuard like 
[FLINK-37597](https://issues.apache.org/jira/browse/FLINK-37597) when using 
sync mode to access remote storage ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37616) PyFlink incorrectly unpickles Row fields within a Row

2025-04-07 Thread Mika Naylor (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mika Naylor updated FLINK-37616:

Summary: PyFlink incorrectly unpickles Row fields within a Row  (was: 
PyFlink incorrectly unpickles Row fields)

> PyFlink incorrectly unpickles Row fields within a Row
> -
>
> Key: FLINK-37616
> URL: https://issues.apache.org/jira/browse/FLINK-37616
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Mika Naylor
>Assignee: Mika Naylor
>Priority: Minor
>  Labels: pull-request-available
>
> If you call {{TableEnvironment.from_elements}} where one of the fields in the 
> row contains a {{Row}} Type, for example where one of the values you pass in 
> is:
> {code:java}
> [
>     Row("pyflink1A", "pyflink2A", "pyflink3A"),
>     Row("pyflink1B", "pyflink2B", "pyflink3B"),
>     Row("pyflink1C", "pyflink2C", "pyflink3C"),
> ],{code}
> where the schema for the field is:
> {code:java}
> DataTypes.ARRAY(
>     DataTypes.ROW(
>         [
>             DataTypes.FIELD("a", DataTypes.STRING()),
>             DataTypes.FIELD("b", DataTypes.STRING()),
>             DataTypes.FIELD("c", DataTypes.STRING()),
>         ]
>     )
> ),{code}
> When you call {{execute().collect()}} on the table, the array is returned as:
> {code:java}
> [
> ,
> ,
> 
> ]{code}
> Instead of each {{Row}} having 3 values, the collected row only has 1 value, 
> which is now a list of the actual values in the row. The input and output 
> rows are no longer equal (as their internal _values collection are no longer 
> equal, one being a list of strings and the other being a list of a list of 
> strings). The len() of the source Row is correctly returned as 3, but the 
> collected row incorrectly reports a len() of 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] [docs] Fix typo in docker.md [flink]

2025-04-07 Thread via GitHub


DanRoscigno commented on PR #25194:
URL: https://github.com/apache/flink/pull/25194#issuecomment-2783710846

   Please can this be reviewed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37628) Wrong reference counting in ForSt file cache

2025-04-07 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-37628:
---

 Summary: Wrong reference counting in ForSt file cache
 Key: FLINK-37628
 URL: https://issues.apache.org/jira/browse/FLINK-37628
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Zakelly Lan
Assignee: Zakelly Lan


There is a concurrency issue for reference counting in ForSt file cache, which 
could lead to a read error in some special scenarios (e.g. extremely frequent 
cache thrashing)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37407][state] Add savepoint metadata SQL built-in process function [flink]

2025-04-07 Thread via GitHub


gaborgsomogyi merged PR #26393:
URL: https://github.com/apache/flink/pull/26393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37627) Restarting from a checkpoint/savepoint which coincides with shard split causes data loss

2025-04-07 Thread Keith Lee (Jira)
Keith Lee created FLINK-37627:
-

 Summary: Restarting from a checkpoint/savepoint which coincides 
with shard split causes data loss
 Key: FLINK-37627
 URL: https://issues.apache.org/jira/browse/FLINK-37627
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: aws-connector-5.0.0
Reporter: Keith Lee


Similar to DDB stream connector's issue 
https://issues.apache.org/jira/browse/FLINK-37416

This is less likely to happen on Kinesis connector due to much lower frequency 
of re-sharding / assigning new split but technically possible so we'd like to 
fix this to avoid data
loss.

The scenario is as follow:

- A checkpoint started
- KinesisStreamsSourceEnumerator takes a checkpoint (shard was assigned here)
- KinesisStreamsSourceEnumerator sends checkpoint event to reader
- Before taking reader checkpoint, a SplitFinishedEvent came up in reader
- Reader takes checkpoint
- Now, just after checkpoint complete, job restarted

This can lead to a shard lineage getting lost because of a shard being in 
ASSIGNED state in enumerator and not being part of any task manager state.

See DDB Connector issue's PR for reference fix: 
https://issues.apache.org/jira/browse/FLINK-37416



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-37625][python] Don't skip type validation for Rows made with positional arguments [flink]

2025-04-07 Thread via GitHub


autophagy opened a new pull request, #26414:
URL: https://github.com/apache/flink/pull/26414

   ## What is the purpose of the change
   
   When creating a table using `TableEnvironment.from_elements`, the Table API 
skips type validation on any Row elements that were created using positional 
arguments, rather than keyword arguments. 
   
   For example, take a table with a single column, whose type is an array of 
Rows. These rows have 2 columns, `a VARCHAR` and `b BOOLEAN`. If we create a 
table with elements where one of these rows has columns with incorrect 
datatypes:
   
   ```python
   schema = DataTypes.ROW(
   [
   DataTypes.FIELD(
   "col",
   DataTypes.ARRAY(
   DataTypes.ROW(
   [
   DataTypes.FIELD("a", DataTypes.STRING()),
   DataTypes.FIELD("b", DataTypes.BOOLEAN()),
   ]
   )
   ),
   ),
   ]
   ) 
   elements = [(
   [("pyflink", True), ("pyflink", False), (True, "pyflink")],
   )] 
   table = self.t_env.from_elements(elements, schema)
   table_result = list(table.execute().collect())
   ```
   
   This results in a type validation error:
   
   ```
   TypeError: field a in element in array field col: VARCHAR can not accept 
object True in type 
   ```
   
   In an example where we use Row instead of tuples, but with column arguments:
   
   ```
   elements = [(
   [Row(a="pyflink", b=True), Row(a="pyflink", b=False), Row(a=True, 
b="pyflink")],
   )]
   ```
   
   We also get the same type validation error. However, when we use Row with 
positional arguments:
   
   ```
   elements = [(
   [Row("pyflink", True), Row("pyflink", False), Row(True, "pyflink")],
   )]
   ```
   
   the type validation is skipped, leading to an unpickling error when 
collecting:
   
   ```
   >   data = pickle.loads(data)
   E   EOFError: Ran out of input 
   ```
   
   The type validator skips this by stating that [the order in the row could be 
different to the order of the datatype 
fields](https://github.com/apache/flink/blob/master/flink-python/pyflink/table/types.py#L2156),
 but I don't think this is true. Both rows made from tuples and lists are type 
verified positionally with the positions of the Datatype fields, and in the 
case of the `Row` class the order the row's internal values are preserved. 
Similarly, `Row` class equality in cases where both of the rows are created 
with positional arguments 
   
   
   ## Brief change log
   
 - *Change the type validation logic used by 
`TableEnvironment.from_elements` so that `Row`s constructed with positional 
arguments are not skipped.*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added a test to ensure consistent type validation behaviour with rows 
constructed from tuples, lists, `Row`s with keyword arguments and `Row`s with 
positional arguments*
 - 
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37625][python] Don't skip type validation for Rows made with positional arguments [flink]

2025-04-07 Thread via GitHub


flinkbot commented on PR #26414:
URL: https://github.com/apache/flink/pull/26414#issuecomment-2783296378

   
   ## CI report:
   
   * 92e7cc3e8e13c1c011ae107921c0b43b31cbc1d7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37601] Remove Unirest dependency in PrometheusReporterTest [flink]

2025-04-07 Thread via GitHub


hlteoh37 merged PR #26387:
URL: https://github.com/apache/flink/pull/26387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031879610


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ModelDescriptor.java:
##
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Describes a {@link CatalogModel} representing a model.
+ *
+ * A {@link ModelDescriptor} is a template for creating a {@link 
CatalogModel} instance. It
+ * closely resembles the "CREATE MODEL" SQL DDL statement, containing input 
schema, output schema,
+ * and other characteristics.
+ *
+ * This can be used to register a Model in the Table API.
+ */
+@PublicEvolving
+public class ModelDescriptor {
+private final @Nullable Schema inputSchema;
+private final @Nullable Schema outputSchema;
+private final Map modelOptions;
+private final @Nullable String comment;
+
+protected ModelDescriptor(
+@Nullable Schema inputSchema,
+@Nullable Schema outputSchema,
+Map modelOptions,
+@Nullable String comment) {
+this.inputSchema = inputSchema;
+this.outputSchema = outputSchema;
+this.modelOptions = modelOptions;
+this.comment = comment;
+}
+
+/** Converts this descriptor into a {@link CatalogModel}. */
+public CatalogModel toCatalogModel() {
+final Schema inputSchema =
+getInputSchema()
+.orElseThrow(
+() ->
+new ValidationException(
+"Input schema missing in 
ModelDescriptor. Input schema cannot be null."));
+final Schema outputSchema =
+getOutputSchema()
+.orElseThrow(
+() ->
+new ValidationException(
+"Output schema missing in 
ModelDescriptor. Output schema cannot be null."));
+return CatalogModel.of(inputSchema, outputSchema, modelOptions, 
comment);
+}
+
+/** Converts this immutable instance into a mutable {@link Builder}. */
+public Builder toBuilder() {
+return new Builder(this);
+}
+
+/**
+ * Returns a map of string-based model options.
+ *
+ * @return options of the model.
+ */
+Map getOptions() {
+return modelOptions;
+}
+
+/**
+ * Get the unresolved input schema of the model.
+ *
+ * @return unresolved input schema of the model.
+ */
+Optional getInputSchema() {
+return Optional.ofNullable(inputSchema);
+}
+
+/**
+ * Get the unresolved output schema of the model.
+ *
+ * @return unresolved output schema of the model.
+ */
+Optional getOutputSchema() {
+return Optional.ofNullable(outputSchema);
+}
+
+/**
+ * Get comment of the model.
+ *
+ * @return comment of the model.
+ */
+Optional getComment() {
+return Optional.ofNullable(comment);
+}
+
+/**
+ * Creates a new {@link Builder} for the model with the given provider 
option.
+ *
+ * @param provider string value of provider for the model.
+ */
+public static Builder forProvider(String provider) {
+Preconditions.checkNotNull(provider, "Model descriptors require a 
provider value.");
+final Builder descriptorBuilder = new Builder();
+descriptorBuilder.option(FactoryUtil.MODEL_PROVIDER, provider);
+return descriptorBuilder;
+}
+
+@Override
+

Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031900150


##
flink-python/pyflink/table/table_environment.py:
##
@@ -1071,6 +1128,88 @@ def create_view(self,
 
 self._j_tenv.createView(view_path, table, ignore_if_exists)
 
+def create_model(self,
+ model_path: str,
+ model_descriptor: ModelDescriptor,
+ ignore_if_exists: Optional[bool] = False):
+"""
+Registers the given :class:`~pyflink.table.ModelDescriptor` as a 
catalog model
+similar to SQL models.
+
+The ModelDescriptor is converted into a CatalogModel and stored in the 
catalog.
+
+If the model should not be permanently stored in a catalog, use
+:func:`create_temporary_model` instead.
+
+Examples:
+::
+
+>>> table_env.create_model("MyModel", 
ModelDescriptor.for_provider("OPENAI")
+... .input_schema(Schema.new_builder()
+...   .column("f0", DataTypes.STRING())
+...   .build())
+... .output_schema(Schema.new_builder()
+...   .column("label", DataTypes.STRING())
+...   .build())
+... .option("task", "regression")
+... .option("type", "remote")
+... .
+... .
+... .build(),
+...  True)
+
+:param model_path: The path under which the model will be registered.
+:param model_descriptor: Template for creating a CatalogModel instance.
+:param ignore_if_exists: If a model exists under the given path and 
this flag is set,
+   no operation is executed. An exception is 
thrown otherwise.
+
+.. versionadded:: 2.0.1

Review Comment:
   this a wrong branch if we are talking about 2.0.1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031904769


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##
@@ -343,12 +343,14 @@ public void testModelModificationListener() throws 
Exception {
 assertThat(alterEvent.ignoreIfNotExists()).isFalse();
 
 // Drop a model
-catalogManager.dropModel(
-ObjectIdentifier.of(
-catalogManager.getCurrentCatalog(),
-catalogManager.getCurrentDatabase(),
-"model1"),
-true);
+assertThat(
+catalogManager.dropModel(
+ObjectIdentifier.of(
+catalogManager.getCurrentCatalog(),
+catalogManager.getCurrentDatabase(),
+"model1"),

Review Comment:
   We can extract `ObjectIdentifier` into a variable to avoid formatting issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-37627) Restarting from a checkpoint/savepoint which coincides with shard split causes data loss

2025-04-07 Thread Arun Lakshman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941693#comment-17941693
 ] 

Arun Lakshman edited comment on FLINK-37627 at 4/7/25 7:19 PM:
---

Can you please assign this issue to me. I can work on this issue


was (Author: arunlakshman):
Can you please this to me. I can work on this issue

> Restarting from a checkpoint/savepoint which coincides with shard split 
> causes data loss
> 
>
> Key: FLINK-37627
> URL: https://issues.apache.org/jira/browse/FLINK-37627
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-5.0.0
>Reporter: Keith Lee
>Priority: Major
>
> Similar to DDB stream connector's issue 
> https://issues.apache.org/jira/browse/FLINK-37416
> This is less likely to happen on Kinesis connector due to much lower 
> frequency of re-sharding / assigning new split but technically possible so 
> we'd like to fix this to avoid data
> loss.
> The scenario is as follow:
> - A checkpoint started
> - KinesisStreamsSourceEnumerator takes a checkpoint (shard was assigned here)
> - KinesisStreamsSourceEnumerator sends checkpoint event to reader
> - Before taking reader checkpoint, a SplitFinishedEvent came up in reader
> - Reader takes checkpoint
> - Now, just after checkpoint complete, job restarted
> This can lead to a shard lineage getting lost because of a shard being in 
> ASSIGNED state in enumerator and not being part of any task manager state.
> See DDB Connector issue's PR for reference fix: 
> https://issues.apache.org/jira/browse/FLINK-37416



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Avoid copy Set for variable implementedRpcGateways [flink]

2025-04-07 Thread via GitHub


beliefer commented on PR #26372:
URL: https://github.com/apache/flink/pull/26372#issuecomment-2782469397

   @1996fanrui @davidradl Thank you all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37620) ForSt Sync mode support remote storage and provide configurable options

2025-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37620:
---
Labels: pull-request-available  (was: )

> ForSt Sync mode support remote storage and provide configurable options
> ---
>
> Key: FLINK-37620
> URL: https://issues.apache.org/jira/browse/FLINK-37620
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.1.0, 2.0.1
>
>
> Currently, the ForSt state backend only serves with local state when using 
> the old state api (the synchronous ones). We should also enable the 
> disaggregated state capabilities in this scenario. Additionally, given the 
> different API set could be mixed used in one job across different tasks, we 
> could provide options for users to enforce local state in sync mode when 
> enabling disaggregated state (Such as 
> 'state.backend.forst.sync.enforce-local'). This is useful for complex SQL 
> jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-37585) The data of the newly added table cannot be read

2025-04-07 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941499#comment-17941499
 ] 

Xin Gong commented on FLINK-37585:
--

You can see this discussion 
[https://github.com/apache/flink-cdc/discussions/3338.] 

> The data of the newly added table cannot be read
> 
>
> Key: FLINK-37585
> URL: https://issues.apache.org/jira/browse/FLINK-37585
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.1
>Reporter: jeff-zou
>Priority: Major
>  Labels: pull-request-available
>
> After starting Flink CDC, when a new table is created in MySQL, Flink can 
> read the table structure of this table, but cannot read the data in it.
>  
> The method to reproduce the bug as follows:
> 1.Start Flink CDC for sync Mysql 
> {code:java}
> // code placeholder
>  MySqlSource mySqlSource =
> MySqlSource.builder()
> .port(3306)
> .hostname("10.11.69.176")
> .port(3306)
> .databaseList("cdc") 
> .tableList(".*") 
> .username("test")
> .password("123456")
> .serverTimeZone("UTC")
> .deserializer(new JsonDebeziumDeserializationSchema()) 
> .includeSchemaChanges(true) 
>.serverId("5401-5404")
> .scanNewlyAddedTableEnabled(true)
> .debeziumProperties(properties)
> .fetchSize(1024)
> .connectTimeout(Duration.ofSeconds(30)) 
> .build(); 
> ...
> {code}
> 2. Create table in Mysql
> {code:java}
> // code placeholder
> CREATE TABLE `test` (
> `id` varchar(100),
> PRIMARY KEY (`id`)
> ) {code}
> 3. Write data to Mysql
> {code:java}
> // code placeholder
> INSERT INTO test(`id`) VALUES('kk') ;
>  {code}
>  
>  
> Flink can read the table's schema, but cannot read the data 'kk' in it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37618][table-planner] Fix PTFs INTERVAL argument [flink]

2025-04-07 Thread via GitHub


davidradl commented on code in PR #26410:
URL: https://github.com/apache/flink/pull/26410#discussion_r2030762088


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java:
##
@@ -538,6 +539,11 @@ private CastAvoidanceChecker(LogicalType sourceType) {
 this.sourceType = sourceType;
 }
 
+@Override
+public Boolean visit(DayTimeIntervalType targetType) {
+return true;

Review Comment:
   I see the char type visit is testing isNullable. Should we do this here also?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state

2025-04-07 Thread Nickel Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941577#comment-17941577
 ] 

Nickel Fang commented on FLINK-25672:
-

We had the same issue (OOM in job manager) and have optimized it by TTL in our 
project. Can I request a PR?


h1. Solution

Requirements
 # Can delete the expired processed paths according to TTL policy to save the 
memory.
 # Be an enhancement solution. Users can easily choose to use the original 
solution or the enhancement solution
 # Can seamlessly upgrade to the enhancement solution with no data lost for the 
running streaming job.

Prerequisite

There is a TTL mechanism in the file source (e.g. S3 retention policy).


Details

Introduce a new variable LinkedHashSet> 
alreadyProcessedPathAndTimestamps in 
PendingSplitsCheckpoint, and Duration retentionTime in 
ContinuousEnumerationSettings.
 

> FileSource enumerator remembers paths of all already processed files which 
> can result in large state
> 
>
> Key: FLINK-25672
> URL: https://issues.apache.org/jira/browse/FLINK-25672
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> As mentioned in the Filesystem documentation, for Unbounded File Sources, the 
> {{FileEnumerator}} currently remembers paths of all already processed files, 
> which is a state that can in come cases grow rather large. 
> We should look into possibilities to reduce this. We could look into adding a 
> compressed form of tracking already processed files (for example by keeping 
> modification timestamps lower boundaries).
> When fixed, this should also be reflected in the documentation, as mentioned 
> in https://github.com/apache/flink/pull/18288#discussion_r785707311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][docs] Fix broken tabs and PTF example argument errors [flink]

2025-04-07 Thread via GitHub


twalthr merged PR #26406:
URL: https://github.com/apache/flink/pull/26406


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37607) Blocklist timeout check may lost

2025-04-07 Thread Heart Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941594#comment-17941594
 ] 

Heart Zhou commented on FLINK-37607:


I would like to work on this issue. 

> Blocklist timeout check may lost
> 
>
> Key: FLINK-37607
> URL: https://issues.apache.org/jira/browse/FLINK-37607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.1
>Reporter: Heart Zhou
>Priority: Major
>
> The blocklist timeout check may be scheduled before the rpc server starts
> The blocklist timeout check is scheduled by the mainThreadExecutor in the 
> constructor.
> {code:java}
> DefaultBlocklistHandler(xxx,
> Duration timeoutCheckInterval,
> ComponentMainThreadExecutor mainThreadExecutor,
> xxx) {
> xxx
> this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
> this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
> xxx
> scheduleTimeoutCheck();
> } {code}
>  
> When the check function is called, the 
> org.apache.flink.runtime.rpc.RpcEndpoint#start method may not have been 
> called yet, although it will be called very soon.
> Therefore, the check function might be lost.
>  
> {code:java}
> public ScheduledFuture schedule(Runnable command, long delay, TimeUnit 
> unit) {
> final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
> FutureTask ft = new FutureTask<>(command, null);
> if (mainScheduledExecutor.isShutdown()) {
> log.warn(
> "The scheduled executor service is shutdown and ignores the 
> command {}",
> command);
> } else {
> mainScheduledExecutor.schedule(
> () -> gateway.runAsync(ft), delayMillis, 
> TimeUnit.MILLISECONDS);
> }
> return new ScheduledFutureAdapter<>(ft, delayMillis, 
> TimeUnit.MILLISECONDS);
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031795676


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -158,6 +167,121 @@ void testTableFromDescriptor() {
 assertThat(tEnv.getCatalogManager().listTables()).isEmpty();
 }
 
+@Test
+void testCreateModelFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertCreateModelFromDescriptor(tEnv, false);
+}
+
+@Test
+void testCreateModelIgnoreIfExistsFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertCreateModelFromDescriptor(tEnv, true);
+assertThatNoException()
+.isThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
true));
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+}
+
+@Test
+void testCreateModelWithSameName() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+}
+
+@Test
+void testDropModel() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+final ObjectPath objectPath = new ObjectPath(database, "M");
+CatalogModel catalogModel =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath);
+assertThat(catalogModel).isInstanceOf(CatalogModel.class);
+assertThat(tEnv.dropModel("M", true)).isTrue();
+assertThatThrownBy(
+() ->
+tEnv.getCatalog(catalog)
+.orElseThrow(AssertionError::new)
+.getModel(objectPath))
+.isInstanceOf(ModelNotExistException.class)
+.hasMessage("Model '`default_catalog`.`default_database`.`M`' 
does not exist.");
+}
+
+@Test
+void testNonExistingDropModel() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertThat(tEnv.dropModel("M", true)).isFalse();
+
+assertThatThrownBy(() -> tEnv.dropModel("M", false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Model with identifier 
'default_catalog.default_database.M' does not exist.");
+}
+
+@Test
+void testCreateTemporaryModelFromDescriptor() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+assertTemporaryCreateModelFromDescriptor(tEnv, false);
+}
+
+@Test
+void testCreateTemporaryModelIfNotExistsFromDescriptor() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertTemporaryCreateModelFromDescriptor(tEnv, true);
+assertThatNoException()
+.isThrownBy(() -> tEnv.createTemporaryModel("M", 
TEST_MODEL_DESCRIPTOR, true));
+
+assertThatThrownBy(() -> tEnv.createTemporaryModel("M", 
TEST_MODEL_DESCRIPTOR, false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Temporary model 
'`default_catalog`.`default_database`.`M`' already exists");
+}
+
+@Test
+void testCreateModelWithSameNameIgnoreIfExists() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+}
+
+@Test
+void testListModels() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();

Review Comment:
   Am I right that we have this line 

Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031813534


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -158,6 +167,121 @@ void testTableFromDescriptor() {
 assertThat(tEnv.getCatalogManager().listTables()).isEmpty();
 }
 
+@Test
+void testCreateModelFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertCreateModelFromDescriptor(tEnv, false);
+}
+
+@Test
+void testCreateModelIgnoreIfExistsFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertCreateModelFromDescriptor(tEnv, true);
+assertThatNoException()
+.isThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
true));
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+}
+
+@Test
+void testCreateModelWithSameName() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+}
+
+@Test
+void testDropModel() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+final ObjectPath objectPath = new ObjectPath(database, "M");
+CatalogModel catalogModel =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath);
+assertThat(catalogModel).isInstanceOf(CatalogModel.class);
+assertThat(tEnv.dropModel("M", true)).isTrue();
+assertThatThrownBy(
+() ->
+tEnv.getCatalog(catalog)
+.orElseThrow(AssertionError::new)
+.getModel(objectPath))
+.isInstanceOf(ModelNotExistException.class)
+.hasMessage("Model '`default_catalog`.`default_database`.`M`' 
does not exist.");
+}
+
+@Test
+void testNonExistingDropModel() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertThat(tEnv.dropModel("M", true)).isFalse();
+
+assertThatThrownBy(() -> tEnv.dropModel("M", false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Model with identifier 
'default_catalog.default_database.M' does not exist.");
+}
+
+@Test
+void testCreateTemporaryModelFromDescriptor() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+assertTemporaryCreateModelFromDescriptor(tEnv, false);
+}
+
+@Test
+void testCreateTemporaryModelIfNotExistsFromDescriptor() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertTemporaryCreateModelFromDescriptor(tEnv, true);
+assertThatNoException()
+.isThrownBy(() -> tEnv.createTemporaryModel("M", 
TEST_MODEL_DESCRIPTOR, true));
+
+assertThatThrownBy(() -> tEnv.createTemporaryModel("M", 
TEST_MODEL_DESCRIPTOR, false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Temporary model 
'`default_catalog`.`default_database`.`M`' already exists");
+}
+
+@Test
+void testCreateModelWithSameNameIgnoreIfExists() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+}
+
+@Test
+void testListModels() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M1", TEST_MODEL_DESCRIPTO

Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031798763


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -212,4 +336,39 @@ private static void 
assertTemporaryCreateTableFromDescriptor(
 
assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
 assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
 }
+
+private static void assertCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) throws 
ModelNotExistException {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+if (ignoreIfExists) {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+} else {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+}
+
+final ObjectPath objectPath = new ObjectPath(database, "M");
+CatalogModel catalogModel =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath);
+assertThat(catalogModel).isInstanceOf(CatalogModel.class);
+assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA);
+assertThat(catalogModel.getOptions().get("a")).isEqualTo("Test");
+}
+
+private static void assertTemporaryCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, ignoreIfExists);
+final Optional lookupResult =
+tEnv.getCatalogManager().getModel(ObjectIdentifier.of(catalog, 
database, "M"));
+assertThat(lookupResult.isPresent()).isTrue();
+CatalogModel catalogModel = lookupResult.get().getResolvedModel();
+assertThat(catalogModel != null).isTrue();

Review Comment:
   why not
   ```suggestion
   assertThat(catalogModel\0.isNotNull();
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031803406


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -212,4 +336,39 @@ private static void 
assertTemporaryCreateTableFromDescriptor(
 
assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
 assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
 }
+
+private static void assertCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) throws 
ModelNotExistException {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+if (ignoreIfExists) {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+} else {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+}
+
+final ObjectPath objectPath = new ObjectPath(database, "M");
+CatalogModel catalogModel =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath);
+assertThat(catalogModel).isInstanceOf(CatalogModel.class);
+assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA);
+assertThat(catalogModel.getOptions().get("a")).isEqualTo("Test");
+}
+
+private static void assertTemporaryCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, ignoreIfExists);
+final Optional lookupResult =
+tEnv.getCatalogManager().getModel(ObjectIdentifier.of(catalog, 
database, "M"));
+assertThat(lookupResult.isPresent()).isTrue();
+CatalogModel catalogModel = lookupResult.get().getResolvedModel();
+assertThat(catalogModel != null).isTrue();
+assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA);
+assertThat(catalogModel.getOutputSchema()).isEqualTo(TEST_SCHEMA);

Review Comment:
   can we have test with different schemas to be sure it behaves properly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031821725


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -212,4 +336,39 @@ private static void 
assertTemporaryCreateTableFromDescriptor(
 
assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
 assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
 }
+
+private static void assertCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) throws 
ModelNotExistException {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+if (ignoreIfExists) {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);

Review Comment:
   if we extract this kind of assert into a separate method I would expect it 
working with different model names, not only `M`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031838881


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -564,6 +565,36 @@ public boolean createView(String path, Table view, boolean 
ignoreIfExists) {
 return catalogManager.createTable(tableTable, viewIdentifier, 
ignoreIfExists);
 }
 
+@Override
+public void createModel(String path, ModelDescriptor descriptor, boolean 
ignoreIfExists) {
+Preconditions.checkNotNull(path, "Path must not be null.");
+Preconditions.checkNotNull(descriptor, "Model descriptor must not be 
null.");
+final ObjectIdentifier objectIdentifier =
+
catalogManager.qualifyIdentifier(getParser().parseIdentifier(path));
+catalogManager.createModel(descriptor.toCatalogModel(), 
objectIdentifier, ignoreIfExists);
+}
+
+@Override
+public void createModel(String path, ModelDescriptor descriptor) {
+createModel(path, descriptor, false);
+}
+
+@Override
+public void createTemporaryModel(String path, ModelDescriptor descriptor) {
+createTemporaryModel(path, descriptor, false);
+}
+
+@Override
+public void createTemporaryModel(
+String path, ModelDescriptor descriptor, boolean ignoreIfExists) {
+Preconditions.checkNotNull(path, "Path must not be null.");
+Preconditions.checkNotNull(descriptor, "Model descriptor must not be 
null.");
+final ObjectIdentifier objectIdentifier =
+
catalogManager.qualifyIdentifier(getParser().parseIdentifier(path));

Review Comment:
   nit: could be extracted into a separate private method with replacement all 
occurrences 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031786003


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -158,6 +167,121 @@ void testTableFromDescriptor() {
 assertThat(tEnv.getCatalogManager().listTables()).isEmpty();
 }
 
+@Test
+void testCreateModelFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertCreateModelFromDescriptor(tEnv, false);
+}
+
+@Test
+void testCreateModelIgnoreIfExistsFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+assertCreateModelFromDescriptor(tEnv, true);
+assertThatNoException()
+.isThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
true));
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+}
+
+@Test
+void testCreateModelWithSameName() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, 
false))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+
+assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR))
+.isInstanceOf(ValidationException.class)
+.hasMessage(
+"Could not execute CreateModel in path 
`default_catalog`.`default_database`.`M`");
+}

Review Comment:
   couldn't it be simplified with parameterized test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031798763


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -212,4 +336,39 @@ private static void 
assertTemporaryCreateTableFromDescriptor(
 
assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
 assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
 }
+
+private static void assertCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) throws 
ModelNotExistException {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+if (ignoreIfExists) {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true);
+} else {
+tEnv.createModel("M", TEST_MODEL_DESCRIPTOR);
+}
+
+final ObjectPath objectPath = new ObjectPath(database, "M");
+CatalogModel catalogModel =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath);
+assertThat(catalogModel).isInstanceOf(CatalogModel.class);
+assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA);
+assertThat(catalogModel.getOptions().get("a")).isEqualTo("Test");
+}
+
+private static void assertTemporaryCreateModelFromDescriptor(
+TableEnvironmentMock tEnv, boolean ignoreIfExists) {
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, ignoreIfExists);
+final Optional lookupResult =
+tEnv.getCatalogManager().getModel(ObjectIdentifier.of(catalog, 
database, "M"));
+assertThat(lookupResult.isPresent()).isTrue();
+CatalogModel catalogModel = lookupResult.get().getResolvedModel();
+assertThat(catalogModel != null).isTrue();

Review Comment:
   why not
   ```suggestion
   assertThat(catalogModel).isNotNull();
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]

2025-04-07 Thread via GitHub


snuyanzin commented on code in PR #26385:
URL: https://github.com/apache/flink/pull/26385#discussion_r2031827083


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -1647,6 +1675,8 @@ private void execute(
 command.execute(catalog.get(), 
objectIdentifier.toObjectPath());
 } catch (TableAlreadyExistException
 | TableNotExistException
+| ModelNotExistException
+| ModelAlreadyExistException

Review Comment:
   with current situation I would vote for having naming similar to what we 
have for tables
   
   >I am curious why we do not have view exceptions
   
   Views are considered as tables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >