[jira] [Commented] (FLINK-25699) Use HashMap for MAP value constructors

2022-01-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482313#comment-17482313
 ] 

Sergey Nuyanzin commented on FLINK-25699:
-

thanks for your comments.
Regarding comparison {{actual vs expected }} instead of toString comparison it 
is also possible to compare the result of {{MAP[]}} and cardinality. It will 
allow to independent on string representation (may be except maps with 
cardinality less than 2).  The drawback is that for one map need to write more 
tests e.g.
{code:scala}
testAllApis(
  map(1, 2L , 3, 4L).at(1),
  "map(1, 2L, 3, 4L).at(1)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]",
  "2")

testAllApis(
  map(1, 2L , 3, 4L).at(3),
  "map(1, 2L, 3, 4L).at(3)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]",
  "4")

testAllApis(
  map(1, 2L , 3, 4L).cardinality(),
  "map(1, 2L, 3, 4L).cardinality()",
  "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])",
  "2")
{code}

> Use HashMap for MAP value constructors
> --
>
> Key: FLINK-25699
> URL: https://issues.apache.org/jira/browse/FLINK-25699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> Currently, the usage of maps is inconsistent. It is not ensured that 
> duplicate keys get eliminated. For CAST and output conversion this is solved. 
> However, we should have a similar implementation in MAP value constructor 
> like in CAST.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25699) Use HashMap for MAP value constructors

2022-01-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482313#comment-17482313
 ] 

Sergey Nuyanzin edited comment on FLINK-25699 at 1/26/22, 7:59 AM:
---

thanks for your comments.
Regarding comparison {{actual vs expected }} instead of toString comparison it 
is also possible to compare the result of {{MAP[] and cardinality. It will 
allow to independent on string representation (may be except maps with 
cardinality less than 2). The drawback is that for one map need to write more 
tests e.g.
{code:scala}
testAllApis(
  map(1, 2L , 3, 4L).at(1),
  "map(1, 2L, 3, 4L).at(1)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]",
  "2")

testAllApis(
  map(1, 2L , 3, 4L).at(3),
  "map(1, 2L, 3, 4L).at(3)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]",
  "4")

testAllApis(
  map(1, 2L , 3, 4L).cardinality(),
  "map(1, 2L, 3, 4L).cardinality()",
  "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])",
  "2")
{code}


was (Author: sergey nuyanzin):
thanks for your comments.
Regarding comparison {{actual vs expected }} instead of toString comparison it 
is also possible to compare the result of {{MAP[]}} and cardinality. It will 
allow to independent on string representation (may be except maps with 
cardinality less than 2).  The drawback is that for one map need to write more 
tests e.g.
{code:scala}
testAllApis(
  map(1, 2L , 3, 4L).at(1),
  "map(1, 2L, 3, 4L).at(1)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]",
  "2")

testAllApis(
  map(1, 2L , 3, 4L).at(3),
  "map(1, 2L, 3, 4L).at(3)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]",
  "4")

testAllApis(
  map(1, 2L , 3, 4L).cardinality(),
  "map(1, 2L, 3, 4L).cardinality()",
  "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])",
  "2")
{code}

> Use HashMap for MAP value constructors
> --
>
> Key: FLINK-25699
> URL: https://issues.apache.org/jira/browse/FLINK-25699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> Currently, the usage of maps is inconsistent. It is not ensured that 
> duplicate keys get eliminated. For CAST and output conversion this is solved. 
> However, we should have a similar implementation in MAP value constructor 
> like in CAST.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18418: [FLINK-25719][python] Support General Python UDF in Thread Mode

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18418:
URL: https://github.com/apache/flink/pull/18418#issuecomment-1017307428


   
   ## CI report:
   
   * 4e95ae2d0e8a2e2a2e3d85640d5ab125e86ede6c Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29850)
 
   * 65bbedd87b4b4962bf1b11764ad284419cbf24ee Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30195)
 
   * 3f39b61a0b956fde5f61790a9b1dd352789d28a6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30205)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-25699) Use HashMap for MAP value constructors

2022-01-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482313#comment-17482313
 ] 

Sergey Nuyanzin edited comment on FLINK-25699 at 1/26/22, 8:01 AM:
---

thanks for your comments.
Regarding comparison {{actual vs expected}} instead of toString comparison it 
is also possible to compare the result of {{MAP[]}} and cardinality. It will 
allow to independent on string representation (may be except maps with 
cardinality less than 2). The drawback is that for one map need to write more 
tests e.g.
{code:scala}
testAllApis(
  map(1, 2L , 3, 4L).at(1),
  "map(1, 2L, 3, 4L).at(1)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]",
  "2")

testAllApis(
  map(1, 2L , 3, 4L).at(3),
  "map(1, 2L, 3, 4L).at(3)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]",
  "4")

testAllApis(
  map(1, 2L , 3, 4L).cardinality(),
  "map(1, 2L, 3, 4L).cardinality()",
  "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])",
  "2")
{code}


was (Author: sergey nuyanzin):
thanks for your comments.
Regarding comparison {{actual vs expected }} instead of toString comparison it 
is also possible to compare the result of {{MAP[] and cardinality. It will 
allow to independent on string representation (may be except maps with 
cardinality less than 2). The drawback is that for one map need to write more 
tests e.g.
{code:scala}
testAllApis(
  map(1, 2L , 3, 4L).at(1),
  "map(1, 2L, 3, 4L).at(1)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]",
  "2")

testAllApis(
  map(1, 2L , 3, 4L).at(3),
  "map(1, 2L, 3, 4L).at(3)",
  "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]",
  "4")

testAllApis(
  map(1, 2L , 3, 4L).cardinality(),
  "map(1, 2L, 3, 4L).cardinality()",
  "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])",
  "2")
{code}

> Use HashMap for MAP value constructors
> --
>
> Key: FLINK-25699
> URL: https://issues.apache.org/jira/browse/FLINK-25699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> Currently, the usage of maps is inconsistent. It is not ensured that 
> duplicate keys get eliminated. For CAST and output conversion this is solved. 
> However, we should have a similar implementation in MAP value constructor 
> like in CAST.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

2022-01-26 Thread GitBox


slinkydeveloper commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r792385485



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
##
@@ -21,8 +21,7 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.utils.ReflectionsUtil;

Review comment:
   yep it's definitely better. When removing, check if there is any 
exclusion in `PlannerModule` in planner-loader




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

2022-01-26 Thread GitBox


slinkydeveloper commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r792385485



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
##
@@ -21,8 +21,7 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.utils.ReflectionsUtil;

Review comment:
   yep it's already a beginning. When removing, check if there is any 
exclusion in `PlannerModule` in planner-loader




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

2022-01-26 Thread GitBox


slinkydeveloper commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r792386400



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for 
JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {
+/** The unique identifier for each ExecNode in the JSON plan. */

Review comment:
   I'm personally not a fan of the javadoc on the private fields, as when 
using the IDE, I won't see anything unless I navigate to the class source code. 
But as you prefer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on pull request #18434: [FLINK-25742][akka] Remove the serialization of rpc invocation at Fli…

2022-01-26 Thread GitBox


KarmaGYZ commented on pull request #18434:
URL: https://github.com/apache/flink/pull/18434#issuecomment-1021955008


   @dawidwys I'll fill that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys commented on pull request #18434: [FLINK-25742][akka] Remove the serialization of rpc invocation at Fli…

2022-01-26 Thread GitBox


dawidwys commented on pull request #18434:
URL: https://github.com/apache/flink/pull/18434#issuecomment-1021956202


   Thanks @KarmaGYZ ! I know from personal experience it might be daunting at 
times :sweat_smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-benchmarks] wsry opened a new pull request #44: [FLINK-25704] Fix the blocking partition benchmark regression caused by FLINK-25637

2022-01-26 Thread GitBox


wsry opened a new pull request #44:
URL: https://github.com/apache/flink-benchmarks/pull/44


   FLINK-25637 changed the default blocking shuffle implementation from 
hash-shuffle to sort-shuffle which caused some benchmark regression. This PR 
tries to fix the regression. For more information about the regression, please 
refer to https://issues.apache.org/jira/browse/FLINK-25704.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25704) Performance regression on 18.01.2022 in batch network benchmarks

2022-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-25704:
---
Labels: pull-request-available  (was: )

> Performance regression on 18.01.2022 in batch network benchmarks
> 
>
> Key: FLINK-25704
> URL: https://issues.apache.org/jira/browse/FLINK-25704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Piotr Nowojski
>Assignee: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
>
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=compressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedMmapPartition&env=2&revs=200&equid=off&quarts=on&extr=on
> Suspected range:
> {code}
> git ls eeec246677..f5c99c6f26
> f5c99c6f26 [5 weeks ago] [FLINK-17321][table] Add support casting of map to 
> map and multiset to multiset [Sergey Nuyanzin]
> 745cfec705 [24 hours ago] [hotfix][table-common] Fix InternalDataUtils for 
> MapData tests [Timo Walther]
> ed699b6ee6 [6 days ago] [FLINK-25637][network] Make sort-shuffle the default 
> shuffle implementation for batch jobs [kevin.cyj]
> 4275525fed [6 days ago] [FLINK-25638][network] Increase the default write 
> buffer size of sort-shuffle to 16M [kevin.cyj]
> e1878fb899 [6 days ago] [FLINK-25639][network] Increase the default read 
> buffer size of sort-shuffle to 64M [kevin.cyj]
> {code}
> It looks [~kevin.cyj], that most likely your change has caused that?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] XComp commented on a change in pull request #18189: [FLINK-25430] Replace RunningJobRegistry by JobResultStore

2022-01-26 Thread GitBox


XComp commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r792392290



##
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##
@@ -147,6 +157,67 @@ public void 
testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
 }
 }
 
