[jira] [Created] (FLINK-36818) Securing credentials in SQL connectors
Kunal Rohitas created FLINK-36818: - Summary: Securing credentials in SQL connectors Key: FLINK-36818 URL: https://issues.apache.org/jira/browse/FLINK-36818 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch, Connectors / MongoDB Reporter: Kunal Rohitas When accessing databases in Flink SQL, we are required to pass sensitive credentials directly within the SQL query as part of the connection configuration and sometimes we don't want them to be exposed. Is there already a solution for this problem ? Can we implement a secret reader mechanism that allows Flink to read credentials from a mounted file or environment-based secrets manager. This way, credentials can be injected at runtime without being hardcoded into the SQL queries. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36819) [state/forst] Support link() in ForStFlinkFileSystem
Yanfei Lei created FLINK-36819: -- Summary: [state/forst] Support link() in ForStFlinkFileSystem Key: FLINK-36819 URL: https://issues.apache.org/jira/browse/FLINK-36819 Project: Flink Issue Type: Sub-task Reporter: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33265] Support source parallelism setting for Kafka connector [flink-connector-kafka]
RocMarshal commented on code in PR #134: URL: https://github.com/apache/flink-connector-kafka/pull/134#discussion_r1863022748 ## docs/content.zh/docs/connectors/table/upsert-kafka.md: ## @@ -180,6 +180,14 @@ of all available metadata fields. + + scan.parallelism + optional + no + (none) + Integer + 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并发。 Review Comment: ```suggestion 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。 ``` ## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java: ## @@ -212,14 +212,56 @@ public void testTableSource() { KAFKA_SOURCE_PROPERTIES, StartupMode.SPECIFIC_OFFSETS, specificOffsets, -0); +0, +null); assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertKafkaSource(provider); } +@Test +public void testTableSourceWithParallelism() { +final Map modifiedOptions = +getModifiedOptions( +getBasicSourceOptions(), options -> options.put("scan.parallelism", "100")); +final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); +final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; + +final Map specificOffsets = new HashMap<>(); +specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); +specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + +final DecodingFormat> valueDecodingFormat = +new DecodingFormatMock(",", true); + +// Test scan source equals +final KafkaDynamicSource expectedKafkaSource = +createExpectedScanSource( +SCHEMA_DATA_TYPE, +null, +valueDecodingFormat, +new int[0], +new int[] {0, 1, 2}, +null, +Collections.singletonList(TOPIC), +null, +KAFKA_SOURCE_PROPERTIES, +StartupMode.SPECIFIC_OFFSETS, +specificOffsets, +0, +100); +assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); + +ScanTableSource.ScanRuntimeProvider provider = + actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); +assertThat(provider).isInstanceOf(DataStreamScanProvider.class); +final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider; +assertThat(sourceProvider.getParallelism().isPresent()).isTrue(); +assertThat(sourceProvider.getParallelism().get()).isEqualTo(100); Review Comment: ```suggestion assertThat(sourceProvider.getParallelism()); assertThat(sourceProvider.getParallelism()).hasValue(100); ``` ## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java: ## @@ -177,6 +180,37 @@ public void testTableSource() { assertKafkaSource(provider); } +@Test +public void testTableSourceWithParallelism() { +final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); +// Construct table source using options and table source factory +final Map modifiedOptions = +getModifiedOptions( +getFullSourceOptions(), options -> options.put("scan.parallelism", "100")); Review Comment: ditto ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java: ## @@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); +Integer parallelism = tableOptions.get(SCAN_PARALLELISM); Review Comment: A trivial comment, I'm not sure if it's appropriate: what about inlining the line into line 174 ? ## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java: ## @@ -212,14 +212,56 @@ public void testTableSource() { KAFKA_SOURCE_PROPERTIES, StartupMode.SPECIFIC_OFFSETS, specificOffsets, -0); +0, +null); as
Re: [PR] [FLINK-33265] Support source parallelism setting for Kafka connector [flink-connector-kafka]
RocMarshal commented on code in PR #134: URL: https://github.com/apache/flink-connector-kafka/pull/134#discussion_r1863022594 ## docs/content.zh/docs/connectors/table/kafka.md: ## @@ -342,6 +342,14 @@ CREATE TABLE KafkaTable ( Duration Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能 + + scan.parallelism + optional + no + (none) + Integer + 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并发。 Review Comment: ```suggestion 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并行度。 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1861756363 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +745,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. + * + * Examples: + * + * {@code + * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); + * } + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. + * @param ignoreIfExists If a table exists under the given path and this flag is set, no + * operation is executed. An exception is thrown otherwise. + */ +void createTable(String path, TableDescriptor descriptor, boolean ignoreIfExists); Review Comment: Yes, it does. I'm getting my python environment set up right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]
SML0127 commented on code in PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861728840 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql: ## Review Comment: it was my mistake... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr opened a new pull request, #25707: URL: https://github.com/apache/flink/pull/25707 ## What is the purpose of the change Adds the initial interfaces of `ProcessTableFunction` and `StateHint` mentioned in FLIP-440 to the code base. This is a requirement to continue with the `TypeInferenceExtractor` extractor work. ## Brief change log - Introduce `ProcessTableFunction` and `StateHint` - Add `FunctionHint.state()` - Add `TableSementics` for powering the context of PTFs ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36705) Add ProcessTableFunction class and annotations
[ https://issues.apache.org/jira/browse/FLINK-36705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36705: --- Labels: pull-request-available (was: ) > Add ProcessTableFunction class and annotations > -- > > Key: FLINK-36705 > URL: https://issues.apache.org/jira/browse/FLINK-36705 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add the ProcessTableFunction class and update annotations. This makes a first > (non-functional) implementation of PTFs possible. Time access is excluded in > the first version. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36801) Update CI image to Ubuntu 22.04
[ https://issues.apache.org/jira/browse/FLINK-36801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-36801. Resolution: Duplicate > Update CI image to Ubuntu 22.04 > --- > > Key: FLINK-36801 > URL: https://issues.apache.org/jira/browse/FLINK-36801 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Reporter: Mehdi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34194) Upgrade Flink CI Docker container to Ubuntu 22.04
[ https://issues.apache.org/jira/browse/FLINK-34194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-34194: Assignee: Chesnay Schepler (was: Matthias Pohl) > Upgrade Flink CI Docker container to Ubuntu 22.04 > - > > Key: FLINK-34194 > URL: https://issues.apache.org/jira/browse/FLINK-34194 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Assignee: Chesnay Schepler >Priority: Major > Labels: github-actions, pull-request-available > Fix For: 2.0.0 > > > The current CI Docker image is based on Ubuntu 16.04. We already use 20.04 > for the e2e tests. We can update the Docker image to a newer version to be on > par with what we need in GitHub Actions (FLINK-33923). > This issue can cover the following topics: > * Update to 22.04 > ** OpenSSL 1.0.0 dependency should be added for netty-tcnative support > ** Use Python3 instead of Python 2.7 (python symlink needs to be added due > to FLINK-34195) > * Removal of Maven (FLINK-33501 makes us rely on the Maven wrapper) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36817) Give users ability to provide their own KafkaConsumer when using flink-connector-kafka
Levani Kokhreidze created FLINK-36817: - Summary: Give users ability to provide their own KafkaConsumer when using flink-connector-kafka Key: FLINK-36817 URL: https://issues.apache.org/jira/browse/FLINK-36817 Project: Flink Issue Type: New Feature Components: Connectors / Kafka Reporter: Levani Kokhreidze In certain scenarios users of the `KafkaSource` in the flink-connector-kafka might want to provide their own KafkaConsumer. Right now this is not possible as consumer is created in the [KafkaPartitionSplitReader|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L97] which makes customisation impossible. Proposal is to let users pass `KafkaConsumerFactory` when building the KafkaSource. {code:java} public interface KafkaConsumerFactory { KafkaConsumer get(Properties properties); }{code} Builder will have a default implementation which creates the KafkaConsumer the same as it is done now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-36817: Introduce KafkaConsumerFactory [flink-connector-kafka]
boring-cyborg[bot] commented on PR #137: URL: https://github.com/apache/flink-connector-kafka/pull/137#issuecomment-2505689926 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36817) Give users ability to provide their own KafkaConsumer when using flink-connector-kafka
[ https://issues.apache.org/jira/browse/FLINK-36817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36817: --- Labels: pull-request-available (was: ) > Give users ability to provide their own KafkaConsumer when using > flink-connector-kafka > -- > > Key: FLINK-36817 > URL: https://issues.apache.org/jira/browse/FLINK-36817 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Reporter: Levani Kokhreidze >Priority: Major > Labels: pull-request-available > > In certain scenarios users of the `KafkaSource` in the flink-connector-kafka > might want to provide their own KafkaConsumer. Right now this is not possible > as consumer is created in the > [KafkaPartitionSplitReader|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L97] > which makes customisation impossible. > Proposal is to let users pass `KafkaConsumerFactory` when building the > KafkaSource. > {code:java} > public interface KafkaConsumerFactory { > KafkaConsumer get(Properties properties); > }{code} > Builder will have a default implementation which creates the KafkaConsumer > the same as it is done now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36817] Introduce KafkaConsumerFactory [flink-connector-kafka]
lkokhreidze commented on PR #137: URL: https://github.com/apache/flink-connector-kafka/pull/137#issuecomment-2505694480 Will add the javadocs. But first wanted to get the initial feedback if this makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36817] Introduce KafkaConsumerFactory [flink-connector-kafka]
lkokhreidze commented on code in PR #137: URL: https://github.com/apache/flink-connector-kafka/pull/137#discussion_r1861855542 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaConsumerFactory.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.kafka.source; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Properties; + +public interface KafkaConsumerFactory { +KafkaConsumer get(Properties properties); Review Comment: I was debating myself if it would be better to return `Consumer` interface instead of the implementation. Happy to change it if community agrees that returning an interface is the better choice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
snuyanzin commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1861852208 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). * * By default, an explicit accumulator type is undefined and the reflection-based extraction * is used. + * + * This parameter is primarily intended for aggregating functions (i.e. {@link + * AggregateFunction} and {@link TableAggregateFunction}). It is recommended to use {@link + * #state()} for {@link ProcessTableFunction}. */ DataTypeHint accumulator() default @DataTypeHint(); +/** + * Explicitly lists the intermediate results (i.e. state entries) of a function that is managed + * by the framework (i.e. Flink managed state). Including their names and data types. + * + * State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports + * multiple state entries at the beginning of an eval()/onTimer() method (after an optional + * context parameter). + * + * Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction}) + * support a single state entry at the beginning of an accumulate()/retract() method (i.e. the + * accumulator). + * + * By default, explicit state is undefined and the reflection-based extraction is used where + * {@link StateHint} is present. + * + * Using both {@link #accumulator()} and this parameter is not allowed. Specifying the list + * of state entries manually disables the entire reflection-based extraction around {@link + * StateHint} and accumulators for aggregating functions. Review Comment: Should it also be mentioned in doc? Somewhere here e.g. https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/docs/content/docs/dev/table/functions/udfs.md?plain=1#L439 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
snuyanzin commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1861853852 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableAggregateFunction; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A hint that declares an intermediate result (i.e. state entry) that is managed by the framework + * (i.e. Flink managed state). + * + * State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports multiple + * state entries at the beginning of an eval()/onTimer() method (after an optional context + * parameter). + * + * Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction}) + * support a single state entry at the beginning of an accumulate()/retract() method (i.e. the + * accumulator). + * + * For example, {@code @StateHint(name = "count", type = @DataTypeHint("BIGINT"))} is a state + * entry with the data type BIGINT named "count". + * + * Note: Usually, a state entry is partitioned by a key and can not be accessed globally. The + * partitioning (or whether it is only a single partition) is defined by the corresponding function + * call. + * + * @see FunctionHint + */ +@PublicEvolving +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER}) +public @interface StateHint { Review Comment: Also a question: whether it should be covered in doc as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]
ferenc-csaky merged PR #25702: URL: https://github.com/apache/flink/pull/25702 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
snuyanzin commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1861953820 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' e2e_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: shouldn't it also be updated for nightly, python wheels and in other places in this file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36643][filesystems] Backport: Update aws-java-sdk-core to 1.12.779 to fix CVE [flink]
tomncooper opened a new pull request, #25709: URL: https://github.com/apache/flink/pull/25709 ## What is the purpose of the change This is backport of #25600. The current version of `aws-java-sdk-core`, used in the `flink-s3-fs-base` module, has a high severity vulnerability ([CVE-2024-21634](https://nvd.nist.gov/vuln/detail/CVE-2024-21634)). To address this we need to update to version 1.12.773 or higher, 1.12.779 is the current latest version. ## Brief change log Update the `aws-java-sdk-core` version used in `flink-s3-fs-base` to 1.12.779. ## Verifying this change This change is already covered by existing tests in the `flink-s3-fs-base` module. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: yes ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
davidradl commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862049614 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: I wonder why we are not going to 24.04 -since it is the latest LTS release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]
naferx commented on PR #24829: URL: https://github.com/apache/flink/pull/24829#issuecomment-2505902456 @snuyanzin just rebased this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36643][filesystems] Backport: Update aws-java-sdk-core to 1.12.779 [flink]
flinkbot commented on PR #25709: URL: https://github.com/apache/flink/pull/25709#issuecomment-2505933359 ## CI report: * f04105fca97ddc88b5e4359b428ec5a8455bdfdf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
davidradl commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862058735 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). Review Comment: for clarity - it would be worth saying how this is managed by the frame work if not using managed state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
davidradl commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862058735 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). Review Comment: it would be worth saying how this is managed by the frame =work if not using managed state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
davidradl commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862058735 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). Review Comment: for clarity - it would be worth saying how this is managed by the frame work if not using managed state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
davidradl commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862076676 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.util.Collector; + +/** + * Base class for a user-defined process table function. A process table function (PTF) maps zero, + * one, or multiple tables to zero, one, or multiple rows (or structured types). Scalar arguments + * are also supported. If the output record consists of only one field, the wrapper can be omitted, + * and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime. + * + * PTFs are the most powerful function kind for Flink SQL and Table API. They enable implementing + * user-defined operators that can be as feature-rich as built-in operations. PTFs have access to + * Flink's managed state, event-time and timer services, underlying table changelogs, and can take + * multiple ordered and/or partitioned tables to produce a new table. + * + * Table Semantics and Virtual Processors + * + * PTFs can produce a new table by consuming tables as arguments. For scalability, input tables + * are distributed into virtual processors. Each virtual processor executes a PTF instance and has + * access only to a share of the entire table. The argument declaration decides about the size of + * the share and co-location of data. Conceptually, tables can be processed either "as row" (i.e. + * with row semantics) or "as set" (i.e. with set semantics). + * + * Table Argument with Row Semantics + * + * A PTF that takes a table with row semantics assumes that there is no correlation between rows + * and each row can be processed independently. The framework is free in how to distribute rows + * among virtual processors and each virtual processor has access only to the currently processed + * row. + * + * Table Argument with Set Semantics + * + * A PTF that takes a table with set semantics assumes that there is a correlation between rows. + * When calling the function, the PARTITION BY clause defines the columns for correlation. The + * framework ensures that all rows belonging to same set are co-located. A PTF instance is able to + * access all rows belonging to the same set. In other words: The virtual processor is scoped under + * a key context. + * + * Basic Implementation + * + * The behavior of a {@link ProcessTableFunction} can be defined by implementing a custom + * evaluation method. The evaluation method must be declared publicly, not static, and named + * eval. Overloading is not supported. Review Comment: I wonder why Overloading is not supported. Is there some relationship to the way the parameters are supplied from SQL that means they need to be one shape, so overloading would not work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]
SML0127 commented on code in PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861766911 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java: ## Review Comment: I will add integrated test cases if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
flinkbot commented on PR #25708: URL: https://github.com/apache/flink/pull/25708#issuecomment-2505643498 ## CI report: * 3a9c69c677f25a4be5a3168d830e3ced77ff4369 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]
mehdid93 commented on PR #25670: URL: https://github.com/apache/flink/pull/25670#issuecomment-2505664022 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36816) Support source parallelism setting for JDBC connector
Grzegorz Kołakowski created FLINK-36816: --- Summary: Support source parallelism setting for JDBC connector Key: FLINK-36816 URL: https://issues.apache.org/jira/browse/FLINK-36816 Project: Flink Issue Type: New Feature Components: Connectors / JDBC, Table SQL / JDBC Reporter: Grzegorz Kołakowski The jdbc connector adaptation work for [FLIP-367: Support Setting Parallelism for Table/SQL Sources - Apache Flink - Apache Software Foundation|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]
SML0127 commented on code in PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861726630 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java: ## @@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception { .isEqualTo(expectedStreamRecord); } +private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat) +throws Exception { +database.createAndInitialize(); +CloseableIterator iterator = +env.fromSource( +getFlinkSourceProvider( +new String[] {"json_types"}, +database, +useLegacyJsonFormat) +.getSource(), +WatermarkStrategy.noWatermarks(), +"Event-Source") +.executeAndCollect(); + +Object[] expectedSnapshot = +new Object[] { +DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), +BinaryStringData.fromString("{\"key1\": \"value1\"}"), +BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"), +BinaryStringData.fromString( +"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"), +1 +}; + +// skip CreateTableEvent +List snapshotResults = +MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; +RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); +Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES)) +.isEqualTo(expectedSnapshot); + +try (Connection connection = database.getJdbcConnection(); +Statement statement = connection.createStatement()) { +statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;"); +} + +Object[] expectedStreamRecord = expectedSnapshot; + +if (useLegacyJsonFormat) { +expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); +expectedSnapshot[2] = + BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"); +expectedSnapshot[3] = +BinaryStringData.fromString( + "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"); +} +expectedSnapshot[4] = null; Review Comment: I revised it more clearly as shown below. ```java if (useLegacyJsonFormat) { // removed whitespace before value and after comma in json format string value Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) .containsExactly( DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), BinaryStringData.fromString("{\"key1\":\"value1\"}"), BinaryStringData.fromString( "{\"key1\":\"value1\",\"key2\":\"value2\"}"), BinaryStringData.fromString( "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"), null); } else { Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) .containsExactly(expectedStreamRecord); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28897) Fail to use udf in added jar when enabling checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901640#comment-17901640 ] Ammu Parvathy commented on FLINK-28897: --- [~Jiangang] [~lsy] I have raised a PR [https://github.com/apache/flink/pull/25656] to 1.20 version with a probable fix on this particular issue with checkpointing enabled. Can I get a review on this please. > Fail to use udf in added jar when enabling checkpoint > - > > Key: FLINK-28897 > URL: https://issues.apache.org/jira/browse/FLINK-28897 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: Liu >Assignee: dalongliu >Priority: Critical > Labels: pull-request-available, stale-assigned > Fix For: 2.0.0 > > > The problem can be reproduced when enabling checkpoint for that > StreamingJobGraphGenerator.preValidate is called actually in this case. Maybe > this is a classloader problem. > The reproduced steps are as following: > {code:java} > // Enable checkpoint first and execute the command in sql client. > ADD JAR > '~/flink/flink-end-to-end-tests/flink-sql-client-test/target/SqlToolbox.jar'; > create function func1 as > 'org.apache.flink.table.toolbox.StringRegexReplaceFunction' LANGUAGE JAVA; > SELECT id, func1(str, 'World', 'Flink') FROM (VALUES (1, 'Hello World')) AS > T(id, str); {code} > The output is as following: > {code:java} > /* 1 */ > /* 2 */ public class StreamExecCalc$11 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */ private final Object[] references; > /* 6 */ private transient > org.apache.flink.table.runtime.typeutils.StringDataSerializer > typeSerializer$4; > /* 7 */ > /* 8 */ private final > org.apache.flink.table.data.binary.BinaryStringData str$6 = > org.apache.flink.table.data.binary.BinaryStringData.fromString("World"); > /* 9 */ > /* 10 */ > /* 11 */ private final > org.apache.flink.table.data.binary.BinaryStringData str$7 = > org.apache.flink.table.data.binary.BinaryStringData.fromString("Flink"); > /* 12 */ > /* 13 */ private transient > org.apache.flink.table.toolbox.StringRegexReplaceFunction > function_org$apache$flink$table$toolbox$StringRegexReplaceFunction; > /* 14 */ private transient > org.apache.flink.table.data.conversion.StringStringConverter converter$8; > /* 15 */ org.apache.flink.table.data.BoxedWrapperRowData out = new > org.apache.flink.table.data.BoxedWrapperRowData(2); > /* 16 */ private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 17 */ > /* 18 */ public StreamExecCalc$11( > /* 19 */ Object[] references, > /* 20 */ org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 21 */ org.apache.flink.streaming.api.graph.StreamConfig config, > /* 22 */ org.apache.flink.streaming.api.operators.Output output, > /* 23 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 24 */ this.references = references; > /* 25 */ typeSerializer$4 = > (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) > references[0])); > /* 26 */ > function_org$apache$flink$table$toolbox$StringRegexReplaceFunction = > (((org.apache.flink.table.toolbox.StringRegexReplaceFunction) references[1])); > /* 27 */ converter$8 = > (((org.apache.flink.table.data.conversion.StringStringConverter) > references[2])); > /* 28 */ this.setup(task, config, output); > /* 29 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 30 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 31 */ .setProcessingTimeService(processingTimeService); > /* 32 */ } > /* 33 */ } > /* 34 */ > /* 35 */ @Override > /* 36 */ public void open() throws Exception { > /* 37 */ super.open(); > /* 38 */ > /* 39 */ > function_org$apache$flink$table$toolbox$StringRegexReplaceFunction.open(new > org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); > /* 40 */ > /* 41 */ > /* 42 */ > converter$8.open(getRuntimeContext().getUserCodeClassLoader()); > /* 43 */ > /* 44 */ } > /* 45 */ > /* 46 */ @Override > /* 47 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /*
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1861751435 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -100,6 +101,55 @@ void testCreateTableFromDescriptor() throws Exception { .contains(entry("connector", "fake"), entry("a", "Test")); } +@Test +void testCreateTableIfNotExistsFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); +tEnv.createTable( +"T", + TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build(), +true); + +final ObjectPath objectPath = new ObjectPath(database, "T"); +assertThat( +tEnv.getCatalog(catalog) +.orElseThrow(AssertionError::new) +.tableExists(objectPath)) +.isTrue(); + +final CatalogBaseTable catalogTable = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath); +assertThat(catalogTable).isInstanceOf(CatalogTable.class); +assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); +assertThat(catalogTable.getOptions()) +.contains(entry("connector", "fake"), entry("a", "Test")); + +assertThatNoException() +.isThrownBy( +() -> +tEnv.createTable( +"T", +TableDescriptor.forConnector("fake") +.schema(schema) +.option("a", "Test") +.build(), +true)); + +assertThatThrownBy( +() -> +tEnv.createTable( +"T", +TableDescriptor.forConnector("fake") +.schema(schema) +.option("a", "Test") +.build(), +false)) +.isInstanceOf(ValidationException.class); Review Comment: Yes, that works -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
flinkbot commented on PR #25707: URL: https://github.com/apache/flink/pull/25707#issuecomment-2505633651 ## CI report: * e8c2ec88bf59239f3c40a88d228905772eab9459 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36816] Support source parallelism setting for JDBC connector [flink-connector-jdbc]
grzegorz8 opened a new pull request, #148: URL: https://github.com/apache/flink-connector-jdbc/pull/148 # Purpose of the change Add new option `scan.parallelism` support for JDBC connector. Part of [FLINK-33261](https://issues.apache.org/jira/browse/FLINK-33261) FLIP-367: Support Setting Parallelism for Table/SQL Sources -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36816) Support source parallelism setting for JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-36816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36816: --- Labels: pull-request-available (was: ) > Support source parallelism setting for JDBC connector > - > > Key: FLINK-36816 > URL: https://issues.apache.org/jira/browse/FLINK-36816 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC, Table SQL / JDBC >Reporter: Grzegorz Kołakowski >Priority: Major > Labels: pull-request-available > > The jdbc connector adaptation work for [FLIP-367: Support Setting Parallelism > for Table/SQL Sources - Apache Flink - Apache Software > Foundation|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
dawidwys commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1861980546 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java: ## @@ -632,7 +633,9 @@ private static Stream functionSpecs() { TestSpec.forScalarFunction(FunctionHintTableArgScalarFunction.class) .expectErrorMessage("Only scalar arguments are supported yet."), TestSpec.forScalarFunction(ArgumentHintTableArgScalarFunction.class) -.expectErrorMessage("Only scalar arguments are supported yet.")); +.expectErrorMessage("Only scalar arguments are supported yet."), Review Comment: You don't change the message, but I spotted the sentence is not grammatically correct. Do you think we could fix it? ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableAggregateFunction; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A hint that declares an intermediate result (i.e. state entry) that is managed by the framework + * (i.e. Flink managed state). + * + * State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports multiple + * state entries at the beginning of an eval()/onTimer() method (after an optional context + * parameter). + * + * Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction}) + * support a single state entry at the beginning of an accumulate()/retract() method (i.e. the + * accumulator). + * + * For example, {@code @StateHint(name = "count", type = @DataTypeHint("BIGINT"))} is a state + * entry with the data type BIGINT named "count". + * + * Note: Usually, a state entry is partitioned by a key and can not be accessed globally. The + * partitioning (or whether it is only a single partition) is defined by the corresponding function + * call. + * + * @see FunctionHint + */ +@PublicEvolving +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER}) +public @interface StateHint { + +/** + * The name of the state entry. + * + * This can be used to provide a descriptive name for the state entry. The name can be used + * for referencing the entry during clean up. + */ +String name() default ""; Review Comment: Are there any uniqueness requirements? If so can we write them explicitly? ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.Fu
Re: [PR] [FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency [flink-kubernetes-operator]
mxm commented on code in PR #920: URL: https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1861986603 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -181,15 +178,15 @@ private void updateRecommendedParallelism( } @VisibleForTesting -static boolean allRequiredVerticesWithinUtilizationTarget( +static boolean allChangedVerticesWithinUtilizationTarget( Map> evaluatedMetrics, -Set requiredVertices) { -// All vertices' ParallelismChange is optional, rescaling will be ignored. -if (requiredVertices.isEmpty()) { +Set changedVertices) { +// No any vertex is changed. +if (changedVertices.isEmpty()) { return true; } -for (JobVertexID vertex : requiredVertices) { +for (var vertex : changedVertices) { Review Comment: I generally avoid `var` because it makes the code harder to read without type information, but that's just me. Moreover, I avoid refactoring existing code to use var, because it adds noise to the diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency [flink-kubernetes-operator]
mxm commented on code in PR #920: URL: https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1861985148 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ## @@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval( var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); if (scaleDownInterval.toMillis() <= 0) { // The scale down interval is disable, so don't block scaling. -return ParallelismChange.required(newParallelism); -} - -var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); -if (firstTriggerTime.isEmpty()) { -LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); -delayedScaleDown.updateTriggerTime(vertex, clock.instant()); -return ParallelismChange.optional(newParallelism); +return ParallelismChange.build(newParallelism); } -if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) { -LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex); -return ParallelismChange.optional(newParallelism); +var now = clock.instant(); +var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); + +// Never scale down within scale down interval +if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { Review Comment: I thought that `getFirstTriggerTime()` returns the first time we scaled *up*, but we are actually recording the time we first try to scale down. I'm not sure this is correct. We want to delay scale down from the first time we scale up, not the first time we scaled down. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ## @@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval( var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); if (scaleDownInterval.toMillis() <= 0) { // The scale down interval is disable, so don't block scaling. -return ParallelismChange.required(newParallelism); -} - -var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); -if (firstTriggerTime.isEmpty()) { -LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); -delayedScaleDown.updateTriggerTime(vertex, clock.instant()); -return ParallelismChange.optional(newParallelism); +return ParallelismChange.build(newParallelism); } -if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) { -LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex); -return ParallelismChange.optional(newParallelism); +var now = clock.instant(); +var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); + +// Never scale down within scale down interval +if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { +if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) { +LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); +} else { +LOG.debug( +"Try to skip immediate scale down within scale-down interval for {}", +vertex); +} +return ParallelismChange.noChange(); } else { -return ParallelismChange.required(newParallelism); +// Using the maximum parallelism within the scale down interval window instead of the +// latest parallelism when scaling down +return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism()); Review Comment: Thanks Rui! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1861989731 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' e2e_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
davidradl commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862082959 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). Review Comment: if would be worth saying how the entry is managed when not via managed state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35114) Remove old Table API implementations, update Schema stack
[ https://issues.apache.org/jira/browse/FLINK-35114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-35114: - Summary: Remove old Table API implementations, update Schema stack (was: Remove old Table API implementations) > Remove old Table API implementations, update Schema stack > - > > Key: FLINK-35114 > URL: https://issues.apache.org/jira/browse/FLINK-35114 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kudu >Reporter: Ferenc Csaky >Assignee: Ferenc Csaky >Priority: Major > > At the moment, the connector has both the old Table sink/source/catalog > implementations and the matching Dynamic... implementations as well. > Going forward, the deprecated old implementation should be removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862101386 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.util.Collector; + +/** + * Base class for a user-defined process table function. A process table function (PTF) maps zero, + * one, or multiple tables to zero, one, or multiple rows (or structured types). Scalar arguments + * are also supported. If the output record consists of only one field, the wrapper can be omitted, + * and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime. + * + * PTFs are the most powerful function kind for Flink SQL and Table API. They enable implementing + * user-defined operators that can be as feature-rich as built-in operations. PTFs have access to + * Flink's managed state, event-time and timer services, underlying table changelogs, and can take + * multiple ordered and/or partitioned tables to produce a new table. + * + * Table Semantics and Virtual Processors + * + * PTFs can produce a new table by consuming tables as arguments. For scalability, input tables + * are distributed into virtual processors. Each virtual processor executes a PTF instance and has + * access only to a share of the entire table. The argument declaration decides about the size of + * the share and co-location of data. Conceptually, tables can be processed either "as row" (i.e. + * with row semantics) or "as set" (i.e. with set semantics). + * + * Table Argument with Row Semantics + * + * A PTF that takes a table with row semantics assumes that there is no correlation between rows + * and each row can be processed independently. The framework is free in how to distribute rows + * among virtual processors and each virtual processor has access only to the currently processed + * row. + * + * Table Argument with Set Semantics + * + * A PTF that takes a table with set semantics assumes that there is a correlation between rows. + * When calling the function, the PARTITION BY clause defines the columns for correlation. The + * framework ensures that all rows belonging to same set are co-located. A PTF instance is able to + * access all rows belonging to the same set. In other words: The virtual processor is scoped under + * a key context. + * + * Basic Implementation + * + * The behavior of a {@link ProcessTableFunction} can be defined by implementing a custom + * evaluation method. The evaluation method must be declared publicly, not static, and named + * eval. Overloading is not supported. Review Comment: Overloading does not go well with named parameters. In the end we want to support a syntax like: ``` SELECT * FROM f(name => 's', 'threshold' => 12) ``` Supporting `name == STRING || name == INT` makes the implementation highly complex. We support optional arguments, which should solve most of the use cases. E.g. `f(numbers = [123])` or `f(strings = ['1', '2', '3'])`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862101386 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.util.Collector; + +/** + * Base class for a user-defined process table function. A process table function (PTF) maps zero, + * one, or multiple tables to zero, one, or multiple rows (or structured types). Scalar arguments + * are also supported. If the output record consists of only one field, the wrapper can be omitted, + * and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime. + * + * PTFs are the most powerful function kind for Flink SQL and Table API. They enable implementing + * user-defined operators that can be as feature-rich as built-in operations. PTFs have access to + * Flink's managed state, event-time and timer services, underlying table changelogs, and can take + * multiple ordered and/or partitioned tables to produce a new table. + * + * Table Semantics and Virtual Processors + * + * PTFs can produce a new table by consuming tables as arguments. For scalability, input tables + * are distributed into virtual processors. Each virtual processor executes a PTF instance and has + * access only to a share of the entire table. The argument declaration decides about the size of + * the share and co-location of data. Conceptually, tables can be processed either "as row" (i.e. + * with row semantics) or "as set" (i.e. with set semantics). + * + * Table Argument with Row Semantics + * + * A PTF that takes a table with row semantics assumes that there is no correlation between rows + * and each row can be processed independently. The framework is free in how to distribute rows + * among virtual processors and each virtual processor has access only to the currently processed + * row. + * + * Table Argument with Set Semantics + * + * A PTF that takes a table with set semantics assumes that there is a correlation between rows. + * When calling the function, the PARTITION BY clause defines the columns for correlation. The + * framework ensures that all rows belonging to same set are co-located. A PTF instance is able to + * access all rows belonging to the same set. In other words: The virtual processor is scoped under + * a key context. + * + * Basic Implementation + * + * The behavior of a {@link ProcessTableFunction} can be defined by implementing a custom + * evaluation method. The evaluation method must be declared publicly, not static, and named + * eval. Overloading is not supported. Review Comment: Overloading does not go well with named parameters. In the end we want to support a syntax like: ``` SELECT * FROM f(name => 's', 'threshold' => 12) ``` Supporting `name == STRING || name == INT` makes the implementation highly complex. We support optional arguments, which should solve most of the use cases. E.g. `f(numbers = [1, 2, 3])` or `f(strings = ['1', '2', '3'])`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35114] Remove old Table API implementations, update Table API and Schema stack [flink-connector-kudu]
ferenc-csaky opened a new pull request, #3: URL: https://github.com/apache/flink-connector-kudu/pull/3 Main logical changes: - Removed old Table API source/sink/factory. - Moved the Dynamic implementations up from the `dymanic` package. - Updated the schema stack from `TableSchema` to `Schema`/`ResolvedSchema`. - Utilized the now built-un `LookupFunction` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862114539 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.util.Collector; + +/** + * Base class for a user-defined process table function. A process table function (PTF) maps zero, + * one, or multiple tables to zero, one, or multiple rows (or structured types). Scalar arguments + * are also supported. If the output record consists of only one field, the wrapper can be omitted, + * and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime. + * + * PTFs are the most powerful function kind for Flink SQL and Table API. They enable implementing + * user-defined operators that can be as feature-rich as built-in operations. PTFs have access to + * Flink's managed state, event-time and timer services, underlying table changelogs, and can take + * multiple ordered and/or partitioned tables to produce a new table. + * + * Table Semantics and Virtual Processors + * + * PTFs can produce a new table by consuming tables as arguments. For scalability, input tables + * are distributed into virtual processors. Each virtual processor executes a PTF instance and has Review Comment: A virtual processor could be translated into the Flink concept "key context" but this is rather an internal term. Virtual processor is neither TM, nor Task, nor Subtask. And I think this is also what confuses people when they work with KeyedProcessFunction for the first time. Understanding that state is scoped under the key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35114) Remove old Table API implementations, update Schema stack
[ https://issues.apache.org/jira/browse/FLINK-35114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35114: --- Labels: pull-request-available (was: ) > Remove old Table API implementations, update Schema stack > - > > Key: FLINK-35114 > URL: https://issues.apache.org/jira/browse/FLINK-35114 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kudu >Reporter: Ferenc Csaky >Assignee: Ferenc Csaky >Priority: Major > Labels: pull-request-available > > At the moment, the connector has both the old Table sink/source/catalog > implementations and the matching Dynamic... implementations as well. > Going forward, the deprecated old implementation should be removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862103659 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). Review Comment: This is hard to say and might be too implementation specific. It varies from operator to operator. Sometimes its MemorySegments, sometimes its just Java heap. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). Review Comment: This is hard to say and might be too implementation specific. It varies from operator to operator. Sometimes its MemorySegments, sometimes it's just Java heap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on PR #25707: URL: https://github.com/apache/flink/pull/25707#issuecomment-2506035053 > ideally testing the behavior documented in the comments. This is a feature under development. The behavior in comments is not fully implemented yet. As can be seen in the umbrella ticket FLINK-36703, this is just the very beginning of the implementation. The interfaces are necessary as a first step to power the next tasks which is TypeInferenceExtractor update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
twalthr commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862089260 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, + * it will be inaccessible in the current session. To make the permanent object available again + * one can drop the corresponding temporary object. + * + * Examples: + * + * {@code + * tEnv.createTemporaryTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); Review Comment: update example ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +775,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. + * + * Examples: + * + * {@code + * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); Review Comment: update example ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +745,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. Review Comment: update link -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: because we already did the work required for Ubuntu 22. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: because we already did the work on the image-side for Ubuntu 22. ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: because we already did the work on the image-side for Ubuntu 22 months ago. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: because we already did the work on the image-side for Ubuntu 22 months ago and even used it in the GHA-based CI workflows for a while now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862185553 ## .github/workflows/template.flink-ci.yml: ## @@ -72,7 +72,7 @@ jobs: name: "Compile" runs-on: ubuntu-22.04 container: - image: mapohl/flink-ci:FLINK-34194 + image: chesnay/flink-ci:java_8_11_17_21_maven_386_jammy Review Comment: figured we could also use this image in the GHA setup for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: because we already did the work on the image-side for Ubuntu 22 months ago and have been using it in the GHA-based CI workflows for months. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938 ## azure-pipelines.yml: ## @@ -73,16 +73,16 @@ stages: parameters: # see template file for a definition of the parameters. stage_name: ci_build test_pool_definition: -vmImage: 'ubuntu-20.04' +vmImage: 'ubuntu-22.04' Review Comment: because we already did the work on the image-side for Ubuntu 22 months ago and have been using it in the GHA-based CI workflows for months. i.e., it's just an easy win. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
snuyanzin commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862207540 ## .github/workflows/template.flink-ci.yml: ## @@ -72,7 +72,7 @@ jobs: name: "Compile" runs-on: ubuntu-22.04 container: - image: mapohl/flink-ci:FLINK-34194 + image: chesnay/flink-ci:java_8_11_17_21_maven_386_jammy Review Comment: yep, also in https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/.github/workflows/template.pre-compile-checks.yml#L46 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
snuyanzin commented on code in PR #25708: URL: https://github.com/apache/flink/pull/25708#discussion_r1862207540 ## .github/workflows/template.flink-ci.yml: ## @@ -72,7 +72,7 @@ jobs: name: "Compile" runs-on: ubuntu-22.04 container: - image: mapohl/flink-ci:FLINK-34194 + image: chesnay/flink-ci:java_8_11_17_21_maven_386_jammy Review Comment: yep, also in https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/.github/workflows/template.pre-compile-checks.yml#L46 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862211415 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). * * By default, an explicit accumulator type is undefined and the reflection-based extraction * is used. + * + * This parameter is primarily intended for aggregating functions (i.e. {@link + * AggregateFunction} and {@link TableAggregateFunction}). It is recommended to use {@link + * #state()} for {@link ProcessTableFunction}. */ DataTypeHint accumulator() default @DataTypeHint(); +/** + * Explicitly lists the intermediate results (i.e. state entries) of a function that is managed + * by the framework (i.e. Flink managed state). Including their names and data types. + * + * State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports + * multiple state entries at the beginning of an eval()/onTimer() method (after an optional + * context parameter). + * + * Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction}) + * support a single state entry at the beginning of an accumulate()/retract() method (i.e. the + * accumulator). + * + * By default, explicit state is undefined and the reflection-based extraction is used where + * {@link StateHint} is present. + * + * Using both {@link #accumulator()} and this parameter is not allowed. Specifying the list + * of state entries manually disables the entire reflection-based extraction around {@link + * StateHint} and accumulators for aggregating functions. Review Comment: Let wait with this. Currently, PTFs are not fully functional. Exposing the changes via docs might cause confusion. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java: ## @@ -175,13 +178,40 @@ ArgumentHint[] arguments() default {}; /** - * Explicitly defines the intermediate result type that a function uses as accumulator. + * Explicitly defines the intermediate result type (i.e. state entry) that an aggregating + * function uses as its accumulator. The entry is managed by the framework (usually via Flink's + * managed state). * * By default, an explicit accumulator type is undefined and the reflection-based extraction * is used. + * + * This parameter is primarily intended for aggregating functions (i.e. {@link + * AggregateFunction} and {@link TableAggregateFunction}). It is recommended to use {@link + * #state()} for {@link ProcessTableFunction}. */ DataTypeHint accumulator() default @DataTypeHint(); +/** + * Explicitly lists the intermediate results (i.e. state entries) of a function that is managed + * by the framework (i.e. Flink managed state). Including their names and data types. + * + * State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports + * multiple state entries at the beginning of an eval()/onTimer() method (after an optional + * context parameter). + * + * Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction}) + * support a single state entry at the beginning of an accumulate()/retract() method (i.e. the + * accumulator). + * + * By default, explicit state is undefined and the reflection-based extraction is used where + * {@link StateHint} is present. + * + * Using both {@link #accumulator()} and this parameter is not allowed. Specifying the list + * of state entries manually disables the entire reflection-based extraction around {@link + * StateHint} and accumulators for aggregating functions. Review Comment: Let's wait with this. Currently, PTFs are not fully functional. Exposing the changes via docs might cause confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on code in PR #25707: URL: https://github.com/apache/flink/pull/25707#discussion_r1862212335 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableAggregateFunction; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A hint that declares an intermediate result (i.e. state entry) that is managed by the framework + * (i.e. Flink managed state). + * + * State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports multiple + * state entries at the beginning of an eval()/onTimer() method (after an optional context + * parameter). + * + * Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction}) + * support a single state entry at the beginning of an accumulate()/retract() method (i.e. the + * accumulator). + * + * For example, {@code @StateHint(name = "count", type = @DataTypeHint("BIGINT"))} is a state + * entry with the data type BIGINT named "count". + * + * Note: Usually, a state entry is partitioned by a key and can not be accessed globally. The + * partitioning (or whether it is only a single partition) is defined by the corresponding function + * call. + * + * @see FunctionHint + */ +@PublicEvolving +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER}) +public @interface StateHint { Review Comment: Absolutely, but not yet. First there needs to be more logic merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
snuyanzin commented on PR #25708: URL: https://github.com/apache/flink/pull/25708#issuecomment-2506147084 one minor comment which probably relates to image rather than this PR: since jdk8 support was dropped (https://github.com/apache/flink/pull/25406) it could be removed from image as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
zentol commented on PR #25708: URL: https://github.com/apache/flink/pull/25708#issuecomment-2506155540 So long as we maintain 1.x branches I'd suggest to keep java 8 in there so we are able to use the same CI image for all versions (in case the need arised to bump the CI image in the 1.x branches). It's not really a problem to have java 8 in there after all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36529] Allow Flink version configs to be set to greater than given version [flink-kubernetes-operator]
gyfora merged PR #918: URL: https://github.com/apache/flink-kubernetes-operator/pull/918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862788331 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +745,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. Review Comment: > update link ops, thanks! ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +745,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. Review Comment: > update link ops, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on PR #25704: URL: https://github.com/apache/flink/pull/25704#issuecomment-2506886668 @twalthr @davidradl thanks for the reviews 👍 I think I've addressed all comments, take a look. Updating the python api was a bit tricky with the setup (curious to see if lint will fail), but the implementation should be working. Tests are identical to the java implementation and are passing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]
herunkang2018 commented on PR #3656: URL: https://github.com/apache/flink-cdc/pull/3656#issuecomment-2506898687 @leonardBang @yuxiqian Would you like to review again since the CI is passed? Thanks a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34874][cdc-connector][mongodb] Support initial.snapshotting.pipeline related configs in table api [flink-cdc]
Jiabao-Sun commented on PR #3707: URL: https://github.com/apache/flink-cdc/pull/3707#issuecomment-2506910528 Thanks @herunkang2018 for this contribution. Hi @yuxiqian, could you help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][cdc-connector][oracle] Fix oracle server time zone option missing in table api. [flink-cdc]
LiuBodong closed pull request #2985: [fix][cdc-connector][oracle] Fix oracle server time zone option missing in table api. URL: https://github.com/apache/flink-cdc/pull/2985 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36698][pipeline-connector][elasticsearch] Elasticsearch pipeline sink support authentication [flink-cdc]
beryllw commented on PR #3728: URL: https://github.com/apache/flink-cdc/pull/3728#issuecomment-2506921020 @lvyanquan @proletarians cc, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862771988 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, Review Comment: - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#temporary-vs-permanent-tables - I agree that `tables` instead of `objects` would be easier to digest in this context. On the other hand, the temporary objects concept seems to exist for multiple resources, like views, and having the same sentence might help people recognize this as being the same pattern. I have no strong opinion here though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862772602 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, + * it will be inaccessible in the current session. To make the permanent object available again + * one can drop the corresponding temporary object. + * + * Examples: + * + * {@code + * tEnv.createTemporaryTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); + * } + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. + * @param ignoreIfExists If a table exists under the given path and this flag is set, no * Review Comment: I think it does behave the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore [flink-cdc]
github-actions[bot] commented on PR #3569: URL: https://github.com/apache/flink-cdc/pull/3569#issuecomment-2506867619 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore [flink-cdc]
github-actions[bot] closed pull request #3569: [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore URL: https://github.com/apache/flink-cdc/pull/3569 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862775007 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, + * it will be inaccessible in the current session. To make the permanent object available again Review Comment: - Same as https://github.com/apache/flink/pull/25704#discussion_r1862771988 - Good question. I'm also not sure and would need to double-check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862779370 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -49,55 +50,78 @@ class TableEnvironmentTest { @Test void testCreateTemporaryTableFromDescriptor() { final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); -final String catalog = tEnv.getCurrentCatalog(); -final String database = tEnv.getCurrentDatabase(); - final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); -tEnv.createTemporaryTable( -"T", - TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build()); -assertThat( -tEnv.getCatalog(catalog) -.orElseThrow(AssertionError::new) -.tableExists(new ObjectPath(database, "T"))) -.isFalse(); +assertTemporaryCreateTableFromDescriptor(tEnv, schema); +} -final Optional lookupResult = -tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, database, "T")); -assertThat(lookupResult.isPresent()).isTrue(); +@Test +void testCreateTemporaryTableIfNotExistsFromDescriptor() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); +final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); -final CatalogBaseTable catalogTable = lookupResult.get().getResolvedTable(); -assertThat(catalogTable instanceof CatalogTable).isTrue(); -assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); - assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); -assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); +assertTemporaryCreateTableFromDescriptor(tEnv, schema); +assertThatNoException() +.isThrownBy( +() -> +tEnv.createTemporaryTable( +"T", +TableDescriptor.forConnector("fake") +.schema(schema) +.option("a", "Test") +.build(), Review Comment: I've updated the tests so both `assertCreateTableFromDescriptor` and `assertTemporaryCreateTableFromDescriptor` helper functions have a n gnoreIfExists param and we test the case you asked for 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862341867 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, + * it will be inaccessible in the current session. To make the permanent object available again + * one can drop the corresponding temporary object. + * + * Examples: + * + * {@code + * tEnv.createTemporaryTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); + * } + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. + * @param ignoreIfExists If a table exists under the given path and this flag is set, no * Review Comment: NIT: extra * after no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33571][table] Override json-path version for Calcite 1.32 to deal with CVE [flink]
MartijnVisser commented on PR #25602: URL: https://github.com/apache/flink/pull/25602#issuecomment-2506290502 It looks like we've pivoted from the original intent of the Jira ticket (upgrade to Calcite 1.38). That means that we'll either have to create a new Jira ticket for what's done in this PR, or we have to update the existing Jira ticket to reflect the changes from this PR and create a new one to deal with the upgrade to Calcite 1.38 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36602) Upgrade Calcite version to 1.38.0
[ https://issues.apache.org/jira/browse/FLINK-36602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-36602: --- Labels: (was: pull-request-available) > Upgrade Calcite version to 1.38.0 > - > > Key: FLINK-36602 > URL: https://issues.apache.org/jira/browse/FLINK-36602 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 2.0-preview >Reporter: Thomas Cooper >Priority: Major > > The currently used Calcite version (1.32) has a high severity vulnerability > ([CVE-2023-1370|https://nvd.nist.gov/vuln/detail/CVE-2023-1370]). This can be > mitigated by upgrading to Calcite 1.37 or higher (which upgrades the > vulnerable json-path library). > As [1.38 has been > released|https://calcite.apache.org/news/2024/10/15/release-1.38.0/] we > should probably upgrade to that. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862333839 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, Review Comment: I read sentence "Temporary objects can shadow permanent ones" - I am not sure why I would want to do this ; is this shadowing capability documented? - I suggest this sentence use `table `instead of` object`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36800] Upgrade hadoop-aws to 3.3.6 [flink]
davidradl commented on code in PR #25692: URL: https://github.com/apache/flink/pull/25692#discussion_r1862403736 ## flink-filesystems/pom.xml: ## @@ -34,7 +34,7 @@ under the License. pom - 3.3.4 + 3.3.6 Review Comment: There are notice files in the metadata in the subprojects that reference this number - they should be updated as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36688][table-planner] Sort metadata keys when reusing source [flink]
davidradl commented on code in PR #25681: URL: https://github.com/apache/flink/pull/25681#discussion_r1862411403 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java: ## @@ -165,7 +166,7 @@ public List reuseDuplicatedScan(List relNodes) { } int[][] allProjectFields = allProjectFieldSet.toArray(new int[0][]); -List allMetaKeys = new ArrayList<>(allMetaKeySet); +List allMetaKeys = sortMetadataKeys(allMetaKeySet, pickTable.tableSource()); Review Comment: NIT: method name could be `enforceMetadataKeyOrder` or `copyMetadataKeyOrder`; sort implies there is a sort order like ascending. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36756][sql-gateway] Bump up the sql gateway rest version [flink]
davidradl commented on code in PR #25705: URL: https://github.com/apache/flink/pull/25705#discussion_r1862326643 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java: ## @@ -51,7 +51,10 @@ public enum SqlGatewayRestAPIVersion V2(false, true), // V3 introduces materialized table related APIs -V3(true, true); +V3(false, true), + +// V4 supports to deploy script to application cluster +V4(true, true); Review Comment: I cant see an incompatible change in the API in FLIP-480 that would need a new version. When we remove calls - then "The Flink community is preparing to remove support for per-job submission." the version will need to change then. I am curious whether this version change should be delayed until there is an incompatible REST change. Or has the incompatible change already gone in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862384619 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, + * it will be inaccessible in the current session. To make the permanent object available again + * one can drop the corresponding temporary object. + * + * Examples: + * + * {@code + * tEnv.createTemporaryTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); + * } + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. + * @param ignoreIfExists If a table exists under the given path and this flag is set, no * Review Comment: I am curious, if there is an existing table , temporary table , view and materialized table, does this flag act the same in each case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]
snuyanzin merged PR #24829: URL: https://github.com/apache/flink/pull/24829 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]
snuyanzin commented on PR #24829: URL: https://github.com/apache/flink/pull/24829#issuecomment-2506518447 @naferx can you please also create backports for 1.19, 1.20? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36529) Support greater or equals logic for operator flink version default configs
[ https://issues.apache.org/jira/browse/FLINK-36529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-36529. -- Fix Version/s: kubernetes-operator-1.11.0 Resolution: Fixed merged to main 9bab0286dc152ba5dbf8226913cee98d988a5b66 > Support greater or equals logic for operator flink version default configs > -- > > Key: FLINK-36529 > URL: https://issues.apache.org/jira/browse/FLINK-36529 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.11.0 > > > The operator currently allows the following syntax for defining flink version > specific defaults: > kubernetes.operator.default-configuration.flink-version.v1_18.key: value > The problem with this is that usually these defaults should be applied to > newer Flink versions as well in many cases, forcing config duplications. > We should introduce a + syntax for configs applied to a version and above: > kubernetes.operator.default-configuration.flink-version.v1_18+.key: value > in this case key:value would be applied to all Flink version greater or equal > to 1.18 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862354231 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, + * it will be inaccessible in the current session. To make the permanent object available again Review Comment: for sentences `If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again one can drop the corresponding temporary object.` - can we use `table` or `TableDescriptor ` instead of `object`? - is this true if ignoreIfExists is set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862333839 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, Review Comment: I read sentence "Temporary objects can shadow permanent ones" I know this was copied from the original method. - I am not sure why I would want to do this ; is this shadowing capability documented? - I suggest this sentence use `table` or `TableDescriptor ` instead of` object`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]
mehdid93 commented on PR #25670: URL: https://github.com/apache/flink/pull/25670#issuecomment-2506492367 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
ammu20-dev commented on code in PR #25656: URL: https://github.com/apache/flink/pull/25656#discussion_r1861721487 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java: ## @@ -1542,6 +1542,19 @@ void testUsingAddJar() throws Exception { "drop function lowerUdf"); } +@Test Review Comment: Added new test case to verify that the job submission succeeds with add jar and checkpointing. Note that this test case will fail in 1.20.x and below versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]
SML0127 commented on code in PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861726630 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java: ## @@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception { .isEqualTo(expectedStreamRecord); } +private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat) +throws Exception { +database.createAndInitialize(); +CloseableIterator iterator = +env.fromSource( +getFlinkSourceProvider( +new String[] {"json_types"}, +database, +useLegacyJsonFormat) +.getSource(), +WatermarkStrategy.noWatermarks(), +"Event-Source") +.executeAndCollect(); + +Object[] expectedSnapshot = +new Object[] { +DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), +BinaryStringData.fromString("{\"key1\": \"value1\"}"), +BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"), +BinaryStringData.fromString( +"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"), +1 +}; + +// skip CreateTableEvent +List snapshotResults = +MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; +RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); +Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES)) +.isEqualTo(expectedSnapshot); + +try (Connection connection = database.getJdbcConnection(); +Statement statement = connection.createStatement()) { +statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;"); +} + +Object[] expectedStreamRecord = expectedSnapshot; + +if (useLegacyJsonFormat) { +expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); +expectedSnapshot[2] = + BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"); +expectedSnapshot[3] = +BinaryStringData.fromString( + "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"); +} +expectedSnapshot[4] = null; Review Comment: I revised it more clearly as shown below. ```java if (useLegacyJsonFormat) { // removed whitespace before value and after comma in json format string value Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) .containsExactly( DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), BinaryStringData.fromString("{\"key1\":\"value1\"}"), BinaryStringData.fromString( "{\"key1\":\"value1\",\"key2\":\"value2\"}"), BinaryStringData.fromString( "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"), null); } else { Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) .containsExactly(expectedStreamRecord); } ``` ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java: ## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicabl
Re: [PR] [FLINK-36749][state/forst] Implement rescaling for ForStStateBackend [flink]
davidradl commented on code in PR #25676: URL: https://github.com/apache/flink/pull/25676#discussion_r1862415650 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java: ## @@ -177,8 +182,8 @@ protected AsyncKeyedStateBackend restoreAsyncKeyedBackend( new JobID(), "test_op", keySerializer, -numberOfKeyGroups, -new KeyGroupRange(0, numberOfKeyGroups - 1), +keyGroupRange.getNumberOfKeyGroups(), +keyGroupRange, Review Comment: why do we need both parameteres - if one can be derived from another? ## flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java: ## @@ -177,8 +182,8 @@ protected AsyncKeyedStateBackend restoreAsyncKeyedBackend( new JobID(), "test_op", keySerializer, -numberOfKeyGroups, -new KeyGroupRange(0, numberOfKeyGroups - 1), +keyGroupRange.getNumberOfKeyGroups(), +keyGroupRange, Review Comment: why do we need both parameters - if one can be derived from another? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36451][runtime] Replaces LeaderElection#hasLeadership with LeaderElection#runAsLeader [flink]
XComp commented on code in PR #25679: URL: https://github.com/apache/flink/pull/25679#discussion_r1862280134 ## flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java: ## @@ -1246,6 +1200,62 @@ private void testNonBlockingCall( testInstance.close(); } +/** + * This test is used to verify FLINK-36451 where we observed concurrent nested locks being + * acquired from the {@link LeaderContender} and from the {@link DefaultLeaderElectionService}. + */ +@Test +void testNestedDeadlockInLeadershipConfirmation() throws Exception { Review Comment: That's the test that I added to verify the findings. I kept it in a separate commit to prove that the test runs into a deadlock before applying the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862333839 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -718,6 +718,36 @@ void createTemporarySystemFunction( */ void createTemporaryTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, Review Comment: I read sentence "Temporary objects can shadow permanent ones" - I am not sure why I would want to do this ; is this shadowing capability documented? - I suggest this sentence use `table` or `TableDescriptor ` instead of` object`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]
twalthr commented on PR #25707: URL: https://github.com/apache/flink/pull/25707#issuecomment-2506316008 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36702][sql-gateway] Introduce ScripExecutor to run the script [flink]
davidradl commented on code in PR #25700: URL: https://github.com/apache/flink/pull/25700#discussion_r1862393806 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/application/ScriptExecutor.java: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.application; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserEOFException; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationExecutor; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Parser to split the statements. */ Review Comment: This javadoc does not seem to correspond to the class name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
davidradl commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1862391147 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -49,55 +50,78 @@ class TableEnvironmentTest { @Test void testCreateTemporaryTableFromDescriptor() { final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); -final String catalog = tEnv.getCurrentCatalog(); -final String database = tEnv.getCurrentDatabase(); - final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); -tEnv.createTemporaryTable( -"T", - TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build()); -assertThat( -tEnv.getCatalog(catalog) -.orElseThrow(AssertionError::new) -.tableExists(new ObjectPath(database, "T"))) -.isFalse(); +assertTemporaryCreateTableFromDescriptor(tEnv, schema); +} -final Optional lookupResult = -tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, database, "T")); -assertThat(lookupResult.isPresent()).isTrue(); +@Test +void testCreateTemporaryTableIfNotExistsFromDescriptor() { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); +final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); -final CatalogBaseTable catalogTable = lookupResult.get().getResolvedTable(); -assertThat(catalogTable instanceof CatalogTable).isTrue(); -assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); - assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); -assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); +assertTemporaryCreateTableFromDescriptor(tEnv, schema); +assertThatNoException() +.isThrownBy( +() -> +tEnv.createTemporaryTable( +"T", +TableDescriptor.forConnector("fake") +.schema(schema) +.option("a", "Test") +.build(), Review Comment: I assume table T existing in this case - so no exception is thrown. I suggest adding a test to ensure that creating a temp table with the flag set means that the table will be created when there is no existing table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]
snuyanzin commented on PR #24829: URL: https://github.com/apache/flink/pull/24829#issuecomment-2506443102 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
davidradl commented on code in PR #25656: URL: https://github.com/apache/flink/pull/25656#discussion_r1862450447 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java: ## @@ -1542,6 +1542,19 @@ void testUsingAddJar() throws Exception { "drop function lowerUdf"); } +@Test +void testUsingAddJarWithCheckpointing() throws Exception { +env().enableCheckpointing(100); Review Comment: NIT: I guess we could check that the class loader on the Current thread is the same before and after the env.executeSql. Using the Thread.currentThread().getContextClassLoader(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org