[jira] [Created] (FLINK-37622) Exactly once Kafka sink does not produce any records in batch mode
Arvid Heise created FLINK-37622: --- Summary: Exactly once Kafka sink does not produce any records in batch mode Key: FLINK-37622 URL: https://issues.apache.org/jira/browse/FLINK-37622 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-3.4.0 Reporter: Arvid Heise Assignee: Arvid Heise Fix For: kafka-4.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36648) Release flink-connector-kafka v4.0.0 for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-36648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arvid Heise reassigned FLINK-36648: --- Assignee: Arvid Heise (was: Yanquan Lv) > Release flink-connector-kafka v4.0.0 for Flink 2.0 > -- > > Key: FLINK-36648 > URL: https://issues.apache.org/jira/browse/FLINK-36648 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: kafka-3.4.0 >Reporter: Yanquan Lv >Assignee: Arvid Heise >Priority: Major > Labels: pull-request-available > Fix For: kafka-4.0.0 > > > Flink now already have a release for 2.0-preview1 version, Kafka, as one of > the most commonly used connectors, needs to be bump to this version as soon > as possible for users and developers to use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] fix ci failure due to code style [flink-cdc]
MOBIN-F commented on PR #3980: URL: https://github.com/apache/flink-cdc/pull/3980#issuecomment-2785142817 ci failed. I'll check again later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36684][cdc-connector][mysql] Support read changelog as append only mode [flink-cdc]
ruanhang1993 commented on code in PR #3708: URL: https://github.com/apache/flink-cdc/pull/3708#discussion_r2032439969 ## docs/content/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -426,6 +426,19 @@ During a snapshot operation, the connector will query each included table to pro Experimental option, defaults to false. + + scan.read-changelog-as-append-only.enabled + optional + false + Boolean + +Whether to convert the changelog stream to an append-only stream. +This feature is only used in special scenarios where you need to save upstream table deletion messages. For example, in a logical deletion scenario, users are not allowed to physically delete downstream messages. In this case, this feature is used in conjunction with the row_kind metadata field. Therefore, the downstream can save all detailed data at first, and then use the row_kind field to determine whether to perform logical deletion. +The option values are as follows: + true: All types of messages (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) will be converted into INSERT messages. + false (default): All types of messages are sent as is. + + Review Comment: The row-kind part need to be updated as in the content.zh. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36456] [Runtime/Configuration] Improve Security in DatadogHttpReporterFactory by providing ENV alternative to retrieve API key [flink]
github-actions[bot] commented on PR #25470: URL: https://github.com/apache/flink/pull/25470#issuecomment-2785362064 This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/ If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add timestamp_mapping.legacy option to avro filesystem and avro-confluent format [flink]
github-actions[bot] commented on PR #25439: URL: https://github.com/apache/flink/pull/25439#issuecomment-2785361786 This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/ If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [runtime-web] chore: update Angular to v15 [flink]
github-actions[bot] commented on PR #25450: URL: https://github.com/apache/flink/pull/25450#issuecomment-2785361905 This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/ If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Updated readme with stream-processing [flink]
github-actions[bot] commented on PR #25455: URL: https://github.com/apache/flink/pull/25455#issuecomment-2785361975 This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/ If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]
Zakelly opened a new pull request, #26415: URL: https://github.com/apache/flink/pull/26415 ## What is the purpose of the change There is a concurrency issue for reference counting in ForSt file cache, which could lead to a read error in some special scenarios (e.g. extremely frequent cache thrashing) ## Brief change log Two problem: - All the read interfaces in `CachedDataInputStream` should release the reference **only after** the stream is not the `originalStream` - The stream retrieving methods in task threads should consider the latency of status switch. ## Verifying this change This change is already covered by existing tests, such as `ForStFlinkFileSystemTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Rename registeredSlots to allocatedSlots [flink]
beliefer opened a new pull request, #26417: URL: https://github.com/apache/flink/pull/26417 ## What is the purpose of the change This PR aims to improve the readability of `DefaultAllocatedSlotPool`. The original name `registeredSlots` can't tell developers any useful information. I think `allocatedSlots` is better than `registeredSlots`. ## Brief change log Rename `registeredSlots` to `allocatedSlots`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37553) Make ForStKeyedStateBackend consider namespaceSerializer compatibility when restoring state
[ https://issues.apache.org/jira/browse/FLINK-37553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37553: --- Labels: pull-request-available (was: ) > Make ForStKeyedStateBackend consider namespaceSerializer compatibility when > restoring state > --- > > Key: FLINK-37553 > URL: https://issues.apache.org/jira/browse/FLINK-37553 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Affects Versions: 2.0.0 >Reporter: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 2.1.0 > > > ForStKeyedStateBackend currently does not consider namespaceSerializer > compatibility when restoring state. This may cause the namespaceSerializer to > change when it is not known whether it should be compatible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]
fredia commented on code in PR #26415: URL: https://github.com/apache/flink/pull/26415#discussion_r2032341068 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java: ## @@ -102,62 +102,81 @@ private FSDataInputStream getStream() throws IOException { if (isFlinkThread()) { cacheEntry.touch(); } -FSDataInputStream stream = tryGetCacheStream(); -if (stream != null) { -fileBasedCache.incHitCounter(); -return stream; -} - -if (streamStatus == StreamStatus.CACHED_CLOSED -|| streamStatus == StreamStatus.CACHED_CLOSING) { -if (streamStatus == StreamStatus.CACHED_CLOSING) { -try { -semaphore.acquire(1); -} catch (InterruptedException e) { -throw new RuntimeException(e); -} -originalStream.seek(position); -position = -1; -LOG.trace( -"Stream {} status from {} to {}", -cacheEntry.cachePath, -streamStatus, -StreamStatus.CACHED_CLOSED); -streamStatus = StreamStatus.CACHED_CLOSED; -} -// try reopen -tryReopen(); -stream = tryGetCacheStream(); +// Repeat until get a stream +while (true) { +// Firstly, we try to get cache stream +FSDataInputStream stream = tryGetCacheStream(); if (stream != null) { fileBasedCache.incHitCounter(); return stream; } -fileBasedCache.incMissCounter(); -return originalStream; -} else if (streamStatus == StreamStatus.ORIGINAL) { -fileBasedCache.incMissCounter(); -return originalStream; -} else { -if (streamStatus == StreamStatus.CACHED_OPEN) { -stream = tryGetCacheStream(); + +// No cache stream, so is it closing? +if (streamStatus == StreamStatus.CACHED_CLOSED +|| streamStatus == StreamStatus.CACHED_CLOSING) { +if (streamStatus == StreamStatus.CACHED_CLOSING) { +// if closing, update the position +try { +semaphore.acquire(1); +} catch (InterruptedException e) { +throw new RuntimeException(e); +} +originalStream.seek(position); +position = -1; +LOG.trace( +"Stream {} status from {} to {}", +cacheEntry.cachePath, +streamStatus, +StreamStatus.CACHED_CLOSED); +streamStatus = StreamStatus.CACHED_CLOSED; +} +// try reopen +stream = tryReopen(); if (stream != null) { fileBasedCache.incHitCounter(); return stream; } +fileBasedCache.incMissCounter(); Review Comment: Add `streamStatus = StreamStatus.ORIGINAL` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-37553] Make ForStKeyedStateBackend update NamespaceSerializer during restoring state [flink]
mayuehappy opened a new pull request, #26416: URL: https://github.com/apache/flink/pull/26416 …during restoring state ## What is the purpose of the change Make ForStKeyedStateBackend update NamespaceSerializer during restoring state ## Brief change log - add `updateNamespaceSerializer` in `RegisteredKeyValueStateBackendMetaInfo` - invoke RegisteredKeyValueStateBackendMetaInfo#updateNamespaceSerializer when creating State ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework This change added tests and can be verified as follows: add unit test ForStStateMigrationTest#testStateNamespaceSerializerChanged ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (checkpointing) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]
flinkbot commented on PR #26415: URL: https://github.com/apache/flink/pull/26415#issuecomment-2785105538 ## CI report: * 7b963ba791e89acf37bb95e08dcfdc724dee6624 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-37458) Forbid enableAsyncState() for synchronous operators
[ https://issues.apache.org/jira/browse/FLINK-37458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei reassigned FLINK-37458: -- Assignee: Yanfei Lei > Forbid enableAsyncState() for synchronous operators > --- > > Key: FLINK-37458 > URL: https://issues.apache.org/jira/browse/FLINK-37458 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-37458) Forbid enableAsyncState() for synchronous operators
[ https://issues.apache.org/jira/browse/FLINK-37458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941759#comment-17941759 ] Yanfei Lei commented on FLINK-37458: Merged into release-2.0 via 14e85eced10e98bc75870ac0360bad67d0722697 > Forbid enableAsyncState() for synchronous operators > --- > > Key: FLINK-37458 > URL: https://issues.apache.org/jira/browse/FLINK-37458 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix] fix ci failure due to code style [flink-cdc]
MOBIN-F opened a new pull request, #3980: URL: https://github.com/apache/flink-cdc/pull/3980 Code style verifier has been updated to JUnit 5  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] fix ci failure due to code style [flink-cdc]
leonardBang commented on PR #3980: URL: https://github.com/apache/flink-cdc/pull/3980#issuecomment-2785104152 Thanks @MOBIN-F for the quickfix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37628) Wrong reference counting in ForSt file cache
[ https://issues.apache.org/jira/browse/FLINK-37628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37628: --- Labels: pull-request-available (was: ) > Wrong reference counting in ForSt file cache > > > Key: FLINK-37628 > URL: https://issues.apache.org/jira/browse/FLINK-37628 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > > There is a concurrency issue for reference counting in ForSt file cache, > which could lead to a read error in some special scenarios (e.g. extremely > frequent cache thrashing) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37553] Make ForStKeyedStateBackend update NamespaceSerializer during restoring state [flink]
flinkbot commented on PR #26416: URL: https://github.com/apache/flink/pull/26416#issuecomment-2785176177 ## CI report: * 8f4cfd9ee17624cda10a40d69b8891bd1edd4593 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Rename registeredSlots to allocatedSlots [flink]
flinkbot commented on PR #26417: URL: https://github.com/apache/flink/pull/26417#issuecomment-2785177001 ## CI report: * 7fd9bfbbd3106b4c43f3f14d69c056550fa66aff UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [postgres] fix when backfill will scan all table schame which match config [flink-cdc]
hql0312 commented on PR #3979: URL: https://github.com/apache/flink-cdc/pull/3979#issuecomment-2785178413 > Please check the code style. @hql0312 ok,done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Unify the name for the collection of SlotOffer with slotOffers [flink]
beliefer opened a new pull request, #26418: URL: https://github.com/apache/flink/pull/26418 ## What is the purpose of the change This PR aims to unify the name for the collection of `SlotOffer`. Currently, there are two name of variable about collection of `SlotOffer`. One is `slots` and another is `offers`. I think `slotOffers` have better readability than them. ## Brief change log Unify the name for the collection of `SlotOffer` with `slotOffers`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37628] Fix reference counting in ForSt file cache [flink]
Zakelly commented on code in PR #26415: URL: https://github.com/apache/flink/pull/26415#discussion_r2032354803 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java: ## @@ -102,62 +102,81 @@ private FSDataInputStream getStream() throws IOException { if (isFlinkThread()) { cacheEntry.touch(); } -FSDataInputStream stream = tryGetCacheStream(); -if (stream != null) { -fileBasedCache.incHitCounter(); -return stream; -} - -if (streamStatus == StreamStatus.CACHED_CLOSED -|| streamStatus == StreamStatus.CACHED_CLOSING) { -if (streamStatus == StreamStatus.CACHED_CLOSING) { -try { -semaphore.acquire(1); -} catch (InterruptedException e) { -throw new RuntimeException(e); -} -originalStream.seek(position); -position = -1; -LOG.trace( -"Stream {} status from {} to {}", -cacheEntry.cachePath, -streamStatus, -StreamStatus.CACHED_CLOSED); -streamStatus = StreamStatus.CACHED_CLOSED; -} -// try reopen -tryReopen(); -stream = tryGetCacheStream(); +// Repeat until get a stream +while (true) { +// Firstly, we try to get cache stream +FSDataInputStream stream = tryGetCacheStream(); if (stream != null) { fileBasedCache.incHitCounter(); return stream; } -fileBasedCache.incMissCounter(); -return originalStream; -} else if (streamStatus == StreamStatus.ORIGINAL) { -fileBasedCache.incMissCounter(); -return originalStream; -} else { -if (streamStatus == StreamStatus.CACHED_OPEN) { -stream = tryGetCacheStream(); + +// No cache stream, so is it closing? +if (streamStatus == StreamStatus.CACHED_CLOSED +|| streamStatus == StreamStatus.CACHED_CLOSING) { +if (streamStatus == StreamStatus.CACHED_CLOSING) { +// if closing, update the position +try { +semaphore.acquire(1); +} catch (InterruptedException e) { +throw new RuntimeException(e); +} +originalStream.seek(position); +position = -1; +LOG.trace( +"Stream {} status from {} to {}", +cacheEntry.cachePath, +streamStatus, +StreamStatus.CACHED_CLOSED); +streamStatus = StreamStatus.CACHED_CLOSED; +} +// try reopen +stream = tryReopen(); if (stream != null) { fileBasedCache.incHitCounter(); return stream; } +fileBasedCache.incMissCounter(); Review Comment: I suggest not. The `ORIGINAL` means this stream could only be read via original stream, no cache mechanism enabled. It's a stablized status without transition. There is an `else if` branch under this to handle the `ORIGINAL` condition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-37623) Async state support for `process()` in Datastream API
[ https://issues.apache.org/jira/browse/FLINK-37623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-37623: --- Assignee: Yanfei Lei (was: Zakelly Lan) > Async state support for `process()` in Datastream API > -- > > Key: FLINK-37623 > URL: https://issues.apache.org/jira/browse/FLINK-37623 > Project: Flink > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Fix For: 2.1.0, 2.0.1 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Unify the name for the collection of SlotOffer with slotOffers [flink]
flinkbot commented on PR #26418: URL: https://github.com/apache/flink/pull/26418#issuecomment-2785194374 ## CI report: * 43a54812fb035d279a21137de5f65d667500142d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fixing flaky tests in flink-table-planner /hints [flink]
github-actions[bot] commented on PR #25486: URL: https://github.com/apache/flink/pull/25486#issuecomment-2785362205 This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/ If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36683) Support metadata 'row_kind' virtual column for Mongo CDC Connector
[ https://issues.apache.org/jira/browse/FLINK-36683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-36683. Resolution: Resolved Resolved via master: 2e4abdb68ed960f09610cb8be332b770a07ba53e > Support metadata 'row_kind' virtual column for Mongo CDC Connector > -- > > Key: FLINK-36683 > URL: https://issues.apache.org/jira/browse/FLINK-36683 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Runkang He >Assignee: Runkang He >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.4.0 > > > 'row_kind' metadata is very useful in actual user scenarios, the two main > scenarios are below: > 1. Save all upstream messages: In this scenario, the downstream will save all > message includes delete messages from upstream. To achieve this requirement, > we should convert full changelogs to append only message, and need to use > metadata row_kind to represent the changelog kind. > 2. Ignore upstream delete messages: In this scenario, to save storage space, > the upstream cdc source often deletes historical data regularly and only > retains data within seven days. However, the business requires the downstream > OLAP system to retain the full amount of historical data, so it is necessary > to ignore the delete messages from source. A reasonable way is to use > metadata row_kind to filter out these delete messages. > So I think we should support 'row_kind' metadata in Mongo CDC Connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37583] Upgrade to Kafka 4.0.0 client. [flink-connector-kafka]
tomncooper commented on code in PR #161: URL: https://github.com/apache/flink-connector-kafka/pull/161#discussion_r2030882248 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java: ## @@ -40,7 +40,7 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase { private static final String INIT_KAFKA_RETRIES = "0"; private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000"; private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000"; -private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000"; +private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500"; Review Comment: Sorry, I should have put a comment in the PR description about this. The new kafka client now enforces the requirement for the delivery timeout to be longer than the request timeout (which is part of the overall delivery timeout) at compile time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-37626) Flaky test: ForStFlinkFileSystemTest.testSstFileInCache
[ https://issues.apache.org/jira/browse/FLINK-37626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-37626: --- Assignee: Zakelly Lan > Flaky test: ForStFlinkFileSystemTest.testSstFileInCache > --- > > Key: FLINK-37626 > URL: https://issues.apache.org/jira/browse/FLINK-37626 > Project: Flink > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Gabor Somogyi >Assignee: Zakelly Lan >Priority: Major > > https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041 > {code:java} > Apr 07 09:38:18 09:38:18.871 [ERROR] > org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache > -- Time elapsed: 0.049 s <<< FAILURE! > Apr 07 09:38:18 org.opentest4j.AssertionFailedError: > Apr 07 09:38:18 > Apr 07 09:38:18 expected: 89 > Apr 07 09:38:18 but was: 0 > Apr 07 09:38:18 at > org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache(ForStFlinkFileSystemTest.java:332) > Apr 07 09:38:18 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > Apr 07 09:38:18 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]
flinkbot commented on PR #26412: URL: https://github.com/apache/flink/pull/26412#issuecomment-2782507746 ## CI report: * f818133916a63feec3aba9361323b2c2231d0afd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]
davidradl commented on code in PR #26412: URL: https://github.com/apache/flink/pull/26412#discussion_r2030769398 ## docs/layouts/shortcodes/generated/forst_configuration.html: ## @@ -116,6 +116,12 @@ String The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager. + +state.backend.forst.sync.enforce-local +false +Boolean +Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job. Review Comment: nit: is this true? same job. -> same local job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37596][metrics] Close metric group of a finished split [flink]
pnowojski commented on code in PR #26388: URL: https://github.com/apache/flink/pull/26388#discussion_r2030767818 ## flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java: ## @@ -191,4 +191,17 @@ public Boolean isIdle() { public Boolean isActive() { return !isPaused() && !isIdle(); } + +public void onSplitFinished() { +if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) { +((AbstractMetricGroup) splitWatermarkMetricGroup).close(); +} else { +LOG.warn("Split watermark metric group can not be closed"); Review Comment: nit: log what's the instance type of the metric group? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Update copyright NOTICE year to 2025 [flink]
flinkbot commented on PR #26413: URL: https://github.com/apache/flink/pull/26413#issuecomment-2782943869 ## CI report: * 15cc4c070ca4a56aa1057071d421c0a95a10dd89 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37598][table] Support list and map state in PTFs [flink]
twalthr commented on PR #26396: URL: https://github.com/apache/flink/pull/26396#issuecomment-2783419929 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36090) Bug with IngestDB restore operation for priority queue state in backend
[ https://issues.apache.org/jira/browse/FLINK-36090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-36090: -- Assignee: Maxim Vershinin > Bug with IngestDB restore operation for priority queue state in backend > --- > > Key: FLINK-36090 > URL: https://issues.apache.org/jira/browse/FLINK-36090 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 2.0.0 >Reporter: Maxim Vershinin >Assignee: Maxim Vershinin >Priority: Major > Labels: pull-request-available > > *Summary:* Incorrect handling of priority queue states in IngestDB during > restoring due to missing {{equals()}} and {{hashCode()}} methods in > {{{}RegisteredPriorityQueueStateBackendMetaInfo{}}}. > *Problem Description:* > During the restoring by IngestDB in my Flink project, an issue was identified > where the priority queue states are not managed correctly in the backend. The > problem stems from the absence of {{equals()}} and {{hashCode()}} methods in > the {{RegisteredPriorityQueueStateBackendMetaInfo}} class. > In particular, within the {{exportColumnFamiliesWithSstDataInKeyGroupsRange}} > method of the {{RocksDBIncrementalRestoreOperation}} class, if the state is a > priority queue, identical states from different subtasks are erroneously > treated as distinct states within the {{exportedColumnFamiliesOut}} map. This > leads to inconsistent behavior and errors during the restoring process. > *Proposed Solution:* > To address this issue, add {{equals()}} and {{hashCode()}} methods to the > {{RegisteredPriorityQueueStateBackendMetaInfo}} class. Implementing these > methods will ensure that priority queue states are accurately recognized and > handled across different subtasks, thereby preventing errors during restoring > by IngestDB. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37511][rest] Use Jackson serialization in JobPlanInfo.Plan [flink]
akalash commented on code in PR #26320: URL: https://github.com/apache/flink/pull/26320#discussion_r2031317302 ## docs/layouts/shortcodes/generated/rest_v1_dispatcher.html: ## @@ -2879,7 +3009,72 @@ "properties" : { "plan" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson" + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:Plan", Review Comment: I see there are still several open questions(cc. @davidradl ): > this appears to be an issue with the open api and rest docs - that are not generating correctly. Kind of, the RawJson didn't generate the proper docs and rest_api indeed looked unreadable with RawJson > the fix seems to be to change the way we do job serialization Yes, indeed we used to serialize it manually but right now Jackson does it for us based on POJO class but in fact, the serialization remains the same(it's still Jackson) but it just happens in different time/place > on the face of it this seems a large change to a core part of Flink, that could introduce other considerations for example how are existing job serializations effected. It doesn't look like a big change since, as we discussed in (https://github.com/apache/flink/pull/26320/files#r2022598714), the result JSON hasn't changed. So the only change is the internal representation changed from String to Object and the documentation has more details now. @Efrat19 maybe you have something to add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36090) Bug with IngestDB restore operation for priority queue state in backend
[ https://issues.apache.org/jira/browse/FLINK-36090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter resolved FLINK-36090. Assignee: (was: Maxim Vershinin) Resolution: Duplicate Already fixed in FLINK-35580, 122c7ee628ba0a670894aeea7b0bb48f3c06d4f6 > Bug with IngestDB restore operation for priority queue state in backend > --- > > Key: FLINK-36090 > URL: https://issues.apache.org/jira/browse/FLINK-36090 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 2.0.0 >Reporter: Maxim Vershinin >Priority: Major > Labels: pull-request-available > > *Summary:* Incorrect handling of priority queue states in IngestDB during > restoring due to missing {{equals()}} and {{hashCode()}} methods in > {{{}RegisteredPriorityQueueStateBackendMetaInfo{}}}. > *Problem Description:* > During the restoring by IngestDB in my Flink project, an issue was identified > where the priority queue states are not managed correctly in the backend. The > problem stems from the absence of {{equals()}} and {{hashCode()}} methods in > the {{RegisteredPriorityQueueStateBackendMetaInfo}} class. > In particular, within the {{exportColumnFamiliesWithSstDataInKeyGroupsRange}} > method of the {{RocksDBIncrementalRestoreOperation}} class, if the state is a > priority queue, identical states from different subtasks are erroneously > treated as distinct states within the {{exportedColumnFamiliesOut}} map. This > leads to inconsistent behavior and errors during the restoring process. > *Proposed Solution:* > To address this issue, add {{equals()}} and {{hashCode()}} methods to the > {{RegisteredPriorityQueueStateBackendMetaInfo}} class. Implementing these > methods will ensure that priority queue states are accurately recognized and > handled across different subtasks, thereby preventing errors during restoring > by IngestDB. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36683][cdc-connector][mongo] Support metadata 'row_kind' virtual column [flink-cdc]
leonardBang merged PR #3705: URL: https://github.com/apache/flink-cdc/pull/3705 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump org.apache.kafka:kafka-clients from 3.4.0 to 3.7.1 [flink-connector-kafka]
prshnt commented on PR #135: URL: https://github.com/apache/flink-connector-kafka/pull/135#issuecomment-2782506369 @dependabot rebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords calls are made on idle source causing high CPU utilisation and throttling [flink-connector-aws]
gguptp commented on code in PR #195: URL: https://github.com/apache/flink-connector-aws/pull/195#discussion_r2030973955 ## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java: ## @@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader( this.kinesis = kinesisProxy; this.configuration = configuration; this.maxRecordsToGet = configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX); +this.getRecordsIntervalMillis = configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis(); +this.idleSourceGetRecordsIntervalMillis = + configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis(); } @Override -protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { +protected RecordBatch fetchRecords(KinesisShardSplitState splitState) throws IOException { +if (skipUntilScheduledGetRecordTime(splitState)) { +return null; +} + GetRecordsResponse getRecordsResponse = kinesis.getRecords( splitState.getStreamArn(), splitState.getShardId(), splitState.getNextStartingPosition(), this.maxRecordsToGet); + +scheduleNextGetRecord(splitState, getRecordsResponse); + boolean isCompleted = getRecordsResponse.nextShardIterator() == null; return new RecordBatch( getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted); } +private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState splitState) +throws IOException { +if (scheduledGetRecordTimes.containsKey(splitState) +&& scheduledGetRecordTimes.get(splitState) > System.currentTimeMillis()) { +try { +Thread.sleep(1); Review Comment: if you dont add this, flink calls fetch function infinitely causing 100% CPU -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords calls are made on idle source causing high CPU utilisation and throttling [flink-connector-aws]
darenwkt commented on code in PR #195: URL: https://github.com/apache/flink-connector-aws/pull/195#discussion_r2030971648 ## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java: ## @@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader( this.kinesis = kinesisProxy; this.configuration = configuration; this.maxRecordsToGet = configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX); +this.getRecordsIntervalMillis = configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis(); +this.idleSourceGetRecordsIntervalMillis = + configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis(); } @Override -protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { +protected RecordBatch fetchRecords(KinesisShardSplitState splitState) throws IOException { +if (skipUntilScheduledGetRecordTime(splitState)) { +return null; +} + GetRecordsResponse getRecordsResponse = kinesis.getRecords( splitState.getStreamArn(), splitState.getShardId(), splitState.getNextStartingPosition(), this.maxRecordsToGet); + +scheduleNextGetRecord(splitState, getRecordsResponse); + boolean isCompleted = getRecordsResponse.nextShardIterator() == null; return new RecordBatch( getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted); } +private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState splitState) +throws IOException { +if (scheduledGetRecordTimes.containsKey(splitState) +&& scheduledGetRecordTimes.get(splitState) > System.currentTimeMillis()) { +try { +Thread.sleep(1); Review Comment: Q: Why do we add a 1ms sleep here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37618][table-planner] Fix PTFs INTERVAL argument [flink]
juntaozhang commented on code in PR #26410: URL: https://github.com/apache/flink/pull/26410#discussion_r2031022714 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java: ## @@ -538,6 +539,11 @@ private CastAvoidanceChecker(LogicalType sourceType) { this.sourceType = sourceType; } +@Override +public Boolean visit(DayTimeIntervalType targetType) { +return true; Review Comment: Hi @davidradl, thanks for your kindly reminding, already fix it, and also fix `INTERVAL_YEAR_MONTH` situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35466) Cannot pass all columns to SQL UDFs using `*`
[ https://issues.apache.org/jira/browse/FLINK-35466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941590#comment-17941590 ] Lorenzo Affetti commented on FLINK-35466: - I would like to take this one > Cannot pass all columns to SQL UDFs using `*` > - > > Key: FLINK-35466 > URL: https://issues.apache.org/jira/browse/FLINK-35466 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Gyula Fora >Priority: Major > > The SQL API does not allow calling UDFs using all the columns of a given > table using * notation such as: > {noformat} > tableEnv.executeSql("SELECT MyFun(Orders.*) FROM Orders").print();{noformat} > The above call fails with the following error: > {noformat} > org.apache.flink.table.api.ValidationException: SQL validation failed. At > line 1, column 21: Unknown field '*' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) >at com.apple.pie.flink.utils.testing.Test2.test(Test2.java:29) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) >at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) >at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) >at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) >at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) >at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) >at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) >at > org.jun
[jira] [Commented] (FLINK-37621) EnableAsyncState doesn't seem to do anything on DataStream API and misleading error message
[ https://issues.apache.org/jira/browse/FLINK-37621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941475#comment-17941475 ] Zakelly Lan commented on FLINK-37621: - Seems we lack support for `process()` with async state in Datastream API. Will add this. And the error message should be more precise for the unsupported scenarios. Thanks for reporting this [~gyfora] > EnableAsyncState doesn't seem to do anything on DataStream API and misleading > error message > --- > > Key: FLINK-37621 > URL: https://issues.apache.org/jira/browse/FLINK-37621 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 2.0.0 >Reporter: Gyula Fora >Priority: Major > > eventStream > .keyBy(e -> e.key) > .enableAsyncState() > .process(new EventHistoryProcessor(params)) > .enableAsyncState(); > Leads to: > Caused by: java.lang.IllegalStateException: Current operator integrates the > async processing logic, thus only supports state v2 APIs. Please use > StateDescriptor under 'org.apache.flink.runtime.state.v2'. > The error is misleading because the v2 apis are used but the check combines > the async enabled check. We need to split the error reporting. > Replacing with: > eventStream > .keyBy(e -> e.key) > .transform( > "Event History", > BasicTypeInfo.LONG_TYPE_INFO, > new AsyncKeyedProcessOperator<>(new EventHistoryProcessor(params))); > Would fix the problem but that doesn't seem right -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-37621) EnableAsyncState doesn't seem to do anything on DataStream API and misleading error message
[ https://issues.apache.org/jira/browse/FLINK-37621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-37621: --- Assignee: Zakelly Lan > EnableAsyncState doesn't seem to do anything on DataStream API and misleading > error message > --- > > Key: FLINK-37621 > URL: https://issues.apache.org/jira/browse/FLINK-37621 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 2.0.0 >Reporter: Gyula Fora >Assignee: Zakelly Lan >Priority: Major > > eventStream > .keyBy(e -> e.key) > .enableAsyncState() > .process(new EventHistoryProcessor(params)) > .enableAsyncState(); > Leads to: > Caused by: java.lang.IllegalStateException: Current operator integrates the > async processing logic, thus only supports state v2 APIs. Please use > StateDescriptor under 'org.apache.flink.runtime.state.v2'. > The error is misleading because the v2 apis are used but the check combines > the async enabled check. We need to split the error reporting. > Replacing with: > eventStream > .keyBy(e -> e.key) > .transform( > "Event History", > BasicTypeInfo.LONG_TYPE_INFO, > new AsyncKeyedProcessOperator<>(new EventHistoryProcessor(params))); > Would fix the problem but that doesn't seem right -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Avoid copy Set for variable implementedRpcGateways [flink]
1996fanrui merged PR #26372: URL: https://github.com/apache/flink/pull/26372 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37624) Support enableAsyncState and switch operator after datastream transformation
Zakelly Lan created FLINK-37624: --- Summary: Support enableAsyncState and switch operator after datastream transformation Key: FLINK-37624 URL: https://issues.apache.org/jira/browse/FLINK-37624 Project: Flink Issue Type: Improvement Affects Versions: 2.0.0 Reporter: Zakelly Lan Fix For: 2.1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37623) Async state support for `process()` in Datastream API
Zakelly Lan created FLINK-37623: --- Summary: Async state support for `process()` in Datastream API Key: FLINK-37623 URL: https://issues.apache.org/jira/browse/FLINK-37623 Project: Flink Issue Type: Improvement Affects Versions: 2.0.0 Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 2.1.0, 2.0.1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Avoid duplicate fetch the size of memory segment [flink]
1996fanrui merged PR #26367: URL: https://github.com/apache/flink/pull/26367 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37407][state] Add savepoint metadata SQL built-in process function [flink]
Zakelly commented on code in PR #26393: URL: https://github.com/apache/flink/pull/26393#discussion_r2030677692 ## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; +import org.apache.flink.state.api.runtime.SavepointLoader; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +@Internal +@FunctionHint( +output = +@DataTypeHint( +"ROW")) +public class SavepointMetadataTableFunction extends TableFunction { +public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {} + +public void eval(String savepointPath) { +try { +CheckpointMetadata checkpointMetadata = +SavepointLoader.loadSavepointMetadata(savepointPath); + +for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) { +Row row = Row.withNames(); +row.setField("checkpoint-id", checkpointMetadata.getCheckpointId()); +row.setField("operator-name", operatorState.getOperatorName().orElse(null)); +row.setField("operator-uid", operatorState.getOperatorUid().orElse(null)); +row.setField("operator-uid-hash", operatorState.getOperatorID().toHexString()); +row.setField("operator-parallelism", operatorState.getParallelism()); +row.setField("operator-max-parallelism", operatorState.getMaxParallelism()); +row.setField("operator-subtask-state-count", operatorState.getStates().size()); Review Comment: Thus I suggest adding a more detailed description for `operator-subtask-state-count` in the doc. It represents the state partition count divided by the operator's parallelism and might be 0 if the state is not partitioned. That's my understanding, correct me if I'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-37143) Update version of flink-connector-kafka to 4.0-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-37143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arvid Heise resolved FLINK-37143. - Assignee: Yanquan Lv Resolution: Fixed > Update version of flink-connector-kafka to 4.0-SNAPSHOT > --- > > Key: FLINK-37143 > URL: https://issues.apache.org/jira/browse/FLINK-37143 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: kafka-4.0.0 >Reporter: Yanquan Lv >Assignee: Yanquan Lv >Priority: Minor > Fix For: kafka-4.0.0 > > > As [https://lists.apache.org/thread/rl7prqop7wfn2o8j2j9fd96dgr1bjjnx] > discuss, next version after kafka-3.4.0 would be kafka-4.0.0 to support Flink > 2.0-preview and Flink 2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37407][state] Add savepoint metadata SQL built-in process function [flink]
gaborgsomogyi commented on code in PR #26393: URL: https://github.com/apache/flink/pull/26393#discussion_r2030693310 ## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; +import org.apache.flink.state.api.runtime.SavepointLoader; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +@Internal +@FunctionHint( +output = +@DataTypeHint( +"ROW")) +public class SavepointMetadataTableFunction extends TableFunction { +public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {} + +public void eval(String savepointPath) { +try { +CheckpointMetadata checkpointMetadata = +SavepointLoader.loadSavepointMetadata(savepointPath); + +for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) { +Row row = Row.withNames(); +row.setField("checkpoint-id", checkpointMetadata.getCheckpointId()); +row.setField("operator-name", operatorState.getOperatorName().orElse(null)); +row.setField("operator-uid", operatorState.getOperatorUid().orElse(null)); +row.setField("operator-uid-hash", operatorState.getOperatorID().toHexString()); +row.setField("operator-parallelism", operatorState.getParallelism()); +row.setField("operator-max-parallelism", operatorState.getMaxParallelism()); +row.setField("operator-subtask-state-count", operatorState.getStates().size()); Review Comment: Yeah, this is a hidden gem and worth to add some more info. Your understanding is correct so I would add your sentence as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37583] Upgrade to Kafka 4.0.0 client. [flink-connector-kafka]
tomncooper commented on PR #161: URL: https://github.com/apache/flink-connector-kafka/pull/161#issuecomment-2782791845 > @tomncooper @MartijnVisser @AHeise I suggest we move to Kafka 4.0..0 client when we do Kafka connector v4 for Flink 2. WDYT? ( assuming we can sort out the tests) > > I think there is a case to say we do not backport Kafka client v4.0.0 support to Kafka connector v3.3 or 3.4, in case there are old Kafka clusters that we would not want to break on the v3 stream. @davidradl As per the [discussion](https://lists.apache.org/thread/l0kzx2wb2qz7ntcv6wkbbfwx688y0o69) on the dev mailing list, we are going to move ahead with a Connector 4.0 release with Flink 2.0 and Kafka 3.9.0. We can then do point release updates. I still think, given that this PR would drop support for older Kafka versions it should be part of a further major version bump (ie 5.0) but we can have that discussion when the time comes to merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]
Zakelly commented on code in PR #26412: URL: https://github.com/apache/flink/pull/26412#discussion_r2030928114 ## docs/content.zh/docs/ops/state/disaggregated_state.md: ## @@ -150,6 +150,18 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state checkpoint and fast recovery, since the ForSt will perform file copy between the primary storage location and the checkpoint directory during checkpointing and recovery. + ForSt Local Storage Location + +ForSt stores the state of synchronous operators in the local disk by default, to avoid the high Review Comment: ```suggestion By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When using synchronous state APIs in Datastream and SQL jobs, ForSt will only serve as **local state store**. Since a job may contain multiple ForSt instances with API mixed usage, synchronous local state access along with asynchronous remote state access could help achieve better overall throughput. If you ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Update copyright NOTICE year to 2025 [flink]
Zakelly opened a new pull request, #26413: URL: https://github.com/apache/flink/pull/26413 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37626) Flaky test: ForStFlinkFileSystemTest.testSstFileInCache
Gabor Somogyi created FLINK-37626: - Summary: Flaky test: ForStFlinkFileSystemTest.testSstFileInCache Key: FLINK-37626 URL: https://issues.apache.org/jira/browse/FLINK-37626 Project: Flink Issue Type: Bug Affects Versions: 2.0.0 Reporter: Gabor Somogyi https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37626) Flaky test: ForStFlinkFileSystemTest.testSstFileInCache
[ https://issues.apache.org/jira/browse/FLINK-37626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated FLINK-37626: -- Description: https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041 {code:java} Apr 07 09:38:18 09:38:18.871 [ERROR] org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache -- Time elapsed: 0.049 s <<< FAILURE! Apr 07 09:38:18 org.opentest4j.AssertionFailedError: Apr 07 09:38:18 Apr 07 09:38:18 expected: 89 Apr 07 09:38:18 but was: 0 Apr 07 09:38:18 at org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache(ForStFlinkFileSystemTest.java:332) Apr 07 09:38:18 at java.base/java.lang.reflect.Method.invoke(Method.java:568) Apr 07 09:38:18 at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) Apr 07 09:38:18 at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) Apr 07 09:38:18 at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) Apr 07 09:38:18 at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) Apr 07 09:38:18 at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Apr 07 09:38:18 {code} was:https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041 > Flaky test: ForStFlinkFileSystemTest.testSstFileInCache > --- > > Key: FLINK-37626 > URL: https://issues.apache.org/jira/browse/FLINK-37626 > Project: Flink > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Gabor Somogyi >Priority: Major > > https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=67041 > {code:java} > Apr 07 09:38:18 09:38:18.871 [ERROR] > org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache > -- Time elapsed: 0.049 s <<< FAILURE! > Apr 07 09:38:18 org.opentest4j.AssertionFailedError: > Apr 07 09:38:18 > Apr 07 09:38:18 expected: 89 > Apr 07 09:38:18 but was: 0 > Apr 07 09:38:18 at > org.apache.flink.state.forst.fs.ForStFlinkFileSystemTest.testSstFileInCache(ForStFlinkFileSystemTest.java:332) > Apr 07 09:38:18 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 07 09:38:18 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > Apr 07 09:38:18 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]
fredia commented on PR #26412: URL: https://github.com/apache/flink/pull/26412#issuecomment-2783112395 > Should we also use ResourceGuard like [FLINK-37597](https://issues.apache.org/jira/browse/FLINK-37597) when using sync mode to access remote storage ? `ResourceGuard` is used to ensure that the snapshot threads are completed before `ForStSyncKeyedStateBackend` disposing, and `ForStSyncKeyedStateBackend#dispose()` has already achieved this. IIUC, the remote storage accessing in `ForStSyncKeyedStateBackend` is done in the main task thread, maybe extra work is not necessary, correct me if I'm wrong :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36813][cdc-connectors][mysql] support mysql sync part columns [flink-cdc]
JNSimba commented on PR #3767: URL: https://github.com/apache/flink-cdc/pull/3767#issuecomment-2783640913 > @JNSimba Could we provide this feature by `SupportsProjectionPushDown`? `debezium.column.include.list` and `debezium.column.exclude.list` is hard for users to understand and use. We could use `SupportsProjectionPushDown` interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting. @ruanhang1993 `SupportsProjectionPushDown` is indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, here If you run the `MySqlConnectorITCase.testConsumingAllEvents` case, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reported https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37 ```java Object[] chunkKey = RecordUtils.getSplitKey( splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { if (RecordUtils.splitKeyRangeContains( chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAfter(splitInfo.getHighWatermark())) { return true; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Translate "Table ID" Page for Flink CDC Chinese Documentation [flink-cdc]
leonardBang merged PR #3888: URL: https://github.com/apache/flink-cdc/pull/3888 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34554] Introduce transaction strategies [flink-connector-kafka]
fapaul commented on code in PR #154: URL: https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2031443184 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.KafkaWriterState; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Describes the ownership model of transactional ids and with that ownership of the transactions. + * + * A subtask that owns a transactional id is responsible for committing and aborting the + * transactions having that id. Only that subtask may create new ids. + * + * Transactional ids have the form transactionalIdPrefix + "-" + subtaskId + "-" + counter + * . The prefix is given by the user, the subtask id is defined through the ownership model + * and the counter through the {@link + * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}. + * + * For all strategies ownership is extrapolated for subtask ids beyond the currently known + * subtasks. This is necessary to support cases of intermediate upscaling where no checkpoint has + * been taken. Consider an application that runs with 3 subtasks and checkpointed. Later, its + * upscaled to 5 but then a failure happens. We need to have at least 5 open transactions. If the + * application is finally resumed from the checkpoint with 3 subtasks again. These 3 subtasks need + * to assume ownership of the remaining 2. + */ +@Internal +public enum TransactionOwnership { +/** + * The ownership is determined by the current subtask ID. Ownership is extrapolated by + * extracting the original subtask id of the ongoing transactions and applying modulo on the + * current parallelism. + */ +IMPLICIT_BY_SUBTASK_ID { +@Override +public int[] getOwnedSubtaskIds( +int currentSubtaskId, +int currentParallelism, +Collection recoveredStates) { +if (!recoveredStates.isEmpty()) { +checkForMigration(currentSubtaskId, recoveredStates); +} + +return new int[] {currentSubtaskId}; +} + +private void checkForMigration( +int currentSubtaskId, Collection recoveredStates) { +TransactionOwnership oldOwnership = +recoveredStates.stream() +.map(KafkaWriterState::getTransactionOwnership) +.findFirst() +.orElseThrow(); +if (oldOwnership == this) { +return; +} + +Set ownedSubtaskIds = +recoveredStates.stream() +.mapToInt(KafkaWriterState::getOwnedSubtaskId) +.boxed() +.collect(Collectors.toSet()); +if (!ownedSubtaskIds.contains(currentSubtaskId) +&& !ownedSubtaskIds.equals(Set.of(UNKNOWN))) { +int numShares = +recoveredStates.stream() +.mapToInt(KafkaWriterState::getOwnedSubtaskId) +.findFirst() +.orElse(UNKNOWN); +throw new IllegalStateException( +"Attempted to switch back to INCREMENTING from a transaction naming strategy that uses the new writer state. A possible way to safely do it is to scale back to the maximum parallelism of " ++ numShares); +} +} + +@Override +public int getTotalNumberOfOwnedSubtasks( +int currentSubtaskId, +int currentParallelism, +Collection recoveredStates) { +return currentParallelism;
[jira] [Assigned] (FLINK-36611) Add schema info to output of Kafka sink
[ https://issues.apache.org/jira/browse/FLINK-36611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-36611: -- Assignee: MOBIN > Add schema info to output of Kafka sink > - > > Key: FLINK-36611 > URL: https://issues.apache.org/jira/browse/FLINK-36611 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: Yanquan Lv >Assignee: MOBIN >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.4.0 > > > Currently, the output of Kafka sink in debezium format looks like this: > {code:java} > { > "before": { > "id": 4, > "name": "John", > "address": "New York", > "phone_number": "", > "age": 12 > }, > "after": { > "id": 4, > "name": "John", > "address": "New York", > "phone_number": "1234", > "age": 12 > }, > "op": "u", > "source": { > "db": null, > "table": "customers" > } > } {code} > It contains record data with full before/after and db info, but schema info > wasn't included. > However, In some scenarios, we need this information to determine the type of > data. For example, Paimon's Kafka CDC source requires this type information, > otherwise all types are considered String, refer to > [https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/#supported-formats.] > Considering that this will increase the data load, I suggest adding a > parameter to configure whether to enable it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36611) Add schema info to output of Kafka sink
[ https://issues.apache.org/jira/browse/FLINK-36611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-36611. Resolution: Implemented Implemented via master: 3457a922b5da331243fb3a4a1bc26a1b58e81b36 > Add schema info to output of Kafka sink > - > > Key: FLINK-36611 > URL: https://issues.apache.org/jira/browse/FLINK-36611 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: Yanquan Lv >Assignee: MOBIN >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.4.0 > > > Currently, the output of Kafka sink in debezium format looks like this: > {code:java} > { > "before": { > "id": 4, > "name": "John", > "address": "New York", > "phone_number": "", > "age": 12 > }, > "after": { > "id": 4, > "name": "John", > "address": "New York", > "phone_number": "1234", > "age": 12 > }, > "op": "u", > "source": { > "db": null, > "table": "customers" > } > } {code} > It contains record data with full before/after and db info, but schema info > wasn't included. > However, In some scenarios, we need this information to determine the type of > data. For example, Paimon's Kafka CDC source requires this type information, > otherwise all types are considered String, refer to > [https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/#supported-formats.] > Considering that this will increase the data load, I suggest adding a > parameter to configure whether to enable it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink [flink-cdc]
leonardBang merged PR #3791: URL: https://github.com/apache/flink-cdc/pull/3791 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-cli] fix duplicated option in CliFrontendOptions [flink-cdc]
leonardBang merged PR #3848: URL: https://github.com/apache/flink-cdc/pull/3848 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [minor][pipeline-connectors][doris]Fix deprecated method usage in DorisSchemaChangeManager [flink-cdc]
leonardBang merged PR #3959: URL: https://github.com/apache/flink-cdc/pull/3959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34554] Introduce transaction strategies [flink-connector-kafka]
AHeise commented on code in PR #154: URL: https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2031458928 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.KafkaWriterState; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Describes the ownership model of transactional ids and with that ownership of the transactions. + * + * A subtask that owns a transactional id is responsible for committing and aborting the + * transactions having that id. Only that subtask may create new ids. + * + * Transactional ids have the form transactionalIdPrefix + "-" + subtaskId + "-" + counter + * . The prefix is given by the user, the subtask id is defined through the ownership model + * and the counter through the {@link + * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}. + * + * For all strategies ownership is extrapolated for subtask ids beyond the currently known + * subtasks. This is necessary to support cases of intermediate upscaling where no checkpoint has + * been taken. Consider an application that runs with 3 subtasks and checkpointed. Later, its + * upscaled to 5 but then a failure happens. We need to have at least 5 open transactions. If the + * application is finally resumed from the checkpoint with 3 subtasks again. These 3 subtasks need + * to assume ownership of the remaining 2. + */ +@Internal +public enum TransactionOwnership { +/** + * The ownership is determined by the current subtask ID. Ownership is extrapolated by + * extracting the original subtask id of the ongoing transactions and applying modulo on the + * current parallelism. + */ +IMPLICIT_BY_SUBTASK_ID { +@Override +public int[] getOwnedSubtaskIds( +int currentSubtaskId, +int currentParallelism, +Collection recoveredStates) { +if (!recoveredStates.isEmpty()) { +checkForMigration(currentSubtaskId, recoveredStates); +} + +return new int[] {currentSubtaskId}; +} + +private void checkForMigration( +int currentSubtaskId, Collection recoveredStates) { +TransactionOwnership oldOwnership = +recoveredStates.stream() +.map(KafkaWriterState::getTransactionOwnership) +.findFirst() +.orElseThrow(); +if (oldOwnership == this) { +return; +} + +Set ownedSubtaskIds = +recoveredStates.stream() +.mapToInt(KafkaWriterState::getOwnedSubtaskId) +.boxed() +.collect(Collectors.toSet()); +if (!ownedSubtaskIds.contains(currentSubtaskId) +&& !ownedSubtaskIds.equals(Set.of(UNKNOWN))) { +int numShares = +recoveredStates.stream() +.mapToInt(KafkaWriterState::getOwnedSubtaskId) +.findFirst() +.orElse(UNKNOWN); +throw new IllegalStateException( +"Attempted to switch back to INCREMENTING from a transaction naming strategy that uses the new writer state. A possible way to safely do it is to scale back to the maximum parallelism of " ++ numShares); +} +} + +@Override +public int getTotalNumberOfOwnedSubtasks( +int currentSubtaskId, +int currentParallelism, +Collection recoveredStates) { +return currentParallelism;
Re: [PR] [FLINK-36648] Bump Flink version to Flink 2.0.0 [flink-connector-kafka]
FranMorilloAWS commented on PR #140: URL: https://github.com/apache/flink-connector-kafka/pull/140#issuecomment-2783947758 When is this releasing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031778448 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ModelDescriptor.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.table.catalog.CatalogModel; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Describes a {@link CatalogModel} representing a model. + * + * A {@link ModelDescriptor} is a template for creating a {@link CatalogModel} instance. It + * closely resembles the "CREATE MODEL" SQL DDL statement, containing input schema, output schema, + * and other characteristics. + * + * This can be used to register a Model in the Table API. + */ +@PublicEvolving +public class ModelDescriptor { +private final @Nullable Schema inputSchema; +private final @Nullable Schema outputSchema; +private final Map modelOptions; +private final @Nullable String comment; + +protected ModelDescriptor( +@Nullable Schema inputSchema, +@Nullable Schema outputSchema, +Map modelOptions, +@Nullable String comment) { +this.inputSchema = inputSchema; +this.outputSchema = outputSchema; +this.modelOptions = modelOptions; +this.comment = comment; +} + +/** Converts this descriptor into a {@link CatalogModel}. */ +public CatalogModel toCatalogModel() { +final Schema inputSchema = +getInputSchema() +.orElseThrow( +() -> +new ValidationException( +"Input schema missing in ModelDescriptor. Input schema cannot be null.")); +final Schema outputSchema = +getOutputSchema() +.orElseThrow( +() -> +new ValidationException( +"Output schema missing in ModelDescriptor. Output schema cannot be null.")); +return CatalogModel.of(inputSchema, outputSchema, modelOptions, comment); +} + +/** Converts this immutable instance into a mutable {@link Builder}. */ +public Builder toBuilder() { +return new Builder(this); +} + +/** + * Returns a map of string-based model options. + * + * @return options of the model. + */ +Map getOptions() { +return modelOptions; +} + +/** + * Get the unresolved input schema of the model. + * + * @return unresolved input schema of the model. + */ Review Comment: Do we really need such duplicating javadocs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords calls are made on idle source causing high CPU utilisation and throttling [flink-connector-aws]
leekeiabstraction commented on code in PR #195: URL: https://github.com/apache/flink-connector-aws/pull/195#discussion_r2031062113 ## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java: ## @@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader( this.kinesis = kinesisProxy; this.configuration = configuration; this.maxRecordsToGet = configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX); +this.getRecordsIntervalMillis = configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis(); +this.idleSourceGetRecordsIntervalMillis = + configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis(); } @Override -protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { +protected RecordBatch fetchRecords(KinesisShardSplitState splitState) throws IOException { +if (skipUntilScheduledGetRecordTime(splitState)) { +return null; +} + GetRecordsResponse getRecordsResponse = kinesis.getRecords( splitState.getStreamArn(), splitState.getShardId(), splitState.getNextStartingPosition(), this.maxRecordsToGet); + +scheduleNextGetRecord(splitState, getRecordsResponse); + boolean isCompleted = getRecordsResponse.nextShardIterator() == null; return new RecordBatch( getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted); } +private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState splitState) +throws IOException { +if (scheduledGetRecordTimes.containsKey(splitState) +&& scheduledGetRecordTimes.get(splitState) > System.currentTimeMillis()) { +try { +Thread.sleep(1); Review Comment: To elaborate on Abhi's point further. This is to prevent KinesisShardSplitReaderBase from immediately looping through fetch() for each split in the case where we have idle sources, causing very high CPU usage. Adding a 1ms pause ensures lower CPU usage at the cost of at most 1ms wait time per split. There is an edge case where a source with large number of idle assigned split will not loop through all its assigned split in a timely manner adding latency to GetRecords from a minority of non-idle shards, but this would require upwards of 1000 assigned splits to incur a 1 second delay. Assigning upwards of 1000 active splits to a source subtask is an unlikely and impractical scenario (app should really have much higher parallelism). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37620][state/forst] ForSt Sync mode support remote storage [flink]
masteryhx commented on PR #26412: URL: https://github.com/apache/flink/pull/26412#issuecomment-2783084846 Thanks for the PR. Just an extra comment: Should we also use ResourceGuard like [FLINK-37597](https://issues.apache.org/jira/browse/FLINK-37597) when using sync mode to access remote storage ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37616) PyFlink incorrectly unpickles Row fields within a Row
[ https://issues.apache.org/jira/browse/FLINK-37616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mika Naylor updated FLINK-37616: Summary: PyFlink incorrectly unpickles Row fields within a Row (was: PyFlink incorrectly unpickles Row fields) > PyFlink incorrectly unpickles Row fields within a Row > - > > Key: FLINK-37616 > URL: https://issues.apache.org/jira/browse/FLINK-37616 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Mika Naylor >Assignee: Mika Naylor >Priority: Minor > Labels: pull-request-available > > If you call {{TableEnvironment.from_elements}} where one of the fields in the > row contains a {{Row}} Type, for example where one of the values you pass in > is: > {code:java} > [ > Row("pyflink1A", "pyflink2A", "pyflink3A"), > Row("pyflink1B", "pyflink2B", "pyflink3B"), > Row("pyflink1C", "pyflink2C", "pyflink3C"), > ],{code} > where the schema for the field is: > {code:java} > DataTypes.ARRAY( > DataTypes.ROW( > [ > DataTypes.FIELD("a", DataTypes.STRING()), > DataTypes.FIELD("b", DataTypes.STRING()), > DataTypes.FIELD("c", DataTypes.STRING()), > ] > ) > ),{code} > When you call {{execute().collect()}} on the table, the array is returned as: > {code:java} > [ > , > , > > ]{code} > Instead of each {{Row}} having 3 values, the collected row only has 1 value, > which is now a list of the actual values in the row. The input and output > rows are no longer equal (as their internal _values collection are no longer > equal, one being a list of strings and the other being a list of a list of > strings). The len() of the source Row is correctly returned as 3, but the > collected row incorrectly reports a len() of 1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] [docs] Fix typo in docker.md [flink]
DanRoscigno commented on PR #25194: URL: https://github.com/apache/flink/pull/25194#issuecomment-2783710846 Please can this be reviewed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37628) Wrong reference counting in ForSt file cache
Zakelly Lan created FLINK-37628: --- Summary: Wrong reference counting in ForSt file cache Key: FLINK-37628 URL: https://issues.apache.org/jira/browse/FLINK-37628 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Zakelly Lan Assignee: Zakelly Lan There is a concurrency issue for reference counting in ForSt file cache, which could lead to a read error in some special scenarios (e.g. extremely frequent cache thrashing) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37407][state] Add savepoint metadata SQL built-in process function [flink]
gaborgsomogyi merged PR #26393: URL: https://github.com/apache/flink/pull/26393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37627) Restarting from a checkpoint/savepoint which coincides with shard split causes data loss
Keith Lee created FLINK-37627: - Summary: Restarting from a checkpoint/savepoint which coincides with shard split causes data loss Key: FLINK-37627 URL: https://issues.apache.org/jira/browse/FLINK-37627 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: aws-connector-5.0.0 Reporter: Keith Lee Similar to DDB stream connector's issue https://issues.apache.org/jira/browse/FLINK-37416 This is less likely to happen on Kinesis connector due to much lower frequency of re-sharding / assigning new split but technically possible so we'd like to fix this to avoid data loss. The scenario is as follow: - A checkpoint started - KinesisStreamsSourceEnumerator takes a checkpoint (shard was assigned here) - KinesisStreamsSourceEnumerator sends checkpoint event to reader - Before taking reader checkpoint, a SplitFinishedEvent came up in reader - Reader takes checkpoint - Now, just after checkpoint complete, job restarted This can lead to a shard lineage getting lost because of a shard being in ASSIGNED state in enumerator and not being part of any task manager state. See DDB Connector issue's PR for reference fix: https://issues.apache.org/jira/browse/FLINK-37416 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-37625][python] Don't skip type validation for Rows made with positional arguments [flink]
autophagy opened a new pull request, #26414: URL: https://github.com/apache/flink/pull/26414 ## What is the purpose of the change When creating a table using `TableEnvironment.from_elements`, the Table API skips type validation on any Row elements that were created using positional arguments, rather than keyword arguments. For example, take a table with a single column, whose type is an array of Rows. These rows have 2 columns, `a VARCHAR` and `b BOOLEAN`. If we create a table with elements where one of these rows has columns with incorrect datatypes: ```python schema = DataTypes.ROW( [ DataTypes.FIELD( "col", DataTypes.ARRAY( DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.STRING()), DataTypes.FIELD("b", DataTypes.BOOLEAN()), ] ) ), ), ] ) elements = [( [("pyflink", True), ("pyflink", False), (True, "pyflink")], )] table = self.t_env.from_elements(elements, schema) table_result = list(table.execute().collect()) ``` This results in a type validation error: ``` TypeError: field a in element in array field col: VARCHAR can not accept object True in type ``` In an example where we use Row instead of tuples, but with column arguments: ``` elements = [( [Row(a="pyflink", b=True), Row(a="pyflink", b=False), Row(a=True, b="pyflink")], )] ``` We also get the same type validation error. However, when we use Row with positional arguments: ``` elements = [( [Row("pyflink", True), Row("pyflink", False), Row(True, "pyflink")], )] ``` the type validation is skipped, leading to an unpickling error when collecting: ``` > data = pickle.loads(data) E EOFError: Ran out of input ``` The type validator skips this by stating that [the order in the row could be different to the order of the datatype fields](https://github.com/apache/flink/blob/master/flink-python/pyflink/table/types.py#L2156), but I don't think this is true. Both rows made from tuples and lists are type verified positionally with the positions of the Datatype fields, and in the case of the `Row` class the order the row's internal values are preserved. Similarly, `Row` class equality in cases where both of the rows are created with positional arguments ## Brief change log - *Change the type validation logic used by `TableEnvironment.from_elements` so that `Row`s constructed with positional arguments are not skipped.* ## Verifying this change This change added tests and can be verified as follows: - *Added a test to ensure consistent type validation behaviour with rows constructed from tuples, lists, `Row`s with keyword arguments and `Row`s with positional arguments* - ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37625][python] Don't skip type validation for Rows made with positional arguments [flink]
flinkbot commented on PR #26414: URL: https://github.com/apache/flink/pull/26414#issuecomment-2783296378 ## CI report: * 92e7cc3e8e13c1c011ae107921c0b43b31cbc1d7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37601] Remove Unirest dependency in PrometheusReporterTest [flink]
hlteoh37 merged PR #26387: URL: https://github.com/apache/flink/pull/26387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031879610 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ModelDescriptor.java: ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.table.catalog.CatalogModel; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Describes a {@link CatalogModel} representing a model. + * + * A {@link ModelDescriptor} is a template for creating a {@link CatalogModel} instance. It + * closely resembles the "CREATE MODEL" SQL DDL statement, containing input schema, output schema, + * and other characteristics. + * + * This can be used to register a Model in the Table API. + */ +@PublicEvolving +public class ModelDescriptor { +private final @Nullable Schema inputSchema; +private final @Nullable Schema outputSchema; +private final Map modelOptions; +private final @Nullable String comment; + +protected ModelDescriptor( +@Nullable Schema inputSchema, +@Nullable Schema outputSchema, +Map modelOptions, +@Nullable String comment) { +this.inputSchema = inputSchema; +this.outputSchema = outputSchema; +this.modelOptions = modelOptions; +this.comment = comment; +} + +/** Converts this descriptor into a {@link CatalogModel}. */ +public CatalogModel toCatalogModel() { +final Schema inputSchema = +getInputSchema() +.orElseThrow( +() -> +new ValidationException( +"Input schema missing in ModelDescriptor. Input schema cannot be null.")); +final Schema outputSchema = +getOutputSchema() +.orElseThrow( +() -> +new ValidationException( +"Output schema missing in ModelDescriptor. Output schema cannot be null.")); +return CatalogModel.of(inputSchema, outputSchema, modelOptions, comment); +} + +/** Converts this immutable instance into a mutable {@link Builder}. */ +public Builder toBuilder() { +return new Builder(this); +} + +/** + * Returns a map of string-based model options. + * + * @return options of the model. + */ +Map getOptions() { +return modelOptions; +} + +/** + * Get the unresolved input schema of the model. + * + * @return unresolved input schema of the model. + */ +Optional getInputSchema() { +return Optional.ofNullable(inputSchema); +} + +/** + * Get the unresolved output schema of the model. + * + * @return unresolved output schema of the model. + */ +Optional getOutputSchema() { +return Optional.ofNullable(outputSchema); +} + +/** + * Get comment of the model. + * + * @return comment of the model. + */ +Optional getComment() { +return Optional.ofNullable(comment); +} + +/** + * Creates a new {@link Builder} for the model with the given provider option. + * + * @param provider string value of provider for the model. + */ +public static Builder forProvider(String provider) { +Preconditions.checkNotNull(provider, "Model descriptors require a provider value."); +final Builder descriptorBuilder = new Builder(); +descriptorBuilder.option(FactoryUtil.MODEL_PROVIDER, provider); +return descriptorBuilder; +} + +@Override +
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031900150 ## flink-python/pyflink/table/table_environment.py: ## @@ -1071,6 +1128,88 @@ def create_view(self, self._j_tenv.createView(view_path, table, ignore_if_exists) +def create_model(self, + model_path: str, + model_descriptor: ModelDescriptor, + ignore_if_exists: Optional[bool] = False): +""" +Registers the given :class:`~pyflink.table.ModelDescriptor` as a catalog model +similar to SQL models. + +The ModelDescriptor is converted into a CatalogModel and stored in the catalog. + +If the model should not be permanently stored in a catalog, use +:func:`create_temporary_model` instead. + +Examples: +:: + +>>> table_env.create_model("MyModel", ModelDescriptor.for_provider("OPENAI") +... .input_schema(Schema.new_builder() +... .column("f0", DataTypes.STRING()) +... .build()) +... .output_schema(Schema.new_builder() +... .column("label", DataTypes.STRING()) +... .build()) +... .option("task", "regression") +... .option("type", "remote") +... . +... . +... .build(), +... True) + +:param model_path: The path under which the model will be registered. +:param model_descriptor: Template for creating a CatalogModel instance. +:param ignore_if_exists: If a model exists under the given path and this flag is set, + no operation is executed. An exception is thrown otherwise. + +.. versionadded:: 2.0.1 Review Comment: this a wrong branch if we are talking about 2.0.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031904769 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java: ## @@ -343,12 +343,14 @@ public void testModelModificationListener() throws Exception { assertThat(alterEvent.ignoreIfNotExists()).isFalse(); // Drop a model -catalogManager.dropModel( -ObjectIdentifier.of( -catalogManager.getCurrentCatalog(), -catalogManager.getCurrentDatabase(), -"model1"), -true); +assertThat( +catalogManager.dropModel( +ObjectIdentifier.of( +catalogManager.getCurrentCatalog(), +catalogManager.getCurrentDatabase(), +"model1"), Review Comment: We can extract `ObjectIdentifier` into a variable to avoid formatting issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-37627) Restarting from a checkpoint/savepoint which coincides with shard split causes data loss
[ https://issues.apache.org/jira/browse/FLINK-37627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941693#comment-17941693 ] Arun Lakshman edited comment on FLINK-37627 at 4/7/25 7:19 PM: --- Can you please assign this issue to me. I can work on this issue was (Author: arunlakshman): Can you please this to me. I can work on this issue > Restarting from a checkpoint/savepoint which coincides with shard split > causes data loss > > > Key: FLINK-37627 > URL: https://issues.apache.org/jira/browse/FLINK-37627 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-5.0.0 >Reporter: Keith Lee >Priority: Major > > Similar to DDB stream connector's issue > https://issues.apache.org/jira/browse/FLINK-37416 > This is less likely to happen on Kinesis connector due to much lower > frequency of re-sharding / assigning new split but technically possible so > we'd like to fix this to avoid data > loss. > The scenario is as follow: > - A checkpoint started > - KinesisStreamsSourceEnumerator takes a checkpoint (shard was assigned here) > - KinesisStreamsSourceEnumerator sends checkpoint event to reader > - Before taking reader checkpoint, a SplitFinishedEvent came up in reader > - Reader takes checkpoint > - Now, just after checkpoint complete, job restarted > This can lead to a shard lineage getting lost because of a shard being in > ASSIGNED state in enumerator and not being part of any task manager state. > See DDB Connector issue's PR for reference fix: > https://issues.apache.org/jira/browse/FLINK-37416 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Avoid copy Set for variable implementedRpcGateways [flink]
beliefer commented on PR #26372: URL: https://github.com/apache/flink/pull/26372#issuecomment-2782469397 @1996fanrui @davidradl Thank you all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37620) ForSt Sync mode support remote storage and provide configurable options
[ https://issues.apache.org/jira/browse/FLINK-37620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37620: --- Labels: pull-request-available (was: ) > ForSt Sync mode support remote storage and provide configurable options > --- > > Key: FLINK-37620 > URL: https://issues.apache.org/jira/browse/FLINK-37620 > Project: Flink > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Fix For: 2.1.0, 2.0.1 > > > Currently, the ForSt state backend only serves with local state when using > the old state api (the synchronous ones). We should also enable the > disaggregated state capabilities in this scenario. Additionally, given the > different API set could be mixed used in one job across different tasks, we > could provide options for users to enforce local state in sync mode when > enabling disaggregated state (Such as > 'state.backend.forst.sync.enforce-local'). This is useful for complex SQL > jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-37585) The data of the newly added table cannot be read
[ https://issues.apache.org/jira/browse/FLINK-37585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941499#comment-17941499 ] Xin Gong commented on FLINK-37585: -- You can see this discussion [https://github.com/apache/flink-cdc/discussions/3338.] > The data of the newly added table cannot be read > > > Key: FLINK-37585 > URL: https://issues.apache.org/jira/browse/FLINK-37585 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.1 >Reporter: jeff-zou >Priority: Major > Labels: pull-request-available > > After starting Flink CDC, when a new table is created in MySQL, Flink can > read the table structure of this table, but cannot read the data in it. > > The method to reproduce the bug as follows: > 1.Start Flink CDC for sync Mysql > {code:java} > // code placeholder > MySqlSource mySqlSource = > MySqlSource.builder() > .port(3306) > .hostname("10.11.69.176") > .port(3306) > .databaseList("cdc") > .tableList(".*") > .username("test") > .password("123456") > .serverTimeZone("UTC") > .deserializer(new JsonDebeziumDeserializationSchema()) > .includeSchemaChanges(true) >.serverId("5401-5404") > .scanNewlyAddedTableEnabled(true) > .debeziumProperties(properties) > .fetchSize(1024) > .connectTimeout(Duration.ofSeconds(30)) > .build(); > ... > {code} > 2. Create table in Mysql > {code:java} > // code placeholder > CREATE TABLE `test` ( > `id` varchar(100), > PRIMARY KEY (`id`) > ) {code} > 3. Write data to Mysql > {code:java} > // code placeholder > INSERT INTO test(`id`) VALUES('kk') ; > {code} > > > Flink can read the table's schema, but cannot read the data 'kk' in it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37618][table-planner] Fix PTFs INTERVAL argument [flink]
davidradl commented on code in PR #26410: URL: https://github.com/apache/flink/pull/26410#discussion_r2030762088 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java: ## @@ -538,6 +539,11 @@ private CastAvoidanceChecker(LogicalType sourceType) { this.sourceType = sourceType; } +@Override +public Boolean visit(DayTimeIntervalType targetType) { +return true; Review Comment: I see the char type visit is testing isNullable. Should we do this here also? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state
[ https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941577#comment-17941577 ] Nickel Fang commented on FLINK-25672: - We had the same issue (OOM in job manager) and have optimized it by TTL in our project. Can I request a PR? h1. Solution Requirements # Can delete the expired processed paths according to TTL policy to save the memory. # Be an enhancement solution. Users can easily choose to use the original solution or the enhancement solution # Can seamlessly upgrade to the enhancement solution with no data lost for the running streaming job. Prerequisite There is a TTL mechanism in the file source (e.g. S3 retention policy). Details Introduce a new variable LinkedHashSet> alreadyProcessedPathAndTimestamps in PendingSplitsCheckpoint, and Duration retentionTime in ContinuousEnumerationSettings. > FileSource enumerator remembers paths of all already processed files which > can result in large state > > > Key: FLINK-25672 > URL: https://issues.apache.org/jira/browse/FLINK-25672 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > As mentioned in the Filesystem documentation, for Unbounded File Sources, the > {{FileEnumerator}} currently remembers paths of all already processed files, > which is a state that can in come cases grow rather large. > We should look into possibilities to reduce this. We could look into adding a > compressed form of tracking already processed files (for example by keeping > modification timestamps lower boundaries). > When fixed, this should also be reflected in the documentation, as mentioned > in https://github.com/apache/flink/pull/18288#discussion_r785707311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][docs] Fix broken tabs and PTF example argument errors [flink]
twalthr merged PR #26406: URL: https://github.com/apache/flink/pull/26406 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37607) Blocklist timeout check may lost
[ https://issues.apache.org/jira/browse/FLINK-37607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941594#comment-17941594 ] Heart Zhou commented on FLINK-37607: I would like to work on this issue. > Blocklist timeout check may lost > > > Key: FLINK-37607 > URL: https://issues.apache.org/jira/browse/FLINK-37607 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.20.1 >Reporter: Heart Zhou >Priority: Major > > The blocklist timeout check may be scheduled before the rpc server starts > The blocklist timeout check is scheduled by the mainThreadExecutor in the > constructor. > {code:java} > DefaultBlocklistHandler(xxx, > Duration timeoutCheckInterval, > ComponentMainThreadExecutor mainThreadExecutor, > xxx) { > xxx > this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval); > this.mainThreadExecutor = checkNotNull(mainThreadExecutor); > xxx > scheduleTimeoutCheck(); > } {code} > > When the check function is called, the > org.apache.flink.runtime.rpc.RpcEndpoint#start method may not have been > called yet, although it will be called very soon. > Therefore, the check function might be lost. > > {code:java} > public ScheduledFuture schedule(Runnable command, long delay, TimeUnit > unit) { > final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit); > FutureTask ft = new FutureTask<>(command, null); > if (mainScheduledExecutor.isShutdown()) { > log.warn( > "The scheduled executor service is shutdown and ignores the > command {}", > command); > } else { > mainScheduledExecutor.schedule( > () -> gateway.runAsync(ft), delayMillis, > TimeUnit.MILLISECONDS); > } > return new ScheduledFutureAdapter<>(ft, delayMillis, > TimeUnit.MILLISECONDS); > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031795676 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -158,6 +167,121 @@ void testTableFromDescriptor() { assertThat(tEnv.getCatalogManager().listTables()).isEmpty(); } +@Test +void testCreateModelFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertCreateModelFromDescriptor(tEnv, false); +} + +@Test +void testCreateModelIgnoreIfExistsFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertCreateModelFromDescriptor(tEnv, true); +assertThatNoException() +.isThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true)); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); +} + +@Test +void testCreateModelWithSameName() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); +} + +@Test +void testDropModel() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); + +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); +final ObjectPath objectPath = new ObjectPath(database, "M"); +CatalogModel catalogModel = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath); +assertThat(catalogModel).isInstanceOf(CatalogModel.class); +assertThat(tEnv.dropModel("M", true)).isTrue(); +assertThatThrownBy( +() -> +tEnv.getCatalog(catalog) +.orElseThrow(AssertionError::new) +.getModel(objectPath)) +.isInstanceOf(ModelNotExistException.class) +.hasMessage("Model '`default_catalog`.`default_database`.`M`' does not exist."); +} + +@Test +void testNonExistingDropModel() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertThat(tEnv.dropModel("M", true)).isFalse(); + +assertThatThrownBy(() -> tEnv.dropModel("M", false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Model with identifier 'default_catalog.default_database.M' does not exist."); +} + +@Test +void testCreateTemporaryModelFromDescriptor() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); +assertTemporaryCreateModelFromDescriptor(tEnv, false); +} + +@Test +void testCreateTemporaryModelIfNotExistsFromDescriptor() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertTemporaryCreateModelFromDescriptor(tEnv, true); +assertThatNoException() +.isThrownBy(() -> tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, true)); + +assertThatThrownBy(() -> tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Temporary model '`default_catalog`.`default_database`.`M`' already exists"); +} + +@Test +void testCreateModelWithSameNameIgnoreIfExists() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); +} + +@Test +void testListModels() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); Review Comment: Am I right that we have this line
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031813534 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -158,6 +167,121 @@ void testTableFromDescriptor() { assertThat(tEnv.getCatalogManager().listTables()).isEmpty(); } +@Test +void testCreateModelFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertCreateModelFromDescriptor(tEnv, false); +} + +@Test +void testCreateModelIgnoreIfExistsFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertCreateModelFromDescriptor(tEnv, true); +assertThatNoException() +.isThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true)); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); +} + +@Test +void testCreateModelWithSameName() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); +} + +@Test +void testDropModel() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); + +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); +final ObjectPath objectPath = new ObjectPath(database, "M"); +CatalogModel catalogModel = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath); +assertThat(catalogModel).isInstanceOf(CatalogModel.class); +assertThat(tEnv.dropModel("M", true)).isTrue(); +assertThatThrownBy( +() -> +tEnv.getCatalog(catalog) +.orElseThrow(AssertionError::new) +.getModel(objectPath)) +.isInstanceOf(ModelNotExistException.class) +.hasMessage("Model '`default_catalog`.`default_database`.`M`' does not exist."); +} + +@Test +void testNonExistingDropModel() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertThat(tEnv.dropModel("M", true)).isFalse(); + +assertThatThrownBy(() -> tEnv.dropModel("M", false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Model with identifier 'default_catalog.default_database.M' does not exist."); +} + +@Test +void testCreateTemporaryModelFromDescriptor() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); +assertTemporaryCreateModelFromDescriptor(tEnv, false); +} + +@Test +void testCreateTemporaryModelIfNotExistsFromDescriptor() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertTemporaryCreateModelFromDescriptor(tEnv, true); +assertThatNoException() +.isThrownBy(() -> tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, true)); + +assertThatThrownBy(() -> tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Temporary model '`default_catalog`.`default_database`.`M`' already exists"); +} + +@Test +void testCreateModelWithSameNameIgnoreIfExists() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); +} + +@Test +void testListModels() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M1", TEST_MODEL_DESCRIPTO
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031798763 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -212,4 +336,39 @@ private static void assertTemporaryCreateTableFromDescriptor( assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); } + +private static void assertCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) throws ModelNotExistException { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +if (ignoreIfExists) { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); +} else { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); +} + +final ObjectPath objectPath = new ObjectPath(database, "M"); +CatalogModel catalogModel = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath); +assertThat(catalogModel).isInstanceOf(CatalogModel.class); +assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA); +assertThat(catalogModel.getOptions().get("a")).isEqualTo("Test"); +} + +private static void assertTemporaryCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, ignoreIfExists); +final Optional lookupResult = +tEnv.getCatalogManager().getModel(ObjectIdentifier.of(catalog, database, "M")); +assertThat(lookupResult.isPresent()).isTrue(); +CatalogModel catalogModel = lookupResult.get().getResolvedModel(); +assertThat(catalogModel != null).isTrue(); Review Comment: why not ```suggestion assertThat(catalogModel\0.isNotNull(); ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031803406 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -212,4 +336,39 @@ private static void assertTemporaryCreateTableFromDescriptor( assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); } + +private static void assertCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) throws ModelNotExistException { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +if (ignoreIfExists) { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); +} else { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); +} + +final ObjectPath objectPath = new ObjectPath(database, "M"); +CatalogModel catalogModel = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath); +assertThat(catalogModel).isInstanceOf(CatalogModel.class); +assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA); +assertThat(catalogModel.getOptions().get("a")).isEqualTo("Test"); +} + +private static void assertTemporaryCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, ignoreIfExists); +final Optional lookupResult = +tEnv.getCatalogManager().getModel(ObjectIdentifier.of(catalog, database, "M")); +assertThat(lookupResult.isPresent()).isTrue(); +CatalogModel catalogModel = lookupResult.get().getResolvedModel(); +assertThat(catalogModel != null).isTrue(); +assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA); +assertThat(catalogModel.getOutputSchema()).isEqualTo(TEST_SCHEMA); Review Comment: can we have test with different schemas to be sure it behaves properly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031821725 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -212,4 +336,39 @@ private static void assertTemporaryCreateTableFromDescriptor( assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); } + +private static void assertCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) throws ModelNotExistException { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +if (ignoreIfExists) { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); Review Comment: if we extract this kind of assert into a separate method I would expect it working with different model names, not only `M` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031838881 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -564,6 +565,36 @@ public boolean createView(String path, Table view, boolean ignoreIfExists) { return catalogManager.createTable(tableTable, viewIdentifier, ignoreIfExists); } +@Override +public void createModel(String path, ModelDescriptor descriptor, boolean ignoreIfExists) { +Preconditions.checkNotNull(path, "Path must not be null."); +Preconditions.checkNotNull(descriptor, "Model descriptor must not be null."); +final ObjectIdentifier objectIdentifier = + catalogManager.qualifyIdentifier(getParser().parseIdentifier(path)); +catalogManager.createModel(descriptor.toCatalogModel(), objectIdentifier, ignoreIfExists); +} + +@Override +public void createModel(String path, ModelDescriptor descriptor) { +createModel(path, descriptor, false); +} + +@Override +public void createTemporaryModel(String path, ModelDescriptor descriptor) { +createTemporaryModel(path, descriptor, false); +} + +@Override +public void createTemporaryModel( +String path, ModelDescriptor descriptor, boolean ignoreIfExists) { +Preconditions.checkNotNull(path, "Path must not be null."); +Preconditions.checkNotNull(descriptor, "Model descriptor must not be null."); +final ObjectIdentifier objectIdentifier = + catalogManager.qualifyIdentifier(getParser().parseIdentifier(path)); Review Comment: nit: could be extracted into a separate private method with replacement all occurrences -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031786003 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -158,6 +167,121 @@ void testTableFromDescriptor() { assertThat(tEnv.getCatalogManager().listTables()).isEmpty(); } +@Test +void testCreateModelFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertCreateModelFromDescriptor(tEnv, false); +} + +@Test +void testCreateModelIgnoreIfExistsFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +assertCreateModelFromDescriptor(tEnv, true); +assertThatNoException() +.isThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true)); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); +} + +@Test +void testCreateModelWithSameName() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); + +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, false)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); + +assertThatThrownBy(() -> tEnv.createModel("M", TEST_MODEL_DESCRIPTOR)) +.isInstanceOf(ValidationException.class) +.hasMessage( +"Could not execute CreateModel in path `default_catalog`.`default_database`.`M`"); +} Review Comment: couldn't it be simplified with parameterized test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031798763 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -212,4 +336,39 @@ private static void assertTemporaryCreateTableFromDescriptor( assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); } + +private static void assertCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) throws ModelNotExistException { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +if (ignoreIfExists) { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR, true); +} else { +tEnv.createModel("M", TEST_MODEL_DESCRIPTOR); +} + +final ObjectPath objectPath = new ObjectPath(database, "M"); +CatalogModel catalogModel = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getModel(objectPath); +assertThat(catalogModel).isInstanceOf(CatalogModel.class); +assertThat(catalogModel.getInputSchema()).isEqualTo(TEST_SCHEMA); +assertThat(catalogModel.getOptions().get("a")).isEqualTo("Test"); +} + +private static void assertTemporaryCreateModelFromDescriptor( +TableEnvironmentMock tEnv, boolean ignoreIfExists) { +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +tEnv.createTemporaryModel("M", TEST_MODEL_DESCRIPTOR, ignoreIfExists); +final Optional lookupResult = +tEnv.getCatalogManager().getModel(ObjectIdentifier.of(catalog, database, "M")); +assertThat(lookupResult.isPresent()).isTrue(); +CatalogModel catalogModel = lookupResult.get().getResolvedModel(); +assertThat(catalogModel != null).isTrue(); Review Comment: why not ```suggestion assertThat(catalogModel).isNotNull(); ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37548] Add Model DDL methods in TABLE API [flink]
snuyanzin commented on code in PR #26385: URL: https://github.com/apache/flink/pull/26385#discussion_r2031827083 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -1647,6 +1675,8 @@ private void execute( command.execute(catalog.get(), objectIdentifier.toObjectPath()); } catch (TableAlreadyExistException | TableNotExistException +| ModelNotExistException +| ModelAlreadyExistException Review Comment: with current situation I would vote for having naming similar to what we have for tables >I am curious why we do not have view exceptions Views are considered as tables -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org