+@Test
+public void testDirtyJobResultRecoveryInApplicationMode() throws Exception 
{
+final Deadline deadline = Deadline.fromNow(TIMEOUT);
+final Configuration configuration = new Configuration();
+configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, 
Duration.ofMillis(100));
+final TestingMiniClusterConfiguration clusterConfiguration =
+TestingMiniClusterConfiguration.newBuilder()
+.setConfiguration(configuration)
+.build();
+
+// having a dirty entry in the JobResultStore should make the 
ApplicationDispatcherBootstrap
+// implementation fail to submit the job
+final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+jobResultStore.createDirtyResult(
+new JobResultEntry(
+new JobResult.Builder()
+
.jobId(ApplicationDispatcherBootstrap.ZERO_JOB_ID)
+.applicationStatus(ApplicationStatus.SUCCEEDED)
+.netRuntime(1)
+.build()));
+final EmbeddedHaServicesWithLeadershipControl haServices =
+new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
+
+@Override
+public JobResultStore getJobResultStore() {
+return jobResultStore;
+}
+};
+
+final TestingMiniCluster.Builder clusterBuilder =
+TestingMiniCluster.newBuilder(clusterConfiguration)
+.setHighAvailabilityServicesSupplier(() -> haServices)
+.setDispatcherResourceManagerComponentFactorySupplier(
+
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
+
clusterConfiguration.getConfiguration(),
+
ErrorHandlingSubmissionJob.createPackagedProgram()));
+try (final MiniCluster cluster = clusterBuilder.build()) {
+// start mini cluster and submit the job
+cluster.start();
+
+// the cluster should shut down automatically once the application 
completes
+awaitClusterStopped(cluster, deadline);
+}
+
+
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
+.as(
+"The job's main method shouldn't have been succeeded 
due to a DuplicateJobSubmissionException.")
+
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+
+assertThat(
+jobResultStore.hasDirtyJobResultEntry(
+ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+.isTrue();
+assertThat(
+jobResultStore.hasCleanJobResultEntry(
+ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+.isFalse();

Review comment:
   This test is here to check that no job was re-submitted in case of a 
dirty jobresult existing in the `JobResultStore`. The cleanup is integrated in 
[FLINK-25432](https://issues.apache.org/jira/browse/FLINK-25432). That's where 
we would have to adjust the test accordingly.
   
   We won't miss the change in `FLINK-25432` because the test would fail after 
integrating the cleanup because of the asserts on `hasDirtyJobResultEntry` and 
`hasCleanJobResultEntry`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] slinkydeveloper commented on pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

2022-01-26 Thread GitBox


slinkydeveloper commented on pull request #18479:
URL: https://github.com/apache/flink/pull/18479#issuecomment-1021961660


   > Because, with the upgrade story, we can have an @ExecNodeMetadata 
annotation with the same name, on a subclass of a current ExecNode class, which 
does something new/different and defines a newer version. So we need the 
combination of name + version to uniquely identify the class when we lookup and 
rebuild the Java object graph from the JSON plan.
   
   I may have badly explained myself, but i'm not questioning the name + 
version tuple. That's ok and I get why we need it.
   
   What I'm questioning is that in the same field you add `id + name + 
version`, and this seems wrong to me, because `id` is the "instance 
identifier", like a pointer to a specific node of the graph, while `name + 
version` is the "type identifier", which tells you which `ExecNode` class and 
version the node is. I don't think these two concepts (instance id and type id) 
should be in the same field, as they are logically very different things, and 
also because future tooling will have hard time parsing this JSON. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] infoverload commented on a change in pull request #18431: [FLINK-25024][docs] Add Changelog backend docs

2022-01-26 Thread GitBox


infoverload commented on a change in pull request #18431:
URL: https://github.com/apache/flink/pull/18431#discussion_r792382240



##
File path: docs/content/docs/ops/metrics.md
##
@@ -1203,6 +1203,59 @@ Note that for failed checkpoints, metrics are updated on 
a best efforts basis an
 ### RocksDB
 Certain RocksDB native metrics are available but disabled by default, you can 
find full documentation [here]({{< ref "docs/deployment/config" 
>}}#rocksdb-native-metrics)
 
+### State changelog

Review comment:
   ```suggestion
   ### State Changelog
   ```

##
File path: docs/content/docs/ops/state/state_backends.md
##
@@ -325,6 +325,129 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The last one (upload time) can be decreased by [Incremental checkpoints]({{< 
ref "#incremental-checkpoints" >}}).
+However, most Incremental State Backends perform some form of compaction 
periodically, which results in re-uploading the
+old state in addition to the new changes. In large deployments, the 
probability of at least one task uploading lots of
+data tend to be very high in every checkpoint.

Review comment:
   ```suggestion
   Upload time can be decreased by [incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}).
   However, most incremental state backends perform some form of compaction 
periodically, which results in re-uploading the
   old state in addition to the new changes. In large deployments, the 
probability of at least one task uploading lots of
   data tends to be very high in every checkpoint.
   ```

##
File path: docs/content/docs/ops/state/state_backends.md
##
@@ -325,6 +325,129 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The last one (upload time) can be decreased by [Incremental checkpoints]({{< 
ref "#incremental-checkpoints" >}}).
+However, most Incremental State Backends perform some form of compaction 
periodically, which results in re-uploading the
+old state in addition to the new changes. In large deployments, the 
probability of at least one task uploading lots of
+data tend to be very high in every checkpoint.
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is snapshotted in the
+background periodically. Upon successful upload, changelog is truncated.
+
+As a result, asynchronous phase duration is reduced, as well as synchronous 
phase - because no data needs to be flushed
+to disk. In particular, long-tail latency is improved.
+
+On the flip side, resource usage is higher:
+
+- more files are created on DFS
+- more IO bandwidth is used to upload state changes
+- more CPU used to serialize state changes
+- more memory used by Task Managers to buffer state changes
+- todo: more details after test

[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

2022-01-26 Thread GitBox


JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792393666



##
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+private final RowType keyType;
+private final RowType rowType;
+private final Comparator keyComparator;
+private final Accumulator accumulator;
+private final FileFormat fileFormat;
+private final FileStorePathFactory pathFactory;
+private final MergeTreeOptions mergeTreeOptions;
+
+private final FileStoreScan scan;
+
+public FileStoreWriteImpl(
+RowType partitionType,
+RowType keyType,
+RowType rowType,
+Comparator keyComparator,
+Accumulator accumulator,
+FileFormat fileFormat,
+FileStorePathFactory pathFactory,
+MergeTreeOptions mergeTreeOptions) {
+this.keyType = keyType;
+this.rowType = rowType;
+this.keyComparator = keyComparator;
+this.accumulator = accumulator;
+this.fileFormat = fileFormat;
+this.pathFactory = pathFactory;
+this.mergeTreeOptions = mergeTreeOptions;
+
+this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, 
fileFormat, pathFactory);
+}
+
+@Override
+public RecordWriter createWriter(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+Long latestSnapshotId = pathFactory.latestSnapshotId();
+if (latestSnapshotId == null) {
+return createEmptyWriter(partition, bucket, compactExecutor);
+} else {
+MergeTree mergeTree = createMergeTree(partition, bucket, 
compactExecutor);
+return mergeTree.createWriter(
+scan.withSnapshot(latestSnapshotId)
+
.withPartitionFilter(Collections.singletonList(partition))
+.withBucket(bucket).plan().files().stream()
+.map(ManifestEntry::file)
+.collect(Collectors.toList()));
+}
+}
+
+@Override
+public RecordWriter createEmptyWriter(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+MergeTree mergeTree = createMergeTree(partition, bucket, 
compactExecutor);
+return mergeTree.createWriter(Collections.emptyList());
+}
+
+private MergeTree createMergeTree(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+SstFile sstFile =
+new SstFile(
+keyType,
+rowType,
+fileFormat,
+pathFactory.createSstPathFactory(partition, bucket),

Review comment:
   Can uuid be reused in `SstPathFactory` from `FileStorePathFactory`?

##
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software

[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

2022-01-26 Thread GitBox


JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792394148



##
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+private final RowType keyType;
+private final RowType rowType;
+private final Comparator keyComparator;
+private final Accumulator accumulator;
+private final FileFormat fileFormat;
+private final FileStorePathFactory pathFactory;
+private final MergeTreeOptions mergeTreeOptions;
+
+private final FileStoreScan scan;
+
+public FileStoreWriteImpl(
+RowType partitionType,
+RowType keyType,
+RowType rowType,
+Comparator keyComparator,
+Accumulator accumulator,
+FileFormat fileFormat,
+FileStorePathFactory pathFactory,
+MergeTreeOptions mergeTreeOptions) {
+this.keyType = keyType;
+this.rowType = rowType;
+this.keyComparator = keyComparator;
+this.accumulator = accumulator;
+this.fileFormat = fileFormat;
+this.pathFactory = pathFactory;
+this.mergeTreeOptions = mergeTreeOptions;
+
+this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, 
fileFormat, pathFactory);
+}
+
+@Override
+public RecordWriter createWriter(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+Long latestSnapshotId = pathFactory.latestSnapshotId();
+if (latestSnapshotId == null) {
+return createEmptyWriter(partition, bucket, compactExecutor);
+} else {
+MergeTree mergeTree = createMergeTree(partition, bucket, 
compactExecutor);
+return mergeTree.createWriter(
+scan.withSnapshot(latestSnapshotId)
+
.withPartitionFilter(Collections.singletonList(partition))
+.withBucket(bucket).plan().files().stream()
+.map(ManifestEntry::file)
+.collect(Collectors.toList()));
+}
+}
+
+@Override
+public RecordWriter createEmptyWriter(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+MergeTree mergeTree = createMergeTree(partition, bucket, 
compactExecutor);
+return mergeTree.createWriter(Collections.emptyList());
+}
+
+private MergeTree createMergeTree(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+SstFile sstFile =
+new SstFile(

Review comment:
   Can `readerFactory` and `writerFactory` be reused in `SstFile`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18500: [FLINK-25312][hive] HiveCatalog supports Flink's managed table

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18500:
URL: https://github.com/apache/flink/pull/18500#issuecomment-1021030168


   
   ## CI report:
   
   * d7dc483a078a947fe4d397c17e3ec3fa9ec8db72 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30191)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18505:
URL: https://github.com/apache/flink/pull/18505#issuecomment-1021169033


   
   ## CI report:
   
   * 7937706340bfa394537cc62380693fa12abe3d38 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30149)
 
   * 62ce77fe716ae2cfa26fb5cc04948683acc33e40 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30196)
 
   * 7ab24a6fde4e9c0f15e1bddbb7134dbd656d0c40 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30198)
 
   * 94e63928b2c567014e7f0861c6069186914950e4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

2022-01-26 Thread GitBox


JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792405101



##
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+private final RowType keyType;
+private final RowType rowType;
+private final Comparator keyComparator;
+private final Accumulator accumulator;
+private final FileFormat fileFormat;
+private final FileStorePathFactory pathFactory;
+private final MergeTreeOptions mergeTreeOptions;
+
+private final FileStoreScan scan;
+
+public FileStoreWriteImpl(
+RowType partitionType,
+RowType keyType,
+RowType rowType,
+Comparator keyComparator,
+Accumulator accumulator,
+FileFormat fileFormat,
+FileStorePathFactory pathFactory,
+MergeTreeOptions mergeTreeOptions) {
+this.keyType = keyType;
+this.rowType = rowType;
+this.keyComparator = keyComparator;
+this.accumulator = accumulator;
+this.fileFormat = fileFormat;
+this.pathFactory = pathFactory;
+this.mergeTreeOptions = mergeTreeOptions;
+
+this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, 
fileFormat, pathFactory);
+}
+
+@Override
+public RecordWriter createWriter(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+Long latestSnapshotId = pathFactory.latestSnapshotId();
+if (latestSnapshotId == null) {
+return createEmptyWriter(partition, bucket, compactExecutor);
+} else {
+MergeTree mergeTree = createMergeTree(partition, bucket, 
compactExecutor);
+return mergeTree.createWriter(
+scan.withSnapshot(latestSnapshotId)
+
.withPartitionFilter(Collections.singletonList(partition))
+.withBucket(bucket).plan().files().stream()
+.map(ManifestEntry::file)
+.collect(Collectors.toList()));
+}
+}
+
+@Override
+public RecordWriter createEmptyWriter(
+BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
+MergeTree mergeTree = createMergeTree(partition, bucket, 
compactExecutor);
+return mergeTree.createWriter(Collections.emptyList());
+}
+
+private MergeTree createMergeTree(

Review comment:
   We can create a `MergeTreeFactory`, it is required by reader too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18505:
URL: https://github.com/apache/flink/pull/18505#issuecomment-1021169033


   
   ## CI report:
   
   * 7937706340bfa394537cc62380693fa12abe3d38 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30149)
 
   * 62ce77fe716ae2cfa26fb5cc04948683acc33e40 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30196)
 
   * 7ab24a6fde4e9c0f15e1bddbb7134dbd656d0c40 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30198)
 
   * 94e63928b2c567014e7f0861c6069186914950e4 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30206)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Myasuka commented on a change in pull request #18391: [FLINK-25478][chaneglog] Correct the state register logic of ChangelogStateBackendHandle

2022-01-26 Thread GitBox


Myasuka commented on a change in pull request #18391:
URL: https://github.com/apache/flink/pull/18391#discussion_r792406221



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##
@@ -130,14 +155,51 @@ public String toString() {
 keyGroupRange, materialized.size(), 
nonMaterialized.size());
 }
 
-private static Closeable asCloseable(KeyedStateHandle h) {
-return () -> {
-try {
-h.discardState();
-} catch (Exception e) {
-ExceptionUtils.rethrowIOException(e);
+private static class StreamStateHandleWrapper implements 
StreamStateHandle {
+private static final long serialVersionUID = 1L;
+
+private final KeyedStateHandle keyedStateHandle;
+
+StreamStateHandleWrapper(KeyedStateHandle keyedStateHandle) {
+this.keyedStateHandle = keyedStateHandle;
+}
+
+@Override
+public void discardState() throws Exception {
+keyedStateHandle.discardState();
+}
+
+@Override
+public long getStateSize() {
+return keyedStateHandle.getStateSize();
+}
+
+@Override
+public FSDataInputStream openInputStream() throws IOException {
+throw new UnsupportedOperationException("Should not call 
here.");
+}
+
+@Override
+public Optional asBytesIfInMemory() {
+throw new UnsupportedOperationException("Should not call 
here.");
+}

Review comment:
   The returned `StreamStateHandle` would only be used in 
`IncrementalRemoteKeyedStateHandle`, which is safe here to just add such 
wrapper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] matriv commented on pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

2022-01-26 Thread GitBox


matriv commented on pull request #18479:
URL: https://github.com/apache/flink/pull/18479#issuecomment-1021979678


   > > Because, with the upgrade story, we can have an @ExecNodeMetadata 
annotation with the same name, on a subclass of a current ExecNode class, which 
does something new/different and defines a newer version. So we need the 
combination of name + version to uniquely identify the class when we lookup and 
rebuild the Java object graph from the JSON plan.
   > 
   > I may have badly explained myself, but i'm not questioning the name + 
version tuple. That's ok and I get why we need it.
   > 
   > What I'm questioning is that in the same field you add `id + name + 
version`, and this seems wrong to me, because `id` is the "instance 
identifier", like a pointer to a specific node of the graph, while `name + 
version` is the "type identifier", which tells you which `ExecNode` class and 
version the node is. I don't think these two concepts (instance id and type id) 
should be in the same field, as they are logically very different things, and 
also because future tooling will have hard time parsing this JSON.
   
   Thx for explaining! I see your point. The decision was more towards JSON and 
code simplification, so that this int id is part of the one liner `context` 
field, so that everything is handled in one place regarding the complete 
identification of a node. So we read the `context` as one POJO from JSON, and 
we construct a new `context` from the annotation plus a freshly incremented id 
for new nodes. Imho, I don't see a big issue in this approach, since we have 
javadocs describing the usage of those sub-fields of the context, and I don't 
see any problem for tooling, since it's a well defined string with 3 
components, and a simple `split("_")` gives those independently.
   
   @twalthr WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18353: [FLINK-25129][docs]project configuation changes in docs

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18353:
URL: https://github.com/apache/flink/pull/18353#issuecomment-1012981377


   
   ## CI report:
   
   * 80fd50ad46865be06e2c83b2470fb3eb2d35cd96 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29823)
 
   * d0b2b188c37443b7bbda39af499398326cd56979 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18428: [FLINK-25575] Add Sink V2 operators and translation

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18428:
URL: https://github.com/apache/flink/pull/18428#issuecomment-1017677769


   
   ## CI report:
   
   * 3705b617ac596bd4be08fd9ef2e4db40bef586f2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30154)
 
   * b3cd4cde01b0207349df086beb909b000b5e0bdd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on pull request #17819: [FLINK-15816][k8s] Prevent labels using kubernetes.cluster-id to exceed the limit of 63 characters

2022-01-26 Thread GitBox


XComp commented on pull request #17819:
URL: https://github.com/apache/flink/pull/17819#issuecomment-1021982073


   Ok, my understanding was that we're using service names in labels which 
forces us to set some limitation on the cluster ID. Reiterating over the 
comments of FLINK-15816, I conclude that it's really only about adding the 
check for the `clusterIds` maximum length also for standalone mode. 
Essentially, the only change that's needed is to add a check similar to 
[AbstractKubernetesParameters:73](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java#L73)
 in 
[KubernetesHaServices:99](https://github.com/apache/flink/blob/a115a3c52f0e305c139666b35ec952365049738d/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java#L99).
   
   I'm just wondering why we should limit ourselves with 45 characters. Don't 
we have to restart the Flink cluster anyway in case of a version update? That 
would give the user the possibility to update the cluster ID if it becomes 
necessary.
   
   In any case, we should add some in-code documentation about this to the 
[MAXIMUM_CHARACTERS_OF_CLUSTER_ID declaration in 
Constants:86]https://github.com/apache/flink/blob/d532f5fc81f9acf611541cab440eb3d5a9de08cc/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java#L86)
 to explain the reason for this constant being set in that way (i.e. that it's 
left at `45` to be prepared for future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp edited a comment on pull request #17819: [FLINK-15816][k8s] Prevent labels using kubernetes.cluster-id to exceed the limit of 63 characters

2022-01-26 Thread GitBox


XComp edited a comment on pull request #17819:
URL: https://github.com/apache/flink/pull/17819#issuecomment-1021982073


   Ok, my understanding was that we're using service names in labels which 
forces us to set some limitation on the cluster ID. Reiterating over the 
comments of FLINK-15816, I conclude that it's really only about adding the 
check for the `clusterIds` maximum length also for standalone mode. 
Essentially, the only change that's needed is to add a check similar to 
[AbstractKubernetesParameters:73](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java#L73)
 in 
[KubernetesHaServices:99](https://github.com/apache/flink/blob/a115a3c52f0e305c139666b35ec952365049738d/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java#L99).
   
   I'm just wondering why we should limit ourselves with 45 characters. Don't 
we have to restart the Flink cluster anyway in case of a version update? That 
would give the user the possibility to update the cluster ID if it becomes 
necessary.
   
   In any case, we should add some in-code documentation about this to the 
[MAXIMUM_CHARACTERS_OF_CLUSTER_ID declaration in 
Constants:86]https://github.com/apache/flink/blob/d532f5fc81f9acf611541cab440eb3d5a9de08cc/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java#L86)
 to explain the reason for this constant being set in that way (i.e. that it's 
left at `45` to be prepared for future.
   
   Sorry for guiding you into the wrong direction, @alpreu .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

2022-01-26 Thread GitBox


pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r792408057



##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
   That is part of the reason why we generally speaking do not accept new 
test code using mockito, as tests shouldn't relay on private implementation 
details of the production code.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
 || inputSelectionHandler.areAllInputsFinished()) {
 return AVAILABLE;
 }
-final CompletableFuture anyInputAvailable = new 
CompletableFuture<>();
 for (int i = 0; i < inputProcessors.length; i++) {
 if (!inputSelectionHandler.isInputFinished(i)
-&& inputSelectionHandler.isInputSelected(i)) {
-assertNoException(
-inputProcessors[i]
-.getAvailableFuture()
-.thenRun(() -> 
anyInputAvailable.complete(null)));
+&& inputSelectionHandler.isInputSelected(i)
+&& inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+return AVAILABLE;

Review comment:
   But
   1. in what scenario is this extra short cut doing something more compared to 
the old check?
   2. if there is such scenario, have you tested that it's worth the added 
complexity?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18353: [FLINK-25129][docs]project configuation changes in docs

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18353:
URL: https://github.com/apache/flink/pull/18353#issuecomment-1012981377


   
   ## CI report:
   
   * 80fd50ad46865be06e2c83b2470fb3eb2d35cd96 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29823)
 
   * d0b2b188c37443b7bbda39af499398326cd56979 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30207)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18428: [FLINK-25575] Add Sink V2 operators and translation

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18428:
URL: https://github.com/apache/flink/pull/18428#issuecomment-1017677769


   
   ## CI report:
   
   * 3705b617ac596bd4be08fd9ef2e4db40bef586f2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30154)
 
   * b3cd4cde01b0207349df086beb909b000b5e0bdd Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30208)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performanc

2022-01-26 Thread GitBox


pnowojski commented on a change in pull request #18505:
URL: https://github.com/apache/flink/pull/18505#discussion_r792418842



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
##
@@ -41,7 +41,7 @@
  * ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards 
to fully release all
  * resources associated the partition and propagate failure cause to the 
consumer if possible.
  */
-public interface ResultPartitionWriter extends AutoCloseable, 
AvailabilityProvider {
+public interface ResultPartitionWriter extends AvailabilityProvider {

Review comment:
   (One small comment as I'm not able to do full review): Why did you 
remove `AutoCloseable`?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
##
@@ -41,7 +41,7 @@
  * ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards 
to fully release all
  * resources associated the partition and propagate failure cause to the 
consumer if possible.
  */
-public interface ResultPartitionWriter extends AutoCloseable, 
AvailabilityProvider {
+public interface ResultPartitionWriter extends AvailabilityProvider {

Review comment:
   (One small comment as I'm not able to do full review): Can you explain 
why did you remove `AutoCloseable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18145: [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient when getting offsets

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18145:
URL: https://github.com/apache/flink/pull/18145#issuecomment-997131565


   
   ## CI report:
   
   * 83ad01c2346a5e46106e0aef530285aed68d966f Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30190)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Thesharing commented on a change in pull request #18480: [FLINK-25789][docs-zh] Translate the formats/hadoop page into Chinese.

2022-01-26 Thread GitBox


Thesharing commented on a change in pull request #18480:
URL: https://github.com/apache/flink/pull/18480#discussion_r792409930



##
File path: docs/content.zh/docs/connectors/datastream/formats/hadoop.md
##
@@ -57,20 +55,13 @@ a `hadoop-client` dependency such as:
 
 ## Using Hadoop InputFormats
 
-To use Hadoop `InputFormats` with Flink the format must first be wrapped
-using either `readHadoopFile` or `createHadoopInput` of the
-`HadoopInputs` utility class.
-The former is used for input formats derived
-from `FileInputFormat` while the latter has to be used for general purpose
-input formats.
-The resulting `InputFormat` can be used to create a data source by using
-`ExecutionEnvironmen#createInput`.
+要将 Hadoop `InputFormats` 与 Flink 一起使用,必须首先使用 `HadoopInputs` 工具类的 
`readHadoopFile` 或 `createHadoopInput` 包装格式。

Review comment:
   ```suggestion
   要在 Flink 中使用 Hadoop `InputFormats`,必须首先使用 `HadoopInputs` 工具类的 
`readHadoopFile` 或 `createHadoopInput` 包装 Input Format。
   ```

##
File path: docs/content.zh/docs/connectors/datastream/formats/hadoop.md
##
@@ -57,20 +55,13 @@ a `hadoop-client` dependency such as:
 
 ## Using Hadoop InputFormats
 
-To use Hadoop `InputFormats` with Flink the format must first be wrapped
-using either `readHadoopFile` or `createHadoopInput` of the
-`HadoopInputs` utility class.
-The former is used for input formats derived
-from `FileInputFormat` while the latter has to be used for general purpose
-input formats.
-The resulting `InputFormat` can be used to create a data source by using
-`ExecutionEnvironmen#createInput`.
+要将 Hadoop `InputFormats` 与 Flink 一起使用,必须首先使用 `HadoopInputs` 工具类的 
`readHadoopFile` 或 `createHadoopInput` 包装格式。
+前者用于从 `FileInputFormat` 派生的输入格式,而后者必须用于通用输入格式。

Review comment:
   ```suggestion
   前者用于从 `FileInputFormat` 派生的 Input Format,而后者必须用于通用的 Input Format。
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] fapaul merged pull request #18145: [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient when getting offsets

2022-01-26 Thread GitBox


fapaul merged pull request #18145:
URL: https://github.com/apache/flink/pull/18145


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer

2022-01-26 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482329#comment-17482329
 ] 

Fabian Paul commented on FLINK-25368:
-

Merged in master: c6f14ca5b10d30232966bce2f52e9f9128346473

> Use AdminClient to get offsets rather than KafkaConsumer
> 
>
> Key: FLINK-25368
> URL: https://issues.apache.org/jira/browse/FLINK-25368
> Project: Flink
>  Issue Type: Improvement
>Reporter: dengziming
>Priority: Minor
>  Labels: pull-request-available
>
> `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more 
> `OffsetSpce` types will be added to it, for example, 
> OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to 
> substitute `KafkaConsumer` with `AdminClient`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25704) Performance regression on 18.01.2022 in batch network benchmarks

2022-01-26 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482328#comment-17482328
 ] 

Piotr Nowojski commented on FLINK-25704:


Thanks for the investigation [~kevin.cyj]
{quote}
Make the existing benchmark tests still test BoundedBlockingResultPartition;
Add new benchmark tests for SortMergeResultPartition;
Try to optimize SortMergeResultPartition for small records;
Document this default blocking shuffle change in both release notes and user 
doc.
{quote}
This plan makes sense to me, +1

> Performance regression on 18.01.2022 in batch network benchmarks
> 
>
> Key: FLINK-25704
> URL: https://issues.apache.org/jira/browse/FLINK-25704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Piotr Nowojski
>Assignee: Yingjie Cao
>Priority: Critical
>  Labels: pull-request-available
>
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=compressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedMmapPartition&env=2&revs=200&equid=off&quarts=on&extr=on
> Suspected range:
> {code}
> git ls eeec246677..f5c99c6f26
> f5c99c6f26 [5 weeks ago] [FLINK-17321][table] Add support casting of map to 
> map and multiset to multiset [Sergey Nuyanzin]
> 745cfec705 [24 hours ago] [hotfix][table-common] Fix InternalDataUtils for 
> MapData tests [Timo Walther]
> ed699b6ee6 [6 days ago] [FLINK-25637][network] Make sort-shuffle the default 
> shuffle implementation for batch jobs [kevin.cyj]
> 4275525fed [6 days ago] [FLINK-25638][network] Increase the default write 
> buffer size of sort-shuffle to 16M [kevin.cyj]
> e1878fb899 [6 days ago] [FLINK-25639][network] Increase the default read 
> buffer size of sort-shuffle to 64M [kevin.cyj]
> {code}
> It looks [~kevin.cyj], that most likely your change has caused that?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer

2022-01-26 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul reassigned FLINK-25368:
---

Assignee: dengziming

> Use AdminClient to get offsets rather than KafkaConsumer
> 
>
> Key: FLINK-25368
> URL: https://issues.apache.org/jira/browse/FLINK-25368
> Project: Flink
>  Issue Type: Improvement
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>  Labels: pull-request-available
>
> `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more 
> `OffsetSpce` types will be added to it, for example, 
> OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to 
> substitute `KafkaConsumer` with `AdminClient`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer

2022-01-26 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul updated FLINK-25368:

Affects Version/s: 1.15.0

> Use AdminClient to get offsets rather than KafkaConsumer
> 
>
> Key: FLINK-25368
> URL: https://issues.apache.org/jira/browse/FLINK-25368
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.15.0
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>  Labels: pull-request-available
>
> `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more 
> `OffsetSpce` types will be added to it, for example, 
> OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to 
> substitute `KafkaConsumer` with `AdminClient`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer

2022-01-26 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul resolved FLINK-25368.
-
Fix Version/s: 1.15.0
   Resolution: Fixed

> Use AdminClient to get offsets rather than KafkaConsumer
> 
>
> Key: FLINK-25368
> URL: https://issues.apache.org/jira/browse/FLINK-25368
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.15.0
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more 
> `OffsetSpce` types will be added to it, for example, 
> OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to 
> substitute `KafkaConsumer` with `AdminClient`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink

2022-01-26 Thread GitBox


imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r792420132



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
##
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSink} to make it easier for the users to 
construct a {@link
+ * PulsarSink}.
+ *
+ * The following example shows the minimum setup to create a PulsarSink 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * {@code
+ * PulsarSink sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopics(topic)
+ * 
.setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .build();
+ * }
+ *
+ * The service url, admin url, and the record serializer are required 
fields that must be set. If
+ * you don't set the topics, make sure you have provided a custom {@link 
TopicRouter}. Otherwise,
+ * you must provide the topics to produce.
+ *
+ * To specify the delivery guarantees of PulsarSink, one can call {@link
+ * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the 
delivery guarantee is {@link
+ * DeliveryGuarantee#EXACTLY_ONCE}, and it requires the Pulsar broker to turn 
on transaction
+ * support.
+ *
+ * {@code
+ * PulsarSink sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopics(topic)
+ * 
.setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .setDeliveryGuarantee(deliveryGuarantee)
+ * .build();
+ * }
+ *
+ * @see PulsarSink for a more detailed explanation of the different guarantees.
+ * @param  The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSinkBuilder {
+private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSinkBuilder.class);
+
+private final PulsarConfigBuilder configBuilder;
+
+private DeliveryGuarantee delive

[GitHub] [flink] pnowojski merged pull request #18392: [FLINK-25590][metrics] Introduce RequestedMemoryUsage and log warnings if usage exceeds 100%

2022-01-26 Thread GitBox


pnowojski merged pull request #18392:
URL: https://github.com/apache/flink/pull/18392


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on pull request #18392: [FLINK-25590][metrics] Introduce RequestedMemoryUsage and log warnings if usage exceeds 100%

2022-01-26 Thread GitBox


pnowojski commented on pull request #18392:
URL: https://github.com/apache/flink/pull/18392#issuecomment-1021993369


   Ops, I forgot to squash commits before merging :|


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-25590) Logging warning of insufficient memory for all configured buffers

2022-01-26 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-25590.
--
Fix Version/s: 1.15.0
 Assignee: Piotr Nowojski
   Resolution: Fixed

Merged to master as cfe5e9a728d and cfe5e9a728d^

> Logging warning of insufficient memory for all configured buffers
> -
>
> Key: FLINK-25590
> URL: https://issues.apache.org/jira/browse/FLINK-25590
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Anton Kalashnikov
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Right now, if exclusive buffers for the input channel and one buffer for each 
> subpartition would be allocated on start successfully but there would be not 
> enough memory for the rest of the buffers(floating buffers, rest buffers for 
> subpartitions), then we see nothing in the log about that(as I understand).
> So first of all, we need to check what logs we have right now about 
> situations when flink doesn't have enough memory for all configured buffers. 
> And if we have nothing (or not enough) we should add such a log.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] zentol merged pull request #18501: [FLINK-25348][build] Clear japicmp exclusions

