[jira] [Created] (FLINK-36818) Securing credentials in SQL connectors

2024-11-28 Thread Kunal Rohitas (Jira)
Kunal Rohitas created FLINK-36818:
-

 Summary: Securing credentials in SQL connectors
 Key: FLINK-36818
 URL: https://issues.apache.org/jira/browse/FLINK-36818
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch, Connectors / MongoDB
Reporter: Kunal Rohitas


When accessing databases in Flink SQL, we are required to pass sensitive 
credentials directly within the SQL query as part of the connection 
configuration and sometimes we don't want them to be exposed. Is there already 
a solution for this problem ?

 

Can we implement a secret reader mechanism that allows Flink to read 
credentials from a mounted file or environment-based secrets manager. This way, 
credentials can be injected at runtime without being hardcoded into the SQL 
queries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36819) [state/forst] Support link() in ForStFlinkFileSystem

2024-11-28 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-36819:
--

 Summary: [state/forst] Support link() in ForStFlinkFileSystem
 Key: FLINK-36819
 URL: https://issues.apache.org/jira/browse/FLINK-36819
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33265] Support source parallelism setting for Kafka connector [flink-connector-kafka]

2024-11-28 Thread via GitHub


RocMarshal commented on code in PR #134:
URL: 
https://github.com/apache/flink-connector-kafka/pull/134#discussion_r1863022748


##
docs/content.zh/docs/connectors/table/upsert-kafka.md:
##
@@ -180,6 +180,14 @@ of all available metadata fields.


 
+
+  scan.parallelism
+  optional
+  no
+  (none)
+  Integer
+  定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并发。

Review Comment:
   ```suggestion
 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。
   ```



##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:
##
@@ -212,14 +212,56 @@ public void testTableSource() {
 KAFKA_SOURCE_PROPERTIES,
 StartupMode.SPECIFIC_OFFSETS,
 specificOffsets,
-0);
+0,
+null);
 assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
 
 ScanTableSource.ScanRuntimeProvider provider =
 
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
 assertKafkaSource(provider);
 }
 
+@Test
+public void testTableSourceWithParallelism() {
+final Map modifiedOptions =
+getModifiedOptions(
+getBasicSourceOptions(), options -> 
options.put("scan.parallelism", "100"));
+final DynamicTableSource actualSource = createTableSource(SCHEMA, 
modifiedOptions);
+final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) 
actualSource;
+
+final Map specificOffsets = new HashMap<>();
+specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), 
OFFSET_0);
+specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), 
OFFSET_1);
+
+final DecodingFormat> 
valueDecodingFormat =
+new DecodingFormatMock(",", true);
+
+// Test scan source equals
+final KafkaDynamicSource expectedKafkaSource =
+createExpectedScanSource(
+SCHEMA_DATA_TYPE,
+null,
+valueDecodingFormat,
+new int[0],
+new int[] {0, 1, 2},
+null,
+Collections.singletonList(TOPIC),
+null,
+KAFKA_SOURCE_PROPERTIES,
+StartupMode.SPECIFIC_OFFSETS,
+specificOffsets,
+0,
+100);
+assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+
+ScanTableSource.ScanRuntimeProvider provider =
+
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) 
provider;
+assertThat(sourceProvider.getParallelism().isPresent()).isTrue();
+assertThat(sourceProvider.getParallelism().get()).isEqualTo(100);

Review Comment:
   ```suggestion
   assertThat(sourceProvider.getParallelism());
   assertThat(sourceProvider.getParallelism()).hasValue(100);
   ```



##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java:
##
@@ -177,6 +180,37 @@ public void testTableSource() {
 assertKafkaSource(provider);
 }
 
+@Test
+public void testTableSourceWithParallelism() {
+final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+// Construct table source using options and table source factory
+final Map modifiedOptions =
+getModifiedOptions(
+getFullSourceOptions(), options -> 
options.put("scan.parallelism", "100"));

Review Comment:
   ditto



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java:
##
@@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
 
 final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
 
+Integer parallelism = tableOptions.get(SCAN_PARALLELISM);

Review Comment:
   A trivial comment, I'm not sure if it's appropriate:
   what about inlining the line into line 174  ?



##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:
##
@@ -212,14 +212,56 @@ public void testTableSource() {
 KAFKA_SOURCE_PROPERTIES,
 StartupMode.SPECIFIC_OFFSETS,
 specificOffsets,
-0);
+0,
+null);
 as

Re: [PR] [FLINK-33265] Support source parallelism setting for Kafka connector [flink-connector-kafka]

2024-11-28 Thread via GitHub


RocMarshal commented on code in PR #134:
URL: 
https://github.com/apache/flink-connector-kafka/pull/134#discussion_r1863022594


