[jira] [Updated] (FLINK-34648) Avoid RPC time when apply SchemaChangeEvent to external system

2024-03-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34648:
---
Labels: pull-request-available  (was: )

> Avoid RPC time when apply SchemaChangeEvent to external system
> --
>
> Key: FLINK-34648
> URL: https://issues.apache.org/jira/browse/FLINK-34648
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: LvYanquan
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: 截屏2024-03-12 14.55.09.png
>
>
> When SchemaOperator receive SchemaChangeEvent, it will send request to 
> SchemaRegistry and wait for applying this SchemaChangeEvent to external 
> system synchronously.
> However, if this process take too long time, it will cause RPC 
> TimeoutException, and During the process of task recovery, there may be other 
> errors like `AddColumnEvent is already existed`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34648) Avoid RPC time when apply SchemaChangeEvent to external system

2024-03-12 Thread LvYanquan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LvYanquan updated FLINK-34648:
--
Attachment: 截屏2024-03-12 14.55.09.png

> Avoid RPC time when apply SchemaChangeEvent to external system
> --
>
> Key: FLINK-34648
> URL: https://issues.apache.org/jira/browse/FLINK-34648
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.1.0
>
> Attachments: 截屏2024-03-12 14.55.09.png
>
>
> When SchemaOperator receive SchemaChangeEvent, it will send request to 
> SchemaRegistry and wait for applying this SchemaChangeEvent to external 
> system synchronously.
> However, if this process take too long time, it will cause RPC 
> TimeoutException, and During the process of task recovery, there may be other 
> errors like `AddColumnEvent is already existed`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34648) Avoid RPC time when apply SchemaChangeEvent to external system

2024-03-12 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825538#comment-17825538
 ] 

LvYanquan edited comment on FLINK-34648 at 3/12/24 7:07 AM:


Enumerator will not restart but add splits back. See the picture in the 
attachment.


was (Author: JIRAUSER304414):
Enumerator will not restart but add splits back.

> Avoid RPC time when apply SchemaChangeEvent to external system
> --
>
> Key: FLINK-34648
> URL: https://issues.apache.org/jira/browse/FLINK-34648
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: LvYanquan
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: 截屏2024-03-12 14.55.09.png
>
>
> When SchemaOperator receive SchemaChangeEvent, it will send request to 
> SchemaRegistry and wait for applying this SchemaChangeEvent to external 
> system synchronously.
> However, if this process take too long time, it will cause RPC 
> TimeoutException, and During the process of task recovery, there may be other 
> errors like `AddColumnEvent is already existed`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34533][release] Add release note for version 1.19 [flink]

2024-03-12 Thread via GitHub