2022-01-26 Thread GitBox


zentol merged pull request #18501:
URL: https://github.com/apache/flink/pull/18501


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-25348) Update release guide to reset japicmp exceptions for every release

2022-01-26 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-25348.

Resolution: Fixed

master: d9bb8d7946fa09bb77ffddcc6eea98435ef7f825

I've also updated the release guide.

> Update release guide to reset japicmp exceptions for every release
> --
>
> Key: FLINK-25348
> URL: https://issues.apache.org/jira/browse/FLINK-25348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> I propose to clean up the japicmp maven plugin exclusion for every minor 
> release for @Public and the exclusions for @PublicEvolving with every minor 
> release. Currently, we don’t do this and that’s why we have accumulated quite 
> some list of exclusions that a) might shadow other problems and b) nobody 
> really knows why they are still relevant. I would propose to make this part 
> of the release guide. The result should be that we minimize our set of 
> exclusions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler

2022-01-26 Thread GitBox


zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r791884986



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##
@@ -190,10 +191,24 @@ private void maybeSetParallelism(final ExecutionJobVertex 
jobVertex) {
 jobVertex.getName(),
 parallelism);
 
-jobVertex.setParallelism(parallelism);
+changeJobVertexParallelism(jobVertex, parallelism);
 }
 }
 