##
docs/content.zh/docs/connectors/table/kafka.md:
##
@@ -342,6 +342,14 @@ CREATE TABLE KafkaTable (
   Duration
   Consumer 定期探测动态创建的 Kafka topic 和 partition 
的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能
 
+
+  scan.parallelism
+  optional
+  no
+  (none)
+  Integer
+  定义 Kafka source 算子的并行度。默认情况下会使用全局默认并发。

Review Comment:
   ```suggestion
 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并行度。
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1861756363


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +745,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * If the table should not be permanently stored in a catalog, use 
{@link
+ * #createTemporaryTable(String, TableDescriptor)} instead.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());
+ * }
+ *
+ * @param path The path under which the table will be registered. See also 
the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param descriptor Template for creating a {@link CatalogTable} instance.
+ * @param ignoreIfExists If a table exists under the given path and this 
flag is set, no
+ * operation is executed. An exception is thrown otherwise.
+ */
+void createTable(String path, TableDescriptor descriptor, boolean 
ignoreIfExists);

Review Comment:
   Yes, it does. I'm getting my python environment set up right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]

2024-11-28 Thread via GitHub


SML0127 commented on code in PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861728840


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql:
##


Review Comment:
   it was my mistake...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr opened a new pull request, #25707:
URL: https://github.com/apache/flink/pull/25707

   ## What is the purpose of the change
   
   Adds the initial interfaces of `ProcessTableFunction` and `StateHint` 
mentioned in FLIP-440 to the code base. This is a requirement to continue with 
the `TypeInferenceExtractor` extractor work. 
   
   ## Brief change log
   
   - Introduce `ProcessTableFunction` and `StateHint`
   - Add `FunctionHint.state()`
   - Add `TableSementics` for powering the context of PTFs
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36705) Add ProcessTableFunction class and annotations

2024-11-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36705:
---
Labels: pull-request-available  (was: )

> Add ProcessTableFunction class and annotations
> --
>
> Key: FLINK-36705
> URL: https://issues.apache.org/jira/browse/FLINK-36705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add the ProcessTableFunction class and update annotations. This makes a first 
> (non-functional) implementation of PTFs possible. Time access is excluded in 
> the first version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36801) Update CI image to Ubuntu 22.04

2024-11-28 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-36801.

Resolution: Duplicate

> Update CI image to Ubuntu 22.04
> ---
>
> Key: FLINK-36801
> URL: https://issues.apache.org/jira/browse/FLINK-36801
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Reporter: Mehdi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34194) Upgrade Flink CI Docker container to Ubuntu 22.04

2024-11-28 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-34194:


Assignee: Chesnay Schepler  (was: Matthias Pohl)

> Upgrade Flink CI Docker container to Ubuntu 22.04
> -
>
> Key: FLINK-34194
> URL: https://issues.apache.org/jira/browse/FLINK-34194
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: github-actions, pull-request-available
> Fix For: 2.0.0
>
>
> The current CI Docker image is based on Ubuntu 16.04. We already use 20.04 
> for the e2e tests. We can update the Docker image to a newer version to be on 
> par with what we need in GitHub Actions (FLINK-33923).
> This issue can cover the following topics:
>  * Update to 22.04
>  ** OpenSSL 1.0.0 dependency should be added for netty-tcnative support
>  ** Use Python3 instead of Python 2.7 (python symlink needs to be added due 
> to FLINK-34195) 
>  * Removal of Maven (FLINK-33501 makes us rely on the Maven wrapper)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36817) Give users ability to provide their own KafkaConsumer when using flink-connector-kafka

2024-11-28 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created FLINK-36817:
-

 Summary: Give users ability to provide their own KafkaConsumer 
when using flink-connector-kafka
 Key: FLINK-36817
 URL: https://issues.apache.org/jira/browse/FLINK-36817
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Reporter: Levani Kokhreidze


In certain scenarios users of the `KafkaSource` in the flink-connector-kafka 
might want to provide their own KafkaConsumer. Right now this is not possible 
as consumer is created in the 
[KafkaPartitionSplitReader|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L97]
 which makes customisation impossible.

Proposal is to let users pass `KafkaConsumerFactory` when building the 
KafkaSource.
{code:java}
public interface KafkaConsumerFactory {
  KafkaConsumer get(Properties properties);
}{code}
Builder will have a default implementation which creates the KafkaConsumer the 
same as it is done now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] FLINK-36817: Introduce KafkaConsumerFactory [flink-connector-kafka]

2024-11-28 Thread via GitHub


boring-cyborg[bot] commented on PR #137:
URL: 
https://github.com/apache/flink-connector-kafka/pull/137#issuecomment-2505689926

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36817) Give users ability to provide their own KafkaConsumer when using flink-connector-kafka

2024-11-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36817:
---
Labels: pull-request-available  (was: )

> Give users ability to provide their own KafkaConsumer when using 
> flink-connector-kafka
> --
>
> Key: FLINK-36817
> URL: https://issues.apache.org/jira/browse/FLINK-36817
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Levani Kokhreidze
>Priority: Major
>  Labels: pull-request-available
>
> In certain scenarios users of the `KafkaSource` in the flink-connector-kafka 
> might want to provide their own KafkaConsumer. Right now this is not possible 
> as consumer is created in the 
> [KafkaPartitionSplitReader|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L97]
>  which makes customisation impossible.
> Proposal is to let users pass `KafkaConsumerFactory` when building the 
> KafkaSource.
> {code:java}
> public interface KafkaConsumerFactory {
>   KafkaConsumer get(Properties properties);
> }{code}
> Builder will have a default implementation which creates the KafkaConsumer 
> the same as it is done now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36817] Introduce KafkaConsumerFactory [flink-connector-kafka]

2024-11-28 Thread via GitHub


lkokhreidze commented on PR #137:
URL: 
https://github.com/apache/flink-connector-kafka/pull/137#issuecomment-2505694480

   Will add the javadocs. But first wanted to get the initial feedback if this 
makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36817] Introduce KafkaConsumerFactory [flink-connector-kafka]

2024-11-28 Thread via GitHub


lkokhreidze commented on code in PR #137:
URL: 
https://github.com/apache/flink-connector-kafka/pull/137#discussion_r1861855542


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaConsumerFactory.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Properties;
+
+public interface KafkaConsumerFactory {
+KafkaConsumer get(Properties properties);

Review Comment:
   I was debating myself if it would be better to return `Consumer` interface 
instead of the implementation. Happy to change it if community agrees that 
returning an interface is the better choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1861852208


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).
  *
  * By default, an explicit accumulator type is undefined and the 
reflection-based extraction
  * is used.
+ *
+ * This parameter is primarily intended for aggregating functions (i.e. 
{@link
+ * AggregateFunction} and {@link TableAggregateFunction}). It is 
recommended to use {@link
+ * #state()} for {@link ProcessTableFunction}.
  */
 DataTypeHint accumulator() default @DataTypeHint();
 
+/**
+ * Explicitly lists the intermediate results (i.e. state entries) of a 
function that is managed
+ * by the framework (i.e. Flink managed state). Including their names and 
data types.
+ *
+ * State hints are primarily intended for {@link ProcessTableFunction}. 
A PTF supports
+ * multiple state entries at the beginning of an eval()/onTimer() method 
(after an optional
+ * context parameter).
+ *
+ * Aggregating functions (i.e. {@link AggregateFunction} and {@link 
TableAggregateFunction})
+ * support a single state entry at the beginning of an 
accumulate()/retract() method (i.e. the
+ * accumulator).
+ *
+ * By default, explicit state is undefined and the reflection-based 
extraction is used where
+ * {@link StateHint} is present.
+ *
+ * Using both {@link #accumulator()} and this parameter is not allowed. 
Specifying the list
+ * of state entries manually disables the entire reflection-based 
extraction around {@link
+ * StateHint} and accumulators for aggregating functions.

Review Comment:
   Should it also be mentioned in doc? Somewhere here e.g.
   
https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/docs/content/docs/dev/table/functions/udfs.md?plain=1#L439



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1861853852


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A hint that declares an intermediate result (i.e. state entry) that is 
managed by the framework
+ * (i.e. Flink managed state).
+ *
+ * State hints are primarily intended for {@link ProcessTableFunction}. A 
PTF supports multiple
+ * state entries at the beginning of an eval()/onTimer() method (after an 
optional context
+ * parameter).
+ *
+ * Aggregating functions (i.e. {@link AggregateFunction} and {@link 
TableAggregateFunction})
+ * support a single state entry at the beginning of an accumulate()/retract() 
method (i.e. the
+ * accumulator).
+ *
+ * For example, {@code @StateHint(name = "count", type = 
@DataTypeHint("BIGINT"))} is a state
+ * entry with the data type BIGINT named "count".
+ *
+ * Note: Usually, a state entry is partitioned by a key and can not be 
accessed globally. The
+ * partitioning (or whether it is only a single partition) is defined by the 
corresponding function
+ * call.
+ *
+ * @see FunctionHint
+ */
+@PublicEvolving
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER})
+public @interface StateHint {

Review Comment:
   Also a question: whether it should be covered in doc as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]

2024-11-28 Thread via GitHub


ferenc-csaky merged PR #25702:
URL: https://github.com/apache/flink/pull/25702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1861953820


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'
   e2e_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   shouldn't it also be updated for nightly, python wheels and in other places 
in this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36643][filesystems] Backport: Update aws-java-sdk-core to 1.12.779 to fix CVE [flink]

2024-11-28 Thread via GitHub


tomncooper opened a new pull request, #25709:
URL: https://github.com/apache/flink/pull/25709

   ## What is the purpose of the change
   
   This is backport of #25600. 
   
   The current version of `aws-java-sdk-core`, used in the `flink-s3-fs-base` 
module, has a high severity vulnerability 
([CVE-2024-21634](https://nvd.nist.gov/vuln/detail/CVE-2024-21634)).
   
   To address this we need to update to version 1.12.773 or higher, 1.12.779 is 
the current latest version.
   
   ## Brief change log
   
   Update the `aws-java-sdk-core` version used in `flink-s3-fs-base` to 
1.12.779. 
   
   ## Verifying this change
   
   This change is already covered by existing tests in the `flink-s3-fs-base` 
module.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862049614


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   I wonder why we are not going to 24.04 -since it is the latest LTS release?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]

2024-11-28 Thread via GitHub


naferx commented on PR #24829:
URL: https://github.com/apache/flink/pull/24829#issuecomment-2505902456

   @snuyanzin just rebased this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36643][filesystems] Backport: Update aws-java-sdk-core to 1.12.779 [flink]

2024-11-28 Thread via GitHub


flinkbot commented on PR #25709:
URL: https://github.com/apache/flink/pull/25709#issuecomment-2505933359

   
   ## CI report:
   
   * f04105fca97ddc88b5e4359b428ec5a8455bdfdf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862058735


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).

Review Comment:
   for clarity - it would be worth saying how this is managed by the frame work 
if not using managed state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862058735


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).

Review Comment:
   it would be worth saying how this is managed by the frame =work if not using 
managed state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862058735


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).

Review Comment:
   for clarity - it would be worth saying how this is managed by the frame work 
if not using managed state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862076676


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base class for a user-defined process table function. A process table 
function (PTF) maps zero,
+ * one, or multiple tables to zero, one, or multiple rows (or structured 
types). Scalar arguments
+ * are also supported. If the output record consists of only one field, the 
wrapper can be omitted,
+ * and a scalar value can be emitted that will be implicitly wrapped into a 
row by the runtime.
+ *
+ * PTFs are the most powerful function kind for Flink SQL and Table API. 
They enable implementing
+ * user-defined operators that can be as feature-rich as built-in operations. 
PTFs have access to
+ * Flink's managed state, event-time and timer services, underlying table 
changelogs, and can take
+ * multiple ordered and/or partitioned tables to produce a new table.
+ *
+ * Table Semantics and Virtual Processors
+ *
+ * PTFs can produce a new table by consuming tables as arguments. For 
scalability, input tables
+ * are distributed into virtual processors. Each virtual processor executes a 
PTF instance and has
+ * access only to a share of the entire table. The argument declaration 
decides about the size of
+ * the share and co-location of data. Conceptually, tables can be processed 
either "as row" (i.e.
+ * with row semantics) or "as set" (i.e. with set semantics).
+ *
+ * Table Argument with Row Semantics
+ *
+ * A PTF that takes a table with row semantics assumes that there is no 
correlation between rows
+ * and each row can be processed independently. The framework is free in how 
to distribute rows
+ * among virtual processors and each virtual processor has access only to the 
currently processed
+ * row.
+ *
+ * Table Argument with Set Semantics
+ *
+ * A PTF that takes a table with set semantics assumes that there is a 
correlation between rows.
+ * When calling the function, the PARTITION BY clause defines the columns for 
correlation. The
+ * framework ensures that all rows belonging to same set are co-located. A PTF 
instance is able to
+ * access all rows belonging to the same set. In other words: The virtual 
processor is scoped under
+ * a key context.
+ *
+ * Basic Implementation
+ *
+ * The behavior of a {@link ProcessTableFunction} can be defined by 
implementing a custom
+ * evaluation method. The evaluation method must be declared publicly, not 
static, and named 
+ * eval. Overloading is not supported.

Review Comment:
   I wonder why Overloading is not supported. Is there some relationship to the 
way the parameters are supplied from  SQL that means they need to be one shape, 
so overloading would not work?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]

2024-11-28 Thread via GitHub


SML0127 commented on code in PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861766911


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java:
##


Review Comment:
   I will add integrated test cases if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


flinkbot commented on PR #25708:
URL: https://github.com/apache/flink/pull/25708#issuecomment-2505643498

   
   ## CI report:
   
   * 3a9c69c677f25a4be5a3168d830e3ced77ff4369 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-11-28 Thread via GitHub


mehdid93 commented on PR #25670:
URL: https://github.com/apache/flink/pull/25670#issuecomment-2505664022

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36816) Support source parallelism setting for JDBC connector

2024-11-28 Thread Jira
Grzegorz Kołakowski created FLINK-36816:
---

 Summary: Support source parallelism setting for JDBC connector
 Key: FLINK-36816
 URL: https://issues.apache.org/jira/browse/FLINK-36816
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / JDBC, Table SQL / JDBC
Reporter: Grzegorz Kołakowski


The jdbc connector adaptation work for [FLIP-367: Support Setting Parallelism 
for Table/SQL Sources - Apache Flink - Apache Software 
Foundation|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]

2024-11-28 Thread via GitHub


SML0127 commented on code in PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861726630


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java:
##
@@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) 
throws Exception {
 .isEqualTo(expectedStreamRecord);
 }
 
+private void testJsonDataType(UniqueDatabase database, Boolean 
useLegacyJsonFormat)
+throws Exception {
+database.createAndInitialize();
+CloseableIterator iterator =
+env.fromSource(
+getFlinkSourceProvider(
+new String[] {"json_types"},
+database,
+useLegacyJsonFormat)
+.getSource(),
+WatermarkStrategy.noWatermarks(),
+"Event-Source")
+.executeAndCollect();
+
+Object[] expectedSnapshot =
+new Object[] {
+DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+BinaryStringData.fromString("{\"key1\": \"value1\"}"),
+BinaryStringData.fromString("{\"key1\": \"value1\", 
\"key2\": \"value2\"}"),
+BinaryStringData.fromString(
+"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": 
\"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": 
[\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
+1
+};
+
+// skip CreateTableEvent
+List snapshotResults =
+MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
JSON_TYPES))
+.isEqualTo(expectedSnapshot);
+
+try (Connection connection = database.getJdbcConnection();
+Statement statement = connection.createStatement()) {
+statement.execute("UPDATE json_types SET int_c = null WHERE id = 
1;");
+}
+
+Object[] expectedStreamRecord = expectedSnapshot;
+
+if (useLegacyJsonFormat) {
+expectedSnapshot[1] = 
BinaryStringData.fromString("{\"key1\":\"value1\"}");
+expectedSnapshot[2] =
+
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
+expectedSnapshot[3] =
+BinaryStringData.fromString(
+
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]");
+}
+expectedSnapshot[4] = null;

Review Comment:
   I revised it more clearly as shown below.
   
   ```java
   if (useLegacyJsonFormat) {
   // removed whitespace before value and after comma in json format string 
value
   Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
JSON_TYPES))
   .containsExactly(
   DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
   BinaryStringData.fromString("{\"key1\":\"value1\"}"),
   BinaryStringData.fromString(
   "{\"key1\":\"value1\",\"key2\":\"value2\"}"),
   BinaryStringData.fromString(
   
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
   null);
   } else {
   Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
JSON_TYPES))
   .containsExactly(expectedStreamRecord);
   }
   ```   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28897) Fail to use udf in added jar when enabling checkpoint

2024-11-28 Thread Ammu Parvathy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901640#comment-17901640
 ] 

Ammu Parvathy commented on FLINK-28897:
---

[~Jiangang] [~lsy] I have raised a PR 
[https://github.com/apache/flink/pull/25656] to 1.20 version with a probable 
fix on this particular issue with checkpointing enabled. Can I get a review on 
this please.

> Fail to use udf in added jar when enabling checkpoint
> -
>
> Key: FLINK-28897
> URL: https://issues.apache.org/jira/browse/FLINK-28897
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Liu
>Assignee: dalongliu
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
> Fix For: 2.0.0
>
>
> The problem can be reproduced when enabling checkpoint for that 
> StreamingJobGraphGenerator.preValidate is called actually in this case. Maybe 
> this is a classloader problem.
> The reproduced steps are as following:
> {code:java}
> // Enable checkpoint first and execute the command in sql client.
> ADD JAR  
> '~/flink/flink-end-to-end-tests/flink-sql-client-test/target/SqlToolbox.jar';
> create function func1 as 
> 'org.apache.flink.table.toolbox.StringRegexReplaceFunction' LANGUAGE JAVA;
> SELECT id, func1(str, 'World', 'Flink') FROM (VALUES (1, 'Hello World')) AS 
> T(id, str); {code}
> The output is as following:
> {code:java}
> /* 1 */
> /* 2 */      public class StreamExecCalc$11 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */        private final Object[] references;
> /* 6 */        private transient 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer 
> typeSerializer$4;
> /* 7 */
> /* 8 */        private final 
> org.apache.flink.table.data.binary.BinaryStringData str$6 = 
> org.apache.flink.table.data.binary.BinaryStringData.fromString("World");
> /* 9 */
> /* 10 */
> /* 11 */        private final 
> org.apache.flink.table.data.binary.BinaryStringData str$7 = 
> org.apache.flink.table.data.binary.BinaryStringData.fromString("Flink");
> /* 12 */
> /* 13 */        private transient 
> org.apache.flink.table.toolbox.StringRegexReplaceFunction 
> function_org$apache$flink$table$toolbox$StringRegexReplaceFunction;
> /* 14 */        private transient 
> org.apache.flink.table.data.conversion.StringStringConverter converter$8;
> /* 15 */        org.apache.flink.table.data.BoxedWrapperRowData out = new 
> org.apache.flink.table.data.BoxedWrapperRowData(2);
> /* 16 */        private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 17 */
> /* 18 */        public StreamExecCalc$11(
> /* 19 */            Object[] references,
> /* 20 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 21 */            org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 22 */            org.apache.flink.streaming.api.operators.Output output,
> /* 23 */            
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 24 */          this.references = references;
> /* 25 */          typeSerializer$4 = 
> (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) 
> references[0]));
> /* 26 */          
> function_org$apache$flink$table$toolbox$StringRegexReplaceFunction = 
> (((org.apache.flink.table.toolbox.StringRegexReplaceFunction) references[1]));
> /* 27 */          converter$8 = 
> (((org.apache.flink.table.data.conversion.StringStringConverter) 
> references[2]));
> /* 28 */          this.setup(task, config, output);
> /* 29 */          if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 30 */            
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 31 */              .setProcessingTimeService(processingTimeService);
> /* 32 */          }
> /* 33 */        }
> /* 34 */
> /* 35 */        @Override
> /* 36 */        public void open() throws Exception {
> /* 37 */          super.open();
> /* 38 */
> /* 39 */          
> function_org$apache$flink$table$toolbox$StringRegexReplaceFunction.open(new 
> org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
> /* 40 */
> /* 41 */
> /* 42 */          
> converter$8.open(getRuntimeContext().getUserCodeClassLoader());
> /* 43 */
> /* 44 */        }
> /* 45 */
> /* 46 */        @Override
> /* 47 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /*

Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1861751435


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -100,6 +101,55 @@ void testCreateTableFromDescriptor() throws Exception {
 .contains(entry("connector", "fake"), entry("a", "Test"));
 }
 
+@Test
+void testCreateTableIfNotExistsFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
+tEnv.createTable(
+"T",
+
TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build(),
+true);
+
+final ObjectPath objectPath = new ObjectPath(database, "T");
+assertThat(
+tEnv.getCatalog(catalog)
+.orElseThrow(AssertionError::new)
+.tableExists(objectPath))
+.isTrue();
+
+final CatalogBaseTable catalogTable =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath);
+assertThat(catalogTable).isInstanceOf(CatalogTable.class);
+assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+assertThat(catalogTable.getOptions())
+.contains(entry("connector", "fake"), entry("a", "Test"));
+
+assertThatNoException()
+.isThrownBy(
+() ->
+tEnv.createTable(
+"T",
+TableDescriptor.forConnector("fake")
+.schema(schema)
+.option("a", "Test")
+.build(),
+true));
+
+assertThatThrownBy(
+() ->
+tEnv.createTable(
+"T",
+TableDescriptor.forConnector("fake")
+.schema(schema)
+.option("a", "Test")
+.build(),
+false))
+.isInstanceOf(ValidationException.class);

Review Comment:
   Yes, that works



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


flinkbot commented on PR #25707:
URL: https://github.com/apache/flink/pull/25707#issuecomment-2505633651

   
   ## CI report:
   
   * e8c2ec88bf59239f3c40a88d228905772eab9459 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36816] Support source parallelism setting for JDBC connector [flink-connector-jdbc]

2024-11-28 Thread via GitHub


grzegorz8 opened a new pull request, #148:
URL: https://github.com/apache/flink-connector-jdbc/pull/148

   # Purpose of the change
   
   Add new option `scan.parallelism` support for JDBC connector.
   
   Part of [FLINK-33261](https://issues.apache.org/jira/browse/FLINK-33261) 
FLIP-367: Support Setting Parallelism for Table/SQL Sources
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36816) Support source parallelism setting for JDBC connector

2024-11-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36816:
---
Labels: pull-request-available  (was: )

> Support source parallelism setting for JDBC connector
> -
>
> Key: FLINK-36816
> URL: https://issues.apache.org/jira/browse/FLINK-36816
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC, Table SQL / JDBC
>Reporter: Grzegorz Kołakowski
>Priority: Major
>  Labels: pull-request-available
>
> The jdbc connector adaptation work for [FLIP-367: Support Setting Parallelism 
> for Table/SQL Sources - Apache Flink - Apache Software 
> Foundation|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


dawidwys commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1861980546


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java:
##
@@ -632,7 +633,9 @@ private static Stream functionSpecs() {
 
TestSpec.forScalarFunction(FunctionHintTableArgScalarFunction.class)
 .expectErrorMessage("Only scalar arguments are 
supported yet."),
 
TestSpec.forScalarFunction(ArgumentHintTableArgScalarFunction.class)
-.expectErrorMessage("Only scalar arguments are 
supported yet."));
+.expectErrorMessage("Only scalar arguments are 
supported yet."),

Review Comment:
   You don't change the message, but I spotted the sentence is not 
grammatically correct. Do you think we could fix it?



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A hint that declares an intermediate result (i.e. state entry) that is 
managed by the framework
+ * (i.e. Flink managed state).
+ *
+ * State hints are primarily intended for {@link ProcessTableFunction}. A 
PTF supports multiple
+ * state entries at the beginning of an eval()/onTimer() method (after an 
optional context
+ * parameter).
+ *
+ * Aggregating functions (i.e. {@link AggregateFunction} and {@link 
TableAggregateFunction})
+ * support a single state entry at the beginning of an accumulate()/retract() 
method (i.e. the
+ * accumulator).
+ *
+ * For example, {@code @StateHint(name = "count", type = 
@DataTypeHint("BIGINT"))} is a state
+ * entry with the data type BIGINT named "count".
+ *
+ * Note: Usually, a state entry is partitioned by a key and can not be 
accessed globally. The
+ * partitioning (or whether it is only a single partition) is defined by the 
corresponding function
+ * call.
+ *
+ * @see FunctionHint
+ */
+@PublicEvolving
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER})
+public @interface StateHint {
+
+/**
+ * The name of the state entry.
+ *
+ * This can be used to provide a descriptive name for the state entry. 
The name can be used
+ * for referencing the entry during clean up.
+ */
+String name() default "";

Review Comment:
   Are there any uniqueness requirements? If so can we write them explicitly?



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.Fu

Re: [PR] [FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency [flink-kubernetes-operator]

2024-11-28 Thread via GitHub


mxm commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1861986603


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
 }
 
 @VisibleForTesting
-static boolean allRequiredVerticesWithinUtilizationTarget(
+static boolean allChangedVerticesWithinUtilizationTarget(
 Map> 
evaluatedMetrics,
-Set requiredVertices) {
-// All vertices' ParallelismChange is optional, rescaling will be 
ignored.
-if (requiredVertices.isEmpty()) {
+Set changedVertices) {
+// No any vertex is changed.
+if (changedVertices.isEmpty()) {
 return true;
 }
 
-for (JobVertexID vertex : requiredVertices) {
+for (var vertex : changedVertices) {

Review Comment:
   I generally avoid `var` because it makes the code harder to read without 
type information, but that's just me. Moreover, I avoid refactoring existing 
code to use var, because it adds noise to the diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency [flink-kubernetes-operator]

2024-11-28 Thread via GitHub


mxm commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1861985148


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
 var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
 if (scaleDownInterval.toMillis() <= 0) {
 // The scale down interval is disable, so don't block scaling.
-return ParallelismChange.required(newParallelism);
-}
-
-var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-if (firstTriggerTime.isEmpty()) {
-LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-return ParallelismChange.optional(newParallelism);
+return ParallelismChange.build(newParallelism);
 }
 
-if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-return ParallelismChange.optional(newParallelism);
+var now = clock.instant();
+var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+// Never scale down within scale down interval
+if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {

Review Comment:
   I thought that `getFirstTriggerTime()` returns the first time we scaled 
*up*, but we are actually recording the time we first try to scale down. 
   
   I'm not sure this is correct. We want to delay scale down from the first 
time we scale up, not the first time we scaled down.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
 var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
 if (scaleDownInterval.toMillis() <= 0) {
 // The scale down interval is disable, so don't block scaling.
-return ParallelismChange.required(newParallelism);
-}
-
-var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-if (firstTriggerTime.isEmpty()) {
-LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-return ParallelismChange.optional(newParallelism);
+return ParallelismChange.build(newParallelism);
 }
 
-if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-return ParallelismChange.optional(newParallelism);
+var now = clock.instant();
+var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+// Never scale down within scale down interval
+if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
+LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+} else {
+LOG.debug(
+"Try to skip immediate scale down within scale-down 
interval for {}",
+vertex);
+}
+return ParallelismChange.noChange();
 } else {
-return ParallelismChange.required(newParallelism);
+// Using the maximum parallelism within the scale down interval 
window instead of the
+// latest parallelism when scaling down
+return 
ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());

Review Comment:
   Thanks Rui!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1861989731


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'
   e2e_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862082959


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).

Review Comment:
   if would be worth saying how the entry is managed when not via managed 
state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35114) Remove old Table API implementations, update Schema stack

2024-11-28 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-35114:
-
Summary: Remove old Table API implementations, update Schema stack  (was: 
Remove old Table API implementations)

> Remove old Table API implementations, update Schema stack
> -
>
> Key: FLINK-35114
> URL: https://issues.apache.org/jira/browse/FLINK-35114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kudu
>Reporter: Ferenc Csaky
>Assignee: Ferenc Csaky
>Priority: Major
>
> At the moment, the connector has both the old Table sink/source/catalog 
> implementations and the matching Dynamic... implementations as well.
> Going forward, the deprecated old implementation should be removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862101386


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base class for a user-defined process table function. A process table 
function (PTF) maps zero,
+ * one, or multiple tables to zero, one, or multiple rows (or structured 
types). Scalar arguments
+ * are also supported. If the output record consists of only one field, the 
wrapper can be omitted,
+ * and a scalar value can be emitted that will be implicitly wrapped into a 
row by the runtime.
+ *
+ * PTFs are the most powerful function kind for Flink SQL and Table API. 
They enable implementing
+ * user-defined operators that can be as feature-rich as built-in operations. 
PTFs have access to
+ * Flink's managed state, event-time and timer services, underlying table 
changelogs, and can take
+ * multiple ordered and/or partitioned tables to produce a new table.
+ *
+ * Table Semantics and Virtual Processors
+ *
+ * PTFs can produce a new table by consuming tables as arguments. For 
scalability, input tables
+ * are distributed into virtual processors. Each virtual processor executes a 
PTF instance and has
+ * access only to a share of the entire table. The argument declaration 
decides about the size of
+ * the share and co-location of data. Conceptually, tables can be processed 
either "as row" (i.e.
+ * with row semantics) or "as set" (i.e. with set semantics).
+ *
+ * Table Argument with Row Semantics
+ *
+ * A PTF that takes a table with row semantics assumes that there is no 
correlation between rows
+ * and each row can be processed independently. The framework is free in how 
to distribute rows
+ * among virtual processors and each virtual processor has access only to the 
currently processed
+ * row.
+ *
+ * Table Argument with Set Semantics
+ *
+ * A PTF that takes a table with set semantics assumes that there is a 
correlation between rows.
+ * When calling the function, the PARTITION BY clause defines the columns for 
correlation. The
+ * framework ensures that all rows belonging to same set are co-located. A PTF 
instance is able to
+ * access all rows belonging to the same set. In other words: The virtual 
processor is scoped under
+ * a key context.
+ *
+ * Basic Implementation
+ *
+ * The behavior of a {@link ProcessTableFunction} can be defined by 
implementing a custom
+ * evaluation method. The evaluation method must be declared publicly, not 
static, and named 
+ * eval. Overloading is not supported.

Review Comment:
   Overloading does not go well with named parameters. In the end we want to 
support a syntax like:
   ```
   SELECT * FROM f(name => 's', 'threshold' => 12)
   ```
   
   Supporting `name == STRING || name == INT` makes the implementation highly 
complex. We support optional arguments, which should solve most of the use 
cases. E.g. `f(numbers = [123])` or `f(strings = ['1', '2', '3'])`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862101386


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base class for a user-defined process table function. A process table 
function (PTF) maps zero,
+ * one, or multiple tables to zero, one, or multiple rows (or structured 
types). Scalar arguments
+ * are also supported. If the output record consists of only one field, the 
wrapper can be omitted,
+ * and a scalar value can be emitted that will be implicitly wrapped into a 
row by the runtime.
+ *
+ * PTFs are the most powerful function kind for Flink SQL and Table API. 
They enable implementing
+ * user-defined operators that can be as feature-rich as built-in operations. 
PTFs have access to
+ * Flink's managed state, event-time and timer services, underlying table 
changelogs, and can take
+ * multiple ordered and/or partitioned tables to produce a new table.
+ *
+ * Table Semantics and Virtual Processors
+ *
+ * PTFs can produce a new table by consuming tables as arguments. For 
scalability, input tables
+ * are distributed into virtual processors. Each virtual processor executes a 
PTF instance and has
+ * access only to a share of the entire table. The argument declaration 
decides about the size of
+ * the share and co-location of data. Conceptually, tables can be processed 
either "as row" (i.e.
+ * with row semantics) or "as set" (i.e. with set semantics).
+ *
+ * Table Argument with Row Semantics
+ *
+ * A PTF that takes a table with row semantics assumes that there is no 
correlation between rows
+ * and each row can be processed independently. The framework is free in how 
to distribute rows
+ * among virtual processors and each virtual processor has access only to the 
currently processed
+ * row.
+ *
+ * Table Argument with Set Semantics
+ *
+ * A PTF that takes a table with set semantics assumes that there is a 
correlation between rows.
+ * When calling the function, the PARTITION BY clause defines the columns for 
correlation. The
+ * framework ensures that all rows belonging to same set are co-located. A PTF 
instance is able to
+ * access all rows belonging to the same set. In other words: The virtual 
processor is scoped under
+ * a key context.
+ *
+ * Basic Implementation
+ *
+ * The behavior of a {@link ProcessTableFunction} can be defined by 
implementing a custom
+ * evaluation method. The evaluation method must be declared publicly, not 
static, and named 
+ * eval. Overloading is not supported.

Review Comment:
   Overloading does not go well with named parameters. In the end we want to 
support a syntax like:
   ```
   SELECT * FROM f(name => 's', 'threshold' => 12)
   ```
   
   Supporting `name == STRING || name == INT` makes the implementation highly 
complex. We support optional arguments, which should solve most of the use 
cases. E.g. `f(numbers = [1, 2, 3])` or `f(strings = ['1', '2', '3'])`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35114] Remove old Table API implementations, update Table API and Schema stack [flink-connector-kudu]

2024-11-28 Thread via GitHub


ferenc-csaky opened a new pull request, #3:
URL: https://github.com/apache/flink-connector-kudu/pull/3

   Main logical changes:
   - Removed old Table API source/sink/factory.
   - Moved the Dynamic implementations up from the `dymanic` package.
   - Updated the schema stack from `TableSchema` to `Schema`/`ResolvedSchema`.
   - Utilized the now built-un `LookupFunction`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862114539


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base class for a user-defined process table function. A process table 
function (PTF) maps zero,
+ * one, or multiple tables to zero, one, or multiple rows (or structured 
types). Scalar arguments
+ * are also supported. If the output record consists of only one field, the 
wrapper can be omitted,
+ * and a scalar value can be emitted that will be implicitly wrapped into a 
row by the runtime.
+ *
+ * PTFs are the most powerful function kind for Flink SQL and Table API. 
They enable implementing
+ * user-defined operators that can be as feature-rich as built-in operations. 
PTFs have access to
+ * Flink's managed state, event-time and timer services, underlying table 
changelogs, and can take
+ * multiple ordered and/or partitioned tables to produce a new table.
+ *
+ * Table Semantics and Virtual Processors
+ *
+ * PTFs can produce a new table by consuming tables as arguments. For 
scalability, input tables
+ * are distributed into virtual processors. Each virtual processor executes a 
PTF instance and has

Review Comment:
   A virtual processor could be translated into the Flink concept "key context" 
but this is rather an internal term. Virtual processor is neither TM, nor Task, 
nor Subtask. And I think this is also what confuses people when they work with 
KeyedProcessFunction for the first time. Understanding that state is scoped 
under the key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35114) Remove old Table API implementations, update Schema stack

2024-11-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35114:
---
Labels: pull-request-available  (was: )

> Remove old Table API implementations, update Schema stack
> -
>
> Key: FLINK-35114
> URL: https://issues.apache.org/jira/browse/FLINK-35114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kudu
>Reporter: Ferenc Csaky
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
>
> At the moment, the connector has both the old Table sink/source/catalog 
> implementations and the matching Dynamic... implementations as well.
> Going forward, the deprecated old implementation should be removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862103659


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).

Review Comment:
   This is hard to say and might be too implementation specific. It varies from 
operator to operator. Sometimes its MemorySegments, sometimes its just Java 
heap.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).

Review Comment:
   This is hard to say and might be too implementation specific. It varies from 
operator to operator. Sometimes its MemorySegments, sometimes it's just Java 
heap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on PR #25707:
URL: https://github.com/apache/flink/pull/25707#issuecomment-2506035053

   > ideally testing the behavior documented in the comments.
   
   This is a feature under development. The behavior in comments is not fully 
implemented yet. As can be seen in the umbrella ticket FLINK-36703, this is 
just the very beginning of the implementation. The interfaces are necessary as 
a first step to power the next tasks which is TypeInferenceExtractor update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862089260


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
+ * it will be inaccessible in the current session. To make the permanent 
object available again
+ * one can drop the corresponding temporary object.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTemporaryTable("MyTable", 
TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());

Review Comment:
   update example



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +775,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * If the table should not be permanently stored in a catalog, use 
{@link
+ * #createTemporaryTable(String, TableDescriptor)} instead.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());

Review Comment:
   update example



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +745,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * If the table should not be permanently stored in a catalog, use 
{@link
+ * #createTemporaryTable(String, TableDescriptor)} instead.

Review Comment:
   update link



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   because we already did the work required for Ubuntu 22.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   because we already did the work on the image-side for Ubuntu 22.



##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   because we already did the work on the image-side for Ubuntu 22 months ago.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   because we already did the work on the image-side for Ubuntu 22 months ago 
and even used it in the GHA-based CI workflows for a while now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862185553


##
.github/workflows/template.flink-ci.yml:
##
@@ -72,7 +72,7 @@ jobs:
 name: "Compile"
 runs-on: ubuntu-22.04
 container:
-  image: mapohl/flink-ci:FLINK-34194
+  image: chesnay/flink-ci:java_8_11_17_21_maven_386_jammy

Review Comment:
   figured we could also use this image in the GHA setup for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   because we already did the work on the image-side for Ubuntu 22 months ago 
and have been using it in the GHA-based CI workflows for months.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862177938


##
azure-pipelines.yml:
##
@@ -73,16 +73,16 @@ stages:
 parameters: # see template file for a definition of the parameters.
   stage_name: ci_build
   test_pool_definition:
-vmImage: 'ubuntu-20.04'
+vmImage: 'ubuntu-22.04'

Review Comment:
   because we already did the work on the image-side for Ubuntu 22 months ago 
and have been using it in the GHA-based CI workflows for months.
   
   i.e., it's just an easy win.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862207540


##
.github/workflows/template.flink-ci.yml:
##
@@ -72,7 +72,7 @@ jobs:
 name: "Compile"
 runs-on: ubuntu-22.04
 container:
-  image: mapohl/flink-ci:FLINK-34194
+  image: chesnay/flink-ci:java_8_11_17_21_maven_386_jammy

Review Comment:
   yep, also in
   
https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/.github/workflows/template.pre-compile-checks.yml#L46



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on code in PR #25708:
URL: https://github.com/apache/flink/pull/25708#discussion_r1862207540


##
.github/workflows/template.flink-ci.yml:
##
@@ -72,7 +72,7 @@ jobs:
 name: "Compile"
 runs-on: ubuntu-22.04
 container:
-  image: mapohl/flink-ci:FLINK-34194
+  image: chesnay/flink-ci:java_8_11_17_21_maven_386_jammy

Review Comment:
   yep, also in
   
https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/.github/workflows/template.pre-compile-checks.yml#L46



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862211415


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).
  *
  * By default, an explicit accumulator type is undefined and the 
reflection-based extraction
  * is used.
+ *
+ * This parameter is primarily intended for aggregating functions (i.e. 
{@link
+ * AggregateFunction} and {@link TableAggregateFunction}). It is 
recommended to use {@link
+ * #state()} for {@link ProcessTableFunction}.
  */
 DataTypeHint accumulator() default @DataTypeHint();
 
+/**
+ * Explicitly lists the intermediate results (i.e. state entries) of a 
function that is managed
+ * by the framework (i.e. Flink managed state). Including their names and 
data types.
+ *
+ * State hints are primarily intended for {@link ProcessTableFunction}. 
A PTF supports
+ * multiple state entries at the beginning of an eval()/onTimer() method 
(after an optional
+ * context parameter).
+ *
+ * Aggregating functions (i.e. {@link AggregateFunction} and {@link 
TableAggregateFunction})
+ * support a single state entry at the beginning of an 
accumulate()/retract() method (i.e. the
+ * accumulator).
+ *
+ * By default, explicit state is undefined and the reflection-based 
extraction is used where
+ * {@link StateHint} is present.
+ *
+ * Using both {@link #accumulator()} and this parameter is not allowed. 
Specifying the list
+ * of state entries manually disables the entire reflection-based 
extraction around {@link
+ * StateHint} and accumulators for aggregating functions.

Review Comment:
   Let wait with this. Currently, PTFs are not fully functional. Exposing the 
changes via docs might cause confusion.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java:
##
@@ -175,13 +178,40 @@
 ArgumentHint[] arguments() default {};
 
 /**
- * Explicitly defines the intermediate result type that a function uses as 
accumulator.
+ * Explicitly defines the intermediate result type (i.e. state entry) that 
an aggregating
+ * function uses as its accumulator. The entry is managed by the framework 
(usually via Flink's
+ * managed state).
  *
  * By default, an explicit accumulator type is undefined and the 
reflection-based extraction
  * is used.
+ *
+ * This parameter is primarily intended for aggregating functions (i.e. 
{@link
+ * AggregateFunction} and {@link TableAggregateFunction}). It is 
recommended to use {@link
+ * #state()} for {@link ProcessTableFunction}.
  */
 DataTypeHint accumulator() default @DataTypeHint();
 
+/**
+ * Explicitly lists the intermediate results (i.e. state entries) of a 
function that is managed
+ * by the framework (i.e. Flink managed state). Including their names and 
data types.
+ *
+ * State hints are primarily intended for {@link ProcessTableFunction}. 
A PTF supports
+ * multiple state entries at the beginning of an eval()/onTimer() method 
(after an optional
+ * context parameter).
+ *
+ * Aggregating functions (i.e. {@link AggregateFunction} and {@link 
TableAggregateFunction})
+ * support a single state entry at the beginning of an 
accumulate()/retract() method (i.e. the
+ * accumulator).
+ *
+ * By default, explicit state is undefined and the reflection-based 
extraction is used where
+ * {@link StateHint} is present.
+ *
+ * Using both {@link #accumulator()} and this parameter is not allowed. 
Specifying the list
+ * of state entries manually disables the entire reflection-based 
extraction around {@link
+ * StateHint} and accumulators for aggregating functions.

Review Comment:
   Let's wait with this. Currently, PTFs are not fully functional. Exposing the 
changes via docs might cause confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on code in PR #25707:
URL: https://github.com/apache/flink/pull/25707#discussion_r1862212335


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A hint that declares an intermediate result (i.e. state entry) that is 
managed by the framework
+ * (i.e. Flink managed state).
+ *
+ * State hints are primarily intended for {@link ProcessTableFunction}. A 
PTF supports multiple
+ * state entries at the beginning of an eval()/onTimer() method (after an 
optional context
+ * parameter).
+ *
+ * Aggregating functions (i.e. {@link AggregateFunction} and {@link 
TableAggregateFunction})
+ * support a single state entry at the beginning of an accumulate()/retract() 
method (i.e. the
+ * accumulator).
+ *
+ * For example, {@code @StateHint(name = "count", type = 
@DataTypeHint("BIGINT"))} is a state
+ * entry with the data type BIGINT named "count".
+ *
+ * Note: Usually, a state entry is partitioned by a key and can not be 
accessed globally. The
+ * partitioning (or whether it is only a single partition) is defined by the 
corresponding function
+ * call.
+ *
+ * @see FunctionHint
+ */
+@PublicEvolving
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER})
+public @interface StateHint {

Review Comment:
   Absolutely, but not yet. First there needs to be more logic merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on PR #25708:
URL: https://github.com/apache/flink/pull/25708#issuecomment-2506147084

   one minor comment which probably relates to image rather than this PR: since 
jdk8 support was dropped (https://github.com/apache/flink/pull/25406) it could 
be removed from image as well
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-11-28 Thread via GitHub


zentol commented on PR #25708:
URL: https://github.com/apache/flink/pull/25708#issuecomment-2506155540

   So long as we maintain 1.x branches I'd suggest to keep java 8 in there so 
we are able to use the same CI image for all versions (in case the need arised 
to bump the CI image in the 1.x branches).
   
   It's not really a problem to have java 8 in there after all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36529] Allow Flink version configs to be set to greater than given version [flink-kubernetes-operator]

2024-11-28 Thread via GitHub


gyfora merged PR #918:
URL: https://github.com/apache/flink-kubernetes-operator/pull/918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862788331


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +745,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * If the table should not be permanently stored in a catalog, use 
{@link
+ * #createTemporaryTable(String, TableDescriptor)} instead.

Review Comment:
   > update link
   ops, thanks! 



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +745,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * If the table should not be permanently stored in a catalog, use 
{@link
+ * #createTemporaryTable(String, TableDescriptor)} instead.

Review Comment:
   > update link
   
   ops, thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on PR #25704:
URL: https://github.com/apache/flink/pull/25704#issuecomment-2506886668

   @twalthr @davidradl thanks for the reviews 👍 I think I've addressed all 
comments, take a look. 
   
   Updating the python api was a bit tricky with the setup (curious to see if 
lint will fail), but the implementation should be working. Tests are identical 
to the java implementation and are passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]

2024-11-28 Thread via GitHub


herunkang2018 commented on PR #3656:
URL: https://github.com/apache/flink-cdc/pull/3656#issuecomment-2506898687

   @leonardBang @yuxiqian Would you like to review again since the CI is 
passed? Thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34874][cdc-connector][mongodb] Support initial.snapshotting.pipeline related configs in table api [flink-cdc]

2024-11-28 Thread via GitHub


Jiabao-Sun commented on PR #3707:
URL: https://github.com/apache/flink-cdc/pull/3707#issuecomment-2506910528

   Thanks @herunkang2018 for this contribution.
   Hi @yuxiqian, could you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][cdc-connector][oracle] Fix oracle server time zone option missing in table api. [flink-cdc]

2024-11-28 Thread via GitHub


LiuBodong closed pull request #2985: [fix][cdc-connector][oracle] Fix oracle 
server time zone option missing in table api.
URL: https://github.com/apache/flink-cdc/pull/2985


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36698][pipeline-connector][elasticsearch] Elasticsearch pipeline sink support authentication [flink-cdc]

2024-11-28 Thread via GitHub


beryllw commented on PR #3728:
URL: https://github.com/apache/flink-cdc/pull/3728#issuecomment-2506921020

   @lvyanquan @proletarians  cc, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862771988


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,

Review Comment:
   - 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#temporary-vs-permanent-tables
   - I agree that `tables` instead of `objects` would be easier to digest in 
this context. On the other hand, the temporary objects concept seems to exist 
for multiple resources, like views, and having the same sentence might help 
people recognize this as being the same pattern. I have no strong opinion here 
though.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862772602


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
+ * it will be inaccessible in the current session. To make the permanent 
object available again
+ * one can drop the corresponding temporary object.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTemporaryTable("MyTable", 
TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());
+ * }
+ *
+ * @param path The path under which the table will be registered. See also 
the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param descriptor Template for creating a {@link CatalogTable} instance.
+ * @param ignoreIfExists If a table exists under the given path and this 
flag is set, no *

Review Comment:
   I think it does behave the same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore [flink-cdc]

2024-11-28 Thread via GitHub


github-actions[bot] commented on PR #3569:
URL: https://github.com/apache/flink-cdc/pull/3569#issuecomment-2506867619

   This pull request has been closed because it has not had recent activity. 
You could reopen it if you try to continue your work, and anyone who are 
interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore [flink-cdc]

2024-11-28 Thread via GitHub


github-actions[bot] closed pull request #3569: 
[FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support 
changing column names to lowercase for Hive metastore
URL: https://github.com/apache/flink-cdc/pull/3569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862775007


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
+ * it will be inaccessible in the current session. To make the permanent 
object available again

Review Comment:
   - Same as https://github.com/apache/flink/pull/25704#discussion_r1862771988
   - Good question. I'm also not sure and would need to double-check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


gustavodemorais commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862779370


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -49,55 +50,78 @@ class TableEnvironmentTest {
 @Test
 void testCreateTemporaryTableFromDescriptor() {
 final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
-final String catalog = tEnv.getCurrentCatalog();
-final String database = tEnv.getCurrentDatabase();
-
 final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
-tEnv.createTemporaryTable(
-"T",
-
TableDescriptor.forConnector("fake").schema(schema).option("a", 
"Test").build());
 
-assertThat(
-tEnv.getCatalog(catalog)
-.orElseThrow(AssertionError::new)
-.tableExists(new ObjectPath(database, "T")))
-.isFalse();
+assertTemporaryCreateTableFromDescriptor(tEnv, schema);
+}
 
-final Optional lookupResult =
-tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, 
database, "T"));
-assertThat(lookupResult.isPresent()).isTrue();
+@Test
+void testCreateTemporaryTableIfNotExistsFromDescriptor() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
 
-final CatalogBaseTable catalogTable = 
lookupResult.get().getResolvedTable();
-assertThat(catalogTable instanceof CatalogTable).isTrue();
-assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
-
assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
-assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
+assertTemporaryCreateTableFromDescriptor(tEnv, schema);
+assertThatNoException()
+.isThrownBy(
+() ->
+tEnv.createTemporaryTable(
+"T",
+TableDescriptor.forConnector("fake")
+.schema(schema)
+.option("a", "Test")
+.build(),

Review Comment:
   I've updated the tests so both `assertCreateTableFromDescriptor` and 
`assertTemporaryCreateTableFromDescriptor` helper functions have a n 
gnoreIfExists param and we test the case you asked for 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862341867


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
+ * it will be inaccessible in the current session. To make the permanent 
object available again
+ * one can drop the corresponding temporary object.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTemporaryTable("MyTable", 
TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());
+ * }
+ *
+ * @param path The path under which the table will be registered. See also 
the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param descriptor Template for creating a {@link CatalogTable} instance.
+ * @param ignoreIfExists If a table exists under the given path and this 
flag is set, no *

Review Comment:
   NIT: extra * after no 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33571][table] Override json-path version for Calcite 1.32 to deal with CVE [flink]

2024-11-28 Thread via GitHub


MartijnVisser commented on PR #25602:
URL: https://github.com/apache/flink/pull/25602#issuecomment-2506290502

   It looks like we've pivoted from the original intent of the Jira ticket 
(upgrade to Calcite 1.38). That means that we'll either have to create a new 
Jira ticket for what's done in this PR, or we have to update the existing Jira 
ticket to reflect the changes from this PR and create a new one to deal with 
the upgrade to Calcite 1.38


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36602) Upgrade Calcite version to 1.38.0

2024-11-28 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-36602:
---
Labels:   (was: pull-request-available)

> Upgrade Calcite version to 1.38.0
> -
>
> Key: FLINK-36602
> URL: https://issues.apache.org/jira/browse/FLINK-36602
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 2.0-preview
>Reporter: Thomas Cooper
>Priority: Major
>
> The currently used Calcite version (1.32) has a high severity vulnerability 
> ([CVE-2023-1370|https://nvd.nist.gov/vuln/detail/CVE-2023-1370]). This can be 
> mitigated by upgrading to Calcite 1.37 or higher (which upgrades the 
> vulnerable json-path library). 
> As [1.38 has been 
> released|https://calcite.apache.org/news/2024/10/15/release-1.38.0/] we 
> should probably upgrade to that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862333839


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,

Review Comment:
   I read sentence "Temporary objects can shadow permanent ones"
   - I am not sure why I would want to do this ; is this shadowing capability 
documented?
   - I suggest this sentence use `table `instead of` object`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36800] Upgrade hadoop-aws to 3.3.6 [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25692:
URL: https://github.com/apache/flink/pull/25692#discussion_r1862403736


##
flink-filesystems/pom.xml:
##
@@ -34,7 +34,7 @@ under the License.
pom
 

-   3.3.4
+   3.3.6

Review Comment:
   There are notice files in the metadata in the subprojects that reference 
this number - they should be updated as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36688][table-planner] Sort metadata keys when reusing source [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25681:
URL: https://github.com/apache/flink/pull/25681#discussion_r1862411403


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java:
##
@@ -165,7 +166,7 @@ public List reuseDuplicatedScan(List 
relNodes) {
 }
 
 int[][] allProjectFields = allProjectFieldSet.toArray(new 
int[0][]);
-List allMetaKeys = new ArrayList<>(allMetaKeySet);
+List allMetaKeys = sortMetadataKeys(allMetaKeySet, 
pickTable.tableSource());

Review Comment:
   NIT: method name could be `enforceMetadataKeyOrder` or 
`copyMetadataKeyOrder`; sort implies there is a sort order like ascending. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36756][sql-gateway] Bump up the sql gateway rest version [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25705:
URL: https://github.com/apache/flink/pull/25705#discussion_r1862326643


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java:
##
@@ -51,7 +51,10 @@ public enum SqlGatewayRestAPIVersion
 V2(false, true),
 
 // V3 introduces materialized table related APIs
-V3(true, true);
+V3(false, true),
+
+// V4 supports to deploy script to application cluster
+V4(true, true);

Review Comment:
   I cant see an incompatible change in the API in FLIP-480 that would need a 
new version. When we remove calls - then "The Flink community is preparing to 
remove support for per-job submission." the version will need to change then. 
   
   I am curious whether this version change should be delayed until there is an 
incompatible REST change. Or has the incompatible change already gone in? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862384619


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
+ * it will be inaccessible in the current session. To make the permanent 
object available again
+ * one can drop the corresponding temporary object.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTemporaryTable("MyTable", 
TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());
+ * }
+ *
+ * @param path The path under which the table will be registered. See also 
the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param descriptor Template for creating a {@link CatalogTable} instance.
+ * @param ignoreIfExists If a table exists under the given path and this 
flag is set, no *

Review Comment:
   I am curious, if there is an existing table , temporary table , view and 
materialized table, does this flag act the same in each case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]

2024-11-28 Thread via GitHub


snuyanzin merged PR #24829:
URL: https://github.com/apache/flink/pull/24829


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on PR #24829:
URL: https://github.com/apache/flink/pull/24829#issuecomment-2506518447

   @naferx can you please also create backports for 1.19, 1.20?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-36529) Support greater or equals logic for operator flink version default configs

2024-11-28 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-36529.
--
Fix Version/s: kubernetes-operator-1.11.0
   Resolution: Fixed

merged to main 9bab0286dc152ba5dbf8226913cee98d988a5b66

> Support greater or equals logic for operator flink version default configs
> --
>
> Key: FLINK-36529
> URL: https://issues.apache.org/jira/browse/FLINK-36529
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.11.0
>
>
> The operator currently allows the following syntax for defining flink version 
> specific defaults:
> kubernetes.operator.default-configuration.flink-version.v1_18.key: value
> The problem with this is that usually these defaults should be applied to 
> newer Flink versions as well in many cases, forcing config duplications.
> We should introduce a + syntax for configs applied to a version and above:
> kubernetes.operator.default-configuration.flink-version.v1_18+.key: value
> in this case key:value would be applied to all Flink version greater or equal 
> to 1.18



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862354231


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
+ * it will be inaccessible in the current session. To make the permanent 
object available again

Review Comment:
   for sentences `If a permanent object in a given path exists, it will be 