lincoln-lil commented on PR #24394:
URL: https://github.com/apache/flink/pull/24394#issuecomment-1990932477

   @snuyanzin Thanks for reviewing this! I've fixed these incorrect spaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520965160


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionAggregateOperator.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link PartitionAggregateOperator} is used to apply the aggregate 
transformation on all
+ * records of each partition. Each partition contains all records of a subtask.
+ */
+@Internal
+public class PartitionAggregateOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final AggregateFunction aggregateFunction;
+
+private ACC currentAccumulator = null;
+
+public PartitionAggregateOperator(AggregateFunction 
aggregateFunction) {
+super(aggregateFunction);
+this.aggregateFunction = aggregateFunction;
+}
+
+@Override
+public void open() throws Exception {
+super.open();
+this.currentAccumulator = 
checkNotNull(aggregateFunction.createAccumulator());
+}
+
+@Override
+public void processElement(StreamRecord element) throws Exception {
+aggregateFunction.add(element.getValue(), currentAccumulator);
+}
+
+@Override
+public void endInput() throws Exception {
+TimestampedCollector outputCollector = new 
TimestampedCollector<>(output);

Review Comment:
   This change is indeed unnecessary. Currently, the data will first be 
converted into `StreamRecord` and then directly emitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520966653


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/PartitionAggregateOperatorTest.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Unit test for {@link PartitionAggregateOperator}. */
+class PartitionAggregateOperatorTest {
+
+/** The test environment. */
+private Environment environment;
+
+/** The test stream task. */
+private StreamTask containingTask;
+
+/** The test stream config. */
+private StreamConfig config;
+
+@BeforeEach
+void before() throws Exception {
+environment = MockEnvironment.builder().build();
+containingTask =
+new StreamTask>(environment) {
+@Override
+protected void init() {}
+};
+config = new MockStreamConfig(new Configuration(), 1);
+}
+
+@Test
+void testOpen() {
+PartitionAggregateOperator 
partitionAggregateOperator =
+createPartitionAggregateOperator();
+MockOutput output = new MockOutput<>(new ArrayList<>());
+partitionAggregateOperator.setup(containingTask, config, output);
+assertDoesNotThrow(partitionAggregateOperator::open);
+}
+
+@Test
+void testProcessElement() throws Exception {

Review Comment:
   Good point. Currently all `xxxOperatorTests` has been migrated . 
`OneInputStreamOperatorTestHarness` could indeed provide useful utilities.



##
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java:
##
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedPartitionWindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.C

[jira] [Created] (FLINK-34653) Support table merging with route in Flink CDC

2024-03-12 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-34653:
-

 Summary: Support table merging with route in Flink CDC
 Key: FLINK-34653
 URL: https://issues.apache.org/jira/browse/FLINK-34653
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren
 Fix For: 3.1.0


Currently route in Flink CDC only supports very simple table id replacing. It 
should support more complex table merging strategies. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34653) Support table merging with route in Flink CDC

2024-03-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34653:
---
Labels: pull-request-available  (was: )

> Support table merging with route in Flink CDC
> -
>
> Key: FLINK-34653
> URL: https://issues.apache.org/jira/browse/FLINK-34653
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>
> Currently route in Flink CDC only supports very simple table id replacing. It 
> should support more complex table merging strategies. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520970150


##
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java:
##
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedPartitionWindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Integration tests for {@link KeyedPartitionWindowedStream}. */
+public class KeyedPartitionWindowedStreamITCase {
+
+private static final int EVENT_NUMBER = 100;
+
+private static final String TEST_EVENT = "Test";
+
+@Test
+public void testMapPartition() throws Exception {

Review Comment:
   I apologize for this accidental mistake😢. I have double-checked all the 
newly added test code and ensured that they are tested based on `JUnit5`, and 
assertions are handled using `org.assertj.core.api.Assertions`.



##
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java:
##
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedPartitionWindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;

Review Comment:
   I apologize for this accidental mistake😢. I have double-checked all the 
newly added test code and ensured that they are tested based on `JUnit5`, and 
assertions are handled using `org.assertj.core.api.Assertions`.



-- 
This is an

Re: [PR] [FLINK-34533][release] Add release note for version 1.19 [flink]

2024-03-12 Thread via GitHub


snuyanzin commented on PR #24394:
URL: https://github.com/apache/flink/pull/24394#issuecomment-1990943352

   @lincoln-lil thanks
a question: since  tests are now executed against jdk 8, 11, 17, 21
   should we add a note about jdk21 Beta support ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520972048


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MapPartitionIteratorTest.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Unit test for {@link MapPartitionIterator}. */
+class MapPartitionIteratorTest {

Review Comment:
   I have added a number of tests for corner cases. Currently, the 
`MapPartitionIteratorTest` has tested many scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520972873


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * The {@link MapPartitionOperator} is used to process all records in each 
partition on non-keyed
+ * stream. Each partition contains all records of a subtask.
+ */
+@Internal
+public class MapPartitionOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final MapPartitionFunction function;
+
+private MapPartitionIterator iterator;
+
+public MapPartitionOperator(MapPartitionFunction function) {
+super(function);
+this.function = function;
+// This operator is set to be non-chained as it doesn't use task main 
thread to write
+// records to output, which may introduce risks to downstream chained 
operators.
+this.chainingStrategy = ChainingStrategy.NEVER;
+}
+
+@Override
+public void open() throws Exception {
+super.open();
+this.iterator = new MapPartitionIterator<>();

Review Comment:
   This approach is also acceptable. I have made the changes according to what 
you suggested.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * The {@link MapPartitionOperator} is used to process all records in each 
partition on non-keyed
+ * stream. Each partition contains all records of a subtask.
+ */
+@Internal
+public class MapPartitionOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final MapPartitionFunction function;
+
+private MapPartitionIterator iterator;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520973260


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * The {@link MapPartitionOperator} is used to process all records in each 
partition on non-keyed
+ * stream. Each partition contains all records of a subtask.
+ */
+@Internal
+public class MapPartitionOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final MapPartitionFunction function;
+
+private MapPartitionIterator iterator;
+
+public MapPartitionOperator(MapPartitionFunction function) {
+super(function);
+this.function = function;
+// This operator is set to be non-chained as it doesn't use task main 
thread to write
+// records to output, which may introduce risks to downstream chained 
operators.
+this.chainingStrategy = ChainingStrategy.NEVER;
+}
+
+@Override
+public void open() throws Exception {
+super.open();
+this.iterator = new MapPartitionIterator<>();
+this.iterator.registerUDF(
+iterator -> {
+TimestampedCollector outputCollector = new 
TimestampedCollector<>(output);
+try {
+function.mapPartition(() -> iterator, outputCollector);
+} catch (Exception e) {
+ExceptionUtils.rethrow(e);
+}
+});
+}
+
+@Override
+public void processElement(StreamRecord element) throws Exception {
+iterator.addRecord(element.getValue());
+}
+
+@Override
+public void endInput() throws Exception {
+iterator.close();
+output.close();

Review Comment:
   This change is indeed unnecessary. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520975025


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionIterator.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link MapPartitionIterator} is an iterator used in the {@link 
MapPartitionOperator}.The task
+ * main thread will add records to it. It will set itself as the input 
parameter of {@link
+ * MapPartitionFunction} and execute the function.
+ */
+@Internal
+public class MapPartitionIterator implements Iterator {
+
+/**
+ * Max number of caches.
+ *
+ * The constant defines the maximum number of caches that can be 
created. Its value is set to
+ * 100, which is considered sufficient for most parallel jobs. Each cache 
is a record and
+ * occupies a minimal amount of memory so the value is not excessively 
large.
+ */
+private static final int DEFAULT_MAX_CACHE_NUM = 100;
+
+/** The lock to ensure consistency between task main thread and udf 
executor. */
+private final Lock lock = new ReentrantLock();
+
+/** The queue to store record caches. */
+@GuardedBy("lock")
+private final Queue cacheQueue = new LinkedList<>();
+
+/** The condition to indicate the cache queue is not empty. */
+private final Condition cacheNotEmpty = lock.newCondition();
+
+/** The condition to indicate the cache queue is not full. */
+private final Condition cacheNotFull = lock.newCondition();
+
+/** The condition to indicate the udf is finished. */
+private final Condition udfFinish = lock.newCondition();
+
+/** The task udf executor. */
+private final Executor udfExecutor =

Review Comment:
   I overlooked this point. I have now ensured that the executor can be shut 
down correctly. 😢



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520976445


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The {@link FixedLengthByteKeyAndValueComparator} is used by {@link 
KeyedSortPartitionOperator} to
+ * compare records according to both the record key and record value. The 
length of record key must
+ * be fixed and will be initialized when the {@link 
FixedLengthByteKeyAndValueComparator} is
+ * created.
+ */
+@Internal
+public class FixedLengthByteKeyAndValueComparator
+extends TypeComparator> {
+
+private final int serializedKeyLength;
+
+private final TypeComparator valueComparator;
+
+private byte[] keyReference;
+
+private INPUT valueReference;
+
+FixedLengthByteKeyAndValueComparator(
+int serializedKeyLength, TypeComparator valueComparator) {
+this.serializedKeyLength = serializedKeyLength;
+this.valueComparator = valueComparator;
+}
+
+@Override
+public int hash(Tuple2 record) {
+return record.hashCode();
+}
+
+@Override
+public void setReference(Tuple2 toCompare) {
+this.keyReference = toCompare.f0;
+this.valueReference = toCompare.f1;
+}
+
+@Override
+public boolean equalToReference(Tuple2 candidate) {
+return Arrays.equals(keyReference, candidate.f0) && valueReference == 
candidate.f1;

Review Comment:
   The method `equalToReference` is used to compare whether two objects are of 
the same content. Therefore, `equals()` should be used. The correction has been 
made.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The {@link FixedLengthByteKeyAndValueComparator} is used by {@link 
KeyedSortPartitionOperator} to
+ * compare records according to both the record key and record value. The 
length of record key must
+ * be fixed and will be initialized when the {@link 
FixedLengthByteKeyAndValueComparator} is
+ * created.
+ */
+@Internal
+public class FixedLengthByteKeyAndValueComparator
+extends TypeComparator> {
+
+private final int serializedKeyLength;
+
+private final TypeComparator valueComparator;
+
+private byte[] keyReference;
+
+private I

Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

2024-03-12 Thread via GitHub


anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1520977487


##
flink-formats/flink-protobuf-confluent-registry/pom.xml:
##
@@ -0,0 +1,171 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-formats
+   org.apache.flink
+   1.20-SNAPSHOT
+   
+   4.0.0
+
+   flink-protobuf-confluent-registry
+
+   Flink : Formats : Profobuf confluent registry
+
+   
+   7.5.3
+   7.5.0-22

Review Comment:
   thanks for catching, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]

2024-03-12 Thread via GitHub


ljz2051 commented on PR #24402:
URL: https://github.com/apache/flink/pull/24402#issuecomment-1990952035

   The pr has been rebased on master branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825551#comment-17825551
 ] 

Matthias Pohl commented on FLINK-34227:
---

https://github.com/apache/flink/actions/runs/8242516176/job/22541877232#step:10:12172

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825553#comment-17825553
 ] 

Matthias Pohl commented on FLINK-29114:
---

* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58229&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=11858
* 
https://github.com/apache/flink/actions/runs/8242516657/job/22541862142#step:10:11400
* 
https://github.com/apache/flink/actions/runs/8242516657/job/22541858062#step:10:11543

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> 

[jira] [Commented] (FLINK-34643) JobIDLoggingITCase failed

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825552#comment-17825552
 ] 

Matthias Pohl commented on FLINK-34643:
---

* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58232&view=logs&j=8fd9202e-fd17-5b26-353c-ac1ff76c8f28&t=ea7cf968-e585-52cb-e0fc-f48de023a7ca&l=8242
* 
https://github.com/apache/flink/actions/runs/8228497559/job/22498277501#step:10:11643
* 
https://github.com/apache/flink/actions/runs/8230355449/job/22504508857#step:10:11393
* 
https://github.com/apache/flink/actions/runs/8239288679/job/22532540457#step:10:11461
* 
https://github.com/apache/flink/actions/runs/8239791756/job/22534164458#step:10:11389
* 
https://github.com/apache/flink/actions/runs/8242516657/job/22541862449#step:10:8735
* 
https://github.com/apache/flink/actions/runs/8242516657/job/22541846624#step:10:8301

> JobIDLoggingITCase failed
> -
>
> Key: FLINK-34643
> URL: https://issues.apache.org/jira/browse/FLINK-34643
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187&view=logs&j=8fd9202e-fd17-5b26-353c-ac1ff76c8f28&t=ea7cf968-e585-52cb-e0fc-f48de023a7ca&l=7897
> {code}
> Mar 09 01:24:23 01:24:23.498 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 4.209 s <<< FAILURE! -- in 
> org.apache.flink.test.misc.JobIDLoggingITCase
> Mar 09 01:24:23 01:24:23.498 [ERROR] 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(ClusterClient) 
> -- Time elapsed: 1.459 s <<< ERROR!
> Mar 09 01:24:23 java.lang.IllegalStateException: Too few log events recorded 
> for org.apache.flink.runtime.jobmaster.JobMaster (12) - this must be a bug in 
> the test code
> Mar 09 01:24:23   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:148)
> Mar 09 01:24:23   at 
> org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:132)
> Mar 09 01:24:23   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 09 01:24:23   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 09 01:24:23   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 09 01:24:23 
> {code}
> The other test failures of this build were also caused by the same test:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187&view=logs&j=2c3cbe13-dee0-5837-cf47-3053da9a8a78&t=b78d9d30-509a-5cea-1fef-db7abaa325ae&l=8349
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58187&view=logs&j=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3&t=712ade8c-ca16-5b76-3acd-14df33bc1cb1&l=8209



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1520985642


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The {@link FixedLengthByteKeyAndValueComparator} is used by {@link 
KeyedSortPartitionOperator} to
+ * compare records according to both the record key and record value. The 
length of record key must
+ * be fixed and will be initialized when the {@link 
FixedLengthByteKeyAndValueComparator} is
+ * created.
+ */
+@Internal
+public class FixedLengthByteKeyAndValueComparator
+extends TypeComparator> {
+
+private final int serializedKeyLength;
+
+private final TypeComparator valueComparator;
+
+private byte[] keyReference;
+
+private INPUT valueReference;
+
+FixedLengthByteKeyAndValueComparator(
+int serializedKeyLength, TypeComparator valueComparator) {
+this.serializedKeyLength = serializedKeyLength;
+this.valueComparator = valueComparator;
+}
+
+@Override
+public int hash(Tuple2 record) {
+return record.hashCode();
+}
+
+@Override
+public void setReference(Tuple2 toCompare) {
+this.keyReference = toCompare.f0;
+this.valueReference = toCompare.f1;
+}
+
+@Override
+public boolean equalToReference(Tuple2 candidate) {
+return Arrays.equals(keyReference, candidate.f0) && valueReference == 
candidate.f1;
+}
+
+@Override
+public int compareToReference(TypeComparator> 
referencedComparator) {
+byte[] otherKey =
+((FixedLengthByteKeyAndValueComparator) 
referencedComparator).keyReference;
+INPUT otherValue =
+((FixedLengthByteKeyAndValueComparator) 
referencedComparator).valueReference;
+int keyCmp = compare(otherKey, this.keyReference);
+if (keyCmp != 0) {
+return keyCmp;
+}
+return valueComparator.compare(this.valueReference, otherValue);
+}
+
+@Override
+public int compare(Tuple2 first, Tuple2 
second) {
+int keyCmp = compare(first.f0, second.f0);
+if (keyCmp != 0) {
+return keyCmp;
+}
+return valueComparator.compare(first.f1, second.f1);
+}
+
+private int compare(byte[] first, byte[] second) {
+for (int i = 0; i < serializedKeyLength; i++) {
+int cmp = Byte.compare(first[i], second[i]);
+if (cmp != 0) {
+return cmp < 0 ? -1 : 1;
+}
+}
+return 0;
+}
+
+@Override
+public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource)
+throws IOException {
+int minCount = serializedKeyLength;
+while (minCount-- > 0) {
+byte firstValue = firstSource.readByte();
+byte secondValue = secondSource.readByte();
+int cmp = Byte.compare(firstValue, secondValue);
+if (cmp != 0) {
+return cmp < 0 ? -1 : 1;
+}
+}
+return valueComparator.compareSerialized(firstSource, secondSource);
+}
+
+@Override
+public boolean supportsNormalizedKey() {

Review Comment:
   `NormalizedKey` is used to map data to a standardized representation that 
can be compared, which is a byte array. `NormalizedKey` is used by 
`NormalizedKeySorter` for sorting in memory. In the implementation of the 
`SortPartition` API, we use `ExternalSorter` for external sorting. BTW, it's 
hard for each piece of sorted data to generate a `NormalizedKey` for comparison 
since it contains both key and value with various

Re: [PR] [FLINK-34533][release] Add release note for version 1.19 [flink]

2024-03-12 Thread via GitHub


lincoln-lil commented on PR #24394:
URL: https://github.com/apache/flink/pull/24394#issuecomment-1990984725

   @snuyanzin This should be added, I'll add 
https://issues.apache.org/jira/browse/FLINK-33163.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add announcement blog post for Flink 1.19 [flink-web]

2024-03-12 Thread via GitHub


rmetzger commented on code in PR #721:
URL: https://github.com/apache/flink-web/pull/721#discussion_r1521014951


##
docs/content/posts/2024-03-xx-release-1.19.0.md:
##
@@ -0,0 +1,470 @@
+---
+authors:
+- LincolnLee:
+  name: "Lincoln Lee"
+  twitter: lincoln_86xy
+
+date: "2024-03-xxT22:00:00Z"
+subtitle: ""
+title: Announcing the Release of Apache Flink 1.19
+aliases:
+- /news/2024/03/xx/release-1.19.0.html
+---
+
+The Apache Flink PMC is pleased to announce the release of Apache Flink 
1.19.0. As usual, we are
+looking at a packed release with a wide variety of improvements and new 
features. Overall, 162
+people contributed to this release completing 33 FLIPs and 600+ issues. Thank 
you!
+
+Let's dive into the highlights.
+
+# Flink SQL Improvements
+
+## Custom Parallelism for Table/SQL Sources
+
+Now in Flink 1.19, you can set a custom parallelism for performance tuning via 
the `scan.parallelism`
+option. The first available connector is DataGen (Kafka connector is on the 
way). Here is an example
+using SQL Client:
+
+```sql
+-- set parallelism within the ddl
+CREATE TABLE Orders (
+order_number BIGINT,
+priceDECIMAL(32,2),
+buyerROW,
+order_time   TIMESTAMP(3)
+) WITH (
+'connector' = 'datagen',
+'scan.parallelism' = '4'
+);
+
+-- or set parallelism via dynamic table option
+SELECT * FROM Orders /*+ OPTIONS('scan.parallelism'='4') */;
+```
+
+**More Information**
+* 
[Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/#scan-table-source)
+* [FLIP-367: Support Setting Parallelism for Table/SQL 
Sources](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150)
+
+
+## Configurable SQL Gateway Java Options
+
+A new option `env.java.opts.sql-gateway` for specifying the Java options is 
introduced in Flink 1.19,
+so you can fine-tune the memory settings, garbage collection behavior, and 
other relevant Java
+parameters for SQL Gateway.
+
+**More Information**
+* [FLINK-33203](https://issues.apache.org/jira/browse/FLINK-33203)
+
+
+## Configure Different State TTLs using SQL Hint
+
+Starting from Flink 1.18, Table API and SQL users can set state time-to-live 
(TTL) individually for
+stateful operators via the SQL compiled plan. In Flink 1.19, users have a more 
flexible way to
+specify custom TTL values for regular joins and group aggregations directly 
within their queries by [utilizing the STATE_TTL 
hint](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/hints/#state-ttl-hints).
+This improvement means that you no longer need to alter your compiled plan to 
set specific TTLs for
+these frequently used operators. With the introduction of `STATE_TTL` hints, 
you can streamline your workflow and
+dynamically adjust the TTL based on your operational requirements.
+
+Here is an example:
+```sql
+-- set state ttl for join
+SELECT /*+ STATE_TTL('Orders'= '1d', 'Customers' = '20d') */ *
+FROM Orders LEFT OUTER JOIN Customers
+ON Orders.o_custkey = Customers.c_custkey;
+
+-- set state ttl for aggregation
+SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
+FROM Orders AS o
+GROUP BY o_orderkey;
+```
+
+**More Information**
+* 
[Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/hints/#state-ttl-hints)
+* [FLIP-373: Support Configuring Different State TTLs using SQL 
Hint](https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint)
+
+
+## Named Parameters for Functions and Procedures
+
+Named parameters now can be used when calling a function or stored procedure. 
With named parameters,
+users do not need to strictly specify the parameter position, just specify the 
parameter name and its
+corresponding value. At the same time, if non-essential parameters are not 
specified, they will default to being filled with null.
+
+Here's an example of defining a function with one mandatory parameter and two 
optional parameters using named parameters:
+```java
+public static class NamedArgumentsTableFunction extends TableFunction {
+
+   @FunctionHint(
+   output = @DataTypeHint("STRING"),
+   arguments = {
+   @ArgumentHint(name = "in1", isOptional 
= false, type = @DataTypeHint("STRING")),
+   @ArgumentHint(name = "in2", isOptional 
= true, type = @DataTypeHint("STRING")),
+   @ArgumentHint(name = "in3", isOptional 
= true, type = @DataTypeHint("STRING"))})
+   public void eval(String arg1, String arg2, String arg3) {
+   collect(arg1 + ", " + arg2 + "," + arg3);
+   }
+
+}
+```
+When calling the function in SQL, parameters can be specified by name, for 
example:
+```sql
+SELECT * FROM TABLE(myFunction(in1 => 'v1', in3 => 'v3', in2 => 'v2'))
+```
+Al

[jira] [Updated] (FLINK-34543) Support Full Partition Processing On Non-keyed DataStream

2024-03-12 Thread Wencong Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wencong Liu updated FLINK-34543:

Description: 
Introduce the PartitionWindowedStream and provide multiple full window 
operations in it.

The related motivation and design can be found in 
[FLIP-380|https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream].

  was:
1. Introduce MapParititon, SortPartition, Aggregate, Reduce API in DataStream.
2. Introduce SortPartition API in KeyedStream.

The related motivation and design can be found in 
[FLIP-380|https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream].


> Support Full Partition Processing On Non-keyed DataStream
> -
>
> Key: FLINK-34543
> URL: https://issues.apache.org/jira/browse/FLINK-34543
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.20.0
>Reporter: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce the PartitionWindowedStream and provide multiple full window 
> operations in it.
> The related motivation and design can be found in 
> [FLIP-380|https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507274806


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef1 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef1,
+fileToCommitRef1,
+getStagingDir(shareStagingDir),
+Row.of("a1", 1, "x1"),
+Row.of("a2", 2, "x2"));
+
+// sink2
+AtomicReference> sinkRef2 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef2 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef2,
+fileToCommitRef2,
+getStagingDir(shareStagingDir),
+Row.of("b1", 1, "y1"),
+Row.of("b2", 2, "y2"));
+
+assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+}
+
+private void writeRowsToSink(
+AtomicReference> sinkRef,
+AtomicReference> contentRef,
+Path stagingDir,
+Row... records)
+throws Exception {
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+writeUnorderedRecords(testHarness, Arrays.asList(records));
+
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath(;
+}
+}
+
+private Path getStagingDir(boolean shareStagingDir) {
+String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+}
+
+private void assertSinkBehavior(
+AtomicReference> sinkRef1,
+AtomicReference> fileToCommitRef1,
+AtomicReference> sinkRef2,
+AtomicReference> fileToCommitRef2,
+boolean shareStagingDir)
+throws Exception {
+Map fileToCommit1 = fileToCommitRef1.get();
+Map fileToCommit2 = fileToCommitRef2.get();
+assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+if (shareStagingDir) {
+assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+} else {
+assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+}
+sinkRef1.get().finalizeGlobal(finalizationContext);
+sinkRef2.get().finalizeGlobal(finalizationContext);
+Map committedFiles = getFileContentByPath(outputPath);
+if (shareStagingDir) {

Review Comment:
   Your understanding is correct. It's just that this test is actually testing 
`FileSystemOutputFormat` for a scenario where some other class (in our case 
`FileSystemTableSink`) has a bug.
   
   This test is fine if you think that `FileSystemOutputFormat` should be able 
to handle this case. But I'm wondering whether it would be more appropriate to 
fail in such a case where multiple instances are accessing the same folder (but 
there's the question how easily a `FileSystemOutputFormat` instance can detect 
something like that. 
   
   Anyway, resolving this test scenario easily indicates that the scope of the 
different classes is not well-defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2024-03-12 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825567#comment-17825567
 ] 

Ahmed Hamdy commented on FLINK-31472:
-

[~lincoln.86xy] [~mapohl] 
Hello, could you please review the 
[PR|https://github.com/apache/flink/pull/24481].
Let me add some context

h2.Why is the test failing?

So the flakiness arises from setting processing time within the test to trigger 
the timer flush of the writer, This caused the concurrent thread access of the 
mailbox which caused the failure, The problem was within the test not the 
AsyncWriter. 

h2.Why is it intermittent?

This is because we are also writing batches of records so there was a race 
condition between both batch size trigger and timer trigger, in other words we 
used to add a new batch and a set the time to trigger the flush, had the batch 
trigger flushed the buffer the timer callback would be discarded safely.
h2.Why do I believe this refactor should fix the test?
Because I have removed the time setting from the test it self, The size of 
batches sent should be enough to trigger the flush which is needed for the test.

h2.What could go wrong?

There should be no newly introduced issues here since the batch size is 
unchanged we expect enough flushes triggered by batch size only to stabilize 
the rate limiting value as expected.

h2.How did I verify the fix?

I have run a sampler till failure for a some time and haven't reported any. I 
am aware local setup is different than CI but the test should be less sensitive 
to delays now so I expect we are green to go.

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> 
>
> Key: FLINK-31472
> URL: https://issues.apache.org/jira/browse/FLINK-31472
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.16.1, 1.18.0, 1.19.0, 1.20.0
>Reporter: Ran Tao
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4

Re: [PR] [FLINK-25544][streaming][JUnit5 Migration] The runtime package of module flink-stream-java [flink]

2024-03-12 Thread via GitHub


JingGe commented on PR #24483:
URL: https://github.com/apache/flink/pull/24483#issuecomment-1991049712

   @Jiabao-Sun thanks driving this. Since this PR is huge and contains 117 
changed files. Would you like to summarise all changes in the "Brief change 
log" section to help reviewers do their job? Something like: 1. change the 
package name to JUnit 5. 2. change the methods' visibility to package private. 
etc. WDTY?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521036729


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -83,157 +110,193 @@ void after() {
 @Test
 void testClosingWithoutInput() throws Exception {
 try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+createTestHarness(createSinkFormat(false, false, false, new 
LinkedHashMap<>( {
 testHarness.setup();
 testHarness.open();
 }
 }
 
 @Test
 void testNonPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-}
-
-private void writeUnorderedRecords(OneInputStreamOperatorTestHarness testHarness)
-throws Exception {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+checkWriteAndCommit(
+false,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testOverrideNonPartition() throws Exception {
 testNonPartition();
-
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+true,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testStaticPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
 LinkedHashMap staticParts = new LinkedHashMap<>();
 staticParts.put("c", "p1");
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, true, false, staticParts, ref)) {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
 
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+false,
+true,
+false,
+staticParts,
+() ->
+Arrays.asList(
+new StreamRecord<>(Row.of("a1", 1), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a3", 3), 1L)),
+() ->
+Collections.singletonMap(
+"c=p1", createFileContent("a1,1", "a2,2"

[jira] [Created] (FLINK-34654) Add "Special Thanks" Page on the Flink Website

2024-03-12 Thread Jark Wu (Jira)
Jark Wu created FLINK-34654:
---

 Summary: Add "Special Thanks" Page on the Flink Website
 Key: FLINK-34654
 URL: https://issues.apache.org/jira/browse/FLINK-34654
 Project: Flink
  Issue Type: New Feature
  Components: Project Website
Reporter: Jark Wu
Assignee: Jark Wu


This issue aims to add a "Special Thanks" page on the Flink website 
(https://flink.apache.org/) to honor and appreciate the companies and 
organizations that have sponsored machines or services for our project.

Discussion thread: 
https://lists.apache.org/thread/y5g0nd5t8h2ql4gq7d0kb9tkwv1wkm1j



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


reswqa commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1521091212


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The {@link FixedLengthByteKeyAndValueComparator} is used by {@link 
KeyedSortPartitionOperator} to
+ * compare records according to both the record key and record value. The 
length of record key must
+ * be fixed and will be initialized when the {@link 
FixedLengthByteKeyAndValueComparator} is
+ * created.
+ */
+@Internal
+public class FixedLengthByteKeyAndValueComparator
+extends TypeComparator> {
+
+private final int serializedKeyLength;
+
+private final TypeComparator valueComparator;
+
+private byte[] keyReference;
+
+private INPUT valueReference;
+
+FixedLengthByteKeyAndValueComparator(
+int serializedKeyLength, TypeComparator valueComparator) {
+this.serializedKeyLength = serializedKeyLength;
+this.valueComparator = valueComparator;
+}
+
+@Override
+public int hash(Tuple2 record) {
+return record.hashCode();
+}
+
+@Override
+public void setReference(Tuple2 toCompare) {
+this.keyReference = toCompare.f0;
+this.valueReference = toCompare.f1;
+}
+
+@Override
+public boolean equalToReference(Tuple2 candidate) {
+return Arrays.equals(keyReference, candidate.f0) && valueReference == 
candidate.f1;
+}
+
+@Override
+public int compareToReference(TypeComparator> 
referencedComparator) {
+byte[] otherKey =
+((FixedLengthByteKeyAndValueComparator) 
referencedComparator).keyReference;
+INPUT otherValue =
+((FixedLengthByteKeyAndValueComparator) 
referencedComparator).valueReference;
+int keyCmp = compare(otherKey, this.keyReference);
+if (keyCmp != 0) {
+return keyCmp;
+}
+return valueComparator.compare(this.valueReference, otherValue);
+}
+
+@Override
+public int compare(Tuple2 first, Tuple2 
second) {
+int keyCmp = compare(first.f0, second.f0);
+if (keyCmp != 0) {
+return keyCmp;
+}
+return valueComparator.compare(first.f1, second.f1);
+}
+
+private int compare(byte[] first, byte[] second) {
+for (int i = 0; i < serializedKeyLength; i++) {
+int cmp = Byte.compare(first[i], second[i]);
+if (cmp != 0) {
+return cmp < 0 ? -1 : 1;
+}
+}
+return 0;
+}
+
+@Override
+public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource)
+throws IOException {
+int minCount = serializedKeyLength;
+while (minCount-- > 0) {
+byte firstValue = firstSource.readByte();
+byte secondValue = secondSource.readByte();
+int cmp = Byte.compare(firstValue, secondValue);
+if (cmp != 0) {
+return cmp < 0 ? -1 : 1;
+}
+}
+return valueComparator.compareSerialized(firstSource, secondSource);
+}
+
+@Override
+public boolean supportsNormalizedKey() {

Review Comment:
   Thanks for the explanation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34528) Disconnect TM in JM when TM was killed to further reduce the job restart time

2024-03-12 Thread junzhong qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

junzhong qin closed FLINK-34528.

Resolution: Won't Do

> Disconnect TM in JM when TM was killed to further reduce the job restart time
> -
>
> Key: FLINK-34528
> URL: https://issues.apache.org/jira/browse/FLINK-34528
> Project: Flink
>  Issue Type: Sub-task
>Reporter: junzhong qin
>Assignee: junzhong qin
>Priority: Not a Priority
> Attachments: image-2024-02-27-16-35-04-464.png
>
>
> In https://issues.apache.org/jira/browse/FLINK-34526 we disconnect the killed 
> TM in RM. But in the following scenario, we can further reduce the restart 
> time.
> h3. Phenomenon
> In the test case, the pipeline looks like:
> !image-2024-02-27-16-35-04-464.png!
> The Source: Custom Source generates strings, and the job keyBy the strings to 
> Sink: Unnamed.
>  # parallelism = 100
>  # taskmanager.numberOfTaskSlots = 2
>  # disable checkpoint
> The worker was killed at 
> {code:java}
> 2024-02-27 16:41:49,982 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink: 
> Unnamed (6/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
> switched from RUNNING to FAILED on 
> container_e2472_1705993319725_62292_01_46 @ xxx 
> (dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>  Connection unexpectedly closed by remote task manager 
> 'xxx/10.169.18.138:35983 [ 
> container_e2472_1705993319725_62292_01_10(xxx:5454) ] '. This might 
> indicate that the remote task manager was lost.at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
> {code:java}
> // The task was scheduled to a task manager that had already been killed
> 2024-02-27 16:41:53,506 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source (16/100) (attempt #3) with attempt id 
> 2f1c7b22098a273f

Re: [PR] feat: autoscaling decision parallelism improvement [FLINK-34563] [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


Yang-LI-CS commented on PR #787:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/787#issuecomment-1991128610

   > I have some concerns about this change:
   > 
   > 1. It doesn't work with custom slot sharing configuration which is very 
common
   > 2. It provides almost no benefit with large taskmanager sizes / low number 
of task slots.
   > 3. It goes against some basic design philosophy in the autoscaler such 
that we do not scale vertices beyond their target capacity. It ties to @mxm 's 
question why the logic wouldn't apply to all vertices?
   > 
   > Taking that one step further why don't we scale all vertices to the same 
parallelism at that point? That would naturally cause more resource usage and 
less throughput. By the same logic I don't think we should scale even the 
largest ones further.
   
   @gyfora thanks for the comment! I see your point, indeed it's kind of 
against the basic design of autoscaler algorithm, I'll close this pr then 👍 , 
thanks @mxm also for your comments, looking forward to the next flink 
autoscaling release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: autoscaling decision parallelism improvement [FLINK-34563] [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


Yang-LI-CS closed pull request #787: feat: autoscaling decision parallelism 
improvement [FLINK-34563]
URL: https://github.com/apache/flink-kubernetes-operator/pull/787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-03-12 Thread via GitHub


fredia commented on code in PR #24418:
URL: https://github.com/apache/flink/pull/24418#discussion_r1521110459


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Blocking {@link PhysicalFilePool} which may block when polling physical 
files. This class try
+ * best to reuse a physical file until its size > maxFileSize.
+ */
+public class BlockingPhysicalFilePool extends PhysicalFilePool {
+
+private final AtomicBoolean exclusivePhysicalFilePoolInitialized;
+
+public BlockingPhysicalFilePool(
+long maxFileSize, PhysicalFile.PhysicalFileCreator 
physicalFileCreator) {
+super(maxFileSize, physicalFileCreator);
+this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false);
+}
+
+@Override
+public boolean tryPutFile(
+FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile 
physicalFile)
+throws IOException {
+if (physicalFile.getSize() < maxFileSize) {
+return getFileQueue(subtaskKey, 
physicalFile.getScope()).offer(physicalFile);
+} else {
+getFileQueue(subtaskKey, physicalFile.getScope())
+.offer(physicalFileCreator.perform(subtaskKey, 
physicalFile.getScope()));
+return false;

Review Comment:
   Why create a physicalFile when `physicalFile.getSize() >= maxFileSize`,  in 
other words, why does it behave differently than non-blocking pool?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Blocking {@link PhysicalFilePool} which may block when polling physical 
files. This class try
+ * best to reuse a physical file until its size > maxFileSize.
+ */
+public class BlockingPhysicalFilePool extends PhysicalFilePool {
+
+private final AtomicBoolean exclusivePhysicalFilePoolInitialized;
+
+public BlockingPhysicalFilePool(
+long maxFileSize, PhysicalFile.PhysicalFileCreator 
physicalFileCreator) {
+super(maxFileSize, physicalFileCreator);
+this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false);
+}
+
+@Override
+public boolean tryPutFile(
+FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile 
physicalFile)
+throws IOException {
+if (physicalFile.getSize() < maxFileSize) {
+return getFileQueue(subtaskKey, 
physicalFile.getScope()).offer(physicalFile);
+} else {
+getFileQueue(subtaskKe

Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-03-12 Thread via GitHub


fredia commented on PR #24418:
URL: https://github.com/apache/flink/pull/24418#issuecomment-1991138575

   @masteryhx Thanks for the PR, I took a quick look and had some questions, 
PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-03-12 Thread via GitHub


fredia commented on code in PR #24418:
URL: https://github.com/apache/flink/pull/24418#discussion_r1521110459


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Blocking {@link PhysicalFilePool} which may block when polling physical 
files. This class try
+ * best to reuse a physical file until its size > maxFileSize.
+ */
+public class BlockingPhysicalFilePool extends PhysicalFilePool {
+
+private final AtomicBoolean exclusivePhysicalFilePoolInitialized;
+
+public BlockingPhysicalFilePool(
+long maxFileSize, PhysicalFile.PhysicalFileCreator 
physicalFileCreator) {
+super(maxFileSize, physicalFileCreator);
+this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false);
+}
+
+@Override
+public boolean tryPutFile(
+FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile 
physicalFile)
+throws IOException {
+if (physicalFile.getSize() < maxFileSize) {
+return getFileQueue(subtaskKey, 
physicalFile.getScope()).offer(physicalFile);
+} else {
+getFileQueue(subtaskKey, physicalFile.getScope())
+.offer(physicalFileCreator.perform(subtaskKey, 
physicalFile.getScope()));
+return false;

Review Comment:
   Why create a physicalFile when `physicalFile.getSize() >= maxFileSize`,  in 
other words, why does it behave differently than non-blocking pool?  Do we only 
need to distinguish when polling files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521128496


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -70,6 +85,11 @@ private static Map 
getFileContentByPath(java.nio.file.Path directo
 return contents;
 }
 
+private static Map getStagingFileContent(

Review Comment:
   > The reason I am stressing on that part is that I'd like to remove the 
FileSystemOutputFormat#getStagingFolder method.
   
   Good point! Let me remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-03-12 Thread via GitHub


masteryhx commented on code in PR #24418:
URL: https://github.com/apache/flink/pull/24418#discussion_r1521151391


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Blocking {@link PhysicalFilePool} which may block when polling physical 
files. This class try
+ * best to reuse a physical file until its size > maxFileSize.
+ */
+public class BlockingPhysicalFilePool extends PhysicalFilePool {
+
+private final AtomicBoolean exclusivePhysicalFilePoolInitialized;
+
+public BlockingPhysicalFilePool(
+long maxFileSize, PhysicalFile.PhysicalFileCreator 
physicalFileCreator) {
+super(maxFileSize, physicalFileCreator);
+this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false);
+}
+
+@Override
+public boolean tryPutFile(
+FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile 
physicalFile)
+throws IOException {
+if (physicalFile.getSize() < maxFileSize) {
+return getFileQueue(subtaskKey, 
physicalFile.getScope()).offer(physicalFile);
+} else {
+getFileQueue(subtaskKey, physicalFile.getScope())
+.offer(physicalFileCreator.perform(subtaskKey, 
physicalFile.getScope()));
+return false;

Review Comment:
   Blocking pool will try best to reuse every files.
   And there should always be at least one file usable for `pollFile`.
   Only `pollFile` could not know it should create or wait a file.



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/BlockingPhysicalFilePool.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Blocking {@link PhysicalFilePool} which may block when polling physical 
files. This class try
+ * best to reuse a physical file until its size > maxFileSize.
+ */
+public class BlockingPhysicalFilePool extends PhysicalFilePool {
+
+private final AtomicBoolean exclusivePhysicalFilePoolInitialized;
+
+public BlockingPhysicalFilePool(
+long maxFileSize, PhysicalFile.PhysicalFileCreator 
physicalFileCreator) {
+super(maxFileSize, physicalFileCreator);
+this.exclusivePhysicalFilePoolInitialized = new AtomicBoolean(false);
+}
+
+@Override
+public boolean tryPutFile(
+FileMergingSnapshotManager.SubtaskKey subtaskKey, PhysicalFile 
physicalFile)
+throws IOException {
+if (physicalFile.getSize() < maxFileSize) {
+return getFileQueue(subtaskKey, 
physicalFile.getScope()).offer(physicalFile);
+  

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521155291


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##
@@ -96,6 +99,22 @@ private FileSystemOutputFormat(
 this.outputFileConfig = outputFileConfig;
 this.identifier = identifier;
 this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+
+createStagingDirectory(this.stagingPath);
+}
+
+private static void createStagingDirectory(Path stagingPath) {
+try {
+final FileSystem stagingFileSystem = stagingPath.getFileSystem();
+Preconditions.checkState(
+!stagingFileSystem.exists(stagingPath),
+"Staging dir %s already exists",
+stagingFileSystem);

Review Comment:
   Nit: just noticed a typo that it should be `stagingPath` instead of 
`stagingFileSystem` at L#112



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34654) Add "Special Thanks" Page on the Flink Website

2024-03-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34654:
---
Labels: pull-request-available  (was: )

> Add "Special Thanks" Page on the Flink Website
> --
>
> Key: FLINK-34654
> URL: https://issues.apache.org/jira/browse/FLINK-34654
> Project: Flink
>  Issue Type: New Feature
>  Components: Project Website
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>
> This issue aims to add a "Special Thanks" page on the Flink website 
> (https://flink.apache.org/) to honor and appreciate the companies and 
> organizations that have sponsored machines or services for our project.
> Discussion thread: 
> https://lists.apache.org/thread/y5g0nd5t8h2ql4gq7d0kb9tkwv1wkm1j



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521155291


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##
@@ -96,6 +99,22 @@ private FileSystemOutputFormat(
 this.outputFileConfig = outputFileConfig;
 this.identifier = identifier;
 this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+
+createStagingDirectory(this.stagingPath);
+}
+
+private static void createStagingDirectory(Path stagingPath) {
+try {
+final FileSystem stagingFileSystem = stagingPath.getFileSystem();
+Preconditions.checkState(
+!stagingFileSystem.exists(stagingPath),
+"Staging dir %s already exists",
+stagingFileSystem);

Review Comment:
   Nit: just noticed a typo. It should be `stagingPath` instead of 
`stagingFileSystem` at L#112



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34618][table] Migrate SplitPythonConditionFromJoinRule to java. [flink]

2024-03-12 Thread via GitHub


damumu0625 opened a new pull request, #24486:
URL: https://github.com/apache/flink/pull/24486

   
   
   ## What is the purpose of the change
   
   The PR migrates SplitPythonConditionFromJoinRule to java
   it doesn't touch SplitPythonConditionFromJoinRuleTest to be sure that java 
version continues passing it
   
   ## Verifying this change
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34618) Migrate SplitPythonConditionFromJoinRule

2024-03-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34618:
---
Labels: pull-request-available  (was: )

> Migrate SplitPythonConditionFromJoinRule
> 
>
> Key: FLINK-34618
> URL: https://issues.apache.org/jira/browse/FLINK-34618
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Yongming Zhang
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34618][table] Migrate SplitPythonConditionFromJoinRule to java. [flink]

2024-03-12 Thread via GitHub


flinkbot commented on PR #24486:
URL: https://github.com/apache/flink/pull/24486#issuecomment-1991230364

   
   ## CI report:
   
   * 83e67a1d6988fe08173cdbcf161f6097dde2fecb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34654] Add Special Thanks Page on the Flink Website [flink-web]

2024-03-12 Thread via GitHub


rmetzger commented on code in PR #725:
URL: https://github.com/apache/flink-web/pull/725#discussion_r1521186823


##
docs/content/what-is-flink/special-thanks.md:
##
@@ -0,0 +1,48 @@
+---
+title: Special Thanks
+bookCollapseSection: false
+weight: 9
+---
+
+
+# Special Thanks
+
+## General Apache sponsors
+Without those sponsors, the ASF would simply not exist or sustain its 
activities :

Review Comment:
   ```suggestion
   Without those sponsors, the ASF would simply not exist or sustain its 
activities:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34654] Add Special Thanks Page on the Flink Website [flink-web]

2024-03-12 Thread via GitHub


Zakelly commented on code in PR #725:
URL: https://github.com/apache/flink-web/pull/725#discussion_r1521191193


##
docs/content/what-is-flink/special-thanks.md:
##
@@ -0,0 +1,48 @@
+---
+title: Special Thanks
+bookCollapseSection: false
+weight: 9
+---
+
+
+# Special Thanks
+
+## General Apache sponsors
+Without those sponsors, the ASF would simply not exist or sustain its 
activities :
+
+https://www.apache.org/foundation/thanks.html
+
+For those who want to know more about the Apache Sponsorship Program, please 
check :
+
+https://www.apache.org/foundation/sponsorship.html
+
+Thanks !
+
+## Organizations who helped our project …
+
+We would also like to thank the companies and organizations who sponsored 
machines or services for helping the development of Apache Flink:
+
+- [Alibaba](https://www.alibabagroup.com/en-US) donated 8 machines 
(32vCPU,64GB) to run [Flink CI 
builds](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-AvailableCustomBuildMachines)
 for Flink repository and Flink pull requests.
+- [AWS](https://aws.amazon.com/opensource/) donated service costs for 
[flink-connector-aws](https://github.com/apache/flink-connector-aws) tests that 
hit real AWS services. 
+- [Ververica](http://ververica.com/) donated a machine (1vCPU,2GB) for hosting 
[flink-ci](https://cwiki.apache.org/confluence/display/FLINK/Continuous+Integration)
 
+  repositories and a machine (8vCPU,64GB) for running [Flink 
benchmarks](https://lists.apache.org/thread.html/41a68c775753a7841896690c75438e0a497634102e676db880f30225@%3Cdev.flink.apache.org%3E).

Review Comment:
   I think the right link should be 
https://lists.apache.org/thread/bkw6ozoflgltwfwmzjtgx522hyssfko6 . The wiki 
link might also be helpful: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115511847



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521195360


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -37,24 +39,44 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link FileSystemOutputFormat}. */
 class FileSystemOutputFormatTest {
 
-@TempDir private java.nio.file.Path tmpPath;
 @TempDir private java.nio.file.Path outputPath;
 
+@TempDir private java.nio.file.Path stagingBaseDir;
+
+private java.nio.file.Path stagingPath;

Review Comment:
   The reason to introduce both`stagingBaseDir` and `stagingPath` is that 
   
   1. We cannot directly use `@TempDir` to annotate `stagingPath`. Junit5 will 
automatically create a temp directory, and thus the test will fail at 
`Preconditions.checkState`.
   
   2. The reason to introduce `@TempDir stagingBaseDir`(rather than reuse 
`outputPath`) is to decouple the test logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34654] Add Special Thanks Page on the Flink Website [flink-web]

2024-03-12 Thread via GitHub


wuchong commented on code in PR #725:
URL: https://github.com/apache/flink-web/pull/725#discussion_r1521200340


##
docs/content/what-is-flink/special-thanks.md:
##
@@ -0,0 +1,48 @@
+---
+title: Special Thanks
+bookCollapseSection: false
+weight: 9
+---
+
+
+# Special Thanks
+
+## General Apache sponsors
+Without those sponsors, the ASF would simply not exist or sustain its 
activities :
+
+https://www.apache.org/foundation/thanks.html
+
+For those who want to know more about the Apache Sponsorship Program, please 
check :
+
+https://www.apache.org/foundation/sponsorship.html
+
+Thanks !
+
+## Organizations who helped our project …
+
+We would also like to thank the companies and organizations who sponsored 
machines or services for helping the development of Apache Flink:
+
+- [Alibaba](https://www.alibabagroup.com/en-US) donated 8 machines 
(32vCPU,64GB) to run [Flink CI 
builds](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-AvailableCustomBuildMachines)
 for Flink repository and Flink pull requests.
+- [AWS](https://aws.amazon.com/opensource/) donated service costs for 
[flink-connector-aws](https://github.com/apache/flink-connector-aws) tests that 
hit real AWS services. 
+- [Ververica](http://ververica.com/) donated a machine (1vCPU,2GB) for hosting 
[flink-ci](https://cwiki.apache.org/confluence/display/FLINK/Continuous+Integration)
 
+  repositories and a machine (8vCPU,64GB) for running [Flink 
benchmarks](https://lists.apache.org/thread.html/41a68c775753a7841896690c75438e0a497634102e676db880f30225@%3Cdev.flink.apache.org%3E).

Review Comment:
   Thank you. I made the wrong link again :( 
   I will use the wiki link here, it mentions the correct mailing list link and 
has a detailed description about the benchmark. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34654] Add Special Thanks Page on the Flink Website [flink-web]

2024-03-12 Thread via GitHub


wuchong commented on PR #725:
URL: https://github.com/apache/flink-web/pull/725#issuecomment-1991281258

   @dannycranmer @hlteoh37 could you help to review the AWS part? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-03-12 Thread via GitHub


masteryhx commented on PR #24418:
URL: https://github.com/apache/flink/pull/24418#issuecomment-1991306532

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521248255


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -282,7 +278,7 @@ private FileSystemOutputFormat createSinkFormat(
 TableMetaStoreFactory msFactory = new 
FileSystemCommitterTest.TestMetaStoreFactory(path);
 return new FileSystemOutputFormat.Builder()
 .setMetaStoreFactory(msFactory)
-.setPath(path)
+.setStagingPath(new Path(stagingPath.toString()))

Review Comment:
   You can still stick to `setPath(stagingBasePath)` here and remove the 
`stagingPath` field. The only thing you have to change is that you have to 
assert for empty directory rather than deleted directory in 
`checkWriteAndCommit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Rui Fan (Jira)
Rui Fan created FLINK-34655:
---

 Summary: Autoscaler doesn't work for flink 1.15
 Key: FLINK-34655
 URL: https://issues.apache.org/jira/browse/FLINK-34655
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.8.0


flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  the 
latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.

But autoscaler doesn't work for  flink 1.15.

h2. Root cause: 

* FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
* IOMetricsInfo is a part of JobDetailsInfo
* JobDetailsInfo is necessary for autoscaler [1]
* flink's RestClient doesn't allow miss any property during deserializing the 
json

That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 1.15 
jobs.

h2. How to fix it properly?

Flink side support ignore unknown properties.

FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
still doesn't work. Because the IOMetricsInfo added some properties, they are 
primitive type.

It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
(Not sure whether it should be a seperate FLIP or it can be a part of FLIP-401 
[2].)


h2. How to fix it in the short term?

1. Copy the latest RestMapperUtils and RestClient from master branch (It 
includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
loaded first)
2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
RestMapperUtils#flexibleObjectMapper in copied class.

Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
locally).


After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code is 
released in flink side. flink-ubernetes-operator can remove these 2 copied 
classes.

[1] 
https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34615]Split `ExternalizedCheckpointCleanup` out of `Checkpoint… [flink]

2024-03-12 Thread via GitHub


Zakelly commented on code in PR #24461:
URL: https://github.com/apache/flink/pull/24461#discussion_r1521260155


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##
@@ -520,12 +521,40 @@ public void setTolerableCheckpointFailureNumber(int 
tolerableCheckpointFailureNu
  * CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
  *
  * @param cleanupMode Externalized checkpoint clean-up behaviour.
+ * @deprecated Use {@link #setExternalizedCheckpointCleanupV2} instead.
  */
-@PublicEvolving

Review Comment:
   I think we should keep the `PublicEvolving` here, although it is deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34655] Autoscaler doesn't work for flink 1.15 [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


1996fanrui opened a new pull request, #797:
URL: https://github.com/apache/flink-kubernetes-operator/pull/797

   flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
versions, and autoscaler is a part of flink-ubernetes-operator. Currently, the 
latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
   
   But autoscaler doesn't work for flink 1.15.
   
   ## Root cause:
   
   - [FLINK-28310](https://issues.apache.org/jira/browse/FLINK-28310) added 
some properties in IOMetricsInfo in flink-1.16
   - IOMetricsInfo is a part of JobDetailsInfo
   - JobDetailsInfo is necessary for autoscaler [1]
   - flink's RestClient doesn't allow miss any property during deserializing 
the json
   
   That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
1.15 jobs.
   
   ## How to fix it properly?
   
   Flink side support ignore unknown properties.
   
   [FLINK-33268](https://issues.apache.org/jira/browse/FLINK-33268) already do 
it. But I try run autoscaler with flink-1.15 job, it still doesn't work. 
Because the IOMetricsInfo added some properties, they are primitive type.
   
   It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as 
well. (Not sure whether it should be a seperate FLIP or it can be a part of 
FLIP-401 [2].)
   
   ## How to fix it in the short term?
   
   1. Copy the latest RestMapperUtils and RestClient from master branch (It 
includes [FLINK-33268](https://issues.apache.org/jira/browse/FLINK-33268)) to 
flink-autoscaler module. (The copied class will be loaded first)
   2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
RestMapperUtils#flexibleObjectMapper in copied class.
   
   Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
locally).
   
   After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code is 
released in flink side. flink-ubernetes-operator can remove these 2 copied 
classes.
   
   [1] 
https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
   [2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34655:
---
Labels: pull-request-available  (was: )

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]

2024-03-12 Thread via GitHub


wckdman commented on PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#issuecomment-1991405676

   Any news here? :)
   I'm really looking forward to see this feature 🙏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825639#comment-17825639
 ] 

Matthias Pohl commented on FLINK-34227:
---

My suspicion is that there's a bug in the {{AdaptiveScheduler}} returning the 
wrong JobStatus in 
[JobMaster:1274|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1274]
 resulting in the data being retained on the ResourceManager's side (see [line 
566|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L566]).

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825639#comment-17825639
 ] 

Matthias Pohl edited comment on FLINK-34227 at 3/12/24 11:24 AM:
-

My suspicion is that there's a bug in the {{AdaptiveScheduler}} returning the 
wrong JobStatus in 
[JobMaster:1274|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1274]
 resulting in the data being retained on the ResourceManager's side (see [line 
566|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L566]).
 That would explain why we're only seeing the issue in the AdaptiveScheduler CI 
profile.


was (Author: mapohl):
My suspicion is that there's a bug in the {{AdaptiveScheduler}} returning the 
wrong JobStatus in 
[JobMaster:1274|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1274]
 resulting in the data being retained on the ResourceManager's side (see [line 
566|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L566]).

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33925][connectors/opensearch] Allow customising bulk failure handling [flink-connector-opensearch]

2024-03-12 Thread via GitHub


hajimeni commented on PR #39:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/39#issuecomment-1991441614

   Hey @schulzp. 
   Thank you for creating this PR. It would be very helpful if this PR gets 
merged.
   The CI error seems to be caused by differences in files under the 
`archunit-violations` path.
   It was resolved in the following PR.
   https://github.com/apache/flink-connector-opensearch/pull/21
   
   Additionally, Flink version has been updated to 1.18.1, Please fix CI target 
version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1991461563

   Hi [XComp](https://github.com/XComp), thanks so much for the time and effort 
you've put into the review and discussion :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-12 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1521322872


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * The {@link MapPartitionOperator} is used to process all records in each 
partition on non-keyed
+ * stream. Each partition contains all records of a subtask.
+ */
+@Internal
+public class MapPartitionOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final MapPartitionFunction function;
+
+private MapPartitionIterator iterator;
+
+public MapPartitionOperator(MapPartitionFunction function) {
+super(function);
+this.function = function;
+// This operator is set to be non-chained as it doesn't use task main 
thread to write
+// records to output, which may introduce risks to downstream chained 
operators.
+this.chainingStrategy = ChainingStrategy.NEVER;

Review Comment:
   The `mapPartition` API is an operator that is suitable for batch job 
scenarios, so setting it to `ChargingStrategy.NEVER` allows the operator to be 
separated from other operators, reducing the cost of fault tolerance. 
Therefore, I think this change is also reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34152][autoscaler] Refactor the MemoryTuning to avoid the huge method [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


mxm commented on code in PR #795:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/795#discussion_r1521324931


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##
@@ -92,24 +99,51 @@ public static ConfigChanges tuneTaskManagerMemory(
 return EMPTY_CONFIG;
 }
 
-MemorySize specHeapSize = 
memSpecs.getFlinkMemory().getJvmHeapMemorySize();
-MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
-MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
-MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
-LOG.info(
-"Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
-specHeapSize.toHumanReadableString(),
-specManagedSize.toHumanReadableString(),
-specNetworkSize.toHumanReadableString(),
-specMetaspaceSize.toHumanReadableString());
+final TuningSimpleMemorySpec originalSimpleSpec = new 
TuningSimpleMemorySpec(memSpecs);
+LOG.info("The original memory spec : {}", originalSimpleSpec);
 
-MemorySize maxMemoryBySpec = 
context.getTaskManagerMemory().orElse(MemorySize.ZERO);
-if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
-LOG.warn("Spec TaskManager memory size could not be determined.");
+MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+final TuningSimpleMemorySpec tunedSimpleSpec =
+generateTunedMemorySpec(
+memSpecs,
+context,
+evaluatedMetrics,
+jobTopology,
+scalingSummaries,
+config,
+originalSimpleSpec,
+memBudget);
+final long flinkMemoryDiffBytes =
+calculateFlinkMemoryDiffBytes(originalSimpleSpec, 
tunedSimpleSpec);
+
+// Update total memory according to memory diffs
+final MemorySize totalMemory =
+new MemorySize(maxMemoryBySpec.getBytes() - 
memBudget.getRemaining());
+if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
+LOG.warn("Invalid total memory configuration: {}", totalMemory);
 return EMPTY_CONFIG;
 }
 
-MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+ConfigChanges tuningConfig =
+generateTuningConfig(memSpecs, tunedSimpleSpec, 
flinkMemoryDiffBytes, totalMemory);
+triggerMemoryTuningEvent(context, eventHandler, config, tuningConfig);
+
+if 
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+return EMPTY_CONFIG;
+}
+return tuningConfig;
+}
+
+@Nonnull
+private static TuningSimpleMemorySpec generateTunedMemorySpec(
+CommonProcessMemorySpec memSpecs,
+JobAutoScalerContext context,
+EvaluatedMetrics evaluatedMetrics,
+JobTopology jobTopology,
+Map scalingSummaries,
+UnmodifiableConfiguration config,
+TuningSimpleMemorySpec originalSimpleSpec,
+MemoryBudget memBudget) {

Review Comment:
   Thanks Rui! I'm not against refactoring the code. I just wasn't convinced 
this change made the code easier to understand. Thank you for listening to my 
concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat: autoscaling decision parallelism improvement [FLINK-34563] [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


mxm commented on PR #787:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/787#issuecomment-1991480345

   I think there are some really good ideas in this PR which we might consider 
in the future, but for the sake of simplicity it might be better to leave 
things as-is for now. Thanks for your time and effort @Yang-LI-CS!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825655#comment-17825655
 ] 

Gyula Fora commented on FLINK-34655:


The bigger issue is that aggregated busy time metrics are not part of Flink 1.15

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825654#comment-17825654
 ] 

Matthias Pohl commented on FLINK-34227:
---

The [findings of my initial 
analysis|https://issues.apache.org/jira/browse/FLINK-34227?focusedCommentId=17810745&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17810745]
 are not correct. The missing log message does exist. It's just that the 
{{Close ResourceManager connection [...]}} log message appears twice (once 
triggered from the JobMaster's IO thread and once from the Dispatcher's main 
thread). The latter one seems to retrigger the reconnection.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825656#comment-17825656
 ] 

Gyula Fora commented on FLINK-34655:


But the vertex parallelism overrides feature was introduced in 1.17 so the 
autoscaler never really officially supported anything before that. What do you 
think [~mxm] ?

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34655:
---
Priority: Major  (was: Blocker)

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825657#comment-17825657
 ] 

Gyula Fora commented on FLINK-34655:


Also this issue is fixed in the Kubernetes-operator package where we have an 
override version of JobDetailsInfo

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825657#comment-17825657
 ] 

Gyula Fora edited comment on FLINK-34655 at 3/12/24 12:12 PM:
--

Also this issue is fixed in the Kubernetes-operator package where we have an 
override version of IoMetricsInfo


was (Author: gyfora):
Also this issue is fixed in the Kubernetes-operator package where we have an 
override version of JobDetailsInfo

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34655] Autoscaler doesn't work for flink 1.15 [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


gyfora commented on PR #797:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/797#issuecomment-1991512469

   Instead of a new rest client you could also do what is done in the 
Kubernetes-operator module, see:
   
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
   
   By shadowing the class under the same package this will hide the 
incompatible implementation coming from flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34655] Autoscaler doesn't work for flink 1.15 [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


gyfora commented on PR #797:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/797#issuecomment-1991513679

   But still it will not make it work fully for 1.15 as the aggregated metrics 
need to be back ported on the Flink side (similar to other features required 
for the autoscaler)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34655] Autoscaler doesn't work for flink 1.15 [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


1996fanrui commented on PR #797:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/797#issuecomment-1991537856

   Thanks @gyfora for the valuable feedback! 
   
   > Instead of a new rest client you could also do what is done in the 
Kubernetes-operator module, see: 
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
   > 
   > By shadowing the class under the same package this will hide the 
incompatible implementation coming from flink
   
   If so,  autoscaler works with 1.15 when users uses the 
`flink-kubernetes-operator`, right?
   
   I'm using the autoscaler Standalone, and I found 1.15 job doesn't work. In 
the short term, could I move the `IOMetricsInfo` to autoscaler module? IIUC, it 
will let both of `flink-kubernetes-operator` and `autoscaler Standalone` work.
   
   In the long term, should we maintain the `IOMetricsInfo` class in the 
`flink-kubernetes-operator`  repo or we disable 
`DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES` in the flink repo?
   
   Looking forward to your opinion, and I'm happy to fix them.
   
   > But still it will not make it work fully for 1.15 as the aggregated 
metrics need to be back ported on the Flink side (similar to other features 
required for the autoscaler)
   
   Thanks for the reminder, we have a lot of jobs run with 1.15 version, so we 
are backing port them. Of course, we (our internal platform) only analyse them 
and don't scale them when the version before 1.18.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34655] Autoscaler doesn't work for flink 1.15 [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


gyfora commented on PR #797:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/797#issuecomment-1991544918

   > Thanks @gyfora for the valuable feedback!
   > 
   > > Instead of a new rest client you could also do what is done in the 
Kubernetes-operator module, see: 
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
   > > By shadowing the class under the same package this will hide the 
incompatible implementation coming from flink
   > 
   > If so, autoscaler works with 1.15 when users uses the 
`flink-kubernetes-operator`, right?
   > 
   > I'm using the autoscaler Standalone, and I found 1.15 job doesn't work. In 
the short term, could I move the `IOMetricsInfo` to autoscaler module? IIUC, it 
will let both of `flink-kubernetes-operator` and `autoscaler Standalone` work.
   > 
   > In the long term, should we maintain the `IOMetricsInfo` class in the 
`flink-kubernetes-operator` repo or we disable 
`DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES` in the flink repo?
   > 
   > Looking forward to your opinion, and I'm happy to fix them.
   > 
   > > But still it will not make it work fully for 1.15 as the aggregated 
metrics need to be back ported on the Flink side (similar to other features 
required for the autoscaler)
   > 
   > Thanks for the reminder, we have a lot of jobs run with 1.15 version, so 
we are backing port them. Of course, we (our internal platform) only analyse 
them and don't scale them when the version before 1.18.
   
   Not sure if we need to have the IOMetricsInfo override in both packages to 
be safe actually. I think it has to be copied.
   
   I can personally confirm that given the aggregated metrics back ported to 
1.15 (easy to do) + the IOMetricsInfo, the autoscaler works with 1.15


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825654#comment-17825654
 ] 

Matthias Pohl edited comment on FLINK-34227 at 3/12/24 1:00 PM:


The [findings of my initial 
analysis|https://issues.apache.org/jira/browse/FLINK-34227?focusedCommentId=17810745&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17810745]
 are not correct. The missing log message does exist. It's just that the 
"{{Close ResourceManager connection [...]}}" log message appears twice (once 
triggered from the JobMaster's IO thread and once from the Dispatcher's main 
thread). The latter one seems to retrigger the reconnection.


was (Author: mapohl):
The [findings of my initial 
analysis|https://issues.apache.org/jira/browse/FLINK-34227?focusedCommentId=17810745&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17810745]
 are not correct. The missing log message does exist. It's just that the 
{{Close ResourceManager connection [...]}} log message appears twice (once 
triggered from the JobMaster's IO thread and once from the Dispatcher's main 
thread). The latter one seems to retrigger the reconnection.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825669#comment-17825669
 ] 

Rui Fan commented on FLINK-34655:
-

{quote}But the vertex parallelism overrides feature was introduced in 1.17 so 
the autoscaler never really officially supported anything before that{quote}

We(our internal platform) want to use the autoscaler to give some parallelism 
setting suggestions to our users. We suggest they upgrade job to 1.17 or later 
version if users want to scaling automatically.

And that's why we want to parse scaling report. In the short term, we only use 
the autoscaler to give suggestion instead of scaling directly. After our users 
think the parallelism calculation is reliable, they will have stronger 
motivation to upgrade the flink version.

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34152][autoscaler] Refactor the MemoryTuning to avoid the huge method [flink-kubernetes-operator]

2024-03-12 Thread via GitHub


1996fanrui commented on code in PR #795:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/795#discussion_r1521431571


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##
@@ -92,24 +99,51 @@ public static ConfigChanges tuneTaskManagerMemory(
 return EMPTY_CONFIG;
 }
 
-MemorySize specHeapSize = 
memSpecs.getFlinkMemory().getJvmHeapMemorySize();
-MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
-MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
-MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
-LOG.info(
-"Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
-specHeapSize.toHumanReadableString(),
-specManagedSize.toHumanReadableString(),
-specNetworkSize.toHumanReadableString(),
-specMetaspaceSize.toHumanReadableString());
+final TuningSimpleMemorySpec originalSimpleSpec = new 
TuningSimpleMemorySpec(memSpecs);
+LOG.info("The original memory spec : {}", originalSimpleSpec);
 
-MemorySize maxMemoryBySpec = 
context.getTaskManagerMemory().orElse(MemorySize.ZERO);
-if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
-LOG.warn("Spec TaskManager memory size could not be determined.");
+MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+final TuningSimpleMemorySpec tunedSimpleSpec =
+generateTunedMemorySpec(
+memSpecs,
+context,
+evaluatedMetrics,
+jobTopology,
+scalingSummaries,
+config,
+originalSimpleSpec,
+memBudget);
+final long flinkMemoryDiffBytes =
+calculateFlinkMemoryDiffBytes(originalSimpleSpec, 
tunedSimpleSpec);
+
+// Update total memory according to memory diffs
+final MemorySize totalMemory =
+new MemorySize(maxMemoryBySpec.getBytes() - 
memBudget.getRemaining());
+if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
+LOG.warn("Invalid total memory configuration: {}", totalMemory);
 return EMPTY_CONFIG;
 }
 
-MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+ConfigChanges tuningConfig =
+generateTuningConfig(memSpecs, tunedSimpleSpec, 
flinkMemoryDiffBytes, totalMemory);
+triggerMemoryTuningEvent(context, eventHandler, config, tuningConfig);
+
+if 
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+return EMPTY_CONFIG;
+}
+return tuningConfig;
+}
+
+@Nonnull
+private static TuningSimpleMemorySpec generateTunedMemorySpec(
+CommonProcessMemorySpec memSpecs,
+JobAutoScalerContext context,
+EvaluatedMetrics evaluatedMetrics,
+JobTopology jobTopology,
+Map scalingSummaries,
+UnmodifiableConfiguration config,
+TuningSimpleMemorySpec originalSimpleSpec,
+MemoryBudget memBudget) {

Review Comment:
   Thank you, we can do it in the future if we think it's needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825654#comment-17825654
 ] 

Matthias Pohl edited comment on FLINK-34227 at 3/12/24 1:09 PM:


The [findings of my initial 
analysis|https://issues.apache.org/jira/browse/FLINK-34227?focusedCommentId=17810745&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17810745]
 are not correct. The missing log message does exist. It's just that the 
"{{Close ResourceManager connection [...]}}" log message appears twice (once 
triggered from the JobMaster's IO thread and once from the Dispatcher's main 
thread). The latter one seems to retrigger the reconnection.

{code}
[...]
02:51:28,193 [flink-pekko.actor.default-dispatcher-10] INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
e7cb13faaae707768a1a4db28427af80 from job leader monitoring.
02:51:28,193 [flink-pekko.actor.default-dispatcher-10] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
JobManager connection for job e7cb13faaae707768a1a4db28427af80.
02:51:28,193 [flink-pekko.actor.default-dispatcher-8] INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] 
- Freeing slot 98a0c702ce550d2fd7dd3710ec7b76e0.
02:51:28,194 [flink-pekko.actor.default-dispatcher-8] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Disconnect 
TaskExecutor d71ee9b8-f278-48ee-bb1c-f05fd568947f because: TaskExecutor 
pekko://flink/user/rpc/taskmanager_0 has no more allocated slots for job 
e7cb13faaae707768a1a4db28427af80.
02:51:28,194 [jobmanager-io-thread-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Close 
ResourceManager connection 3c08958c5ef3906fae847097373b047a: Stopping JobMaster 
for job 'Flink Streaming Job' (e7cb13faaae707768a1a4db28427af80).
02:51:28,194 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
a38b8b4ba6c4894c7cfca5f1c0fe4f68@pekko://flink/user/rpc/jobmanager_70 for job 
e7cb13faaae707768a1a4db28427af80 from the resource manager.
02:51:28,194 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Close 
ResourceManager connection 3c08958c5ef3906fae847097373b047a: Stopping JobMaster 
for job 'Flink Streaming Job' (e7cb13faaae707768a1a4db28427af80).
02:51:28,194 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to 
ResourceManager 
pekko://flink/user/rpc/resourcemanager_2(86dfd2ebd79836698df3e4a5de474282)
02:51:28,194 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved 
ResourceManager address, beginning registration
02:51:28,194 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering job manager 
a38b8b4ba6c4894c7cfca5f1c0fe4f68@pekko://flink/user/rpc/jobmanager_70 for job 
e7cb13faaae707768a1a4db28427af80.
02:51:28,195 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registered job manager 
a38b8b4ba6c4894c7cfca5f1c0fe4f68@pekko://flink/user/rpc/jobmanager_70 for job 
e7cb13faaae707768a1a4db28427af80.
02:51:28,195 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager 
successfully registered at ResourceManager, leader id: 
86dfd2ebd79836698df3e4a5de474282.
[...]
{code}


was (Author: mapohl):
The [findings of my initial 
analysis|https://issues.apache.org/jira/browse/FLINK-34227?focusedCommentId=17810745&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17810745]
 are not correct. The missing log message does exist. It's just that the 
"{{Close ResourceManager connection [...]}}" log message appears twice (once 
triggered from the JobMaster's IO thread and once from the Dispatcher's main 
thread). The latter one seems to retrigger the reconnection.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "mai

Re: [PR] [FLINK-34334][state] Add sub-task level RocksDB file count metrics [flink]

2024-03-12 Thread via GitHub


hejufang commented on code in PR #24322:
URL: https://github.com/apache/flink/pull/24322#discussion_r1521505824


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBProperty.java:
##
@@ -31,55 +31,92 @@
  */
 @Internal
 public enum RocksDBProperty {
-NumImmutableMemTable("num-immutable-mem-table"),
-MemTableFlushPending("mem-table-flush-pending"),
-CompactionPending("compaction-pending"),
-BackgroundErrors("background-errors"),
-CurSizeActiveMemTable("cur-size-active-mem-table"),
-CurSizeAllMemTables("cur-size-all-mem-tables"),
-SizeAllMemTables("size-all-mem-tables"),
-NumEntriesActiveMemTable("num-entries-active-mem-table"),
-NumEntriesImmMemTables("num-entries-imm-mem-tables"),
-NumDeletesActiveMemTable("num-deletes-active-mem-table"),
-NumDeletesImmMemTables("num-deletes-imm-mem-tables"),
-EstimateNumKeys("estimate-num-keys"),
-EstimateTableReadersMem("estimate-table-readers-mem"),
-NumSnapshots("num-snapshots"),
-NumLiveVersions("num-live-versions"),
-EstimateLiveDataSize("estimate-live-data-size"),
-TotalSstFilesSize("total-sst-files-size"),
-LiveSstFilesSize("live-sst-files-size"),
-EstimatePendingCompactionBytes("estimate-pending-compaction-bytes"),
-NumRunningCompactions("num-running-compactions"),
-NumRunningFlushes("num-running-flushes"),
-ActualDelayedWriteRate("actual-delayed-write-rate"),
-IsWriteStopped("is-write-stopped"),
-BlockCacheCapacity("block-cache-capacity"),
-BlockCacheUsage("block-cache-usage"),
-BlockCachePinnedUsage("block-cache-pinned-usage");
+NumImmutableMemTable("num-immutable-mem-table", PropertyType.NUMBER),
+MemTableFlushPending("mem-table-flush-pending", PropertyType.NUMBER),
+CompactionPending("compaction-pending", PropertyType.NUMBER),
+BackgroundErrors("background-errors", PropertyType.NUMBER),
+CurSizeActiveMemTable("cur-size-active-mem-table", PropertyType.NUMBER),
+CurSizeAllMemTables("cur-size-all-mem-tables", PropertyType.NUMBER),
+SizeAllMemTables("size-all-mem-tables", PropertyType.NUMBER),
+NumEntriesActiveMemTable("num-entries-active-mem-table", 
PropertyType.NUMBER),
+NumEntriesImmMemTables("num-entries-imm-mem-tables", PropertyType.NUMBER),
+NumDeletesActiveMemTable("num-deletes-active-mem-table", 
PropertyType.NUMBER),
+NumDeletesImmMemTables("num-deletes-imm-mem-tables", PropertyType.NUMBER),
+EstimateNumKeys("estimate-num-keys", PropertyType.NUMBER),
+EstimateTableReadersMem("estimate-table-readers-mem", PropertyType.NUMBER),
+NumSnapshots("num-snapshots", PropertyType.NUMBER),
+NumLiveVersions("num-live-versions", PropertyType.NUMBER),
+EstimateLiveDataSize("estimate-live-data-size", PropertyType.NUMBER),
+TotalSstFilesSize("total-sst-files-size", PropertyType.NUMBER),
+LiveSstFilesSize("live-sst-files-size", PropertyType.NUMBER),
+EstimatePendingCompactionBytes("estimate-pending-compaction-bytes", 
PropertyType.NUMBER),
+NumRunningCompactions("num-running-compactions", PropertyType.NUMBER),
+NumRunningFlushes("num-running-flushes", PropertyType.NUMBER),
+ActualDelayedWriteRate("actual-delayed-write-rate", PropertyType.NUMBER),
+IsWriteStopped("is-write-stopped", PropertyType.NUMBER),
+BlockCacheCapacity("block-cache-capacity", PropertyType.NUMBER),
+BlockCacheUsage("block-cache-usage", PropertyType.NUMBER),
+BlockCachePinnedUsage("block-cache-pinned-usage", PropertyType.NUMBER),
+NumFilesAtLevel("num-files-at-level", PropertyType.STRING);
 
 private static final String ROCKS_DB_PROPERTY_FORMAT = "rocksdb.%s";
 
 private static final String CONFIG_KEY_FORMAT = 
"state.backend.rocksdb.metrics.%s";
 
-private final String property;
+private final String propertyName;
 
-RocksDBProperty(String property) {
-this.property = property;
+private final PropertyType type;
+
+/** Property type. */
+public enum PropertyType {
+NUMBER,
+STRING
 }
 
-/**
- * @return property string that can be used to query {@link
- * RocksDB#getLongProperty(ColumnFamilyHandle, String)}.
- */
-public String getRocksDBProperty() {
-return String.format(ROCKS_DB_PROPERTY_FORMAT, property);
+RocksDBProperty(String propertyName, PropertyType type) {
+this.propertyName = propertyName;
+this.type = type;
+}
+
+public String getPropertyName() {
+return this.propertyName;
+}
+
+public static RocksDBProperty getRocksDBProperty(final String property) {
+// NumFilesAtLevel controls multiple levels of file count monitoring, 
each a separate metric
+if (property.startsWith(NumFilesAtLevel.getPropertyName())) {
+return NumFilesAtLevel;
+}
+for (final RocksDBProperty rocksDBProperty : RocksDBProperty.values()) 
{

Review Comment:
   @Z

[jira] [Created] (FLINK-34656) Generated code for `ITEM` operator should return null when getting element of a null map/array/row

2024-03-12 Thread yisha zhou (Jira)
yisha zhou created FLINK-34656:
--

 Summary: Generated code for `ITEM` operator should return null 
when getting element of a null map/array/row
 Key: FLINK-34656
 URL: https://issues.apache.org/jira/browse/FLINK-34656
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: yisha zhou


In FieldAccessFromTableITCase we can find that the expected result of f0[1] is 
null when f0 is a null array. 

However, behavior in generated code for ITEM is not consistent with case above. 
The main code is:

 
{code:java}
val arrayAccessCode =
  s"""
 |${array.code}
 |${index.code}
 |boolean $nullTerm = ${array.nullTerm} || ${index.nullTerm} ||
 |   $idxStr < 0 || $idxStr >= ${array.resultTerm}.size() || $arrayIsNull;
 |$resultTypeTerm $resultTerm = $nullTerm ? $defaultTerm : $arrayGet;
 |""".stripMargin {code}
If `array.nullTerm` is true, a default value of element type will be returned, 
for example -1 for null bigint array.

The reason why FieldAccessFromTableITCase can get expected result is that the 
ReduceExpressionsRule generated an expression code for that case like:
{code:java}
boolean isNull$0 = true || false ||
   ((int) 1) - 1 < 0 || ((int) 1) - 1 >= 
((org.apache.flink.table.data.ArrayData) null).size() || 
((org.apache.flink.table.data.ArrayData) null).isNullAt(((int) 1) - 1);
long result$0 = isNull$0 ? -1L : ((org.apache.flink.table.data.ArrayData) 
null).getLong(((int) 1) - 1);
if (isNull$0) {
  out.setField(0, null);
} else {
  out.setField(0, result$0);
} {code}
The reduced expr will be a null literal.
 

I think the behaviors for getting element of a null value should be unified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825698#comment-17825698
 ] 

Matthias Pohl commented on FLINK-34227:
---

[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Add announcement blog post for Flink 1.19 [flink-web]

2024-03-12 Thread via GitHub


lincoln-lil commented on PR #721:
URL: https://github.com/apache/flink-web/pull/721#issuecomment-1991840142

   @xintongsong @rmoff As per the conclusion of our discussion, I've tweaked 
these examples and tried to keep them to just describing what the functionality 
looks like (and avoiding overly detailed instructions on how to use them).
   Thanks again for both of you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add announcement blog post for Flink 1.19 [flink-web]

2024-03-12 Thread via GitHub


lincoln-lil commented on code in PR #721:
URL: https://github.com/apache/flink-web/pull/721#discussion_r1521623410


##
docs/content/posts/2024-03-xx-release-1.19.0.md:
##
@@ -0,0 +1,456 @@
+---

Review Comment:
   @MartijnVisser As we discussed at the sync meeting, I've added these 
descriptions (with the minor tweak of using 'Connector API Improvements' as the 
title, rather than the broader 'SDK Improvements').



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825698#comment-17825698
 ] 

Matthias Pohl edited comment on FLINK-34227 at 3/12/24 3:12 PM:


[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.

update: but that doesn't seem to be the problem either, because 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 is called from the main thread and {{FutureUtils#runAfterwards}} uses 
{{Executors#directExecutor}} internally to execute the callback. That should 
already ensure that the code is not executed in some other thread.


was (Author: mapohl):
[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.exe

[jira] [Commented] (FLINK-23542) Upgrade Checkstyle to at least 8.29

2024-03-12 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825712#comment-17825712
 ] 

Ryan Skraba commented on FLINK-23542:
-

This has become slightly more important : the last version of the 
CheckStyle-IDEA plugin we use in IntelliJ dropped support for 8.x in March 2024 
[https://github.com/jshiell/checkstyle-idea/releases/tag/5.88.0]

New Flink developers or existing developers keeping their IDE and plugins up to 
date won't be able to use the existing checkstyle configuration.

> Upgrade Checkstyle to at least 8.29
> ---
>
> Key: FLINK-23542
> URL: https://issues.apache.org/jira/browse/FLINK-23542
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Martijn Visser
>Priority: Not a Priority
>  Labels: auto-deprioritized-minor, pull-request-available
>
> Checkstyle version < 8.29 are still vulnerable to XML External Entity (XXE) 
> Processing due to an incomplete fix for CVE-2019-9658.
> {noformat}
> Impact
> User: Build Maintainers
> This vulnerability probably doesn't impact Maven/Gradle users as, in most 
> cases, these builds are processing files that are trusted, or pre-vetted by a 
> pull request reviewer before being run on internal CI infrastructure.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825698#comment-17825698
 ] 

Matthias Pohl edited comment on FLINK-34227 at 3/12/24 3:20 PM:


[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.


was (Author: mapohl):
[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.

update: but that doesn't seem to be the problem either, because 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 is called from the main thread and {{FutureUtils#runAfterwards}} uses 
{{Executors#directExecutor}} internally to execute the callback. That should 
already ensure that the code is not executed in some other thread.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.exe

Re: [PR] [FLINK-34533][release] Add release note for version 1.19 [flink]

2024-03-12 Thread via GitHub


lincoln-lil commented on code in PR #24394:
URL: https://github.com/apache/flink/pull/24394#discussion_r1521683726


##
docs/content.zh/release-notes/flink-1.19.md:
##
@@ -0,0 +1,362 @@
+---
+title: "Release Notes - Flink 1.19"
+---
+
+
+# Release notes - Flink 1.19
+
+These release notes discuss important aspects, such as configuration, behavior 
or dependencies,
+that changed between Flink 1.18 and Flink 1.19. Please read these notes 
carefully if you are 
+planning to upgrade your Flink version to 1.19.
+
+## Dependency upgrades
+
+ Drop support for python 3.7
+
+# [FLINK-33029](https://issues.apache.org/jira/browse/FLINK-33029)
+
+ Add support for python 3.11
+
+# [FLINK-33030](https://issues.apache.org/jira/browse/FLINK-33030)
+
+## Build System

Review Comment:
   @snuyanzin I've added the java 21 support, PTAL, thanks : )
   (also add it to the release announcement pr:
   
https://github.com/apache/flink-web/pull/721/files#diff-9a4ae63f4269f0b6dee7a38b30564cc94f59cd21f1e5eb6edb13902c1f8bb3fbR234)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-03-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825698#comment-17825698
 ] 

Matthias Pohl edited comment on FLINK-34227 at 3/12/24 3:45 PM:


[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.

update: But that doesn't explain, yet, why this is only happening in the 
AdaptiveScheduler profile, so far. We have the same issue in 
[SchedulerBase#closeAsync|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L674].


was (Author: mapohl):
[~chesnay] supported the investigation. His findings are based around the 
question why the close call was actually triggered from within an IO thread:
# The {{JobMaster#onStop}} call triggers 
[JobMaster#stopJobExecution|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1048]
 which calls {{JobMaster#stopScheduling}} from within the JobMaster's main 
thread which calls {{AdaptiveScheduler#closeAsync}} in 
[JobMaster:1088|https://github.com/apache/flink/blob/d6a4eb966fbc47277e07b79e7c64939a62eb1d54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1088].
# {{AdaptiveScheduler#closeAsync}} composes the resultFuture of the 
{{CheckpointsCleaner}} in 
[AdaptiveScheduler:581|https://github.com/apache/flink/blob/d4e0084649c019c536ee1e44bab15c8eca01bf13/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L581].
 The future is completed from within the {{ioExecutor}} that is used in the 
{{CheckpointsCleaner}}.
# The future is forwarded to {{JobMaster#stopJobExecution}} where the 
disconnect is triggered after the cleanup future is completed causing the 
disconnect to be executed in an IO thread rather than the JobMaster's main 
thread.

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExe

[PR] [hotfix] In case of unexpected errors do not loose the primary failur [flink]

2024-03-12 Thread via GitHub


pnowojski opened a new pull request, #24487:
URL: https://github.com/apache/flink/pull/24487

   Unexpected error can be for example NPE
   
   ## Verifying this change
   
   This
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] In case of unexpected errors do not loose the primary failur [flink]

2024-03-12 Thread via GitHub


flinkbot commented on PR #24487:
URL: https://github.com/apache/flink/pull/24487#issuecomment-1992068365

   
   ## CI report:
   
   * d07a37b06fc847c1f2c6ce148a918c2490f2490c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] In case of unexpected errors do not loose the primary failur [flink]

2024-03-12 Thread via GitHub


rkhachatryan commented on code in PR #24487:
URL: https://github.com/apache/flink/pull/24487#discussion_r1521803705


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -1046,27 +1046,32 @@ private void onTriggerFailure(
 CheckpointProperties checkpointProperties,
 Throwable throwable) {
 // beautify the stack trace a bit
-throwable = ExceptionUtils.stripCompletionException(throwable);
-
 try {
-coordinatorsToCheckpoint.forEach(
-
OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
+throwable = ExceptionUtils.stripCompletionException(throwable);
 
-final CheckpointException cause =
-getCheckpointException(
-
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+try {
+coordinatorsToCheckpoint.forEach(
+
OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
 
-if (checkpoint != null && !checkpoint.isDisposed()) {
-synchronized (lock) {
-abortPendingCheckpoint(checkpoint, cause);
+final CheckpointException cause =
+getCheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+
+if (checkpoint != null && !checkpoint.isDisposed()) {
+synchronized (lock) {
+abortPendingCheckpoint(checkpoint, cause);
+}
+} else {
+failureManager.handleCheckpointException(
+checkpoint, checkpointProperties, cause, null, 
job, null, statsTracker);
 }
-} else {
-failureManager.handleCheckpointException(
-checkpoint, checkpointProperties, cause, null, job, 
null, statsTracker);
+} finally {
+isTriggering = false;
+executeQueuedRequest();
 }
-} finally {
-isTriggering = false;
-executeQueuedRequest();
+} catch (Throwable secondThrowable) {

Review Comment:
   Can't we have just one try/catch block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] In case of unexpected errors do not loose the primary failur [flink]

2024-03-12 Thread via GitHub


rkhachatryan commented on code in PR #24487:
URL: https://github.com/apache/flink/pull/24487#discussion_r1521803705


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -1046,27 +1046,32 @@ private void onTriggerFailure(
 CheckpointProperties checkpointProperties,
 Throwable throwable) {
 // beautify the stack trace a bit
-throwable = ExceptionUtils.stripCompletionException(throwable);
-
 try {
-coordinatorsToCheckpoint.forEach(
-
OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
+throwable = ExceptionUtils.stripCompletionException(throwable);
 
-final CheckpointException cause =
-getCheckpointException(
-
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+try {
+coordinatorsToCheckpoint.forEach(
+
OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
 
-if (checkpoint != null && !checkpoint.isDisposed()) {
-synchronized (lock) {
-abortPendingCheckpoint(checkpoint, cause);
+final CheckpointException cause =
+getCheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+
+if (checkpoint != null && !checkpoint.isDisposed()) {
+synchronized (lock) {
+abortPendingCheckpoint(checkpoint, cause);
+}
+} else {
+failureManager.handleCheckpointException(
+checkpoint, checkpointProperties, cause, null, 
job, null, statsTracker);
 }
-} else {
-failureManager.handleCheckpointException(
-checkpoint, checkpointProperties, cause, null, job, 
null, statsTracker);
+} finally {
+isTriggering = false;
+executeQueuedRequest();
 }
-} finally {
-isTriggering = false;
-executeQueuedRequest();
+} catch (Throwable secondThrowable) {

Review Comment:
   Can't we have just one try/catch block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Modify BarrierAlignmentUtilTest comments [flink]

2024-03-12 Thread via GitHub


HCTommy commented on PR #24476:
URL: https://github.com/apache/flink/pull/24476#issuecomment-1992162943

   @zhuzhurk Hi, I fixed a grammar error in the comments when testing the 
barrier. Could you please review in your available time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33925][connectors/opensearch] Allow customising bulk failure handling [flink-connector-opensearch]

2024-03-12 Thread via GitHub


schulzp commented on PR #39:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/39#issuecomment-1992274982

   Hey @hajimeni!
   
   > The CI error seems to be caused by differences in files under the 
archunit-violations path.
   > It was resolved in the following PR. 
https://github.com/apache/flink-connector-opensearch/pull/21
   
   The PR you mentioned has been merged in 2023, that way before I opened this 
one. I'd like to help here but I feel lost. If I run `mvn clean verify` 
(against flink 1.17.1) all tests pass. Once I add `-Dflink.version=1.18.1` the 
archunit violations show up and are written to the following files:
   
   * `4382f1f0-807a-45ff-97d8-42f72b6e9484` (updated)
   * `eb4a59cf-22af-4223-8dfd-0ebc17ed342d` (new file)
   * `stored.rules` (adds `…342d` to the list)
   
   What should happen next? Is the build process supposed to just accept those 
violations instead of failing? Is there any documentation on archunit in the 
context of flink projects?
   
   > Additionally, Flink version has been updated to 1.18.1, Please fix CI 
target version.
   
   That's out of scope of this PR, isn't it? I looked at [the PR workflow for 
flink-connector-elasticsearch](/apache/flink-connector-elasticsearch/blob/main/.github/workflows/push_pr.yml)
 but even there it's 1.18-SNAPSHOT. What is supposed to be changed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34657) Implement Lineage Graph for streaming API use cases

2024-03-12 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-34657:
-

 Summary: Implement Lineage Graph for streaming API use cases
 Key: FLINK-34657
 URL: https://issues.apache.org/jira/browse/FLINK-34657
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhenqiu Huang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34658) Scala API unusable on Flink 1.18.1/Java 17 Docker image

2024-03-12 Thread Matthew Ernst (Jira)
Matthew Ernst created FLINK-34658:
-

 Summary: Scala API unusable on Flink 1.18.1/Java 17 Docker image
 Key: FLINK-34658
 URL: https://issues.apache.org/jira/browse/FLINK-34658
 Project: Flink
  Issue Type: Bug
  Components: API / Scala
Affects Versions: 1.18.1, 1.18.0
 Environment: This bug has been reproduced under macOS (Intel x64) and 
Linux (AMD 64) on a Flink cluster running in session mode.
Reporter: Matthew Ernst


The Scala API should still work in Flink 1.18. The official Docker image for 
Flink 1.18.1 on Java 17 ("flink:1.18.1-scala_2.12-java17") causes jobs using 
the Scala API to immediately throw a ReflectiveOperationException. Jobs using 
the Scala API still work correctly on the Java 11 image 
("flink:1.18.1-scala_2.12-java11").

The problem happens because the flink-scala JAR file included in the image 
("flink-scala_2.12-1.18.1.jar") has been built with an old Scala compiler that 
has a [compatibility bug with Java 
17|https://github.com/scala/bug/issues/12419]. Rebuilding the flink-scala JAR 
file with the Scala compiler set to 2.12.15 or later fixes the bug. At my day 
job I cannot use Java 11 for a particular Flink job due to dependency on a Java 
library that uses [Java records|https://openjdk.org/jeps/395] (introduced in 
Java 16).

I have created a github repository with an example application and a longer 
description of the bug and how to fix it with a newer Scala compiler version: 
https://github.com/mattbernst/scala-java17-flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-25352) Update stability annotations to include the since and missedGraduations fields

2024-03-12 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-25352:
---

Assignee: Jing Ge

> Update stability annotations to include the since and missedGraduations fields
> --
>
> Key: FLINK-25352
> URL: https://issues.apache.org/jira/browse/FLINK-25352
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Jing Ge
>Priority: Major
> Fix For: 1.20.0
>
>
> In order to implement the graduation process outlined in FLIP-197, we need to 
> extend our stability annotations to include a {{since}} and 
> {{missedGraduations}} fields. 
> The idea of {{since}} is that it tells since when something has this 
> stability guarantee.
> The idea of the {{missedGraduations}} field is to record reasons why an API 
> has not been graduated.
> {code}
> @Target(ElementType.TYPE)
> public @interface PublicEvolving {
>  
>FlinkVersion since();
>  
>GraduationMiss[] missedGraduations();
> }
>  
> public @interface GraduationMiss {
>FlinkVersion graduation();
>  
>String reason();
> }
>  
> // Usage
> @PublicEvolving(
>since = FlinkVersion.V1_11_0,
>missedGraduations = {
>@GraduationMiss(graduation = FlinkVersion.V1_13_0, reason = 
> "foobar"),
>@GraduationMiss(graduation = FlinkVersion.V1_14_0, reason = 
> "barfoo")
>})
> public class Foobar {}
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34647][core] Optimize Path normalization [flink]

2024-03-12 Thread via GitHub


schlosna commented on PR #24473:
URL: https://github.com/apache/flink/pull/24473#issuecomment-1992844641

   Thanks for taking a look at this PR.
   
   > 1. What's your setup for the path ?
   
   We have checkpoints writing to a variety of file systems depending on the 
infrastructure, so it might be cloud blob storage (e.g. S3 or S3 like) or a 
local Linux/POSIX filesystem when running on bare metal or a persistent volume 
claim in kubernetes.
   
   > 2. Could you also share the JFR after your optimization ?
   
   I do not have a JFR for this running a modified Flink build that I can 
share, but I created a simple [JMH Benchmark to compare the old vs. new 
implementations](https://github.com/apache/flink/files/14580149/NormalizeBenchmark.java.txt)
 that shows a ~5x allocation reduction, as well as a ~4x speedup on Intel & 
~3.5x speedup on Apple M1 Pro.
   
   
   ```
   Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
   VM version: JDK 17.0.10, OpenJDK 64-Bit Server VM, 17.0.10+8-LTS
   Benchmark   Mode  Cnt Score
Error   Units
   NormalizeBenchmark.newNormalize avgt5   269.649 ± 
23.957   ns/op
   NormalizeBenchmark.newNormalize:gc.alloc.rate.norm  avgt5   316.800 ±  
0.001B/op
   NormalizeBenchmark.oldNormalize avgt5  1119.999 ± 
57.073   ns/op
   NormalizeBenchmark.oldNormalize:gc.alloc.rate.norm  avgt5  1603.200 ±  
0.001B/op
   ```
   
   ```
   2021 Apple MacBookPro M1 Pro
   VM version: JDK 17.0.10, OpenJDK 64-Bit Server VM, 17.0.10+7-LTS
   Benchmark   Mode  Cnt Score
Error   Units
   NormalizeBenchmark.newNormalize avgt5   167.362 ±  
1.396   ns/op
   NormalizeBenchmark.newNormalize:gc.alloc.rate.norm  avgt5   316.800 ±  
0.001B/op
   NormalizeBenchmark.oldNormalize avgt5   598.058 ±  
9.701   ns/op
   NormalizeBenchmark.oldNormalize:gc.alloc.rate.norm  avgt5  1579.200 ±  
0.001B/op
   ```
   
   Textual details from JFR for a test Flink pipeline where ~1% of all 
allocations were due to `java.util.regex.Pattern` from 
`org.apache.flink.core.fs.Path.normalizePath(String):243` via 
`org.apache.flink.core.fs.Path.initialize(String, String, String)` & 
`org.apache.flink.core.fs.Path.(String)` constructor:
   
   
   ```
   Class  Alloc Total  Total Allocation (%)
   -  ---  
   int[]  2.468 GiB4.43237405980632 %
   
   Stack Trace  

  Count  Percentage
   
-
  -  --
   java.util.regex.Pattern.compile()

  18 21.2 %
  java.util.regex.Pattern.(String, int)   

  18 21.2 %
  java.util.regex.Pattern.compile(String)   

  17 20 %
 java.lang.String.replaceAll(String, String)

  17 20 %
 org.apache.flink.core.fs.Path.normalizePath(String)

  10 11.8 %
org.apache.flink.core.fs.Path.initialize(String, String, String)

  10 11.8 %
org.apache.flink.core.fs.Path.(String)

  5  5.88 %
   org.apache.flink.core.fs.Path.(Path, String)   

  5  5.88 %
   
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckp

[jira] [Updated] (FLINK-34647) Path normalization is allocation intensive

2024-03-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34647:
---
Labels: pull-request-available  (was: )

> Path normalization is allocation intensive
> --
>
> Key: FLINK-34647
> URL: https://issues.apache.org/jira/browse/FLINK-34647
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Checkpointing
>Reporter: David Schlosnagle
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-03-11-20-38-12-573.png
>
>
> While investigating allocation stalls and GC pressure of a Flink streaming 
> pipeline, I noticed significant allocations in JFR from Flink path 
> normalization:
>  !image-2024-03-11-20-38-12-573.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row type [flink]

2024-03-12 Thread via GitHub


ukby1234 commented on PR #24029:
URL: https://github.com/apache/flink/pull/24029#issuecomment-1992944538

   @MartijnVisser can you help review this PR? and possibly 
https://github.com/apache/flink/pull/23881 as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add announcement blog post for Flink 1.19 [flink-web]

2024-03-12 Thread via GitHub


Myasuka commented on code in PR #721:
URL: https://github.com/apache/flink-web/pull/721#discussion_r1522360842


##
docs/content/posts/2024-03-xx-release-1.19.0.md:
##
@@ -0,0 +1,501 @@
+---
+authors:
+- LincolnLee:
+  name: "Lincoln Lee"
+  twitter: lincoln_86xy
+
+date: "2024-03-xxT22:00:00Z"
+subtitle: ""
+title: Announcing the Release of Apache Flink 1.19
+aliases:
+- /news/2024/03/xx/release-1.19.0.html
+---
+
+The Apache Flink PMC is pleased to announce the release of Apache Flink 
1.19.0. As usual, we are
+looking at a packed release with a wide variety of improvements and new 
features. Overall, 162
+people contributed to this release completing 33 FLIPs and 600+ issues. Thank 
you!
+
+Let's dive into the highlights.
+
+# Flink SQL Improvements
+
+## Custom Parallelism for Table/SQL Sources
+
+Now in Flink 1.19, you can set a custom parallelism for performance tuning via 
the `scan.parallelism`
+option. The first available connector is DataGen (Kafka connector is on the 
way). Here is an example
+using SQL Client:
+
+```sql
+-- set parallelism within the ddl
+CREATE TABLE Orders (
+order_number BIGINT,
+priceDECIMAL(32,2),
+buyerROW,
+order_time   TIMESTAMP(3)
+) WITH (
+'connector' = 'datagen',
+'scan.parallelism' = '4'
+);
+
+-- or set parallelism via dynamic table option
+SELECT * FROM Orders /*+ OPTIONS('scan.parallelism'='4') */;
+```
+
+**More Information**
+* 
[Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/#scan-table-source)
+* [FLIP-367: Support Setting Parallelism for Table/SQL 
Sources](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150)
+
+
+## Configurable SQL Gateway Java Options
+
+A new option `env.java.opts.sql-gateway` for specifying the Java options is 
introduced in Flink 1.19,
+so you can fine-tune the memory settings, garbage collection behavior, and 
other relevant Java
+parameters for SQL Gateway.
+
+**More Information**
+* [FLINK-33203](https://issues.apache.org/jira/browse/FLINK-33203)
+
+
+## Configure Different State TTLs Using SQL Hint
+
+Starting from Flink 1.18, Table API and SQL users can set state time-to-live 
(TTL) individually for
+stateful operators via the SQL compiled plan. In Flink 1.19, users have a more 
flexible way to
+specify custom TTL values for regular joins and group aggregations directly 
within their queries by [utilizing the STATE_TTL 
hint](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/hints/#state-ttl-hints).
+This improvement means that you no longer need to alter your compiled plan to 
set specific TTLs for
+these frequently used operators. With the introduction of `STATE_TTL` hints, 
you can streamline your workflow and
+dynamically adjust the TTL based on your operational requirements.
+
+Here is an example:
+```sql
+-- set state ttl for join
+SELECT /*+ STATE_TTL('Orders'= '1d', 'Customers' = '20d') */ *
+FROM Orders LEFT OUTER JOIN Customers
+ON Orders.o_custkey = Customers.c_custkey;
+
+-- set state ttl for aggregation
+SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
+FROM Orders AS o
+GROUP BY o_orderkey;
+```
+
+**More Information**
+* 
[Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/hints/#state-ttl-hints)
+* [FLIP-373: Support Configuring Different State TTLs using SQL 
Hint](https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint)
+
+
+## Named Parameters for Functions and Procedures
+
+Named parameters now can be used when calling a function or stored procedure. 
With named parameters,
+users do not need to strictly specify the parameter position, just specify the 
parameter name and its
+corresponding value. At the same time, if non-essential parameters are not 
specified, they will default to being filled with null.
+
+Here's an example of defining a function with one mandatory parameter and two 
optional parameters using named parameters:
+```java
+public static class NamedArgumentsTableFunction extends TableFunction {
+
+   @FunctionHint(
+   output = @DataTypeHint("STRING"),
+   arguments = {
+   @ArgumentHint(name = "in1", isOptional 
= false, type = @DataTypeHint("STRING")),
+   @ArgumentHint(name = "in2", isOptional 
= true, type = @DataTypeHint("STRING")),
+   @ArgumentHint(name = "in3", isOptional 
= true, type = @DataTypeHint("STRING"))})
+   public void eval(String arg1, String arg2, String arg3) {
+   collect(arg1 + ", " + arg2 + "," + arg3);
+   }
+
+}
+```
+When calling the function in SQL, parameters can be specified by name, for 
example:
+```sql
+SELECT * FROM TABLE(myFunction(in1 => 'v1', in3 => 'v3', in2 => 'v2'))

Review C

  1   2   >