+private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int 
parallelism) {
+// update PlanJson

Review comment:
   Would you add some comments for why we need to update the plan json?  
e.g. "it is needed to enable REST APIs to return the latest parallelism of job 
vertices."

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##
@@ -676,7 +727,7 @@ private void 
notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
 
 @Override
 public ResourceProfile getResourceProfile(final ExecutionVertexID 
executionVertexId) {
-return getExecutionVertex(executionVertexId).getResourceProfile();
+return 
getExecutionJobVertex(executionVertexId.getJobVertexId()).getResourceProfile();

Review comment:
   What's this change for?

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##
@@ -762,6 +762,11 @@ public void setInternalTaskFailuresListener(
 //  Actions
 // 

 
+@Override
+public void notifyNewJobVertexInitialized(List 
vertices) {

Review comment:
   maybe `notifyNewlyInitializedJobVertices()`? because the job vertices 
are just newly initialized rather than newly added.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperati

[GitHub] [flink] TanYuxin-tyx opened a new pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

2022-01-26 Thread GitBox


TanYuxin-tyx opened a new pull request #18515:
URL: https://github.com/apache/flink/pull/18515


   
   
   
   
   ## What is the purpose of the change
   
   Currently, if the partition file has been lost for blocking shuffle, 
FileNotFoundException will be thrown and the partition data will not be 
regenerated. This change makes it throw PartitionNotFoundException instead.
   
   
   ## Brief change log
   
 - *Throw PartitionNotFoundException if the partition file has been lost 
for blocking shuffle.*
   
   ## Verifying this change
   
   This change added tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer commented on pull request #18512: [FLINK-25811][hotfix][connector/base] changing failed requests handler to accept List in AsyncSinkWriter

2022-01-26 Thread GitBox


dannycranmer commented on pull request #18512:
URL: https://github.com/apache/flink/pull/18512#issuecomment-1022002473


   Thanks for the improvement @vahmed-hamdy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer merged pull request #18512: [FLINK-25811][hotfix][connector/base] changing failed requests handler to accept List in AsyncSinkWriter

2022-01-26 Thread GitBox


dannycranmer merged pull request #18512:
URL: https://github.com/apache/flink/pull/18512


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order

2022-01-26 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-25811:
--
Fix Version/s: 1.15.0

> Fix generic AsyncSinkWriter retrying requests in reverse order
> --
>
> Key: FLINK-25811
> URL: https://issues.apache.org/jira/browse/FLINK-25811
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> {{AsyncSinkWriter}} retries failed request in reverse order.
> *Scope:*
> * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} 
> module.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25816) Changelog keyed state backend would come across NPE during notification

2022-01-26 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-25816:
--
Affects Version/s: 1.15.0

> Changelog keyed state backend would come across NPE during notification
> ---
>
> Key: FLINK-25816
> URL: https://issues.apache.org/jira/browse/FLINK-25816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Instance: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order

2022-01-26 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-25811:
--
Affects Version/s: (was: 1.15.0)

> Fix generic AsyncSinkWriter retrying requests in reverse order
> --
>
> Key: FLINK-25811
> URL: https://issues.apache.org/jira/browse/FLINK-25811
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Motivation
> {{AsyncSinkWriter}} retries failed request in reverse order.
> *Scope:*
> * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} 
> module.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order

2022-01-26 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-25811:
-

Assignee: Ahmed Hamdy

> Fix generic AsyncSinkWriter retrying requests in reverse order
> --
>
> Key: FLINK-25811
> URL: https://issues.apache.org/jira/browse/FLINK-25811
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> {{AsyncSinkWriter}} retries failed request in reverse order.
> *Scope:*
> * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} 
> module.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink-benchmarks] wsry commented on pull request #44: [FLINK-25704] Fix the blocking partition benchmark regression caused by FLINK-25637

2022-01-26 Thread GitBox


wsry commented on pull request #44:
URL: https://github.com/apache/flink-benchmarks/pull/44#issuecomment-1022004795


   > NETWORK_SORT_SHUFFLE_MIN_PARALLELISM
   
   @pnowojski Thanks for the review.
   
   ```NETWORK_SORT_SHUFFLE_MIN_PARALLELISM``` is a config option to config 
which blocking shuffle implementation to use, either the hash-based one or the 
sort-based one. If task parallelism is smaller than this config value, the 
hash-based blocking shuffle will be used, otherwise, the sort-based blocking 
shuffle will be used. Previously, the default value of this config option is 
Integer.MAX_VALUE, which mean hash-shuffle will be used by default, FLINK-25637 
changed the default value to 1, which means sort-shuffle will be used by 
default. The changes in this PR restore the previous default value for the 
corresponding benchmark cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order

2022-01-26 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-25811.
---
Resolution: Fixed

> Fix generic AsyncSinkWriter retrying requests in reverse order
> --
>
> Key: FLINK-25811
> URL: https://issues.apache.org/jira/browse/FLINK-25811
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> {{AsyncSinkWriter}} retries failed request in reverse order.
> *Scope:*
> * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} 
> module.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot commented on pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

2022-01-26 Thread GitBox


flinkbot commented on pull request #18515:
URL: https://github.com/apache/flink/pull/18515#issuecomment-1022004860






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order

2022-01-26 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482338#comment-17482338
 ] 

Danny Cranmer commented on FLINK-25811:
---

Also related to https://github.com/apache/flink/pull/18488/

> Fix generic AsyncSinkWriter retrying requests in reverse order
> --
>
> Key: FLINK-25811
> URL: https://issues.apache.org/jira/browse/FLINK-25811
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> {{AsyncSinkWriter}} retries failed request in reverse order.
> *Scope:*
> * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} 
> module.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25816) Changelog keyed state backend would come across NPE during notification

2022-01-26 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482339#comment-17482339
 ] 

Roman Khachatryan commented on FLINK-25816:
---

I think the bug confirms my previous 
[apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] 
that abortion notification adds risk and complexity while being absolutely not 
necessary.

So I'd propose to just remove them.

> Changelog keyed state backend would come across NPE during notification
> ---
>
> Key: FLINK-25816
> URL: https://issues.apache.org/jira/browse/FLINK-25816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Instance: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25816) Changelog keyed state backend would come across NPE during notification

2022-01-26 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482339#comment-17482339
 ] 

Roman Khachatryan edited comment on FLINK-25816 at 1/26/22, 9:14 AM:
-

I think the bug confirms my previous 
[apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] 
that abortion notification adds risk and complexity while being absolutely not 
necessary.

So I'd propose to just remove them (for the nested state backend).


was (Author: roman_khachatryan):
I think the bug confirms my previous 
[apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] 
that abortion notification adds risk and complexity while being absolutely not 
necessary.

So I'd propose to just remove them.

> Changelog keyed state backend would come across NPE during notification
> ---
>
> Key: FLINK-25816
> URL: https://issues.apache.org/jira/browse/FLINK-25816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Instance: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] (FLINK-25816) Changelog keyed state backend would come across NPE during notification

2022-01-26 Thread Roman Khachatryan (Jira)


[ https://issues.apache.org/jira/browse/FLINK-25816 ]


Roman Khachatryan deleted comment on FLINK-25816:
---

was (Author: roman_khachatryan):
I think the bug confirms my previous 
[apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] 
that abortion notification adds risk and complexity while being absolutely not 
necessary.

So I'd propose to just remove them (for the nested state backend).

> Changelog keyed state backend would come across NPE during notification
> ---
>
> Key: FLINK-25816
> URL: https://issues.apache.org/jira/browse/FLINK-25816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Instance: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] fapaul commented on a change in pull request #18412: [FLINK-25696][datastream] Introduce metadataConsumer to InitContext in Sink

2022-01-26 Thread GitBox


fapaul commented on a change in pull request #18412:
URL: https://github.com/apache/flink/pull/18412#discussion_r792434481



##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##
@@ -170,6 +171,15 @@
  * previous execution.
  */
 OptionalLong getRestoredCheckpointId();
+
+/**
+ * Returns a metadata consumer, the {@link SinkWriter} can publish 
metadata events of type
+ * {@link MetaT} to the consumer. The consumer can accept metadata 
events in an asynchronous

Review comment:
   If you want to recommend not executing the metadata consumer as part of 
the mailbox you should make it clearer.
   
   ```suggestion
*  It is recommended to use a separate thread pool to publish 
the metadata because enqueuing a lot of these messages in the mailbox may lead 
to a performance decrease.
   ```

##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
##
@@ -105,5 +107,14 @@
  * Provides a view on this context as a {@link 
SerializationSchema.InitializationContext}.
  */
 SerializationSchema.InitializationContext 
asSerializationSchemaInitializationContext();
+
+/**
+ * Returns a metadata consumer, the {@link SinkWriter} can publish 
metadata events of type
+ * {@link MetaT} to the consumer. The consumer can accept metadata 
events in an asynchronous
+ * thread, and the {@link Consumer#accept} method is executed very 
fast.
+ */
+default  Optional> metadataConsumer() {

Review comment:
   I'd mark this `@Experimental` to leave some room to change the threading 
model later if necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…

2022-01-26 Thread GitBox


dannycranmer commented on a change in pull request #18483:
URL: https://github.com/apache/flink/pull/18483#discussion_r792438959



##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
##
@@ -111,7 +111,7 @@
 return new KinesisDataStreamsSinkBuilder<>();
 }
 
-@Experimental
+@Internal

Review comment:
   Having an `@Internal` public method on a `@Public` class seems like a 
bad design to me. How does this annotation stop users using it? Is this smell 
inherited from the base `SInk` or something we have added here

##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
##
@@ -111,7 +111,7 @@
 return new KinesisDataStreamsSinkBuilder<>();
 }
 
-@Experimental
+@Internal

Review comment:
   Having an `@Internal` public method on a `@Public` class seems like a 
bad design to me. How does this annotation stop users using it? Is this smell 
inherited from the base `SInk` or something we have added here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…

2022-01-26 Thread GitBox


dannycranmer commented on a change in pull request #18483:
URL: https://github.com/apache/flink/pull/18483#discussion_r792439657



##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
##
@@ -54,7 +54,7 @@ private KinesisDataStreamsSinkElementConverter(
 this.partitionKeyGenerator = partitionKeyGenerator;
 }
 
-@Experimental
+@Internal

Review comment:
   Same question as above, why do we need to annotate these methods?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18515:
URL: https://github.com/apache/flink/pull/18515#issuecomment-1022004860


   
   ## CI report:
   
   * 7ddd8e3928e4c3defabb6ed8cc167a3b1e832b3c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25816) Changelog keyed state backend would come across NPE during notification

2022-01-26 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482342#comment-17482342
 ] 

Roman Khachatryan commented on FLINK-25816:
---

I think the bug confirms my previous 
[apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] 
that abortion notification adds risk and complexity while being absolutely not 
necessary.
 
So I'd propose to just remove them (for the nested state backend).

> Changelog keyed state backend would come across NPE during notification
> ---
>
> Key: FLINK-25816
> URL: https://issues.apache.org/jira/browse/FLINK-25816
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Instance: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…

2022-01-26 Thread GitBox


CrynetLogistics commented on a change in pull request #18483:
URL: https://github.com/apache/flink/pull/18483#discussion_r792442692



##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
##
@@ -54,7 +54,7 @@ private KinesisDataStreamsSinkElementConverter(
 this.partitionKeyGenerator = partitionKeyGenerator;
 }
 
-@Experimental
+@Internal

Review comment:
   Because otherwise I get 
`org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.builder():
 Returned leaf type 
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter$Builder
 does not satisfy`.
   
   I cant seem to find a solution, will keep trying




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18515:
URL: https://github.com/apache/flink/pull/18515#issuecomment-1022004860


   
   ## CI report:
   
   * 7ddd8e3928e4c3defabb6ed8cc167a3b1e832b3c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30212)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…

2022-01-26 Thread GitBox


CrynetLogistics commented on a change in pull request #18483:
URL: https://github.com/apache/flink/pull/18483#discussion_r792444851



##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
##
@@ -111,7 +111,7 @@
 return new KinesisDataStreamsSinkBuilder<>();
 }
 
-@Experimental
+@Internal

Review comment:
   The user is never expected to call createWriter right, I guess they're 
effectively prevented from doing so because they'll never have a InitContext 
object. I thought the createWriter was only here for the sink operator to use...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wsry commented on pull request #18470: [FLINK-25774][network] Restrict the maximum number of buffers can be used per result partition for sort-shuffle

2022-01-26 Thread GitBox


wsry commented on pull request #18470:
URL: https://github.com/apache/flink/pull/18470#issuecomment-1022016135


   Rebased master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] fapaul commented on a change in pull request #18428: [FLINK-25575] Add Sink V2 operators and translation

2022-01-26 Thread GitBox


fapaul commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r792447708



##
File path: 
flink-architecture-tests/violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
##
@@ -93,7 +93,6 @@ org.apache.flink.connector.file.src.util.Pool.recycler(): 
Returned leaf type org
 
org.apache.flink.connector.file.src.util.Utils.forEachRemaining(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
 java.util.function.Consumer): Argument leaf type 
org.apache.flink.connector.file.src.reader.BulkFormat$Reader does not satisfy: 
reside outside of package 'org.apache.flink..' or annotated with @Public or 
annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions.builder(): Returned 
leaf type 
org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions$JDBCExactlyOnceOptionsBuilder
 does not satisfy: reside outside of package 'org.apache.flink..' or annotated 
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder(): Returned leaf 
type org.apache.flink.connector.jdbc.JdbcExecutionOptions$Builder does not 
satisfy: reside outside of package 'org.apache.flink..' or annotated with 
@Public or annotated with @PublicEvolving or annotated with @Deprecated
-org.apache.flink.connector.jdbc.JdbcSink.exactlyOnceSink(java.lang.String, 
org.apache.flink.connector.jdbc.JdbcStatementBuilder, 
org.apache.flink.connector.jdbc.JdbcExecutionOptions, 
org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions, 
org.apache.flink.util.function.SerializableSupplier): Argument leaf type 
org.apache.flink.util.function.SerializableSupplier does not satisfy: reside 
outside of package 'org.apache.flink..' or annotated with @Public or annotated 
with @PublicEvolving or annotated with @Deprecated

Review comment:
   I guess this change should have belonged to this ticket 
https://issues.apache.org/jira/browse/FLINK-25570 I am not sure why the tests 
did not fail before. I can make this change on a separate commit but I would 
keep it on this PR.
   
   For the other archunit changes afaict they are related to changes of this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25806) Remove legacy high availability services