inaccessible in the current session. To make the permanent object available 
again one can drop the corresponding temporary object.` 
   - can we use `table` or `TableDescriptor ` instead of `object`?
   - is this true if ignoreIfExists is set.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862333839


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,

Review Comment:
   I read sentence "Temporary objects can shadow permanent ones" I know this 
was copied from the original method.
   - I am not sure why I would want to do this ; is this shadowing capability 
documented?
   - I suggest this sentence use `table` or `TableDescriptor ` instead of` 
object`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-11-28 Thread via GitHub


mehdid93 commented on PR #25670:
URL: https://github.com/apache/flink/pull/25670#issuecomment-2506492367

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-11-28 Thread via GitHub


ammu20-dev commented on code in PR #25656:
URL: https://github.com/apache/flink/pull/25656#discussion_r1861721487


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##
@@ -1542,6 +1542,19 @@ void testUsingAddJar() throws Exception {
 "drop function lowerUdf");
 }
 
+@Test

Review Comment:
   Added new test case to verify that the job submission succeeds with add jar 
and checkpointing. Note that this test case will fail in 1.20.x and below 
versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]

2024-11-28 Thread via GitHub


SML0127 commented on code in PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861726630


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java:
##
@@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) 
throws Exception {
 .isEqualTo(expectedStreamRecord);
 }
 
+private void testJsonDataType(UniqueDatabase database, Boolean 
useLegacyJsonFormat)
+throws Exception {
+database.createAndInitialize();
+CloseableIterator iterator =
+env.fromSource(
+getFlinkSourceProvider(
+new String[] {"json_types"},
+database,
+useLegacyJsonFormat)
+.getSource(),
+WatermarkStrategy.noWatermarks(),
+"Event-Source")
+.executeAndCollect();
+
+Object[] expectedSnapshot =
+new Object[] {
+DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+BinaryStringData.fromString("{\"key1\": \"value1\"}"),
+BinaryStringData.fromString("{\"key1\": \"value1\", 
\"key2\": \"value2\"}"),
+BinaryStringData.fromString(
+"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": 
\"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": 
[\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
+1
+};
+
+// skip CreateTableEvent
+List snapshotResults =
+MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
JSON_TYPES))
+.isEqualTo(expectedSnapshot);
+
+try (Connection connection = database.getJdbcConnection();
+Statement statement = connection.createStatement()) {
+statement.execute("UPDATE json_types SET int_c = null WHERE id = 
1;");
+}
+
+Object[] expectedStreamRecord = expectedSnapshot;
+
+if (useLegacyJsonFormat) {
+expectedSnapshot[1] = 
BinaryStringData.fromString("{\"key1\":\"value1\"}");
+expectedSnapshot[2] =
+
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
+expectedSnapshot[3] =
+BinaryStringData.fromString(
+
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]");
+}
+expectedSnapshot[4] = null;

Review Comment:
   I revised it more clearly as shown below.
   
   ```java
   if (useLegacyJsonFormat) {
   // removed whitespace before value and after comma in json 
format string value
   
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
JSON_TYPES))
   .containsExactly(
   DecimalData.fromBigDecimal(new BigDecimal("1"), 
20, 0),
   
BinaryStringData.fromString("{\"key1\":\"value1\"}"),
   BinaryStringData.fromString(
   
"{\"key1\":\"value1\",\"key2\":\"value2\"}"),
   BinaryStringData.fromString(
   
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
   null);
   } else {
   
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
JSON_TYPES))
   .containsExactly(expectedStreamRecord);
   }
   ```   



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java:
##
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicabl

Re: [PR] [FLINK-36749][state/forst] Implement rescaling for ForStStateBackend [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25676:
URL: https://github.com/apache/flink/pull/25676#discussion_r1862415650


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java:
##
@@ -177,8 +182,8 @@ protected  AsyncKeyedStateBackend 
restoreAsyncKeyedBackend(
 new JobID(),
 "test_op",
 keySerializer,
-numberOfKeyGroups,
-new KeyGroupRange(0, numberOfKeyGroups - 1),
+keyGroupRange.getNumberOfKeyGroups(),
+keyGroupRange,

Review Comment:
   why do we need both parameteres - if one can be derived from another?



##
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java:
##
@@ -177,8 +182,8 @@ protected  AsyncKeyedStateBackend 
restoreAsyncKeyedBackend(
 new JobID(),
 "test_op",
 keySerializer,
-numberOfKeyGroups,
-new KeyGroupRange(0, numberOfKeyGroups - 1),
+keyGroupRange.getNumberOfKeyGroups(),
+keyGroupRange,

Review Comment:
   why do we need both parameters - if one can be derived from another?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36451][runtime] Replaces LeaderElection#hasLeadership with LeaderElection#runAsLeader [flink]

2024-11-28 Thread via GitHub


XComp commented on code in PR #25679:
URL: https://github.com/apache/flink/pull/25679#discussion_r1862280134


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##
@@ -1246,6 +1200,62 @@ private void testNonBlockingCall(
 testInstance.close();
 }
 
+/**
+ * This test is used to verify FLINK-36451 where we observed concurrent 
nested locks being
+ * acquired from the {@link LeaderContender} and from the {@link 
DefaultLeaderElectionService}.
+ */
+@Test
+void testNestedDeadlockInLeadershipConfirmation() throws Exception {

Review Comment:
   That's the test that I added to verify the findings. I kept it in a separate 
commit to prove that the test runs into a deadlock before applying the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862333839


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -718,6 +718,36 @@ void createTemporarySystemFunction(
  */
 void createTemporaryTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,

Review Comment:
   I read sentence "Temporary objects can shadow permanent ones"
   - I am not sure why I would want to do this ; is this shadowing capability 
documented?
   - I suggest this sentence use `table` or `TableDescriptor ` instead of` 
