Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510727272 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java: ## @@ -1071,6 +1077,110 @@ protected SingleOutputStreamOperator aggregate(AggregationFunction aggrega return reduce(aggregate).name("Keyed Aggregation"); } +@Override +public PartitionWindowedStream fullWindowPartition() { +throw new UnsupportedOperationException( +"KeyedStream doesn't support processing non-keyed partitions."); Review Comment: After offline discussion, we all agree that a better version is "KeyedStream doesn't support full window on partitions, if you want operations over each-key, use methods in KeyedStream directly." In DataStream V1, the KeyedStream doesn't have the concept "all records of same key is a partition". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510728074 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator Review Comment: I've refactored the implementations of SortPartition API on DataStream and KeyedStream. PTAL.😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510732371 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; Review Comment: 1.I think adding a default value is an option. This value is only a weight. This implies that if other operators do not use Managed Memory, operators of the SortPartition API can utilize all of the Managed Memory, and the size of the Managed Memory can be adjusted through configuration. By this means, the budget for Managed Memory can also be indirectly adjusted. 2.Adding a new configuration for a single API may introduce unnecessary complexity. We can leave this issue here as a pending question. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510732710 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; + +/** The type information of input records. */ +protected final TypeInformation inputType; + +/** The type information used to sort the records. */ +protected final TypeInformation sortType; + +/** The selector to create the sort key for records. */ +protected final KeySelector sortFieldSelector; + +/** The order to sort records. */ +private final Order sortOrder; + +/** The string field to indicate the sort key for records with tuple or pojo type. */ +private final String stringSortField; + +/** The int field to indicate the sort key for records with tuple type. */ +private final int positionSortField; + +/** The sorter to sort records. */ +protected PushSorter sorter = null; + +public AbstractSortPartitionOperator( +TypeInformation inputType, int positionSortField, Order sortOrder) { +this.inputType = inputType; +ensureFieldSortable(positionSortField); +this.sortType = (TypeInformation) inputType; +this.positionSortField = positionSortField; +this.stringSortField = null; +this.sortFieldSelector = null; +this.sortOrder = sortOrder; +} + +public AbstractSortPartitionOperator( +TypeInformation inputType, String stringSortField, Order sortOrder) { +this.inputType = inputType; +ensur
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510733360 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; + +/** The type information of input records. */ +protected final TypeInformation inputType; + +/** The type information used to sort the records. */ +protected final TypeInformation sortType; + +/** The selector to create the sort key for records. */ +protected final KeySelector sortFieldSelector; + +/** The order to sort records. */ +private final Order sortOrder; + +/** The string field to indicate the sort key for records with tuple or pojo type. */ +private final String stringSortField; + +/** The int field to indicate the sort key for records with tuple type. */ +private final int positionSortField; + +/** The sorter to sort records. */ +protected PushSorter sorter = null; + +public AbstractSortPartitionOperator( +TypeInformation inputType, int positionSortField, Order sortOrder) { +this.inputType = inputType; +ensureFieldSortable(positionSortField); +this.sortType = (TypeInformation) inputType; +this.positionSortField = positionSortField; +this.stringSortField = null; +this.sortFieldSelector = null; +this.sortOrder = sortOrder; +} + +public AbstractSortPartitionOperator( +TypeInformation inputType, String stringSortField, Order sortOrder) { +this.inputType = inputType; +ensur
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510733720 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; + +/** The type information of input records. */ +protected final TypeInformation inputType; + +/** The type information used to sort the records. */ +protected final TypeInformation sortType; Review Comment: I've refactored the implementations of SortPartition API on DataStream and KeyedStream. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510734111 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/NonKeyedSortPartitionOperator.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; + +/** + * The {@link NonKeyedSortPartitionOperator} sorts records of a partition on non-keyed data stream. + * It ensures that all records within the same task are sorted in a user-defined order. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to a {@link KeySelector}, both the + * selected key and the record should be written to {@link ExternalSorter}. In this case, the + * type used to sort the records will be a tuple containing both the selected key and record. + */ +@SuppressWarnings("unchecked") +@Internal +public class NonKeyedSortPartitionOperator Review Comment: Good point. I've added UTs for all newly introduced operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on PR #24398: URL: https://github.com/apache/flink/pull/24398#issuecomment-1975951178 > Thanks @WencongLiu for bring this up. This is a valuable feature! > > I I've only reviewed two of the commits so far and will look at the rest later. Thanks for the careful review. I've updated some changes. PTAL😄. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic. [flink]
BIOINSu opened a new pull request, #24431: URL: https://github.com/apache/flink/pull/24431 ## What is the purpose of the change This pull request removes redundant transformation verification logic and use `ExplainDetail.JSON_EXECUTION_PLAN` toverify the parallelism instead. ## Brief change log - Remove `verifyTransformation` method and corresponding logic. - Use `util.verifyExplain(query, ExplainDetail.JSON_EXECUTION_PLAN)` to verify the parallelism instead. ## Verifying this change This change added tests and can be verified through `TableScanTest.scala#testSetParallelismForSource`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic. [flink]
flinkbot commented on PR #24431: URL: https://github.com/apache/flink/pull/24431#issuecomment-1975961845 ## CI report: * 127a4cc3bdd4818a23e546646c4750605c8d38de UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
luoyuxia merged PR #24419: URL: https://github.com/apache/flink/pull/24419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34517) environment configs ignored when calling procedure operation
[ https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823069#comment-17823069 ] luoyuxia commented on FLINK-34517: -- 1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e master: todo > environment configs ignored when calling procedure operation > > > Key: FLINK-34517 > URL: https://issues.apache.org/jira/browse/FLINK-34517 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: JustinLee >Assignee: JustinLee >Priority: Major > Labels: pull-request-available > > when calling procedure operation in Flink SQL, the ProcedureContext only > contains the underlying application-specific config , not > environment-specific config. > to be more specific, in a Flink sql app of the same > StreamExecutionEnvironment which has a config1. when executing a sql query, > config1 works, while calling a sql procedure, config1 doesn't work, which > apparently is not an expected behavior. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34517) environment configs ignored when calling procedure operation
[ https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823069#comment-17823069 ] luoyuxia edited comment on FLINK-34517 at 3/4/24 8:20 AM: -- 1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e 1.19: todo master: todo was (Author: luoyuxia): 1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e master: todo > environment configs ignored when calling procedure operation > > > Key: FLINK-34517 > URL: https://issues.apache.org/jira/browse/FLINK-34517 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: JustinLee >Assignee: JustinLee >Priority: Major > Labels: pull-request-available > > when calling procedure operation in Flink SQL, the ProcedureContext only > contains the underlying application-specific config , not > environment-specific config. > to be more specific, in a Flink sql app of the same > StreamExecutionEnvironment which has a config1. when executing a sql query, > config1 works, while calling a sql procedure, config1 doesn't work, which > apparently is not an expected behavior. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] feat: autoscaling decision parallelism improvement [FLINK-34563] [flink-kubernetes-operator]
mxm commented on code in PR #787: URL: https://github.com/apache/flink-kubernetes-operator/pull/787#discussion_r1509934269 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -129,6 +131,11 @@ public boolean scaleResource( scalingSummaries, autoScalerEventHandler); +// Increase the biggest vertex's parallelism to the current taskmanager count's +// max-parallelism +updateBiggestVertexParallelism( Review Comment: This method needs to be called earlier because we already sent out the scaling event with the overrides. Having the overrides changed at this point would be a surprise to users. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -153,6 +160,52 @@ public boolean scaleResource( return true; } +private void updateBiggestVertexParallelism( +Context context, +Map> evaluatedMetrics, +Map scalingSummaries) { +int taskSlotsPerTm = context.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS); + +Map> parallelismVertexMap = new HashMap<>(); +int maxParallelism = 0; + +for (Map.Entry entry : scalingSummaries.entrySet()) { +JobVertexID vertex = entry.getKey(); +ScalingSummary summary = entry.getValue(); +int newParallelism = summary.getNewParallelism(); + +// Update maxParallelism if a new maximum is found +if (newParallelism > maxParallelism) { +maxParallelism = newParallelism; +} + +// Map newParallelism to JobVertexID +parallelismVertexMap.computeIfAbsent(newParallelism, k -> new HashSet<>()).add(vertex); +} + +// After the loop, retrieve the JobVertexIDs with the maxParallelism value +Set verticesWithMaxParallelism = +parallelismVertexMap.getOrDefault(maxParallelism, new HashSet<>()); + +// Compute the maximum parallelism for a given taskSlotsPerTm +var time = maxParallelism / taskSlotsPerTm; +if (maxParallelism > time * taskSlotsPerTm) { +maxParallelism = (time + 1) * taskSlotsPerTm; +} + +final var finalMaxNewParallelism = maxParallelism; + +scalingSummaries.forEach( +(vertexID, summary) -> { +if (verticesWithMaxParallelism.contains(vertexID)) { +// Here, you update the newParallelism for each matching JobVertexID +summary.setNewParallelism( +finalMaxNewParallelism); // Assuming setNewParallelism updates the +// value Review Comment: ```suggestion ``` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -153,6 +160,52 @@ public boolean scaleResource( return true; } +private void updateBiggestVertexParallelism( +Context context, +Map> evaluatedMetrics, +Map scalingSummaries) { +int taskSlotsPerTm = context.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS); + +Map> parallelismVertexMap = new HashMap<>(); +int maxParallelism = 0; + +for (Map.Entry entry : scalingSummaries.entrySet()) { +JobVertexID vertex = entry.getKey(); +ScalingSummary summary = entry.getValue(); +int newParallelism = summary.getNewParallelism(); + +// Update maxParallelism if a new maximum is found +if (newParallelism > maxParallelism) { +maxParallelism = newParallelism; +} + +// Map newParallelism to JobVertexID +parallelismVertexMap.computeIfAbsent(newParallelism, k -> new HashSet<>()).add(vertex); +} + +// After the loop, retrieve the JobVertexIDs with the maxParallelism value +Set verticesWithMaxParallelism = +parallelismVertexMap.getOrDefault(maxParallelism, new HashSet<>()); + +// Compute the maximum parallelism for a given taskSlotsPerTm +var time = maxParallelism / taskSlotsPerTm; +if (maxParallelism > time * taskSlotsPerTm) { +maxParallelism = (time + 1) * taskSlotsPerTm; +} Review Comment: This assumes that slot sharing is enabled and there is a single slot sharing group. If slot sharing is (partially) disabled, the computation logic is different. I think we only want to perform this type of adjustment when `num_task_slots_used == maxParallelism`. We have already added `TASKS_SLOTS_USED` metric to the global metrics which you can pass into this method. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -129,6 +131,11 @@ public boolean scaleResource(
Re: [PR] [Draft][bugfix] Move the deserialization of shuffleDescriptor to a separate … [flink]
zhuzhurk commented on PR #24115: URL: https://github.com/apache/flink/pull/24115#issuecomment-1976014749 Hi @caodizhou , is there any progress? Should we re-enable the excluded tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Improve FlinkDeployment spec overview doc [flink-kubernetes-operator]
mxm commented on code in PR #788: URL: https://github.com/apache/flink-kubernetes-operator/pull/788#discussion_r1510789812 ## docs/content/docs/custom-resource/overview.md: ## @@ -85,7 +85,7 @@ The spec contains all the information the operator need to deploy and manage you Most deployments will define at least the following fields: - `image` : Docker used to run Flink job and task manager processes - - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16` ...) + - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16`, `v1_17`, `v1_18`) Review Comment: ```suggestion - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16`, `v1_17`, `v1_18`, ...) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]
Zakelly commented on code in PR #24418: URL: https://github.com/apache/flink/pull/24418#discussion_r1510799597 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java: ## @@ -41,12 +41,13 @@ public class WithinCheckpointFileMergingSnapshotManager extends FileMergingSnaps * this map consist of checkpoint id, subtask key, and checkpoint scope, which collectively * determine the physical file to be reused. */ -private final Map, PhysicalFile> +private final Map, PhysicalFilePool> Review Comment: I'm a little bit lost here. Does a pool should contain and manage a set of physical files that may resue multiple times? Considering the files may share across SubtaskKeys (In exclusive scope), I suggest we leave ` Map` here, and let PhysicalFilePool handle the file sharing between subtasks in Exclusive scope and exclusive between Shared Scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34400) Kafka sources with watermark alignment sporadically stop consuming
[ https://issues.apache.org/jira/browse/FLINK-34400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823102#comment-17823102 ] Alexis Sarda-Espinosa commented on FLINK-34400: --- Hi [~fanrui], sorry for the late reply, it slipped my mind. I did try both approaches back then (with and without idleness), my point was that disabling idleness was behaving strangely: I also expect the quick topic to be blocked by the slow (empty) topic, but in my experiments this didn't happen consistenly, so consumption was unblocked sometimes for unknown reasons. In any case, I imagine this will be an unsupported configuration scenario with a somewhat undefined behavior. > Kafka sources with watermark alignment sporadically stop consuming > -- > > Key: FLINK-34400 > URL: https://issues.apache.org/jira/browse/FLINK-34400 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.1 >Reporter: Alexis Sarda-Espinosa >Priority: Major > Attachments: alignment_lags.png, logs.txt > > > I have 2 Kafka sources that read from different topics. I have assigned them > to the same watermark alignment group, and I have _not_ enabled idleness > explicitly in their watermark strategies. One topic remains pretty much empty > most of the time, while the other receives a few events per second all the > time. Parallelism of the active source is 2, for the other one it's 1, and > checkpoints are once every minute. > This works correctly for some time (10 - 15 minutes in my case) but then 1 of > the active sources stops consuming, which causes lag to increase. Weirdly, > after another 15 minutes or so, all the backlog is consumed at once, and then > everything stops again. > I'm attaching some logs from the Task Manager where the issue appears. You > will notice that the Kafka network client reports disconnections (a long time > after the deserializer stopped reporting that events were being consumed), > I'm not sure if this is related. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34400) Kafka sources with watermark alignment sporadically stop consuming
[ https://issues.apache.org/jira/browse/FLINK-34400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823102#comment-17823102 ] Alexis Sarda-Espinosa edited comment on FLINK-34400 at 3/4/24 9:32 AM: --- Hi [~fanrui], sorry for the late reply, it slipped my mind. I did try both approaches back then (with and without idleness), my point was that disabling idleness was behaving strangely: I also expect the quick topic to be blocked by the slow (empty) topic, but in my experiments this didn't happen consistenly, so consumption was unblocked sometimes for unknown reasons, and only one of the streams was blocked, not all of them. In any case, I imagine this will be an unsupported configuration scenario with a somewhat undefined behavior. was (Author: asardaes): Hi [~fanrui], sorry for the late reply, it slipped my mind. I did try both approaches back then (with and without idleness), my point was that disabling idleness was behaving strangely: I also expect the quick topic to be blocked by the slow (empty) topic, but in my experiments this didn't happen consistenly, so consumption was unblocked sometimes for unknown reasons. In any case, I imagine this will be an unsupported configuration scenario with a somewhat undefined behavior. > Kafka sources with watermark alignment sporadically stop consuming > -- > > Key: FLINK-34400 > URL: https://issues.apache.org/jira/browse/FLINK-34400 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.1 >Reporter: Alexis Sarda-Espinosa >Priority: Major > Attachments: alignment_lags.png, logs.txt > > > I have 2 Kafka sources that read from different topics. I have assigned them > to the same watermark alignment group, and I have _not_ enabled idleness > explicitly in their watermark strategies. One topic remains pretty much empty > most of the time, while the other receives a few events per second all the > time. Parallelism of the active source is 2, for the other one it's 1, and > checkpoints are once every minute. > This works correctly for some time (10 - 15 minutes in my case) but then 1 of > the active sources stops consuming, which causes lag to increase. Weirdly, > after another 15 minutes or so, all the backlog is consumed at once, and then > everything stops again. > I'm attaching some logs from the Task Manager where the issue appears. You > will notice that the Kafka network client reports disconnections (a long time > after the deserializer stopped reporting that events were being consumed), > I'm not sure if this is related. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34429] [flink-kubernetes] Adding K8S Annotations to Internal Service [flink]
dannycranmer commented on PR #24348: URL: https://github.com/apache/flink/pull/24348#issuecomment-1976154596 @barakbn please rebase and squash commits and I will merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34352][doc] Improve the documentation of allowNonRestoredState [flink]
masteryhx commented on PR #24396: URL: https://github.com/apache/flink/pull/24396#issuecomment-1976157579 > Hi @masteryhx , I suggest minor adjustments in Chinese. PTAL, thanks! Thanks for the suggestions, I have addressed and rebased commits. PTAL again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-25544][streaming][JUnit5 Migration] The util,graph,experimental,environment package of module flink-stream-java [flink]
Jiabao-Sun opened a new pull request, #24432: URL: https://github.com/apache/flink/pull/24432 ## What is the purpose of the change [FLINK-25544][streaming][JUnit5 Migration] The util,graph,experimental,environment package of module flink-stream-java ## Brief change log [JUnit5 Migration] The util,graph,experimental,environment package of module flink-stream-java ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25544][streaming][JUnit5 Migration] The util,graph,experimental,environment package of module flink-stream-java [flink]
flinkbot commented on PR #24432: URL: https://github.com/apache/flink/pull/24432#issuecomment-1976310900 ## CI report: * 267e6e0287c75dd99ddd81950d5c6fa285ed1e0a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1510982540 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ## @@ -1516,4 +1517,141 @@ private Stream arraySortTestCases() { }, DataTypes.ARRAY(DataTypes.DATE(; } + +private Stream arrayExceptTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_EXCEPT) +.onFieldsWithData( +new Integer[] {1, 2, 2}, +null, +new Row[] { +Row.of(true, LocalDate.of(2022, 4, 20)), +Row.of(true, LocalDate.of(1990, 10, 14)), +null +}, +new Integer[] {null, null, 1}, +new Integer[][] { +new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} +}, +new Map[] { +CollectionUtil.map(entry(1, "a"), entry(2, "b")), +CollectionUtil.map(entry(3, "c"), entry(4, "d")), +null +}) +.andDataTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY( +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), +DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), +DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING( +// ARRAY +.testResult( +$("f0").arrayExcept(new Integer[] {1, null, 4}), +"ARRAY_EXCEPT(f0, ARRAY[1, NULL, 4])", +new Integer[] {2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayExcept(new Integer[] {1}), +"ARRAY_EXCEPT(f0, ARRAY[1])", +new Integer[] {2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayExcept(new Integer[] {42}), +"ARRAY_EXCEPT(f0, ARRAY[42])", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayTwo is NULL +.testResult( +$("f0").arrayExcept( +lit(null, DataTypes.ARRAY(DataTypes.INT())) + .cast(DataTypes.ARRAY(DataTypes.INT(, +"ARRAY_EXCEPT(f0, CAST(NULL AS ARRAY))", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayTwo contains null elements +.testResult( +$("f0").arrayExcept(new Integer[] {null, 2}), +"ARRAY_EXCEPT(f0, ARRAY[null, 2])", +new Integer[] {1, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayOne is NULL +.testResult( +$("f1").arrayExcept(new Integer[] {1, 2, 3}), +"ARRAY_EXCEPT(f1, ARRAY[1,2,3])", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayOne contains null elements +.testResult( +$("f3").arrayExcept(new Integer[] {null, 42}), +"ARRAY_EXCEPT(f3, ARRAY[null, 42])", +new Integer[] {null, 1}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY> +.testResult( +$("f2").arrayExcept( +new Row[] { +Row.of(tr
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1510988286 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +List list = new ArrayList<>(); +Map map = new HashMap<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +map.merge(null, 1, (k, v) -> v + 1); +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +map.merge(objectContainer, 1, (k, v) -> v + 1); +} +} +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +final ObjectContainer objectContainer = +element == null ? null : new ObjectContainer(element); +if (map.containsKey(objectContainer)) { +if (map.get(objectContainer) > 1) { +map.put(objectContainer, map.get(objectContainer) - 1); +} else { +map.remove(objectContainer); +} Review Comment: Let's be consistent. If you do here: ``` final ObjectContainer objectContainer = element == null ? null : new ObjectContainer(element); ``` do it also few lines above: ``` if (element == null) { map.merge(null, 1, (k, v) -> v + 1); } else { ObjectContainer objectContainer = new ObjectContainer(element); map.merge(objectContainer, 1, (k, v) -> v + 1); } ``` Don't mix and match different approaches, because it only confuses the reader. Code is read more often than written and we should optimise for reading. Doing the same thing in two different ways fo
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1510988286 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +List list = new ArrayList<>(); +Map map = new HashMap<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +map.merge(null, 1, (k, v) -> v + 1); +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +map.merge(objectContainer, 1, (k, v) -> v + 1); +} +} +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +final ObjectContainer objectContainer = +element == null ? null : new ObjectContainer(element); +if (map.containsKey(objectContainer)) { +if (map.get(objectContainer) > 1) { +map.put(objectContainer, map.get(objectContainer) - 1); +} else { +map.remove(objectContainer); +} Review Comment: Let's be consistent. If you do here: ``` final ObjectContainer objectContainer = element == null ? null : new ObjectContainer(element); ``` do it also few lines above: ``` final Object element = elementGetter.getElementOrNull(arrayTwo, pos); if (element == null) { map.merge(null, 1, (k, v) -> v + 1); } else { ObjectContainer objectContainer = new ObjectContainer(element); map.merge(objectContainer, 1, (k, v) -> v + 1); } ``` Don't mix and match different approaches, because it only confuses the reader. Code is read more often than
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1510994873 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/EqualityAndHashcodeProvider.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; + +/** This class is used for scalar function. */ +public class EqualityAndHashcodeProvider implements Closeable, Serializable { +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private static MethodHandle hashcodeHandle; + +private static MethodHandle equalityHandle; + +public EqualityAndHashcodeProvider( +SpecializedFunction.SpecializedContext context, DataType dataType) { +hashcodeEvaluator = +context.createEvaluator( + unresolvedCall(BuiltInFunctionDefinitions.INTERNAL_HASHCODE, $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); + +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public static boolean equals(Object o1, Object o2) { +try { +return (Boolean) equalityHandle.invoke(o1, o2); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} + +public static int hashCode(Object o) { +try { +return (int) hashcodeHandle.invoke(o); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} Review Comment: This must not be static. You pass a `dataType` in the ctor, which is not a static ctx. Both `equals` and `hashCode` depend on the non-static `dataType`. Having `equals` and `hashCode` is asking for troubles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1510994873 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/EqualityAndHashcodeProvider.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; + +/** This class is used for scalar function. */ +public class EqualityAndHashcodeProvider implements Closeable, Serializable { +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private static MethodHandle hashcodeHandle; + +private static MethodHandle equalityHandle; + +public EqualityAndHashcodeProvider( +SpecializedFunction.SpecializedContext context, DataType dataType) { +hashcodeEvaluator = +context.createEvaluator( + unresolvedCall(BuiltInFunctionDefinitions.INTERNAL_HASHCODE, $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); + +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public static boolean equals(Object o1, Object o2) { +try { +return (Boolean) equalityHandle.invoke(o1, o2); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} + +public static int hashCode(Object o) { +try { +return (int) hashcodeHandle.invoke(o); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} Review Comment: This must not be static. You pass a `dataType` in the ctor, which is not a static ctx. Both `equals` and `hashCode` depend on the non-static `dataType`. Having `equals` and `hashCode` is asking for troubles. Just to give you an example what can go wrong. We can have `SELECT ARRAY_EXCEPT(, ) as a0, ARRAY_EXCEPT(, ) as a1`. Since you made `hashcodeHandle` and `equalityHandle` static there can only ever be a single instance of it. Therefore it will happen that you either compare `ints` with `string` equalityHandle or `strings` with `int` one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34334][state] Add sub-task level RocksDB file count metrics [flink]
masteryhx commented on code in PR #24322: URL: https://github.com/apache/flink/pull/24322#discussion_r1511001449 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java: ## @@ -114,7 +114,14 @@ private void setProperty(RocksDBNativePropertyMetricView metricView) { try { synchronized (lock) { if (rocksDB != null) { -long value = rocksDB.getLongProperty(metricView.handle, metricView.property); +long value = +metricView.property.contains( + RocksDBProperty.NumFilesAtLevel.getRocksDBProperty()) +? Long.parseLong( +rocksDB.getProperty( +metricView.handle, metricView.property)) +: rocksDB.getLongProperty( +metricView.handle, metricView.property); Review Comment: Hi, all. Sorry for the late reply. RocksDB expose different methods for different properties, so it's not a special case. I'd suggest to handle this logic in `RocksDBProperty`: Define type of different propety and extract a common method in `RocksDBProperty` to decide which methods to call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511000467 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +/** This class is used for scalar function. */ Review Comment: Please add more context what it is used for. That it is used for comparing Objects using code generated `hashCode` and `equals` instead of using the `Object#equals`/`Object#hashcode` versions. ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/EqualityAndHashcodeProvider.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; + +/** This class is used for scalar function. */ Review Comment: Please add more context, e.g. that it is used for sharing the initialization context between scalar functions that need code generated hashcode and equals methods. ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +/** This class is used for scalar function. */ +public class ObjectContainer { Review Comment: Add proper annotations `@Internal` in this case, please do the same of other classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31530] support oracle catalog [flink-connector-jdbc]
whhe commented on PR #43: URL: https://github.com/apache/flink-connector-jdbc/pull/43#issuecomment-1976373645 Any plan for this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34572) JDBC: Support OceanBase catalog
He Wang created FLINK-34572: --- Summary: JDBC: Support OceanBase catalog Key: FLINK-34572 URL: https://issues.apache.org/jira/browse/FLINK-34572 Project: Flink Issue Type: New Feature Components: Connectors / JDBC Reporter: He Wang As the OceanBase dialect has been introduced, we can support the OceanBase catalog in jdbc connector now. - The catalog of OceanBase MySQL mode should be same with MySQL catalog except 'builtinDatabases'. - In OceanBase Oracle mode, there is no 'database' but 'schema', which is corresponded to the user name. As there is no Oracle catalog for now, I'm not sure how should we use 'database' and 'schema' in the catalog. Personally, I prefer to use 'schema' as 'database'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-12590][docs] Replace HTTP links [flink]
zentol closed pull request #8519: [FLINK-12590][docs] Replace HTTP links URL: https://github.com/apache/flink/pull/8519 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-7958][metrics] Reporters provide default delimiter [flink]
zentol closed pull request #9867: [FLINK-7958][metrics] Reporters provide default delimiter URL: https://github.com/apache/flink/pull/9867 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments [flink]
zentol closed pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments URL: https://github.com/apache/flink/pull/13190 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27467][cli][tests] Drop CliFrontendTestBase [flink]
zentol closed pull request #19620: [FLINK-27467][cli][tests] Drop CliFrontendTestBase URL: https://github.com/apache/flink/pull/19620 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-29399][tests] Wait until job was cancelled [flink]
zentol closed pull request #20952: [FLINK-29399][tests] Wait until job was cancelled URL: https://github.com/apache/flink/pull/20952 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29399) TableITCase is unstable
[ https://issues.apache.org/jira/browse/FLINK-29399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29399: --- Labels: pull-request-available (was: ) > TableITCase is unstable > --- > > Key: FLINK-29399 > URL: https://issues.apache.org/jira/browse/FLINK-29399 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.16.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > > {code:java} > val it = tableResult.collect() > it.close() > val jobStatus = > try { > Some(tableResult.getJobClient.get().getJobStatus.get()) > } catch { > // ignore the exception, > // because the MiniCluster maybe already been shut down when getting > job status > case _: Throwable => None > } > if (jobStatus.isDefined) { > assertNotEquals(jobStatus.get, JobStatus.RUNNING) > } > {code} > There's no guarantee that the cancellation already went through. The test > should periodically poll the job status until another state is reached. > Or even better, use the new collect API, call execute in a separate thread, > close the iterator and wait for the thread to terminate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-29866][build] Split flink-dist [flink]
zentol closed pull request #21228: [FLINK-29866][build] Split flink-dist URL: https://github.com/apache/flink/pull/21228 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][ci] Sync CI setup & Reuse workflow [flink-connector-aws]
zentol closed pull request #38: [hotfix][ci] Sync CI setup & Reuse workflow URL: https://github.com/apache/flink-connector-aws/pull/38 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32179][tests] Harden FileUtils#findFlinkDist against custom repo names [flink]
zentol closed pull request #22653: [FLINK-32179][tests] Harden FileUtils#findFlinkDist against custom repo names URL: https://github.com/apache/flink/pull/22653 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32179) Handle more repo names for automatic dist discovery
[ https://issues.apache.org/jira/browse/FLINK-32179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32179: --- Labels: pull-request-available (was: ) > Handle more repo names for automatic dist discovery > --- > > Key: FLINK-32179 > URL: https://issues.apache.org/jira/browse/FLINK-32179 > Project: Flink > Issue Type: Technical Debt > Components: Test Infrastructure >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The e2e tests have a routine to auto-detect the distribution that they need > to actually run Flink. When Flink is checked out in a directory not starting > with "flink" the auto-discovery doesn't find it. > We can improve this slightly by adjusting the iteration condition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-30455 [core] Excluding java.lang.String and primitive types from closure cleaning [flink]
zentol closed pull request #21532: FLINK-30455 [core] Excluding java.lang.String and primitive types from closure cleaning URL: https://github.com/apache/flink/pull/21532 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-11913] Shading cassandra driver dependencies in cassandra conector [flink]
flinkbot commented on PR #7980: URL: https://github.com/apache/flink/pull/7980#issuecomment-1976427637 ## CI report: * a523bdc12f9f0f00c60d1940927454bd497859a5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup [flink]
flinkbot commented on PR #8545: URL: https://github.com/apache/flink/pull/8545#issuecomment-1976428068 ## CI report: * dcf2c5bd6d1c3ebe4b349c2b479a66f8c225f7ce UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' [flink]
masteryhx commented on code in PR #24401: URL: https://github.com/apache/flink/pull/24401#discussion_r1511051492 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -76,6 +76,17 @@ public class CheckpointingOptions { * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. * * Recognized shortcut names are 'jobmanager' and 'filesystem'. + * + * {@link #CHECKPOINT_STORAGE} and {@link #CHECKPOINTS_DIRECTORY} are usually combined to + * configure the checkpoint location. By default, checkpoint data and meta data will be stored Review Comment: 'checkpoint data and meta data' seems a bit confused. How about 'meta data and actual program state' just like comments for `RETAIN_ON_CANCELLATION` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32075][FLIP-306][Checkpoint] Delete merged files on checkpoint abort or subsumption [flink]
masteryhx closed pull request #24181: [FLINK-32075][FLIP-306][Checkpoint] Delete merged files on checkpoint abort or subsumption URL: https://github.com/apache/flink/pull/24181 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32075) Delete merged files on checkpoint abort or subsumption
[ https://issues.apache.org/jira/browse/FLINK-32075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-32075. -- Fix Version/s: 1.20.0 (was: 1.19.0) Resolution: Fixed merged cd9a9f76 into master. > Delete merged files on checkpoint abort or subsumption > -- > > Key: FLINK-32075 > URL: https://issues.apache.org/jira/browse/FLINK-32075 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34536) Support reading long value as Timestamp column in JSON format
[ https://issues.apache.org/jira/browse/FLINK-34536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823143#comment-17823143 ] yisha zhou commented on FLINK-34536: [~libenchao] Thanks for your insight, it helps a lot! After reading FLIP-162, I think explicit calling of `to_timestamp_ltz` maybe more reasonable and conforming to design. Otherwise we should expose configs for users to declare the details about the conversion, e.g. precision. It seems to be too complex. > Support reading long value as Timestamp column in JSON format > - > > Key: FLINK-34536 > URL: https://issues.apache.org/jira/browse/FLINK-34536 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.19.0 >Reporter: yisha zhou >Priority: Major > > In many scenarios, timestamp data is stored as Long value and expected to be > operated as Timestamp value. It's not user-friendly to use an UDF to convert > the data before operating it. > Meanwhile, in Avro format, it seems it can receive several types of input and > convert it into TimestampData. Hope the same ability can be introduced into > JSON format. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33557] Externalize Cassandra Python connector code [flink-connector-cassandra]
ferenc-csaky commented on PR #26: URL: https://github.com/apache/flink-connector-cassandra/pull/26#issuecomment-1976453914 @echauchot Sorry to ping you out of the blue, I saw that you were an active contributor to the Cassandra connector. If you have some time, would you mind reviewing this PR? Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33210] Introduce lineage graph relevant interfaces [flink]
FangYongs commented on PR #23626: URL: https://github.com/apache/flink/pull/23626#issuecomment-1976455987 Thanks @JingGe , please help to review this PR again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33210] Introduce lineage graph relevant interfaces [flink]
FangYongs commented on PR #23626: URL: https://github.com/apache/flink/pull/23626#issuecomment-1976458731 @HuangZhenQiu @mobuchowski I have add dataset and facet interfaces in lineage vertex, please help to review this PR cc @X-czh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Improve FlinkDeployment spec overview doc [flink-kubernetes-operator]
caicancai commented on code in PR #788: URL: https://github.com/apache/flink-kubernetes-operator/pull/788#discussion_r1511081946 ## docs/content/docs/custom-resource/overview.md: ## @@ -85,7 +85,7 @@ The spec contains all the information the operator need to deploy and manage you Most deployments will define at least the following fields: - `image` : Docker used to run Flink job and task manager processes - - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16` ...) + - `flinkVersion` : Flink version used in the image (`v1_15`, `v1_16`, `v1_17`, `v1_18`) Review Comment: Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33210] Introduce lineage graph relevant interfaces [flink]
X-czh commented on code in PR #23626: URL: https://github.com/apache/flink/pull/23626#discussion_r1511095895 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraphTest.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.api.connector.source.Boundedness; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Testing for lineage graph. */ +class DefaultLineageGraphTest { +@Test +void testLineageGraph() { +SourceLineageVertex source1 = new TestingSourceLineageVertex("source1"); +SourceLineageVertex source2 = new TestingSourceLineageVertex("source2"); +SourceLineageVertex source3 = new TestingSourceLineageVertex("source3"); +LineageVertex sink1 = new TestingLineageVertex("sink1"); +LineageVertex sink2 = new TestingLineageVertex("sink2"); +LineageGraph lineageGraph = +DefaultLineageGraph.builder() +.addLineageEdge(new TestingLineageEdge(source1, sink1)) +.addLineageEdges( +new TestingLineageEdge(source2, sink2), +new TestingLineageEdge(source3, sink1), +new TestingLineageEdge(source1, sink2)) +.build(); +assertThat(lineageGraph.sources()).containsExactlyInAnyOrder(source1, source2, source3); +assertThat(lineageGraph.sinks()).containsExactlyInAnyOrder(sink1, sink2); +assertThat(lineageGraph.relations().size()).isEqualTo(4); Review Comment: NIT: you can use `assertThat(lineageGraph.relations()).hasSize(4)` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33992) Add option to fetch the jar from private repository in FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-33992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823159#comment-17823159 ] Ahmed Soliman commented on FLINK-33992: --- Hello [~skala] I have some thoughts here and please correct me if I am wrong, In Kubernetes, an {{[initContainer|https://kubernetes.io/docs/concepts/workloads/pods/init-containers/]}} is a special kind of container that runs before the main container in a Pod and completes its task before the main container starts. This is often used for setup tasks that need to be done before the main container can start. If you're using an {{initContainer}} to download the JAR file, you would need to make sure that the main container can access the downloaded file. This is where Kubernetes [volumes|https://kubernetes.io/docs/concepts/storage/volumes/] come in. A Kubernetes volume is essentially a directory that is accessible to all containers running in a Pod. Data in a volume is preserved across container restarts, and it can be shared between multiple containers in a Pod. so that's being said, you might use a volume to share the JAR file between the {{initContainer}} and the main container: # Define a volume in your Pod spec. This could be an {{emptyDir}} volume, which is first created when a Pod is assigned to a Node, and exists as long as that Pod is running on that node. # In the {{initContainer}} spec, specify a volume mount that points to the volume you defined. Download the JAR file to a path in this volume. # In the main container spec, specify a volume mount that points to the same volume. The main container will now be able to access the JAR file downloaded by the {{{}initContainer{}}}. This way, the {{initContainer}} can download the JAR file and store it in a location that the main container can access, allowing the main container to use the JAR file when it starts. cc: [~gyfora] Do you think the explanation makes sense? if yes, if we think of a case where a session cluster will have tens of session jobs, with different job jars (if this is a valid use case). Is it worth implementing a way to download from private repo in the job spec other than using this initContainer way? I have some thoughts on how to implement it, if we agree that the feature makes sense. > Add option to fetch the jar from private repository in FlinkSessionJob > -- > > Key: FLINK-33992 > URL: https://issues.apache.org/jira/browse/FLINK-33992 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Sweta Kalakuntla >Priority: Major > > FlinkSessionJob spec does not have a capability to download job jar from > remote private repository. It can currently only download from public > repositories. > Adding capability to supply credentials to the *spec.job.jarURI* in > FlinkSessionJob, will solve that problem. > If I use initContainer to download the jar in FlinkDeployment and try to > access that in FlinkSessionJob, the operator is unable to find the jar in the > defined path. > --- > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSessionJob > metadata: > name: job1 > spec: > deploymentName: session-cluster > job: > jarURI: file:///opt/flink/job.jar > parallelism: 4 > upgradeMode: savepoint > (edited) > caused by: java.io.FileNotFoundException: /opt/flink/job.jar (No such file or > directory) > at java.base/java.io.FileInputStream.open0(Native Method) > at java.base/java.io.FileInputStream.open(Unknown Source) > at java.base/java.io.FileInputStream.(Unknown Source) > at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) > at > org.apache.flink.kubernetes.operator.artifact.FileSystemBasedArtifactFetcher.fetch(FileSystemBasedArtifactFetcher.java:44) > at > org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:63) > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:707) > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:212) > at > org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:73) > at > org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:44) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:120) > at > org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:109) -- This m
Re: [PR] [FLINK-34352][doc] Improve the documentation of allowNonRestoredState [flink]
masteryhx closed pull request #24396: [FLINK-34352][doc] Improve the documentation of allowNonRestoredState URL: https://github.com/apache/flink/pull/24396 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33210] Introduce lineage graph relevant interfaces [flink]
X-czh commented on PR #23626: URL: https://github.com/apache/flink/pull/23626#issuecomment-1976492211 @FangYongs Thanks for the PR. The changes LGTM except for a nit on the UT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34352) Improve the documentation of allowNonRestoredState
[ https://issues.apache.org/jira/browse/FLINK-34352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-34352. -- Fix Version/s: 1.20.0 Resolution: Fixed merged e168cd4d into master. > Improve the documentation of allowNonRestoredState > -- > > Key: FLINK-34352 > URL: https://issues.apache.org/jira/browse/FLINK-34352 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Current documentation of allowNonRestoredState is not clear, we should > clarify: > # It can lead to serious issues with correctness if it's used incorrectly. > # The correctness is related to the topological order and the logic of job > when removing operator by default. > # For DataStream Job, the operator uid could be assigned explicitly to avoid > the reassignment of operator uid. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work
[ https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823163#comment-17823163 ] Gyula Fora commented on FLINK-34566: Looking at this in detail I think it should work as expected. The thread pool max size is set correctly and we can have at most the defined parallelism number of threads. > Flink Kubernetes Operator reconciliation parallelism setting not work > - > > Key: FLINK-34566 > URL: https://issues.apache.org/jira/browse/FLINK-34566 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Fei Feng >Priority: Blocker > Attachments: image-2024-03-04-10-58-37-679.png, > image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png > > > After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , > we can not enlarge reconciliation parallelism , and the maximum > reconciliation parallelism was only 10. This results FlinkDeployment and > SessionJob 's reconciliation delay about 10-30 seconds when we have a large > scale flink session cluster and session jobs in k8s cluster。 > > After investigating and validating, I found the reason is the logic for > reconciliation thread pool creation in JOSDK has changed significantly > between this two version. > v4.3.0: > reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize > was same as corePoolSize), so we pass the reconciliation thread and get a > thread pool that matches our expectations. > !image-2024-03-04-10-58-37-679.png|width=497,height=91! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198] > > but in v4.2.0: > the reconciliation thread pool was created as a customer executor which we > can pass corePoolSize and maximumPoolSize to create this thread pool.The > problem is that we only set the maximumPoolSize of the thread pool, while, > the corePoolSize of the thread pool is defaulted to 10. This causes thread > pool size was only 10 and majority of events would be placed in the workQueue > for a while. > !image-2024-03-04-11-17-22-877.png|width=569,height=112! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37] > > the solution is also simple, we can create and pass thread pool in flink > kubernetes operator so that we can control the reconciliation thread pool > directly, such as: > !image-2024-03-04-11-31-44-451.png|width=483,height=98! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34268] Add a test to verify if restore test exists for ExecNode [flink]
dawidwys commented on code in PR #24219: URL: https://github.com/apache/flink/pull/24219#discussion_r1511101924 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate; +import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; +import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil.ExecNodeNameVersion; + +import org.apache.flink.shaded.guava31.com.google.common.reflect.ClassPath; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** Validate restore tests exists for Exec Nodes. */ +public class RestoreTestCompleteness { + +private static final Set>> SKIP_EXEC_NODES = +new HashSet>>() { +{ +/** Covered with {@link StreamExecGroupAggregate}. */ +add(StreamExecExchange.class); + +/** Covered with {@link StreamExecIncrementalGroupAggregate}. */ +add(StreamExecLocalGroupAggregate.class); +add(StreamExecGlobalGroupAggregate.class); + +/** Covered with {@link StreamExecWindowAggregate}. */ +add(StreamExecLocalWindowAggregate.class); +add(StreamExecGlobalWindowAggregate.class); + +/** Covered with {@link StreamExecChangelogNormalize}. */ +add(StreamExecDropUpdateBefore.class); + +/** TODO: Remove after FLINK-33676 is merged. */ +add(StreamExecWindowAggregate.class); + +/** TODO: Remove after FLINK-33805 is merged. */ +add(StreamExecOverAggregate.class); + +/** Ignoring python based exec nodes temporarily. */ +add(StreamExecPythonCalc.class); +add(StreamExecPythonCorrelate.class); +add(StreamExecPythonOverAggregate.class); +add(StreamExecPythonGroupAggregate.class); +add(StreamExecPythonGroupTableAggregate.class); +add(StreamExecPythonGroupWindowAggregate.class); +} +}; + +@Test +public void testMissingRestoreTest() +throws IOException, NoSuchMethodException, InstantiationException, +IllegalAccessException, InvocationTargetException { +Map>> versionedExecNodes = +E
[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work
[ https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823164#comment-17823164 ] Gyula Fora commented on FLINK-34566: Even if the core pool size is 10, the maxpoolsize defines how many threads are created maximum. After 1 minute of idling some of these threads are released and the pool can shrink when not in use. This is how it was designed. > Flink Kubernetes Operator reconciliation parallelism setting not work > - > > Key: FLINK-34566 > URL: https://issues.apache.org/jira/browse/FLINK-34566 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Fei Feng >Priority: Blocker > Attachments: image-2024-03-04-10-58-37-679.png, > image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png > > > After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , > we can not enlarge reconciliation parallelism , and the maximum > reconciliation parallelism was only 10. This results FlinkDeployment and > SessionJob 's reconciliation delay about 10-30 seconds when we have a large > scale flink session cluster and session jobs in k8s cluster。 > > After investigating and validating, I found the reason is the logic for > reconciliation thread pool creation in JOSDK has changed significantly > between this two version. > v4.3.0: > reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize > was same as corePoolSize), so we pass the reconciliation thread and get a > thread pool that matches our expectations. > !image-2024-03-04-10-58-37-679.png|width=497,height=91! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198] > > but in v4.2.0: > the reconciliation thread pool was created as a customer executor which we > can pass corePoolSize and maximumPoolSize to create this thread pool.The > problem is that we only set the maximumPoolSize of the thread pool, while, > the corePoolSize of the thread pool is defaulted to 10. This causes thread > pool size was only 10 and majority of events would be placed in the workQueue > for a while. > !image-2024-03-04-11-17-22-877.png|width=569,height=112! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37] > > the solution is also simple, we can create and pass thread pool in flink > kubernetes operator so that we can control the reconciliation thread pool > directly, such as: > !image-2024-03-04-11-31-44-451.png|width=483,height=98! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work
[ https://issues.apache.org/jira/browse/FLINK-34566 ] Gyula Fora deleted comment on FLINK-34566: was (Author: gyfora): {noformat} A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) according to the bounds set by corePoolSize (see getCorePoolSize) and maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in method execute(Runnable), if fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. Else if fewer than maximumPoolSize threads are running, a new thread will be created to handle the request only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize.{noformat} > Flink Kubernetes Operator reconciliation parallelism setting not work > - > > Key: FLINK-34566 > URL: https://issues.apache.org/jira/browse/FLINK-34566 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Fei Feng >Priority: Blocker > Attachments: image-2024-03-04-10-58-37-679.png, > image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png > > > After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , > we can not enlarge reconciliation parallelism , and the maximum > reconciliation parallelism was only 10. This results FlinkDeployment and > SessionJob 's reconciliation delay about 10-30 seconds when we have a large > scale flink session cluster and session jobs in k8s cluster。 > > After investigating and validating, I found the reason is the logic for > reconciliation thread pool creation in JOSDK has changed significantly > between this two version. > v4.3.0: > reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize > was same as corePoolSize), so we pass the reconciliation thread and get a > thread pool that matches our expectations. > !image-2024-03-04-10-58-37-679.png|width=497,height=91! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198] > > but in v4.2.0: > the reconciliation thread pool was created as a customer executor which we > can pass corePoolSize and maximumPoolSize to create this thread pool.The > problem is that we only set the maximumPoolSize of the thread pool, while, > the corePoolSize of the thread pool is defaulted to 10. This causes thread > pool size was only 10 and majority of events would be placed in the workQueue > for a while. > !image-2024-03-04-11-17-22-877.png|width=569,height=112! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37] > > the solution is also simple, we can create and pass thread pool in flink > kubernetes operator so that we can control the reconciliation thread pool > directly, such as: > !image-2024-03-04-11-31-44-451.png|width=483,height=98! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work
[ https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823166#comment-17823166 ] Gyula Fora commented on FLINK-34566: >From the java docs: "A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) according to the bounds set by corePoolSize (see getCorePoolSize) and maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in method execute(Runnable), if fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. Else if fewer than maximumPoolSize threads are running, a new thread will be created to handle the request only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize." > Flink Kubernetes Operator reconciliation parallelism setting not work > - > > Key: FLINK-34566 > URL: https://issues.apache.org/jira/browse/FLINK-34566 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Fei Feng >Priority: Blocker > Attachments: image-2024-03-04-10-58-37-679.png, > image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png > > > After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , > we can not enlarge reconciliation parallelism , and the maximum > reconciliation parallelism was only 10. This results FlinkDeployment and > SessionJob 's reconciliation delay about 10-30 seconds when we have a large > scale flink session cluster and session jobs in k8s cluster。 > > After investigating and validating, I found the reason is the logic for > reconciliation thread pool creation in JOSDK has changed significantly > between this two version. > v4.3.0: > reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize > was same as corePoolSize), so we pass the reconciliation thread and get a > thread pool that matches our expectations. > !image-2024-03-04-10-58-37-679.png|width=497,height=91! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198] > > but in v4.2.0: > the reconciliation thread pool was created as a customer executor which we > can pass corePoolSize and maximumPoolSize to create this thread pool.The > problem is that we only set the maximumPoolSize of the thread pool, while, > the corePoolSize of the thread pool is defaulted to 10. This causes thread > pool size was only 10 and majority of events would be placed in the workQueue > for a while. > !image-2024-03-04-11-17-22-877.png|width=569,height=112! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37] > > the solution is also simple, we can create and pass thread pool in flink > kubernetes operator so that we can control the reconciliation thread pool > directly, such as: > !image-2024-03-04-11-31-44-451.png|width=483,height=98! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work
[ https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823165#comment-17823165 ] Gyula Fora commented on FLINK-34566: {noformat} A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) according to the bounds set by corePoolSize (see getCorePoolSize) and maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in method execute(Runnable), if fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. Else if fewer than maximumPoolSize threads are running, a new thread will be created to handle the request only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize.{noformat} > Flink Kubernetes Operator reconciliation parallelism setting not work > - > > Key: FLINK-34566 > URL: https://issues.apache.org/jira/browse/FLINK-34566 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Fei Feng >Priority: Blocker > Attachments: image-2024-03-04-10-58-37-679.png, > image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png > > > After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , > we can not enlarge reconciliation parallelism , and the maximum > reconciliation parallelism was only 10. This results FlinkDeployment and > SessionJob 's reconciliation delay about 10-30 seconds when we have a large > scale flink session cluster and session jobs in k8s cluster。 > > After investigating and validating, I found the reason is the logic for > reconciliation thread pool creation in JOSDK has changed significantly > between this two version. > v4.3.0: > reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize > was same as corePoolSize), so we pass the reconciliation thread and get a > thread pool that matches our expectations. > !image-2024-03-04-10-58-37-679.png|width=497,height=91! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198] > > but in v4.2.0: > the reconciliation thread pool was created as a customer executor which we > can pass corePoolSize and maximumPoolSize to create this thread pool.The > problem is that we only set the maximumPoolSize of the thread pool, while, > the corePoolSize of the thread pool is defaulted to 10. This causes thread > pool size was only 10 and majority of events would be placed in the workQueue > for a while. > !image-2024-03-04-11-17-22-877.png|width=569,height=112! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37] > > the solution is also simple, we can create and pass thread pool in flink > kubernetes operator so that we can control the reconciliation thread pool > directly, such as: > !image-2024-03-04-11-31-44-451.png|width=483,height=98! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33992) Add option to fetch the jar from private repository in FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-33992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823168#comment-17823168 ] Gyula Fora commented on FLINK-33992: For session job submissions the jar generally has to be downloaded and submitted from the operator itself. So initContainers are not really applicable here unless you mean download it into the session cluster lib itself which only works in some special cases. > Add option to fetch the jar from private repository in FlinkSessionJob > -- > > Key: FLINK-33992 > URL: https://issues.apache.org/jira/browse/FLINK-33992 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Sweta Kalakuntla >Priority: Major > > FlinkSessionJob spec does not have a capability to download job jar from > remote private repository. It can currently only download from public > repositories. > Adding capability to supply credentials to the *spec.job.jarURI* in > FlinkSessionJob, will solve that problem. > If I use initContainer to download the jar in FlinkDeployment and try to > access that in FlinkSessionJob, the operator is unable to find the jar in the > defined path. > --- > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSessionJob > metadata: > name: job1 > spec: > deploymentName: session-cluster > job: > jarURI: file:///opt/flink/job.jar > parallelism: 4 > upgradeMode: savepoint > (edited) > caused by: java.io.FileNotFoundException: /opt/flink/job.jar (No such file or > directory) > at java.base/java.io.FileInputStream.open0(Native Method) > at java.base/java.io.FileInputStream.open(Unknown Source) > at java.base/java.io.FileInputStream.(Unknown Source) > at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) > at > org.apache.flink.kubernetes.operator.artifact.FileSystemBasedArtifactFetcher.fetch(FileSystemBasedArtifactFetcher.java:44) > at > org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:63) > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:707) > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:212) > at > org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:73) > at > org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:44) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:120) > at > org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:109) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33992) Add option to fetch the jar from private repository in FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-33992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823169#comment-17823169 ] Gyula Fora commented on FLINK-33992: It would be great if Flink itself would have a way of downloading jars from a target url for session job submissions instead of having to upload it before from the operator > Add option to fetch the jar from private repository in FlinkSessionJob > -- > > Key: FLINK-33992 > URL: https://issues.apache.org/jira/browse/FLINK-33992 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Sweta Kalakuntla >Priority: Major > > FlinkSessionJob spec does not have a capability to download job jar from > remote private repository. It can currently only download from public > repositories. > Adding capability to supply credentials to the *spec.job.jarURI* in > FlinkSessionJob, will solve that problem. > If I use initContainer to download the jar in FlinkDeployment and try to > access that in FlinkSessionJob, the operator is unable to find the jar in the > defined path. > --- > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSessionJob > metadata: > name: job1 > spec: > deploymentName: session-cluster > job: > jarURI: file:///opt/flink/job.jar > parallelism: 4 > upgradeMode: savepoint > (edited) > caused by: java.io.FileNotFoundException: /opt/flink/job.jar (No such file or > directory) > at java.base/java.io.FileInputStream.open0(Native Method) > at java.base/java.io.FileInputStream.open(Unknown Source) > at java.base/java.io.FileInputStream.(Unknown Source) > at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) > at > org.apache.flink.kubernetes.operator.artifact.FileSystemBasedArtifactFetcher.fetch(FileSystemBasedArtifactFetcher.java:44) > at > org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:63) > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:707) > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:212) > at > org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:73) > at > org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:44) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:120) > at > org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:109) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34573) the task is stuck on the high presure
LSZ created FLINK-34573: --- Summary: the task is stuck on the high presure Key: FLINK-34573 URL: https://issues.apache.org/jira/browse/FLINK-34573 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.3 Reporter: LSZ Attachments: stuck.PNG, tm-thread-dump-chk-0123[1].json, tm-thread-dump-no-lock-0123[1].json we havae a flink job , jst one taskmanger; when use high presure as soure data,it will be stuck. !stuck.PNG! the threads dump info ,when high presure , cpu 90%~100%: [^tm-thread-dump-chk-0123[1].json] this is the normal info, when the low presure : [^tm-thread-dump-no-lock-0123[1].json] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Improve FlinkDeployment spec overview doc [flink-kubernetes-operator]
mxm merged PR #788: URL: https://github.com/apache/flink-kubernetes-operator/pull/788 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32497) IF FUNCTION is FALSE and the false_value parameter is a function, then an exception will be thrown
[ https://issues.apache.org/jira/browse/FLINK-32497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823172#comment-17823172 ] david radley commented on FLINK-32497: -- I notice this is a critical issue. But from reading [~Sergey Nuyanzin]'s comments, it looks like this issue is a duplicate of FLINK-30966 which is fixed in 1.16.3 and 1.17.2. [~jarieshan] Could you confirm your problem is fixed and close out this issue please. > IF FUNCTION is FALSE and the false_value parameter is a function, then an > exception will be thrown > -- > > Key: FLINK-32497 > URL: https://issues.apache.org/jira/browse/FLINK-32497 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Client >Affects Versions: 1.17.1 > Environment: {color:#172b4d}- Flink Version -{color} > {color:#172b4d}V{color}{color:#172b4d}ersion: 1.17.1, Commit ID: > 2750d5c{color} > > {color:#172b4d}- Java Version -{color} > {color:#172b4d}java version "1.8.0_202"{color} > {color:#172b4d}Java(TM) SE Runtime Environment (build 1.8.0_202-b08){color} > {color:#172b4d}Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed > mode){color} > >Reporter: Han Zhuo >Priority: Critical > Attachments: flink-1.17.1_logs_jarieshan_20230630.tgz, > image-2023-06-30-15-02-13-197.png, image-2023-06-30-15-02-26-099.png, > image-2023-06-30-15-02-57-082.png, image-2023-06-30-15-07-08-588.png, > image-2023-06-30-15-09-44-623.png, image-2023-06-30-15-10-08-619.png, > image-2023-06-30-15-13-56-625.png, image-2023-06-30-15-14-21-038.png > > > It is successful to execute certain functions individually. > {code:java} > SELECT SPLIT_INDEX('TEST:ABC', ':', 0); {code} > !image-2023-06-30-15-02-57-082.png|width=189,height=36! > > And it is also successful for these functions to be located in the true_value > parameter of the {color:#172b4d}+IF function+{color}. > {code:java} > SELECT IF(2>1, SPLIT_INDEX('TEST:ABC', ':', 1), 'FALSE'); {code} > !image-2023-06-30-15-02-13-197.png|width=185,height=36! > > Only when these functions are located in the false_value parameter of the > {+}IF function{+}, an exception will be thrown. > func1. > {code:java} > SELECT IF(2>1, 'TRUE', SPLIT_INDEX('TEST:ABC', ':', 0)); {code} > {color:#172b4d}!image-2023-06-30-15-09-44-623.png|width=385,height=42!{color} > func2. > {code:java} > SELECT IF(2>1, 'TRUE', LOWER('TEST')); {code} > !image-2023-06-30-15-14-21-038.png|width=337,height=246! > > {color:#172b4d}And it is also successful for{color} +CASE function+ > {code:java} > SELECT CASE WHEN 2=1 THEN 'TRUE' ELSE SPLIT_INDEX('TEST:ABC', ':', 0) END; > {code} > {color:#172b4d}!image-2023-06-30-15-10-08-619.png|width=188,height=41!{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34573) the task is stuck on the high presure
[ https://issues.apache.org/jira/browse/FLINK-34573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LSZ updated FLINK-34573: Attachment: rate.PNG > the task is stuck on the high presure > - > > Key: FLINK-34573 > URL: https://issues.apache.org/jira/browse/FLINK-34573 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: LSZ >Priority: Blocker > Attachments: rate.PNG, stuck.PNG, tm-thread-dump-chk-0123[1].json, > tm-thread-dump-no-lock-0123[1].json > > Original Estimate: 120h > Remaining Estimate: 120h > > we havae a flink job , jst one taskmanger; > when use high presure as soure data,it will be stuck. > !stuck.PNG! > > the threads dump info ,when high presure , cpu 90%~100%: > [^tm-thread-dump-chk-0123[1].json] > this is the normal info, when the low presure : > [^tm-thread-dump-no-lock-0123[1].json] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34573) the task is stuck on the high presure
[ https://issues.apache.org/jira/browse/FLINK-34573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LSZ updated FLINK-34573: Description: we havae a flink job , jst one taskmanger; when use high presure as soure data,it will be stuck. sometimes it will be run 1d ,somtimes it will be run 30min. !stuck.PNG! like this: (13:30 the taskmanager reboot,then run 30min, result is stuck ) test 3 cases: 1: low presure (1200eps ), it will run 30 min or 1d 。 2: close checkpoint , it will run 3d , high presure (1800eps) ,did not run stuck。 3:double the orignal managermemory, it still stuck, jst The appearance time has been changed to 3 days from 30mins. !rate.PNG! the threads dump info ,when high presure , cpu 90%~100%: [^tm-thread-dump-chk-0123[1].json] this is the normal info, when the low presure : [^tm-thread-dump-no-lock-0123[1].json] was: we havae a flink job , jst one taskmanger; when use high presure as soure data,it will be stuck. !stuck.PNG! the threads dump info ,when high presure , cpu 90%~100%: [^tm-thread-dump-chk-0123[1].json] this is the normal info, when the low presure : [^tm-thread-dump-no-lock-0123[1].json] > the task is stuck on the high presure > - > > Key: FLINK-34573 > URL: https://issues.apache.org/jira/browse/FLINK-34573 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: LSZ >Priority: Blocker > Attachments: rate.PNG, stuck.PNG, tm-thread-dump-chk-0123[1].json, > tm-thread-dump-no-lock-0123[1].json > > Original Estimate: 120h > Remaining Estimate: 120h > > we havae a flink job , jst one taskmanger; > when use high presure as soure data,it will be stuck. sometimes it will be > run 1d ,somtimes it will be run 30min. > !stuck.PNG! > like this: (13:30 the taskmanager reboot,then run 30min, result is stuck ) > test 3 cases: > 1: low presure (1200eps ), it will run 30 min or 1d 。 > 2: close checkpoint , it will run 3d , high presure (1800eps) ,did not run > stuck。 > 3:double the orignal managermemory, it still stuck, jst The appearance time > has been changed to 3 days from 30mins. > !rate.PNG! > > the threads dump info ,when high presure , cpu 90%~100%: > [^tm-thread-dump-chk-0123[1].json] > this is the normal info, when the low presure : > [^tm-thread-dump-no-lock-0123[1].json] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34574) Add CPU and memory size autoscaler quota
Gabor Somogyi created FLINK-34574: - Summary: Add CPU and memory size autoscaler quota Key: FLINK-34574 URL: https://issues.apache.org/jira/browse/FLINK-34574 Project: Flink Issue Type: New Feature Components: Autoscaler Reporter: Gabor Somogyi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34574) Add CPU and memory size autoscaler quota
[ https://issues.apache.org/jira/browse/FLINK-34574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi reassigned FLINK-34574: - Assignee: Gabor Somogyi > Add CPU and memory size autoscaler quota > > > Key: FLINK-34574 > URL: https://issues.apache.org/jira/browse/FLINK-34574 > Project: Flink > Issue Type: New Feature > Components: Autoscaler >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic. [flink]
libenchao merged PR #24431: URL: https://github.com/apache/flink/pull/24431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20392) Migrating bash e2e tests to Java/Docker
[ https://issues.apache.org/jira/browse/FLINK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823188#comment-17823188 ] Lorenzo Affetti commented on FLINK-20392: - [~mapohl] [~jark] In order to better understand the current status of this issue and of the codebase as well nowadays: As the code of the PR linked above ([https://github.com/apache/flink/pull/14085]) does not exist anymore (I think the major PR to move it was this one: [https://github.com/apache/flink/pull/17892),] I will proceed following [this example|[https://github.com/apache/flink/blob/4595762f974f03bdf5a4c3d42e211c3a24604bd4/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java]], that uses [FlinkContainers|[https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java]]. I think the goal of these IT tests is to actually spawn a Flink cluster via testcontainers (as `FlinkContainers` does) and required dependencies as well (e.g.: Kafka, etc.) and run the tests. Do you have any feedback on this? Thank you > Migrating bash e2e tests to Java/Docker > --- > > Key: FLINK-20392 > URL: https://issues.apache.org/jira/browse/FLINK-20392 > Project: Flink > Issue Type: Technical Debt > Components: Test Infrastructure, Tests >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, auto-deprioritized-minor, > starter > > This Jira issue serves as an umbrella ticket for single e2e test migration > tasks. This should enable us to migrate all bash-based e2e tests step-by-step. > The goal is to utilize the e2e test framework (see > [flink-end-to-end-tests-common|https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common]). > Ideally, the test should use Docker containers as much as possible > disconnect the execution from the environment. A good source to achieve that > is [testcontainers.org|https://www.testcontainers.org/]. > The related ML discussion is [Stop adding new bash-based e2e tests to > Flink|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Stop-adding-new-bash-based-e2e-tests-to-Flink-td46607.html]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-20392) Migrating bash e2e tests to Java/Docker
[ https://issues.apache.org/jira/browse/FLINK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823193#comment-17823193 ] Matthias Pohl commented on FLINK-20392: --- The goal is to move away from bash-based e2e tests. We created the issue here to have this goal being documented in Jira (so that new contributors can pick up subtasks). The most likely solution would be to rely on TestContainers, I guess. {quote} As the code of the PR linked above {quote} I'm not able to follow what you mean here. Both linked PRs are still accessible (including its code change). Or do you mean that the code of the PR is not present in the main repo anymore? > Migrating bash e2e tests to Java/Docker > --- > > Key: FLINK-20392 > URL: https://issues.apache.org/jira/browse/FLINK-20392 > Project: Flink > Issue Type: Technical Debt > Components: Test Infrastructure, Tests >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, auto-deprioritized-minor, > starter > > This Jira issue serves as an umbrella ticket for single e2e test migration > tasks. This should enable us to migrate all bash-based e2e tests step-by-step. > The goal is to utilize the e2e test framework (see > [flink-end-to-end-tests-common|https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common]). > Ideally, the test should use Docker containers as much as possible > disconnect the execution from the environment. A good source to achieve that > is [testcontainers.org|https://www.testcontainers.org/]. > The related ML discussion is [Stop adding new bash-based e2e tests to > Flink|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Stop-adding-new-bash-based-e2e-tests-to-Flink-td46607.html]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi opened a new pull request, #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789 ## What is the purpose of the change Flink can report slot sharing group information in the vertex information. This can be used to fine tune the autoscaler algorithm to define quotas. In this PR I've added 2 type of quotas: * `kubernetes.operator.job.autoscaler.quota.cpu`: Task manager CPU quota can be defined * `kubernetes.operator.job.autoscaler.quota.memory`: Task manager memory size quota can be defined In bot cases scaling is not happening when the total amount of CPU or memory would be greater than the quota. ## Brief change log Added CPU and memory size autoscaler quota. ## Verifying this change Added automated tests + manually on cluster. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823201#comment-17823201 ] Matthias Pohl commented on FLINK-29114: --- * https://github.com/apache/flink/actions/runs/8119607813/job/22195868118#step:10:11553 * https://github.com/apache/flink/actions/runs/8119607813/job/22195889799#step:10:11535 * https://github.com/apache/flink/actions/runs/8127069864/job/22212009995#step:10:11528 * https://github.com/apache/flink/actions/runs/8134965216/job/8831511#step:10:11371 * https://github.com/apache/flink/actions/runs/8134965216/job/8875875#step:10:11686 > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) >
Re: [PR] [FLINK-27146] [Filesystem] Migrate to Junit5 [flink]
ferenc-csaky commented on code in PR #22789: URL: https://github.com/apache/flink/pull/22789#discussion_r1511222835 ## flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java: ## Review Comment: Use AssertJ's `assertThatThrownBy` instead of try/catch and `fail()`. ## flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java: ## @@ -46,14 +45,14 @@ public void testCreateHadoopFsWithoutConfig() throws Exception { } @Test -public void testCreateHadoopFsWithMissingAuthority() throws Exception { +void testCreateHadoopFsWithMissingAuthority() throws Exception { final URI uri = URI.create("hdfs:///my/path"); HadoopFsFactory factory = new HadoopFsFactory(); try { factory.create(uri); -fail("should have failed with an exception"); +Assertions.fail("should have failed with an exception"); Review Comment: Use AssertJ's `assertThatThrownBy` instead of try/catch. ## flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java: ## @@ -25,19 +25,15 @@ import org.apache.flink.testutils.s3.S3TestCredentials; import com.amazonaws.SdkClientException; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_USE_INSTANCE_CREDENTIALS; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; Review Comment: Use AssertJ's `assertThatThrownBy` instead of try/catch `fail()`. ## flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java: ## @@ -22,17 +22,18 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; /** Tests for the {@link LocalRecoverableWriter}. */ public class LocalFileSystemRecoverableWriterTest extends AbstractRecoverableWriterTest { -@ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); +@TempDir protected static File tempFolder; Review Comment: Why not `private` instead of `protected`? ## flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java: ## @@ -303,27 +304,23 @@ public void testCommitAfterRecovery() throws Exception { // TESTS FOR EXCEPTIONS -@Test(expected = IOException.class) +@Test public void testExceptionWritingAfterCloseForCommit() throws Exception { final Path testDir = getBasePathForTest(); final RecoverableWriter writer = getNewFileSystemWriter(); final Path path = new Path(testDir, "part-0"); -RecoverableFsDataOutputStream stream = null; -try { -stream = writer.open(path); +try (RecoverableFsDataOutputStream stream = writer.open(path)) { stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().getRecoverable(); -stream.write(testData2.getBytes(StandardCharsets.UTF_8)); -fail(); -} finally { Review Comment: I wonder if it would be better to keep the `try`/`finally` here, as try-with-resources do not support quiet close, so it might introduce some flakiness. ## flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java: ## Review Comment: The class could be package-private I think. ## flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java: ## @@ -19,77 +19,63 @@ package org.apache.flink.fs.azurefs; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.TestLogger; import org.apache.hadoop.fs.azure.AzureException; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.net.URI; -import java.util.Arrays; -import java.util.List; -/** Tests for the AzureFSFactory. */ -@RunWith(Parameterized.class) -public class AzureBlobStorageFSFactoryTest extends TestLogger { - -@Parameterized.Parameter public String scheme; +import static org.assertj.
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
XComp commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1511250389 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: You could try to run the tests in a loop in CI to see whether you can reproduce the issue. So far, I couldn't come up with a reason for the test failure, yet. :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34413] Remove HBase 1.x connector files and deps [flink-connector-hbase]
ferenc-csaky commented on PR #42: URL: https://github.com/apache/flink-connector-hbase/pull/42#issuecomment-1976709968 @Tan-JiaLiang if you have some time can you PTAL? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33557] Externalize Cassandra Python connector code [flink-connector-cassandra]
echauchot commented on PR #26: URL: https://github.com/apache/flink-connector-cassandra/pull/26#issuecomment-1976742586 > @echauchot Sorry to ping you out of the blue, I saw that you were an active contributor to the Cassandra connector. If you have some time, would you mind reviewing this PR? Thanks in advance! Hi @ferenc-csaky it would be with pleasure but I know nothing about python so I might not be the good person to review this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34152] Add an option to scale memory when downscaling [flink-kubernetes-operator]
gyfora commented on code in PR #786: URL: https://github.com/apache/flink-kubernetes-operator/pull/786#discussion_r1511285884 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java: ## @@ -148,8 +149,11 @@ public static ConfigChanges tuneTaskManagerHeapMemory( final long flinkMemoryDiffBytes = heapDiffBytes + managedDiffBytes + networkDiffBytes; // Update total memory according to memory diffs -final MemorySize totalMemory = +MemorySize totalMemory = new MemorySize(maxMemoryBySpec.getBytes() - memBudget.getRemaining()); +totalMemory = +MemoryScaling.applyMemoryScaling( +totalMemory, maxMemoryBySpec, context, scalingSummaries, evaluatedMetrics); Review Comment: The memory scaling config specifies that both managed and heap memory is scaled based on the TM scaling factor. But it seems like we only scale the heap as the managed memory size is computed before this. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
mxm commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1511294060 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) { return get(jobVertexID).getInputs().isEmpty(); } -public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) { -get(vertexID).updateMaxParallelism(maxParallelism); Review Comment: Why are removing this logic? This prevents moving beyond the original max parallelism. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } +protected static boolean resourceQuotaReached( +Configuration conf, +EvaluatedMetrics evaluatedMetrics, +Map scalingSummaries, +JobAutoScalerContext ctx) { + +if (evaluatedMetrics.getJobTopology() == null +|| evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) { +return false; +} + +var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA); +var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA); +var tmMemory = ctx.getTaskManagerMemory(); +var tmCpu = ctx.getTaskManagerCpu(); + +if (cpuQuota.isPresent() || memoryQuota.isPresent()) { +var currentSlotSharingGroupMaxParallelisms = new HashMap(); +var newSlotSharingGroupMaxParallelisms = new HashMap(); +for (var e : + evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) { Review Comment: What if the Flink version doesn't support returning the slot sharing group information via the Rest API? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } +protected static boolean resourceQuotaReached( Review Comment: There is a already a similar resource check in this file (`scalingWouldExceedClusterResources`). I think we could unify the two approaches where they share the same input (e.g. task slots usage, num TMs). ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopol "Updating source {} max parallelism based on available partitions to {}", sourceVertex, numPartitions); -topology.updateMaxParallelism(sourceVertex, (int) numPartitions); +topology.get(sourceVertex).setMaxParallelism((int) numPartitions); Review Comment: I'm not sure that is correct. That means some source partitions will be idle when the provided parallelism exceeds the orignal max parallelism. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } +protected static boolean resourceQuotaReached( +Configuration conf, +EvaluatedMetrics evaluatedMetrics, +Map scalingSummaries, +JobAutoScalerContext ctx) { + +if (evaluatedMetrics.getJobTopology() == null +|| evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) { +return false; +} + +var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA); +var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA); +var tmMemory = ctx.getTaskManagerMemory(); +var tmCpu = ctx.getTaskManagerCpu(); + +if (cpuQuota.isPresent() || memoryQuota.isPresent()) { +var currentSlotSharingGroupMaxParallelisms = new HashMap(); +var newSlotSharingGroupMaxParallelisms = new HashMap(); +for (var e : + evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) { Review Comment: We should have some workaround similarly to the `scalingWouldExceedClusterResources` method which checks whether the maximum found parallelism is identical to the total number of task slots to determine that slot sharing is enabled and otherwise assume it isn't. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java: ## @@ -30,6 +31,7 @@ @NoArgsConstructor @AllArgsConstructor public class EvaluatedMetrics { +private JobTopology jobTopology; Review Comment: This is not required,
[jira] [Closed] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work
[ https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-34566. -- Resolution: Not A Problem I am closing this ticket for now, if you feel that this resolution is incorrect please re-open it. > Flink Kubernetes Operator reconciliation parallelism setting not work > - > > Key: FLINK-34566 > URL: https://issues.apache.org/jira/browse/FLINK-34566 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Fei Feng >Priority: Blocker > Attachments: image-2024-03-04-10-58-37-679.png, > image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png > > > After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , > we can not enlarge reconciliation parallelism , and the maximum > reconciliation parallelism was only 10. This results FlinkDeployment and > SessionJob 's reconciliation delay about 10-30 seconds when we have a large > scale flink session cluster and session jobs in k8s cluster。 > > After investigating and validating, I found the reason is the logic for > reconciliation thread pool creation in JOSDK has changed significantly > between this two version. > v4.3.0: > reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize > was same as corePoolSize), so we pass the reconciliation thread and get a > thread pool that matches our expectations. > !image-2024-03-04-10-58-37-679.png|width=497,height=91! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198] > > but in v4.2.0: > the reconciliation thread pool was created as a customer executor which we > can pass corePoolSize and maximumPoolSize to create this thread pool.The > problem is that we only set the maximumPoolSize of the thread pool, while, > the corePoolSize of the thread pool is defaulted to 10. This causes thread > pool size was only 10 and majority of events would be placed in the workQueue > for a while. > !image-2024-03-04-11-17-22-877.png|width=569,height=112! > [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37] > > the solution is also simple, we can create and pass thread pool in flink > kubernetes operator so that we can control the reconciliation thread pool > directly, such as: > !image-2024-03-04-11-31-44-451.png|width=483,height=98! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
mxm commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1511292849 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java: ## @@ -30,6 +31,7 @@ @NoArgsConstructor @AllArgsConstructor public class EvaluatedMetrics { +private JobTopology jobTopology; Review Comment: This is not required, `JobTopology` is already available as a parameter to the `ScalingExecutor#scaleResource` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34152] Add an option to scale memory when downscaling [flink-kubernetes-operator]
mxm commented on code in PR #786: URL: https://github.com/apache/flink-kubernetes-operator/pull/786#discussion_r1511334711 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java: ## @@ -148,8 +149,11 @@ public static ConfigChanges tuneTaskManagerHeapMemory( final long flinkMemoryDiffBytes = heapDiffBytes + managedDiffBytes + networkDiffBytes; // Update total memory according to memory diffs -final MemorySize totalMemory = +MemorySize totalMemory = new MemorySize(maxMemoryBySpec.getBytes() - memBudget.getRemaining()); +totalMemory = +MemoryScaling.applyMemoryScaling( +totalMemory, maxMemoryBySpec, context, scalingSummaries, evaluatedMetrics); Review Comment: Good catch, the scaling is heap only. Managed memory is never adjusted, unless none is used at all (then it is set to zero) or the option to maximize managed memory is turned on. I've adjusted the docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34575) Vulnerabilities in commons-compress 1.24.0; upgrade to 1.26.0 needed.
Adrian Vasiliu created FLINK-34575: -- Summary: Vulnerabilities in commons-compress 1.24.0; upgrade to 1.26.0 needed. Key: FLINK-34575 URL: https://issues.apache.org/jira/browse/FLINK-34575 Project: Flink Issue Type: Bug Affects Versions: 1.18.1 Reporter: Adrian Vasiliu Since Feb. 19, medium/high CVEs have been found for commons-compress 1.24.0: [https://nvd.nist.gov/vuln/detail/CVE-2024-25710] https://nvd.nist.gov/vuln/detail/CVE-2024-26308 [https://github.com/apache/flink/pull/24352] has been opened automatically on Feb. 21 by dependabot for bumping commons-compress to v1.26.0 which fixes the CVEs, but two CI checks are red on the PR. Flink's dependency on commons-compress has been upgraded to v1.24.0 in Oct 2023 (https://issues.apache.org/jira/browse/FLINK-33329). v1.24.0 is the version currently in the master branch:[https://github.com/apache/flink/blob/master/pom.xml#L727-L729|https://github.com/apache/flink/blob/master/pom.xml#L727-L729).]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]
LadyForest commented on PR #24390: URL: https://github.com/apache/flink/pull/24390#issuecomment-1976845985 Hi @XComp, would you mind taking a look when you're available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20392) Migrating bash e2e tests to Java/Docker
[ https://issues.apache.org/jira/browse/FLINK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823242#comment-17823242 ] Lorenzo Affetti commented on FLINK-20392: - {quote}I'm not able to follow what you mean here. Both linked PRs are still accessible (including its code change). Or do you mean that the code of the PR is not present in the main repo anymore? {quote} Yeah, sorry, I meant that that code was moved/refactored (y) But thank you for the clarification. My goal here was to clarify which part of tests should rely on test containers. For example, it could be that Flink runs in the same thread of the tests and the dependencies (for example, Kafka) run in containers. As far as I understood, also Flink should run in containers (y) I was adding references to the code that enables that at the current state of the repo. > Migrating bash e2e tests to Java/Docker > --- > > Key: FLINK-20392 > URL: https://issues.apache.org/jira/browse/FLINK-20392 > Project: Flink > Issue Type: Technical Debt > Components: Test Infrastructure, Tests >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, auto-deprioritized-minor, > starter > > This Jira issue serves as an umbrella ticket for single e2e test migration > tasks. This should enable us to migrate all bash-based e2e tests step-by-step. > The goal is to utilize the e2e test framework (see > [flink-end-to-end-tests-common|https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common]). > Ideally, the test should use Docker containers as much as possible > disconnect the execution from the environment. A good source to achieve that > is [testcontainers.org|https://www.testcontainers.org/]. > The related ML discussion is [Stop adding new bash-based e2e tests to > Flink|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Stop-adding-new-bash-based-e2e-tests-to-Flink-td46607.html]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511439120 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +List list = new ArrayList<>(); +Map map = new HashMap<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +map.merge(null, 1, (k, v) -> v + 1); +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +map.merge(objectContainer, 1, (k, v) -> v + 1); +} +} +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +final ObjectContainer objectContainer = +element == null ? null : new ObjectContainer(element); +if (map.containsKey(objectContainer)) { +if (map.get(objectContainer) > 1) { +map.put(objectContainer, map.get(objectContainer) - 1); +} else { +map.remove(objectContainer); +} Review Comment: ok, I will use few lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]
XComp commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1511414346 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { + +public static Path getStagingPath(Path path) { +// Add a random UUID to prevent multiple sinks from sharing the same staging dir. +// Please see FLINK-29114 for more details +Path stagingDir = +new Path( +path, +String.join( Review Comment: I'm not sure whether that's actually what you want. The resulting String would be something like `.staging__1709568597254_2cf73aab-39a4-440a-b81c-216be9635bb8` (i.e. double underscore). ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { + +public static Path getStagingPath(Path path) { Review Comment: ```suggestion public static Path getStagingPath(Path path) { return getStagingPath( path, () -> String.format( ".staging_%d_%s", System.currentTimeMillis(), UUID.randomUUID())); } @VisibleForTesting static Path getStagingPath(Path path, Supplier suffixSupplier) { ``` The current test is "theoretically" flaky. We change that by using a callback and checking for the Precondition. That would enable us to create the following test: ```java @Test void testReusingStagingDirFails(@TempDir Path tmpDir) throws IOException { final String subfolderName = "directory-name"; Files.createDirectory(tmpDir.resolve(subfolderName)); assertThatThrownBy( () -> PathUtils.getStagingPath( org.apache.flink.core.fs.Path.fromLocalFile( tmpDir.toFile()), () -> subfolderName)) .isEqualTo(IllegalStateException.class); } ``` And that would reveal that our assumption was actually wrong that `mkdirs` returns `false` if the directory already exists. :-/ ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/utils/PathUtilsTest.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional in
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511453416 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/EqualityAndHashcodeProvider.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; + +/** This class is used for scalar function. */ +public class EqualityAndHashcodeProvider implements Closeable, Serializable { +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private static MethodHandle hashcodeHandle; + +private static MethodHandle equalityHandle; + +public EqualityAndHashcodeProvider( +SpecializedFunction.SpecializedContext context, DataType dataType) { +hashcodeEvaluator = +context.createEvaluator( + unresolvedCall(BuiltInFunctionDefinitions.INTERNAL_HASHCODE, $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); + +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public static boolean equals(Object o1, Object o2) { +try { +return (Boolean) equalityHandle.invoke(o1, o2); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} + +public static int hashCode(Object o) { +try { +return (int) hashcodeHandle.invoke(o); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} Review Comment: If we don't use static, it will report this error ``` Caused by: java.lang.IncompatibleClassChangeError: Expected static method org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider.hashCode(Ljava/lang/Object;)I at org.apache.flink.table.runtime.util.ObjectContainer.hashCode(ObjectContainer.java:45) at java.util.HashMap.hash(HashMap.java:340) at java.util.HashMap.merge(HashMap.java:1229) at org.apache.flink.table.runtime.functions.scalar.ArrayExceptFunction.eval(ArrayExceptFunction.java:75) ... 11 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511453416 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/EqualityAndHashcodeProvider.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; + +/** This class is used for scalar function. */ +public class EqualityAndHashcodeProvider implements Closeable, Serializable { +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private static MethodHandle hashcodeHandle; + +private static MethodHandle equalityHandle; + +public EqualityAndHashcodeProvider( +SpecializedFunction.SpecializedContext context, DataType dataType) { +hashcodeEvaluator = +context.createEvaluator( + unresolvedCall(BuiltInFunctionDefinitions.INTERNAL_HASHCODE, $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); + +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public static boolean equals(Object o1, Object o2) { +try { +return (Boolean) equalityHandle.invoke(o1, o2); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} + +public static int hashCode(Object o) { +try { +return (int) hashcodeHandle.invoke(o); +} catch (Throwable e) { +throw new RuntimeException(e); +} +} Review Comment: If we set both `equals` and` hashCode` no-static, we have pass a `EqualityAndHashcodeProvider equalityAndHashcodeProvider` to the `ObjectContainer` ``` public ObjectContainer(Object o, EqualityAndHashcodeProvider equalityAndHashcodeProvider) { this.o = o; this.equalityAndHashcodeProvider = equalityAndHashcodeProvider; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34531) Build and stage Java and Python artifacts
[ https://issues.apache.org/jira/browse/FLINK-34531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-34531: --- Assignee: lincoln lee > Build and stage Java and Python artifacts > - > > Key: FLINK-34531 > URL: https://issues.apache.org/jira/browse/FLINK-34531 > Project: Flink > Issue Type: Sub-task >Reporter: Lincoln Lee >Assignee: lincoln lee >Priority: Major > > # Create a local release branch ((!) this step can not be skipped for minor > releases): > {code:bash} > $ cd ./tools > tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION > RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh > {code} > # Tag the release commit: > {code:bash} > $ git tag -s ${TAG} -m "${TAG}" > {code} > # We now need to do several things: > ## Create the source release archive > ## Deploy jar artefacts to the [Apache Nexus > Repository|https://repository.apache.org/], which is the staging area for > deploying the jars to Maven Central > ## Build PyFlink wheel packages > You might want to create a directory on your local machine for collecting the > various source and binary releases before uploading them. Creating the binary > releases is a lengthy process but you can do this on another machine (for > example, in the "cloud"). When doing this, you can skip signing the release > files on the remote machine, download them to your local machine and sign > them there. > # Build the source release: > {code:bash} > tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh > {code} > # Stage the maven artifacts: > {code:bash} > tools $ releasing/deploy_staging_jars.sh > {code} > Review all staged artifacts ([https://repository.apache.org/]). They should > contain all relevant parts for each module, including pom.xml, jar, test jar, > source, test source, javadoc, etc. Carefully review any new artifacts. > # Close the staging repository on Apache Nexus. When prompted for a > description, enter “Apache Flink, version X, release candidate Y”. > Then, you need to build the PyFlink wheel packages (since 1.11): > # Set up an azure pipeline in your own Azure account. You can refer to > [Azure > Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository] > for more details on how to set up azure pipeline for a fork of the Flink > repository. Note that a google cloud mirror in Europe is used for downloading > maven artifacts, therefore it is recommended to set your [Azure organization > region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location] > to Europe to speed up the downloads. > # Push the release candidate branch to your forked personal Flink > repository, e.g. > {code:bash} > tools $ git push > refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Trigger the Azure Pipelines manually to build the PyFlink wheel packages > ## Go to your Azure Pipelines Flink project → Pipelines > ## Click the "New pipeline" button on the top right > ## Select "GitHub" → your GitHub Flink repository → "Existing Azure > Pipelines YAML file" > ## Select your branch → Set path to "/azure-pipelines.yaml" → click on > "Continue" → click on "Variables" > ## Then click "New Variable" button, fill the name with "MODE", and the > value with "release". Click "OK" to set the variable and the "Save" button to > save the variables, then back on the "Review your pipeline" screen click > "Run" to trigger the build. > ## You should now see a build where only the "CI build (release)" is running > # Download the PyFlink wheel packages from the build result page after the > jobs of "build_wheels mac" and "build_wheels linux" have finished. > ## Download the PyFlink wheel packages > ### Open the build result page of the pipeline > ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact) > ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels > linux}} separately to download the zip files > ## Unzip these two zip files > {code:bash} > $ cd /path/to/downloaded_wheel_packages > $ unzip wheel_Linux_build_wheels\ linux.zip > $ unzip wheel_Darwin_build_wheels\ mac.zip{code} > ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}: > {code:bash} > $ cd > $ mkdir flink-python/dist{code} > ## Move the unzipped wheel packages to the directory of > {{{}flink-python/dist{}}}: > {code:java} > $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/ > $ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/ > $ cd tools{code} > Finally, we create the binary convenience release files: > {code:bash} > tools $ REL
[jira] [Assigned] (FLINK-34532) Stage source and binary releases on dist.apache.org
[ https://issues.apache.org/jira/browse/FLINK-34532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-34532: --- Assignee: lincoln lee > Stage source and binary releases on dist.apache.org > --- > > Key: FLINK-34532 > URL: https://issues.apache.org/jira/browse/FLINK-34532 > Project: Flink > Issue Type: Sub-task >Reporter: Lincoln Lee >Assignee: lincoln lee >Priority: Major > > Copy the source release to the dev repository of dist.apache.org: > # If you have not already, check out the Flink section of the dev repository > on dist.apache.org via Subversion. In a fresh directory: > {code:bash} > $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates > {code} > # Make a directory for the new release and copy all the artifacts (Flink > source/binary distributions, hashes, GPG signatures and the python > subdirectory) into that newly created directory: > {code:bash} > $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > $ mv /tools/releasing/release/* > flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Add and commit all the files. > {code:bash} > $ cd flink > flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM} > flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}" > {code} > # Verify that files are present under > [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink]. > # Push the release tag if not done already (the following command assumes to > be called from within the apache/flink checkout): > {code:bash} > $ git push refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > > > h3. Expectations > * Maven artifacts deployed to the staging repository of > [repository.apache.org|https://repository.apache.org/content/repositories/] > * Source distribution deployed to the dev repository of > [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/] > * Check hashes (e.g. shasum -c *.sha512) > * Check signatures (e.g. {{{}gpg --verify > flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}}) > * {{grep}} for legal headers in each file. > * If time allows check the NOTICE files of the modules whose dependencies > have been changed in this release in advance, since the license issues from > time to time pop up during voting. See [Verifying a Flink > Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release] > "Checking License" section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511584785 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ## @@ -1516,4 +1517,141 @@ private Stream arraySortTestCases() { }, DataTypes.ARRAY(DataTypes.DATE(; } + +private Stream arrayExceptTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_EXCEPT) +.onFieldsWithData( +new Integer[] {1, 2, 2}, +null, +new Row[] { +Row.of(true, LocalDate.of(2022, 4, 20)), +Row.of(true, LocalDate.of(1990, 10, 14)), +null +}, +new Integer[] {null, null, 1}, +new Integer[][] { +new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} +}, +new Map[] { +CollectionUtil.map(entry(1, "a"), entry(2, "b")), +CollectionUtil.map(entry(3, "c"), entry(4, "d")), +null +}) +.andDataTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY( +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), +DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), +DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING( +// ARRAY +.testResult( +$("f0").arrayExcept(new Integer[] {1, null, 4}), +"ARRAY_EXCEPT(f0, ARRAY[1, NULL, 4])", +new Integer[] {2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayExcept(new Integer[] {1}), +"ARRAY_EXCEPT(f0, ARRAY[1])", +new Integer[] {2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayExcept(new Integer[] {42}), +"ARRAY_EXCEPT(f0, ARRAY[42])", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayTwo is NULL +.testResult( +$("f0").arrayExcept( +lit(null, DataTypes.ARRAY(DataTypes.INT())) + .cast(DataTypes.ARRAY(DataTypes.INT(, +"ARRAY_EXCEPT(f0, CAST(NULL AS ARRAY))", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayTwo contains null elements +.testResult( +$("f0").arrayExcept(new Integer[] {null, 2}), +"ARRAY_EXCEPT(f0, ARRAY[null, 2])", +new Integer[] {1, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayOne is NULL +.testResult( +$("f1").arrayExcept(new Integer[] {1, 2, 3}), +"ARRAY_EXCEPT(f1, ARRAY[1,2,3])", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// arrayOne contains null elements +.testResult( +$("f3").arrayExcept(new Integer[] {null, 42}), +"ARRAY_EXCEPT(f3, ARRAY[null, 42])", +new Integer[] {null, 1}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY> +.testResult( +$("f2").arrayExcept( +new Row[] { +Row.of
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511610616 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +/** This class is used for scalar function. */ +public class ObjectContainer { Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511610976 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/EqualityAndHashcodeProvider.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; + +/** This class is used for scalar function. */ Review Comment: Ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1511611341 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +/** This class is used for scalar function. */ Review Comment: Ok, I will add it. ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.util; + +/** This class is used for scalar function. */ +public class ObjectContainer { Review Comment: Ok, I will add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org