2022-01-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482344#comment-17482344
 ] 

Matthias Pohl commented on FLINK-25806:
---

I linked FLINK-25432 as well: We can remove the ordering of cleanups introduced 
in FLINK-25432 (see `DefaultResourceCleaner`) as part of removing the legacy 
high availability services.

> Remove legacy high availability services
> 
>
> Key: FLINK-25806
> URL: https://issues.apache.org/jira/browse/FLINK-25806
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Till Rohrmann
>Priority: Blocker
> Fix For: 1.16.0
>
>
> After FLINK-24038, we should consider removing the legacy high availability 
> services {{ZooKeeperHaServices}} and {{KubernetesHaServices}} since they are 
> now subsumed by the multiple component leader election service that only uses 
> a single leader election per component.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18505:
URL: https://github.com/apache/flink/pull/18505#issuecomment-1021169033


   
   ## CI report:
   
   * 62ce77fe716ae2cfa26fb5cc04948683acc33e40 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30196)
 
   * 7ab24a6fde4e9c0f15e1bddbb7134dbd656d0c40 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30198)
 
   * 94e63928b2c567014e7f0861c6069186914950e4 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30206)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #17485: [FLINK-24038] Add support for single leader election per JobManager process

2022-01-26 Thread GitBox


XComp commented on a change in pull request #17485:
URL: https://github.com/apache/flink/pull/17485#discussion_r786874117



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
##
@@ -78,21 +84,46 @@ public CompletedCheckpointStore 
createRecoveredCompletedCheckpointStore(
 SharedStateRegistryFactory sharedStateRegistryFactory,
 Executor ioExecutor)
 throws Exception {
+final String configMapName = getConfigMapNameFunction.apply(jobID);
+KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, 
configMapName, clusterId);
 
 return KubernetesUtils.createCompletedCheckpointStore(
 configuration,
 kubeClient,
 executor,
-getConfigMapNameFunction.apply(jobID),
+configMapName,
 lockIdentity,
 maxNumberOfCheckpointsToRetain,
 sharedStateRegistryFactory,
 ioExecutor);
 }
 
 @Override
-public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) {
-return new KubernetesCheckpointIDCounter(
-kubeClient, getConfigMapNameFunction.apply(jobID), 
lockIdentity);
+public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws 
Exception {

Review comment:
   To me, it would feel more natural to have a subclass of 
`KubernetesCheckpointRecoveryFactory` that takes care of the ConfigMap 
creation. But I don't have a strong argument towards refactoring this code 
because we wouldn't gain much from such a refactoring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wsry commented on a change in pull request #18474: [FLINK-25786][network] Adjust the generation of subpartition data storage order for sort-shuffle from random shuffle to random shift

2022-01-26 Thread GitBox


wsry commented on a change in pull request #18474:
URL: https://github.com/apache/flink/pull/18474#discussion_r792449522



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##
@@ -492,10 +490,13 @@ public int getNumberOfQueuedBuffers(int 
targetSubpartition) {
 }
 
 private int[] getRandomSubpartitionOrder(int numSubpartitions) {
-List list =
-IntStream.range(0, 
numSubpartitions).boxed().collect(Collectors.toList());
-Collections.shuffle(list);
-return list.stream().mapToInt(Integer::intValue).toArray();
+int[] order = new int[numSubpartitions];
+Random random = new Random();
+int shift = random.nextInt(numSubpartitions);

Review comment:
   Yes, it is expected. The motivation is for better sequential disk IO 
when there is no enough resources to run the downstream consumer tasks in 
parallel.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] ruanhang1993 opened a new pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe

2022-01-26 Thread GitBox


ruanhang1993 opened a new pull request #18516:
URL: https://github.com/apache/flink/pull/18516


   
   ## What is the purpose of the change
   
   This pull request adds savepoint and metric test cases in source suite of 
the connector testfram.
   
   ## Brief change log 
   
 - Add savepoint and metric test cases in the source suite of the connector 
testfram
 - Change the tests in the Kafka and Pulsar connector
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25288) Add savepoint and metric cases in DataStream source suite of connector testing framework

2022-01-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-25288:
---
Labels: pull-request-available  (was: )

> Add savepoint and metric cases in DataStream source suite of connector 
> testing framework
> 
>
> Key: FLINK-25288
> URL: https://issues.apache.org/jira/browse/FLINK-25288
> Project: Flink
>  Issue Type: Sub-task
>  Components: Test Infrastructure
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18418: [FLINK-25719][python] Support General Python UDF in Thread Mode

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18418:
URL: https://github.com/apache/flink/pull/18418#issuecomment-1017307428


   
   ## CI report:
   
   * 65bbedd87b4b4962bf1b11764ad284419cbf24ee Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30195)
 
   * 3f39b61a0b956fde5f61790a9b1dd352789d28a6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30205)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18470: [FLINK-25774][network] Restrict the maximum number of buffers can be used per result partition for sort-shuffle

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18470:
URL: https://github.com/apache/flink/pull/18470#issuecomment-1020036276


   
   ## CI report:
   
   * decf33d57cf16564786d77240287fb1aeb034b39 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30043)
 
   * 4cce15f9d9577fffbdad21c7fb9c286509a04ef5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] tillrohrmann commented on a change in pull request #501: Release announcement for Statefun 3.2.0

2022-01-26 Thread GitBox


tillrohrmann commented on a change in pull request #501:
URL: https://github.com/apache/flink-web/pull/501#discussion_r792452976



##
File path: _posts/2022-01-27-release-statefun-3.2.0.md
##
@@ -0,0 +1,111 @@
+---
+layout: post
+title:  "Stateful Functions 3.2.0 Release Announcement"
+subtitle: "The Apache Flink community is happy to announce the release of 
Stateful Functions (StateFun) 3.2.0."
+date: 2022-01-27T08:00:00.000Z
+categories: news
+authors:
+- trohrmann:
+  name: "Till Rohrmann"
+  twitter: "stsffap"
+- igalshilman:
+  name: "Igal Shilman"
+  twitter: "IgalShilman"
+---
+
+Stateful Functions is a cross-platform stack for building Stateful Serverless 
applications, making it radically simpler to develop scalable, consistent, and 
elastic distributed applications.
+This new release brings various improvements to the StateFun runtime, a leaner 
way to specify StateFun module components, and a brand new GoLang SDK!

Review comment:
   Good catch. Will correct it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe

2022-01-26 Thread GitBox


flinkbot commented on pull request #18516:
URL: https://github.com/apache/flink/pull/18516#issuecomment-1022021934


   
   ## CI report:
   
   * 741b593b95d9501c69f8f7014729b0983350bfe7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe

2022-01-26 Thread GitBox


flinkbot commented on pull request #18516:
URL: https://github.com/apache/flink/pull/18516#issuecomment-1022022034


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 741b593b95d9501c69f8f7014729b0983350bfe7 (Wed Jan 26 
09:33:53 UTC 2022)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-25288).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] LadyForest commented on pull request #18500: [FLINK-25312][hive] HiveCatalog supports Flink's managed table

2022-01-26 Thread GitBox


LadyForest commented on pull request #18500:
URL: https://github.com/apache/flink/pull/18500#issuecomment-1022024594


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables fails on AZP

2022-01-26 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482349#comment-17482349
 ] 

Etienne Chauchot commented on FLINK-25771:
--

[~gaoyunhaii] thanks for the pointer. This issue should be fixed by the above PR

> CassandraConnectorITCase.testRetrialAndDropTables fails on AZP
> --
>
> Key: FLINK-25771
> URL: https://issues.apache.org/jira/browse/FLINK-25771
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.13.5
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP 
> with
> {code}
> Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: 
> All host(s) tried for query failed (tried: /172.17.0.1:59220 
> (com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1] Timed out waiting for server response))
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> Jan 23 01:02:52   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554)
> Jan 23 01:02:52   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jan 23 01:02:52   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jan 23 01:02:52   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jan 23 01:02:52   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196)
> Jan 23 01:02:52   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jan 23 01:02:52   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> Jan 23 01:02:52   at org.junit.runners.Suite.runChild(Suite.java:128)
> Jan 23 01:02:52

[GitHub] [flink] rkhachatryan commented on a change in pull request #18514: [FLINK-25816][changelog] Refactor the logic of notifying materialization id to nested state backend

2022-01-26 Thread GitBox


rkhachatryan commented on a change in pull request #18514:
URL: https://github.com/apache/flink/pull/18514#discussion_r792455364



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -526,18 +510,16 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 }
 Long materializationID = 
materializationIdByCheckpointId.remove(checkpointId);
 if (materializationID != null) {
-Set checkpoints = 
pendingMaterializationConfirmations.get(materializationID);
-checkpoints.remove(checkpointId);
-if (checkpoints.isEmpty()) {

Review comment:
   The purpose of maintaining the `Set` of checkpoints was to not notfiy of 
abortion if any of the checkpoints is still not confirmed.
   For example, there is
   1. single materialization
   1. two pending checkpoints using it
   1. 1st get's aborted (and notified)
   1. later, 2nd get's confirmed (and notified)
   
   In (3), we'd notify of abortion and of completion in (4) - of the same 
materialization.
   The Set would prevent this.
   
   I don't see how the updated version addresses this. Or am I missing 
something?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18470: [FLINK-25774][network] Restrict the maximum number of buffers can be used per result partition for sort-shuffle

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18470:
URL: https://github.com/apache/flink/pull/18470#issuecomment-1020036276


   
   ## CI report:
   
   * decf33d57cf16564786d77240287fb1aeb034b39 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30043)
 
   * 4cce15f9d9577fffbdad21c7fb9c286509a04ef5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30214)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-25817) FLIP-201: Persist local state in working directory

2022-01-26 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25817:
-

 Summary: FLIP-201: Persist local state in working directory
 Key: FLINK-25817
 URL: https://issues.apache.org/jira/browse/FLINK-25817
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Till Rohrmann


This issue is the umbrella ticket for 
[FLIP-201|https://cwiki.apache.org/confluence/x/wJuqCw] which aims at adding 
support for persisting local state in Flink's working directory. This would 
enable Flink in certain scenarios to recover locally even in case of process 
failures.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18514: [FLINK-25816][changelog] Refactor the logic of notifying materialization id to nested state backend

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18514:
URL: https://github.com/apache/flink/pull/18514#issuecomment-1021939543


   
   ## CI report:
   
   * db5af6253d30f1e4e782269f07a24eeb72a9857f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30204)
 
   * 34a3acb29b3077f40aabb517900694bc08b06d58 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18516:
URL: https://github.com/apache/flink/pull/18516#issuecomment-1022021934


   
   ## CI report:
   
   * 741b593b95d9501c69f8f7014729b0983350bfe7 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30215)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18500: [FLINK-25312][hive] HiveCatalog supports Flink's managed table

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18500:
URL: https://github.com/apache/flink/pull/18500#issuecomment-1021030168


   
   ## CI report:
   
   * d7dc483a078a947fe4d397c17e3ec3fa9ec8db72 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30191)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18514: [FLINK-25816][changelog] Refactor the logic of notifying materialization id to nested state backend

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18514:
URL: https://github.com/apache/flink/pull/18514#issuecomment-1021939543


   
   ## CI report:
   
   * db5af6253d30f1e4e782269f07a24eeb72a9857f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30204)
 
   * 34a3acb29b3077f40aabb517900694bc08b06d58 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30216)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wsry commented on a change in pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance

2022-01-26 Thread GitBox


wsry commented on a change in pull request #18505:
URL: https://github.com/apache/flink/pull/18505#discussion_r792462118



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
##
@@ -41,7 +41,7 @@
  * ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards 
to fully release all
  * resources associated the partition and propagate failure cause to the 
consumer if possible.
  */
-public interface ResultPartitionWriter extends AutoCloseable, 
AvailabilityProvider {
+public interface ResultPartitionWriter extends AvailabilityProvider {

Review comment:
   It is removed by accident. When I am checking where the close method is 
called, the IDE gives all places calling AutoCloseable#close, I removed it to 
reduce the result set size but forgot to add it back. Thanks for pointing it 
out. I will add it back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] SteNicholas commented on pull request #18502: [hotfix][docs][kafka] Add explanation how Kafka Source deals with idl…

2022-01-26 Thread GitBox


SteNicholas commented on pull request #18502:
URL: https://github.com/apache/flink/pull/18502#issuecomment-1022031621


   @fapaul , help to merge this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…

2022-01-26 Thread GitBox


CrynetLogistics commented on a change in pull request #18483:
URL: https://github.com/apache/flink/pull/18483#discussion_r792465363



##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
##
@@ -111,7 +111,7 @@
 return new KinesisDataStreamsSinkBuilder<>();
 }
 
-@Experimental
+@Internal

Review comment:
   i.e.
   ```
   
KinesisDataStreamsSink.createWriter(org.apache.flink.api.connector.sink.Sink$InitContext,
 java.util.List): Argument leaf type 
org.apache.flink.api.connector.sink.Sink$InitContext does not satisfy: reside 
outside of package 'org.apache.flink..' or annotated with @Public or annotated 
with @PublicEvolving or annotated with @Deprecated
   
KinesisDataStreamsSink.createWriter(org.apache.flink.api.connector.sink.Sink$InitContext,
 java.util.List): Returned leaf type 
org.apache.flink.api.connector.sink.SinkWriter does not satisfy: reside outside 
of package 'org.apache.flink..' or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan opened a new pull request #18517: [FLINK-25816][state] Remove checkpoint abortion notification of notify backend

2022-01-26 Thread GitBox


rkhachatryan opened a new pull request #18517:
URL: https://github.com/apache/flink/pull/18517


   ## What is the purpose of the change
   
   ```
   The notification currently causes an exception and adds complexity.
   It's also not necessary, unlikely to be delivered (because of the
   difference in checkpoint/materialization intervals) and unlikely to be
   utilized (it will arrive only after the nested snapshot has completed
   and most likely do the same GC as in completion notification).
   ```
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] beyond1920 commented on a change in pull request #18331: [FLINK-25614][table/runtime] Let LocalWindowAggregate be chained with upstream

2022-01-26 Thread GitBox


beyond1920 commented on a change in pull request #18331:
URL: https://github.com/apache/flink/pull/18331#discussion_r792467756



##
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##
@@ -401,7 +402,8 @@ public WindowedSliceAssigner(int windowEndIndex, 
SliceAssigner innerAssigner) {
 
 @Override
 public long assignSliceEnd(RowData element, ClockService clock) {
-return element.getLong(windowEndIndex);
+TimestampData windowEnd = element.getTimestamp(windowEndIndex, 3);

Review comment:
   How about 
   `return element.getTimestamp(windowEndIndex, 3).getMillisecond();`

##
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
##
@@ -963,6 +965,10 @@ private static long epochMills(ZoneId shiftTimeZone, 
String timestampStr) {
 return localDateTime.toInstant(zoneOffset).toEpochMilli();
 }
 
+private static TimestampData wrapTs(long epochMillis) {

Review comment:
   How about import static  `TimestampData.fromEpochMillis` and use 
`fromEpochMillis` directly instead of add an extra `wrapTs` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-25818) Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions

2022-01-26 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25818:
--

 Summary: Add explanation how Kafka Source deals with idleness when 
parallelism is higher then the number of partitions
 Key: FLINK-25818
 URL: https://issues.apache.org/jira/browse/FLINK-25818
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka, Documentation
Reporter: Martijn Visser


Add a section to the Kafka Source documentation to explain what happens with 
the Kafka Source with regards to idleness when parallelism is higher then the 
number of partitions



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…

2022-01-26 Thread GitBox


CrynetLogistics commented on a change in pull request #18483:
URL: https://github.com/apache/flink/pull/18483#discussion_r792468424



##
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
##
@@ -54,7 +54,7 @@ private KinesisDataStreamsSinkElementConverter(
 this.partitionKeyGenerator = partitionKeyGenerator;
 }
 
-@Experimental
+@Internal

Review comment:
   I guess I can make the `ElementConverter` `@Internal`
   But it's still the case that @vahmed-hamdy is using some parts for public 
parts and the converter is an internal tool.
   ```
   KinesisDataStreamsSinkElementConverter.apply(java.lang.Object, 
org.apache.flink.api.connector.sink.SinkWriter$Context): Argument leaf type 
org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside 
outside of package 'org.apache.flink..' or annotated with @Public or annotated 
with @PublicEvolving or annotated with @Deprecated
   KinesisDataStreamsSinkElementConverter.apply(java.lang.Object, 
org.apache.flink.api.connector.sink.SinkWriter$Context): Argument leaf type 
org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside 
outside of package 'org.apache.flink..' or annotated with @Public or annotated 
with @PublicEvolving or annotated with @Deprecated
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] YuriGusev opened a new pull request #18518: [FLINK-24229][connectors/dynamodb] Added DynamoDB connector

2022-01-26 Thread GitBox


YuriGusev opened a new pull request #18518:
URL: https://github.com/apache/flink/pull/18518


   
   
   ## What is the purpose of the change
   
   _User stories:_
   
   As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
   
   _Scope:_
   
   * Implement an asynchronous sink for DynamoDB by inheriting the 
AsyncSinkBase class. 
   * The implementation can for now reside in its own module in 
flink-connectors.
   * Implement an asynchornous sink writer for DynamoDB by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the requeueFailedRequestEntry method. 
   * The implementation should utilize DynamoDb batch API. 
   * The implemented Sink Writer will be used by the Sink class that is created 
as part of this story.
   * Java / code-level docs.
   * Unit/Integration testing.
   
   
   ## Brief change log
   
   - Added new DynamoDB Sink into a new module 
flink-connectors/flink-connector-dynamodb
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Integration tests in 
org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbSinkITCase
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18517: [FLINK-25816][state] Remove checkpoint abortion notification of notify backend

2022-01-26 Thread GitBox


flinkbot commented on pull request #18517:
URL: https://github.com/apache/flink/pull/18517#issuecomment-1022038055


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b7c4e7b1afd0d8023c5714c6dcadabf5b4b0c047 (Wed Jan 26 
09:54:32 UTC 2022)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-25818) Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions

2022-01-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-25818:
--

Assignee: Martijn Visser

> Add explanation how Kafka Source deals with idleness when parallelism is 
> higher then the number of partitions
> -
>
> Key: FLINK-25818
> URL: https://issues.apache.org/jira/browse/FLINK-25818
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Documentation
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>
> Add a section to the Kafka Source documentation to explain what happens with 
> the Kafka Source with regards to idleness when parallelism is higher then the 
> number of partitions



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18502: [FLINK-25818][Docs][Kafka] Add explanation how Kafka Source deals with idl…

2022-01-26 Thread GitBox


flinkbot edited a comment on pull request #18502:
URL: https://github.com/apache/flink/pull/18502#issuecomment-1021048562


   
   ## CI report:
   
   * d446d2d97b5b87ed87a923b1122c32602dc526b3 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30151)
 
   * 1bb532a98042963c9aeeac75303c63a6ba3b8de5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   >