object`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36705] Add initial ProcessTableFunction class and annotations [flink]

2024-11-28 Thread via GitHub


twalthr commented on PR #25707:
URL: https://github.com/apache/flink/pull/25707#issuecomment-2506316008

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36702][sql-gateway] Introduce ScripExecutor to run the script [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25700:
URL: https://github.com/apache/flink/pull/25700#discussion_r1862393806


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/application/ScriptExecutor.java:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.application;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserEOFException;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Parser to split the statements. */

Review Comment:
   This javadoc does not seem to correspond to the class name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1862391147


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -49,55 +50,78 @@ class TableEnvironmentTest {
 @Test
 void testCreateTemporaryTableFromDescriptor() {
 final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
-final String catalog = tEnv.getCurrentCatalog();
-final String database = tEnv.getCurrentDatabase();
-
 final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
-tEnv.createTemporaryTable(
-"T",
-
TableDescriptor.forConnector("fake").schema(schema).option("a", 
"Test").build());
 
-assertThat(
-tEnv.getCatalog(catalog)
-.orElseThrow(AssertionError::new)
-.tableExists(new ObjectPath(database, "T")))
-.isFalse();
+assertTemporaryCreateTableFromDescriptor(tEnv, schema);
+}
 
-final Optional lookupResult =
-tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, 
database, "T"));
-assertThat(lookupResult.isPresent()).isTrue();
+@Test
+void testCreateTemporaryTableIfNotExistsFromDescriptor() {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
 
-final CatalogBaseTable catalogTable = 
lookupResult.get().getResolvedTable();
-assertThat(catalogTable instanceof CatalogTable).isTrue();
-assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
-
assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
-assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
+assertTemporaryCreateTableFromDescriptor(tEnv, schema);
+assertThatNoException()
+.isThrownBy(
+() ->
+tEnv.createTemporaryTable(
+"T",
+TableDescriptor.forConnector("fake")
+.schema(schema)
+.option("a", "Test")
+.build(),

Review Comment:
   I assume table T existing in this case - so no exception is thrown. I 
suggest adding a test to ensure that creating a temp table with the flag set 
means that the table will be created when there is no existing table. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]

2024-11-28 Thread via GitHub


snuyanzin commented on PR #24829:
URL: https://github.com/apache/flink/pull/24829#issuecomment-2506443102

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-11-28 Thread via GitHub


davidradl commented on code in PR #25656:
URL: https://github.com/apache/flink/pull/25656#discussion_r1862450447


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##
@@ -1542,6 +1542,19 @@ void testUsingAddJar() throws Exception {
 "drop function lowerUdf");
 }
 
+@Test
+void testUsingAddJarWithCheckpointing() throws Exception {
+env().enableCheckpointing(100);

Review Comment:
   NIT: I guess we could check that the class loader on the Current thread  is 
the same before and after the  env.executeSql. Using the  
Thread.currentThread().getContextClassLoader();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >