[jira] [Commented] (FLINK-25699) Use HashMap for MAP value constructors
[ https://issues.apache.org/jira/browse/FLINK-25699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482313#comment-17482313 ] Sergey Nuyanzin commented on FLINK-25699: - thanks for your comments. Regarding comparison {{actual vs expected }} instead of toString comparison it is also possible to compare the result of {{MAP[]}} and cardinality. It will allow to independent on string representation (may be except maps with cardinality less than 2). The drawback is that for one map need to write more tests e.g. {code:scala} testAllApis( map(1, 2L , 3, 4L).at(1), "map(1, 2L, 3, 4L).at(1)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]", "2") testAllApis( map(1, 2L , 3, 4L).at(3), "map(1, 2L, 3, 4L).at(3)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]", "4") testAllApis( map(1, 2L , 3, 4L).cardinality(), "map(1, 2L, 3, 4L).cardinality()", "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])", "2") {code} > Use HashMap for MAP value constructors > -- > > Key: FLINK-25699 > URL: https://issues.apache.org/jira/browse/FLINK-25699 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > Currently, the usage of maps is inconsistent. It is not ensured that > duplicate keys get eliminated. For CAST and output conversion this is solved. > However, we should have a similar implementation in MAP value constructor > like in CAST. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25699) Use HashMap for MAP value constructors
[ https://issues.apache.org/jira/browse/FLINK-25699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482313#comment-17482313 ] Sergey Nuyanzin edited comment on FLINK-25699 at 1/26/22, 7:59 AM: --- thanks for your comments. Regarding comparison {{actual vs expected }} instead of toString comparison it is also possible to compare the result of {{MAP[] and cardinality. It will allow to independent on string representation (may be except maps with cardinality less than 2). The drawback is that for one map need to write more tests e.g. {code:scala} testAllApis( map(1, 2L , 3, 4L).at(1), "map(1, 2L, 3, 4L).at(1)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]", "2") testAllApis( map(1, 2L , 3, 4L).at(3), "map(1, 2L, 3, 4L).at(3)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]", "4") testAllApis( map(1, 2L , 3, 4L).cardinality(), "map(1, 2L, 3, 4L).cardinality()", "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])", "2") {code} was (Author: sergey nuyanzin): thanks for your comments. Regarding comparison {{actual vs expected }} instead of toString comparison it is also possible to compare the result of {{MAP[]}} and cardinality. It will allow to independent on string representation (may be except maps with cardinality less than 2). The drawback is that for one map need to write more tests e.g. {code:scala} testAllApis( map(1, 2L , 3, 4L).at(1), "map(1, 2L, 3, 4L).at(1)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]", "2") testAllApis( map(1, 2L , 3, 4L).at(3), "map(1, 2L, 3, 4L).at(3)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]", "4") testAllApis( map(1, 2L , 3, 4L).cardinality(), "map(1, 2L, 3, 4L).cardinality()", "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])", "2") {code} > Use HashMap for MAP value constructors > -- > > Key: FLINK-25699 > URL: https://issues.apache.org/jira/browse/FLINK-25699 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > Currently, the usage of maps is inconsistent. It is not ensured that > duplicate keys get eliminated. For CAST and output conversion this is solved. > However, we should have a similar implementation in MAP value constructor > like in CAST. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18418: [FLINK-25719][python] Support General Python UDF in Thread Mode
flinkbot edited a comment on pull request #18418: URL: https://github.com/apache/flink/pull/18418#issuecomment-1017307428 ## CI report: * 4e95ae2d0e8a2e2a2e3d85640d5ab125e86ede6c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29850) * 65bbedd87b4b4962bf1b11764ad284419cbf24ee Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30195) * 3f39b61a0b956fde5f61790a9b1dd352789d28a6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30205) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-25699) Use HashMap for MAP value constructors
[ https://issues.apache.org/jira/browse/FLINK-25699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482313#comment-17482313 ] Sergey Nuyanzin edited comment on FLINK-25699 at 1/26/22, 8:01 AM: --- thanks for your comments. Regarding comparison {{actual vs expected}} instead of toString comparison it is also possible to compare the result of {{MAP[]}} and cardinality. It will allow to independent on string representation (may be except maps with cardinality less than 2). The drawback is that for one map need to write more tests e.g. {code:scala} testAllApis( map(1, 2L , 3, 4L).at(1), "map(1, 2L, 3, 4L).at(1)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]", "2") testAllApis( map(1, 2L , 3, 4L).at(3), "map(1, 2L, 3, 4L).at(3)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]", "4") testAllApis( map(1, 2L , 3, 4L).cardinality(), "map(1, 2L, 3, 4L).cardinality()", "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])", "2") {code} was (Author: sergey nuyanzin): thanks for your comments. Regarding comparison {{actual vs expected }} instead of toString comparison it is also possible to compare the result of {{MAP[] and cardinality. It will allow to independent on string representation (may be except maps with cardinality less than 2). The drawback is that for one map need to write more tests e.g. {code:scala} testAllApis( map(1, 2L , 3, 4L).at(1), "map(1, 2L, 3, 4L).at(1)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][1]", "2") testAllApis( map(1, 2L , 3, 4L).at(3), "map(1, 2L, 3, 4L).at(3)", "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)][3]", "4") testAllApis( map(1, 2L , 3, 4L).cardinality(), "map(1, 2L, 3, 4L).cardinality()", "CARDINALITY(MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)])", "2") {code} > Use HashMap for MAP value constructors > -- > > Key: FLINK-25699 > URL: https://issues.apache.org/jira/browse/FLINK-25699 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > Currently, the usage of maps is inconsistent. It is not ensured that > duplicate keys get eliminated. For CAST and output conversion this is solved. > However, we should have a similar implementation in MAP value constructor > like in CAST. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata
slinkydeveloper commented on a change in pull request #18479: URL: https://github.com/apache/flink/pull/18479#discussion_r792385485 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java ## @@ -21,8 +21,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.planner.plan.logical.LogicalWindow; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.utils.ReflectionsUtil; Review comment: yep it's definitely better. When removing, check if there is any exclusion in `PlannerModule` in planner-loader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata
slinkydeveloper commented on a change in pull request #18479: URL: https://github.com/apache/flink/pull/18479#discussion_r792385485 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java ## @@ -21,8 +21,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.planner.plan.logical.LogicalWindow; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.utils.ReflectionsUtil; Review comment: yep it's already a beginning. When removing, check if there is any exclusion in `PlannerModule` in planner-loader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata
slinkydeveloper commented on a change in pull request #18479: URL: https://github.com/apache/flink/pull/18479#discussion_r792386400 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue; + +/** + * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation + * and de-serialisation. + */ +public class ExecNodeContext { +/** The unique identifier for each ExecNode in the JSON plan. */ Review comment: I'm personally not a fan of the javadoc on the private fields, as when using the IDE, I won't see anything unless I navigate to the class source code. But as you prefer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on pull request #18434: [FLINK-25742][akka] Remove the serialization of rpc invocation at Fli…
KarmaGYZ commented on pull request #18434: URL: https://github.com/apache/flink/pull/18434#issuecomment-1021955008 @dawidwys I'll fill that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on pull request #18434: [FLINK-25742][akka] Remove the serialization of rpc invocation at Fli…
dawidwys commented on pull request #18434: URL: https://github.com/apache/flink/pull/18434#issuecomment-1021956202 Thanks @KarmaGYZ ! I know from personal experience it might be daunting at times :sweat_smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-benchmarks] wsry opened a new pull request #44: [FLINK-25704] Fix the blocking partition benchmark regression caused by FLINK-25637
wsry opened a new pull request #44: URL: https://github.com/apache/flink-benchmarks/pull/44 FLINK-25637 changed the default blocking shuffle implementation from hash-shuffle to sort-shuffle which caused some benchmark regression. This PR tries to fix the regression. For more information about the regression, please refer to https://issues.apache.org/jira/browse/FLINK-25704. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25704) Performance regression on 18.01.2022 in batch network benchmarks
[ https://issues.apache.org/jira/browse/FLINK-25704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-25704: --- Labels: pull-request-available (was: ) > Performance regression on 18.01.2022 in batch network benchmarks > > > Key: FLINK-25704 > URL: https://issues.apache.org/jira/browse/FLINK-25704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.15.0 >Reporter: Piotr Nowojski >Assignee: Yingjie Cao >Priority: Critical > Labels: pull-request-available > > http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=compressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on > http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on > http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedMmapPartition&env=2&revs=200&equid=off&quarts=on&extr=on > Suspected range: > {code} > git ls eeec246677..f5c99c6f26 > f5c99c6f26 [5 weeks ago] [FLINK-17321][table] Add support casting of map to > map and multiset to multiset [Sergey Nuyanzin] > 745cfec705 [24 hours ago] [hotfix][table-common] Fix InternalDataUtils for > MapData tests [Timo Walther] > ed699b6ee6 [6 days ago] [FLINK-25637][network] Make sort-shuffle the default > shuffle implementation for batch jobs [kevin.cyj] > 4275525fed [6 days ago] [FLINK-25638][network] Increase the default write > buffer size of sort-shuffle to 16M [kevin.cyj] > e1878fb899 [6 days ago] [FLINK-25639][network] Increase the default read > buffer size of sort-shuffle to 64M [kevin.cyj] > {code} > It looks [~kevin.cyj], that most likely your change has caused that? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] XComp commented on a change in pull request #18189: [FLINK-25430] Replace RunningJobRegistry by JobResultStore
XComp commented on a change in pull request #18189: URL: https://github.com/apache/flink/pull/18189#discussion_r792392290 ## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java ## @@ -147,6 +157,67 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc } } +@Test +public void testDirtyJobResultRecoveryInApplicationMode() throws Exception { +final Deadline deadline = Deadline.fromNow(TIMEOUT); +final Configuration configuration = new Configuration(); +configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); +configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); +configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100)); +final TestingMiniClusterConfiguration clusterConfiguration = +TestingMiniClusterConfiguration.newBuilder() +.setConfiguration(configuration) +.build(); + +// having a dirty entry in the JobResultStore should make the ApplicationDispatcherBootstrap +// implementation fail to submit the job +final JobResultStore jobResultStore = new EmbeddedJobResultStore(); +jobResultStore.createDirtyResult( +new JobResultEntry( +new JobResult.Builder() + .jobId(ApplicationDispatcherBootstrap.ZERO_JOB_ID) +.applicationStatus(ApplicationStatus.SUCCEEDED) +.netRuntime(1) +.build())); +final EmbeddedHaServicesWithLeadershipControl haServices = +new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) { + +@Override +public JobResultStore getJobResultStore() { +return jobResultStore; +} +}; + +final TestingMiniCluster.Builder clusterBuilder = +TestingMiniCluster.newBuilder(clusterConfiguration) +.setHighAvailabilityServicesSupplier(() -> haServices) +.setDispatcherResourceManagerComponentFactorySupplier( + createApplicationModeDispatcherResourceManagerComponentFactorySupplier( + clusterConfiguration.getConfiguration(), + ErrorHandlingSubmissionJob.createPackagedProgram())); +try (final MiniCluster cluster = clusterBuilder.build()) { +// start mini cluster and submit the job +cluster.start(); + +// the cluster should shut down automatically once the application completes +awaitClusterStopped(cluster, deadline); +} + + FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException()) +.as( +"The job's main method shouldn't have been succeeded due to a DuplicateJobSubmissionException.") + .hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class); + +assertThat( +jobResultStore.hasDirtyJobResultEntry( +ApplicationDispatcherBootstrap.ZERO_JOB_ID)) +.isTrue(); +assertThat( +jobResultStore.hasCleanJobResultEntry( +ApplicationDispatcherBootstrap.ZERO_JOB_ID)) +.isFalse(); Review comment: This test is here to check that no job was re-submitted in case of a dirty jobresult existing in the `JobResultStore`. The cleanup is integrated in [FLINK-25432](https://issues.apache.org/jira/browse/FLINK-25432). That's where we would have to adjust the test accordingly. We won't miss the change in `FLINK-25432` because the test would fail after integrating the cleanup because of the asserts on `hasDirtyJobResultEntry` and `hasCleanJobResultEntry` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slinkydeveloper commented on pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata
slinkydeveloper commented on pull request #18479: URL: https://github.com/apache/flink/pull/18479#issuecomment-1021961660 > Because, with the upgrade story, we can have an @ExecNodeMetadata annotation with the same name, on a subclass of a current ExecNode class, which does something new/different and defines a newer version. So we need the combination of name + version to uniquely identify the class when we lookup and rebuild the Java object graph from the JSON plan. I may have badly explained myself, but i'm not questioning the name + version tuple. That's ok and I get why we need it. What I'm questioning is that in the same field you add `id + name + version`, and this seems wrong to me, because `id` is the "instance identifier", like a pointer to a specific node of the graph, while `name + version` is the "type identifier", which tells you which `ExecNode` class and version the node is. I don't think these two concepts (instance id and type id) should be in the same field, as they are logically very different things, and also because future tooling will have hard time parsing this JSON. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] infoverload commented on a change in pull request #18431: [FLINK-25024][docs] Add Changelog backend docs
infoverload commented on a change in pull request #18431: URL: https://github.com/apache/flink/pull/18431#discussion_r792382240 ## File path: docs/content/docs/ops/metrics.md ## @@ -1203,6 +1203,59 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an ### RocksDB Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics) +### State changelog Review comment: ```suggestion ### State Changelog ``` ## File path: docs/content/docs/ops/state/state_backends.md ## @@ -325,6 +325,129 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} +## Enabling Changelog + +// todo: Chinese version of all changed docs + +// todo: mention in [large state tuning]({{< ref "docs/ops/state/large_state_tuning" >}})? or 1.16? + +{{< hint warning >}} The feature is in experimental status. {{< /hint >}} + +{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} + +### Introduction + +Changelog is a feature that aims to decrease checkpointing time, and therefore end-to-end latency in exactly-once mode. + +Most commonly, checkpoint duration is affected by: + +1. Barrier travel time and alignment, addressed by + [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) + and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) +2. Snapshot creation time (so-called synchronous phase), addressed by Asynchronous snapshots +3. Snapshot upload time (asynchronous phase) + +The last one (upload time) can be decreased by [Incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). +However, most Incremental State Backends perform some form of compaction periodically, which results in re-uploading the +old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of +data tend to be very high in every checkpoint. Review comment: ```suggestion Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of data tends to be very high in every checkpoint. ``` ## File path: docs/content/docs/ops/state/state_backends.md ## @@ -325,6 +325,129 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} +## Enabling Changelog + +// todo: Chinese version of all changed docs + +// todo: mention in [large state tuning]({{< ref "docs/ops/state/large_state_tuning" >}})? or 1.16? + +{{< hint warning >}} The feature is in experimental status. {{< /hint >}} + +{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}} + +### Introduction + +Changelog is a feature that aims to decrease checkpointing time, and therefore end-to-end latency in exactly-once mode. + +Most commonly, checkpoint duration is affected by: + +1. Barrier travel time and alignment, addressed by + [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}}) + and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}}) +2. Snapshot creation time (so-called synchronous phase), addressed by Asynchronous snapshots +3. Snapshot upload time (asynchronous phase) + +The last one (upload time) can be decreased by [Incremental checkpoints]({{< ref "#incremental-checkpoints" >}}). +However, most Incremental State Backends perform some form of compaction periodically, which results in re-uploading the +old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of +data tend to be very high in every checkpoint. + +With Changelog enabled, Flink uploads state changes continuously, forming a changelog. On checkpoint, only the relevant +part of this changelog needs to be uploaded. Independently, configured state backend is snapshotted in the +background periodically. Upon successful upload, changelog is truncated. + +As a result, asynchronous phase duration is reduced, as well as synchronous phase - because no data needs to be flushed +to disk. In particular, long-tail latency is improved. + +On the flip side, resource usage is higher: + +- more files are created on DFS +- more IO bandwidth is used to upload state changes +- more CPU used to serialize state changes +- more memory used by Task Managers to buffer state changes +- todo: more details after test
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
JingsongLi commented on a change in pull request #14: URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792393666 ## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.operation; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.FileFormat; +import org.apache.flink.table.store.file.manifest.ManifestEntry; +import org.apache.flink.table.store.file.mergetree.MergeTree; +import org.apache.flink.table.store.file.mergetree.MergeTreeOptions; +import org.apache.flink.table.store.file.mergetree.compact.Accumulator; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +/** Default implementation of {@link FileStoreWrite}. */ +public class FileStoreWriteImpl implements FileStoreWrite { + +private final RowType keyType; +private final RowType rowType; +private final Comparator keyComparator; +private final Accumulator accumulator; +private final FileFormat fileFormat; +private final FileStorePathFactory pathFactory; +private final MergeTreeOptions mergeTreeOptions; + +private final FileStoreScan scan; + +public FileStoreWriteImpl( +RowType partitionType, +RowType keyType, +RowType rowType, +Comparator keyComparator, +Accumulator accumulator, +FileFormat fileFormat, +FileStorePathFactory pathFactory, +MergeTreeOptions mergeTreeOptions) { +this.keyType = keyType; +this.rowType = rowType; +this.keyComparator = keyComparator; +this.accumulator = accumulator; +this.fileFormat = fileFormat; +this.pathFactory = pathFactory; +this.mergeTreeOptions = mergeTreeOptions; + +this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory); +} + +@Override +public RecordWriter createWriter( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +Long latestSnapshotId = pathFactory.latestSnapshotId(); +if (latestSnapshotId == null) { +return createEmptyWriter(partition, bucket, compactExecutor); +} else { +MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor); +return mergeTree.createWriter( +scan.withSnapshot(latestSnapshotId) + .withPartitionFilter(Collections.singletonList(partition)) +.withBucket(bucket).plan().files().stream() +.map(ManifestEntry::file) +.collect(Collectors.toList())); +} +} + +@Override +public RecordWriter createEmptyWriter( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor); +return mergeTree.createWriter(Collections.emptyList()); +} + +private MergeTree createMergeTree( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +SstFile sstFile = +new SstFile( +keyType, +rowType, +fileFormat, +pathFactory.createSstPathFactory(partition, bucket), Review comment: Can uuid be reused in `SstPathFactory` from `FileStorePathFactory`? ## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
JingsongLi commented on a change in pull request #14: URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792394148 ## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.operation; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.FileFormat; +import org.apache.flink.table.store.file.manifest.ManifestEntry; +import org.apache.flink.table.store.file.mergetree.MergeTree; +import org.apache.flink.table.store.file.mergetree.MergeTreeOptions; +import org.apache.flink.table.store.file.mergetree.compact.Accumulator; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +/** Default implementation of {@link FileStoreWrite}. */ +public class FileStoreWriteImpl implements FileStoreWrite { + +private final RowType keyType; +private final RowType rowType; +private final Comparator keyComparator; +private final Accumulator accumulator; +private final FileFormat fileFormat; +private final FileStorePathFactory pathFactory; +private final MergeTreeOptions mergeTreeOptions; + +private final FileStoreScan scan; + +public FileStoreWriteImpl( +RowType partitionType, +RowType keyType, +RowType rowType, +Comparator keyComparator, +Accumulator accumulator, +FileFormat fileFormat, +FileStorePathFactory pathFactory, +MergeTreeOptions mergeTreeOptions) { +this.keyType = keyType; +this.rowType = rowType; +this.keyComparator = keyComparator; +this.accumulator = accumulator; +this.fileFormat = fileFormat; +this.pathFactory = pathFactory; +this.mergeTreeOptions = mergeTreeOptions; + +this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory); +} + +@Override +public RecordWriter createWriter( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +Long latestSnapshotId = pathFactory.latestSnapshotId(); +if (latestSnapshotId == null) { +return createEmptyWriter(partition, bucket, compactExecutor); +} else { +MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor); +return mergeTree.createWriter( +scan.withSnapshot(latestSnapshotId) + .withPartitionFilter(Collections.singletonList(partition)) +.withBucket(bucket).plan().files().stream() +.map(ManifestEntry::file) +.collect(Collectors.toList())); +} +} + +@Override +public RecordWriter createEmptyWriter( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor); +return mergeTree.createWriter(Collections.emptyList()); +} + +private MergeTree createMergeTree( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +SstFile sstFile = +new SstFile( Review comment: Can `readerFactory` and `writerFactory` be reused in `SstFile`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18500: [FLINK-25312][hive] HiveCatalog supports Flink's managed table
flinkbot edited a comment on pull request #18500: URL: https://github.com/apache/flink/pull/18500#issuecomment-1021030168 ## CI report: * d7dc483a078a947fe4d397c17e3ec3fa9ec8db72 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30191) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance
flinkbot edited a comment on pull request #18505: URL: https://github.com/apache/flink/pull/18505#issuecomment-1021169033 ## CI report: * 7937706340bfa394537cc62380693fa12abe3d38 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30149) * 62ce77fe716ae2cfa26fb5cc04948683acc33e40 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30196) * 7ab24a6fde4e9c0f15e1bddbb7134dbd656d0c40 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30198) * 94e63928b2c567014e7f0861c6069186914950e4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
JingsongLi commented on a change in pull request #14: URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792405101 ## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.operation; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.FileFormat; +import org.apache.flink.table.store.file.manifest.ManifestEntry; +import org.apache.flink.table.store.file.mergetree.MergeTree; +import org.apache.flink.table.store.file.mergetree.MergeTreeOptions; +import org.apache.flink.table.store.file.mergetree.compact.Accumulator; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +/** Default implementation of {@link FileStoreWrite}. */ +public class FileStoreWriteImpl implements FileStoreWrite { + +private final RowType keyType; +private final RowType rowType; +private final Comparator keyComparator; +private final Accumulator accumulator; +private final FileFormat fileFormat; +private final FileStorePathFactory pathFactory; +private final MergeTreeOptions mergeTreeOptions; + +private final FileStoreScan scan; + +public FileStoreWriteImpl( +RowType partitionType, +RowType keyType, +RowType rowType, +Comparator keyComparator, +Accumulator accumulator, +FileFormat fileFormat, +FileStorePathFactory pathFactory, +MergeTreeOptions mergeTreeOptions) { +this.keyType = keyType; +this.rowType = rowType; +this.keyComparator = keyComparator; +this.accumulator = accumulator; +this.fileFormat = fileFormat; +this.pathFactory = pathFactory; +this.mergeTreeOptions = mergeTreeOptions; + +this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory); +} + +@Override +public RecordWriter createWriter( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +Long latestSnapshotId = pathFactory.latestSnapshotId(); +if (latestSnapshotId == null) { +return createEmptyWriter(partition, bucket, compactExecutor); +} else { +MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor); +return mergeTree.createWriter( +scan.withSnapshot(latestSnapshotId) + .withPartitionFilter(Collections.singletonList(partition)) +.withBucket(bucket).plan().files().stream() +.map(ManifestEntry::file) +.collect(Collectors.toList())); +} +} + +@Override +public RecordWriter createEmptyWriter( +BinaryRowData partition, int bucket, ExecutorService compactExecutor) { +MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor); +return mergeTree.createWriter(Collections.emptyList()); +} + +private MergeTree createMergeTree( Review comment: We can create a `MergeTreeFactory`, it is required by reader too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance
flinkbot edited a comment on pull request #18505: URL: https://github.com/apache/flink/pull/18505#issuecomment-1021169033 ## CI report: * 7937706340bfa394537cc62380693fa12abe3d38 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30149) * 62ce77fe716ae2cfa26fb5cc04948683acc33e40 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30196) * 7ab24a6fde4e9c0f15e1bddbb7134dbd656d0c40 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30198) * 94e63928b2c567014e7f0861c6069186914950e4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30206) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a change in pull request #18391: [FLINK-25478][chaneglog] Correct the state register logic of ChangelogStateBackendHandle
Myasuka commented on a change in pull request #18391: URL: https://github.com/apache/flink/pull/18391#discussion_r792406221 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java ## @@ -130,14 +155,51 @@ public String toString() { keyGroupRange, materialized.size(), nonMaterialized.size()); } -private static Closeable asCloseable(KeyedStateHandle h) { -return () -> { -try { -h.discardState(); -} catch (Exception e) { -ExceptionUtils.rethrowIOException(e); +private static class StreamStateHandleWrapper implements StreamStateHandle { +private static final long serialVersionUID = 1L; + +private final KeyedStateHandle keyedStateHandle; + +StreamStateHandleWrapper(KeyedStateHandle keyedStateHandle) { +this.keyedStateHandle = keyedStateHandle; +} + +@Override +public void discardState() throws Exception { +keyedStateHandle.discardState(); +} + +@Override +public long getStateSize() { +return keyedStateHandle.getStateSize(); +} + +@Override +public FSDataInputStream openInputStream() throws IOException { +throw new UnsupportedOperationException("Should not call here."); +} + +@Override +public Optional asBytesIfInMemory() { +throw new UnsupportedOperationException("Should not call here."); +} Review comment: The returned `StreamStateHandle` would only be used in `IncrementalRemoteKeyedStateHandle`, which is safe here to just add such wrapper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] matriv commented on pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata
matriv commented on pull request #18479: URL: https://github.com/apache/flink/pull/18479#issuecomment-1021979678 > > Because, with the upgrade story, we can have an @ExecNodeMetadata annotation with the same name, on a subclass of a current ExecNode class, which does something new/different and defines a newer version. So we need the combination of name + version to uniquely identify the class when we lookup and rebuild the Java object graph from the JSON plan. > > I may have badly explained myself, but i'm not questioning the name + version tuple. That's ok and I get why we need it. > > What I'm questioning is that in the same field you add `id + name + version`, and this seems wrong to me, because `id` is the "instance identifier", like a pointer to a specific node of the graph, while `name + version` is the "type identifier", which tells you which `ExecNode` class and version the node is. I don't think these two concepts (instance id and type id) should be in the same field, as they are logically very different things, and also because future tooling will have hard time parsing this JSON. Thx for explaining! I see your point. The decision was more towards JSON and code simplification, so that this int id is part of the one liner `context` field, so that everything is handled in one place regarding the complete identification of a node. So we read the `context` as one POJO from JSON, and we construct a new `context` from the annotation plus a freshly incremented id for new nodes. Imho, I don't see a big issue in this approach, since we have javadocs describing the usage of those sub-fields of the context, and I don't see any problem for tooling, since it's a well defined string with 3 components, and a simple `split("_")` gives those independently. @twalthr WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18353: [FLINK-25129][docs]project configuation changes in docs
flinkbot edited a comment on pull request #18353: URL: https://github.com/apache/flink/pull/18353#issuecomment-1012981377 ## CI report: * 80fd50ad46865be06e2c83b2470fb3eb2d35cd96 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29823) * d0b2b188c37443b7bbda39af499398326cd56979 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18428: [FLINK-25575] Add Sink V2 operators and translation
flinkbot edited a comment on pull request #18428: URL: https://github.com/apache/flink/pull/18428#issuecomment-1017677769 ## CI report: * 3705b617ac596bd4be08fd9ef2e4db40bef586f2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30154) * b3cd4cde01b0207349df086beb909b000b5e0bdd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on pull request #17819: [FLINK-15816][k8s] Prevent labels using kubernetes.cluster-id to exceed the limit of 63 characters
XComp commented on pull request #17819: URL: https://github.com/apache/flink/pull/17819#issuecomment-1021982073 Ok, my understanding was that we're using service names in labels which forces us to set some limitation on the cluster ID. Reiterating over the comments of FLINK-15816, I conclude that it's really only about adding the check for the `clusterIds` maximum length also for standalone mode. Essentially, the only change that's needed is to add a check similar to [AbstractKubernetesParameters:73](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java#L73) in [KubernetesHaServices:99](https://github.com/apache/flink/blob/a115a3c52f0e305c139666b35ec952365049738d/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java#L99). I'm just wondering why we should limit ourselves with 45 characters. Don't we have to restart the Flink cluster anyway in case of a version update? That would give the user the possibility to update the cluster ID if it becomes necessary. In any case, we should add some in-code documentation about this to the [MAXIMUM_CHARACTERS_OF_CLUSTER_ID declaration in Constants:86]https://github.com/apache/flink/blob/d532f5fc81f9acf611541cab440eb3d5a9de08cc/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java#L86) to explain the reason for this constant being set in that way (i.e. that it's left at `45` to be prepared for future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp edited a comment on pull request #17819: [FLINK-15816][k8s] Prevent labels using kubernetes.cluster-id to exceed the limit of 63 characters
XComp edited a comment on pull request #17819: URL: https://github.com/apache/flink/pull/17819#issuecomment-1021982073 Ok, my understanding was that we're using service names in labels which forces us to set some limitation on the cluster ID. Reiterating over the comments of FLINK-15816, I conclude that it's really only about adding the check for the `clusterIds` maximum length also for standalone mode. Essentially, the only change that's needed is to add a check similar to [AbstractKubernetesParameters:73](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java#L73) in [KubernetesHaServices:99](https://github.com/apache/flink/blob/a115a3c52f0e305c139666b35ec952365049738d/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java#L99). I'm just wondering why we should limit ourselves with 45 characters. Don't we have to restart the Flink cluster anyway in case of a version update? That would give the user the possibility to update the cluster ID if it becomes necessary. In any case, we should add some in-code documentation about this to the [MAXIMUM_CHARACTERS_OF_CLUSTER_ID declaration in Constants:86]https://github.com/apache/flink/blob/d532f5fc81f9acf611541cab440eb3d5a9de08cc/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java#L86) to explain the reason for this constant being set in that way (i.e. that it's left at `45` to be prepared for future. Sorry for guiding you into the wrong direction, @alpreu . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor
pnowojski commented on a change in pull request #18475: URL: https://github.com/apache/flink/pull/18475#discussion_r792408057 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ## @@ -101,10 +101,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; Review comment: That is part of the reason why we generally speaking do not accept new test code using mockito, as tests shouldn't relay on private implementation details of the production code. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ## @@ -55,17 +77,24 @@ public StreamMultipleInputProcessor( || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } -final CompletableFuture anyInputAvailable = new CompletableFuture<>(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) -&& inputSelectionHandler.isInputSelected(i)) { -assertNoException( -inputProcessors[i] -.getAvailableFuture() -.thenRun(() -> anyInputAvailable.complete(null))); +&& inputSelectionHandler.isInputSelected(i) +&& inputProcessors[i].getAvailableFuture() == AVAILABLE) { +return AVAILABLE; Review comment: But 1. in what scenario is this extra short cut doing something more compared to the old check? 2. if there is such scenario, have you tested that it's worth the added complexity? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18353: [FLINK-25129][docs]project configuation changes in docs
flinkbot edited a comment on pull request #18353: URL: https://github.com/apache/flink/pull/18353#issuecomment-1012981377 ## CI report: * 80fd50ad46865be06e2c83b2470fb3eb2d35cd96 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29823) * d0b2b188c37443b7bbda39af499398326cd56979 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30207) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18428: [FLINK-25575] Add Sink V2 operators and translation
flinkbot edited a comment on pull request #18428: URL: https://github.com/apache/flink/pull/18428#issuecomment-1017677769 ## CI report: * 3705b617ac596bd4be08fd9ef2e4db40bef586f2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30154) * b3cd4cde01b0207349df086beb909b000b5e0bdd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30208) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performanc
pnowojski commented on a change in pull request #18505: URL: https://github.com/apache/flink/pull/18505#discussion_r792418842 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ## @@ -41,7 +41,7 @@ * ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards to fully release all * resources associated the partition and propagate failure cause to the consumer if possible. */ -public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvider { +public interface ResultPartitionWriter extends AvailabilityProvider { Review comment: (One small comment as I'm not able to do full review): Why did you remove `AutoCloseable`? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ## @@ -41,7 +41,7 @@ * ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards to fully release all * resources associated the partition and propagate failure cause to the consumer if possible. */ -public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvider { +public interface ResultPartitionWriter extends AvailabilityProvider { Review comment: (One small comment as I'm not able to do full review): Can you explain why did you remove `AutoCloseable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18145: [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient when getting offsets
flinkbot edited a comment on pull request #18145: URL: https://github.com/apache/flink/pull/18145#issuecomment-997131565 ## CI report: * 83ad01c2346a5e46106e0aef530285aed68d966f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30190) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Thesharing commented on a change in pull request #18480: [FLINK-25789][docs-zh] Translate the formats/hadoop page into Chinese.
Thesharing commented on a change in pull request #18480: URL: https://github.com/apache/flink/pull/18480#discussion_r792409930 ## File path: docs/content.zh/docs/connectors/datastream/formats/hadoop.md ## @@ -57,20 +55,13 @@ a `hadoop-client` dependency such as: ## Using Hadoop InputFormats -To use Hadoop `InputFormats` with Flink the format must first be wrapped -using either `readHadoopFile` or `createHadoopInput` of the -`HadoopInputs` utility class. -The former is used for input formats derived -from `FileInputFormat` while the latter has to be used for general purpose -input formats. -The resulting `InputFormat` can be used to create a data source by using -`ExecutionEnvironmen#createInput`. +要将 Hadoop `InputFormats` 与 Flink 一起使用,必须首先使用 `HadoopInputs` 工具类的 `readHadoopFile` 或 `createHadoopInput` 包装格式。 Review comment: ```suggestion 要在 Flink 中使用 Hadoop `InputFormats`,必须首先使用 `HadoopInputs` 工具类的 `readHadoopFile` 或 `createHadoopInput` 包装 Input Format。 ``` ## File path: docs/content.zh/docs/connectors/datastream/formats/hadoop.md ## @@ -57,20 +55,13 @@ a `hadoop-client` dependency such as: ## Using Hadoop InputFormats -To use Hadoop `InputFormats` with Flink the format must first be wrapped -using either `readHadoopFile` or `createHadoopInput` of the -`HadoopInputs` utility class. -The former is used for input formats derived -from `FileInputFormat` while the latter has to be used for general purpose -input formats. -The resulting `InputFormat` can be used to create a data source by using -`ExecutionEnvironmen#createInput`. +要将 Hadoop `InputFormats` 与 Flink 一起使用,必须首先使用 `HadoopInputs` 工具类的 `readHadoopFile` 或 `createHadoopInput` 包装格式。 +前者用于从 `FileInputFormat` 派生的输入格式,而后者必须用于通用输入格式。 Review comment: ```suggestion 前者用于从 `FileInputFormat` 派生的 Input Format,而后者必须用于通用的 Input Format。 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fapaul merged pull request #18145: [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient when getting offsets
fapaul merged pull request #18145: URL: https://github.com/apache/flink/pull/18145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482329#comment-17482329 ] Fabian Paul commented on FLINK-25368: - Merged in master: c6f14ca5b10d30232966bce2f52e9f9128346473 > Use AdminClient to get offsets rather than KafkaConsumer > > > Key: FLINK-25368 > URL: https://issues.apache.org/jira/browse/FLINK-25368 > Project: Flink > Issue Type: Improvement >Reporter: dengziming >Priority: Minor > Labels: pull-request-available > > `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more > `OffsetSpce` types will be added to it, for example, > OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to > substitute `KafkaConsumer` with `AdminClient`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25704) Performance regression on 18.01.2022 in batch network benchmarks
[ https://issues.apache.org/jira/browse/FLINK-25704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482328#comment-17482328 ] Piotr Nowojski commented on FLINK-25704: Thanks for the investigation [~kevin.cyj] {quote} Make the existing benchmark tests still test BoundedBlockingResultPartition; Add new benchmark tests for SortMergeResultPartition; Try to optimize SortMergeResultPartition for small records; Document this default blocking shuffle change in both release notes and user doc. {quote} This plan makes sense to me, +1 > Performance regression on 18.01.2022 in batch network benchmarks > > > Key: FLINK-25704 > URL: https://issues.apache.org/jira/browse/FLINK-25704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.15.0 >Reporter: Piotr Nowojski >Assignee: Yingjie Cao >Priority: Critical > Labels: pull-request-available > > http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=compressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on > http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedFilePartition&env=2&revs=200&equid=off&quarts=on&extr=on > http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3&ben=uncompressedMmapPartition&env=2&revs=200&equid=off&quarts=on&extr=on > Suspected range: > {code} > git ls eeec246677..f5c99c6f26 > f5c99c6f26 [5 weeks ago] [FLINK-17321][table] Add support casting of map to > map and multiset to multiset [Sergey Nuyanzin] > 745cfec705 [24 hours ago] [hotfix][table-common] Fix InternalDataUtils for > MapData tests [Timo Walther] > ed699b6ee6 [6 days ago] [FLINK-25637][network] Make sort-shuffle the default > shuffle implementation for batch jobs [kevin.cyj] > 4275525fed [6 days ago] [FLINK-25638][network] Increase the default write > buffer size of sort-shuffle to 16M [kevin.cyj] > e1878fb899 [6 days ago] [FLINK-25639][network] Increase the default read > buffer size of sort-shuffle to 64M [kevin.cyj] > {code} > It looks [~kevin.cyj], that most likely your change has caused that? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Paul reassigned FLINK-25368: --- Assignee: dengziming > Use AdminClient to get offsets rather than KafkaConsumer > > > Key: FLINK-25368 > URL: https://issues.apache.org/jira/browse/FLINK-25368 > Project: Flink > Issue Type: Improvement >Reporter: dengziming >Assignee: dengziming >Priority: Minor > Labels: pull-request-available > > `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more > `OffsetSpce` types will be added to it, for example, > OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to > substitute `KafkaConsumer` with `AdminClient`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Paul updated FLINK-25368: Affects Version/s: 1.15.0 > Use AdminClient to get offsets rather than KafkaConsumer > > > Key: FLINK-25368 > URL: https://issues.apache.org/jira/browse/FLINK-25368 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.15.0 >Reporter: dengziming >Assignee: dengziming >Priority: Minor > Labels: pull-request-available > > `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more > `OffsetSpce` types will be added to it, for example, > OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to > substitute `KafkaConsumer` with `AdminClient`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (FLINK-25368) Use AdminClient to get offsets rather than KafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Paul resolved FLINK-25368. - Fix Version/s: 1.15.0 Resolution: Fixed > Use AdminClient to get offsets rather than KafkaConsumer > > > Key: FLINK-25368 > URL: https://issues.apache.org/jira/browse/FLINK-25368 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.15.0 >Reporter: dengziming >Assignee: dengziming >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > `AdminClient.listOffsets` is provided in Kafka 2.7, In the future more > `OffsetSpce` types will be added to it, for example, > OffsetSpec.MaxTimestampSpec is added in Kafka 3.0. so it's better to > substitute `KafkaConsumer` with `AdminClient`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] imaffe commented on a change in pull request #17452: [FLINK-20732][connector/pulsar] Introduction of Pulsar Sink
imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r792420132 ## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java ## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; + +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT; +import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The builder class for {@link PulsarSink} to make it easier for the users to construct a {@link + * PulsarSink}. + * + * The following example shows the minimum setup to create a PulsarSink that reads the String + * values from a Pulsar topic. + * + * {@code + * PulsarSink sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .build(); + * } + * + * The service url, admin url, and the record serializer are required fields that must be set. If + * you don't set the topics, make sure you have provided a custom {@link TopicRouter}. Otherwise, + * you must provide the topics to produce. + * + * To specify the delivery guarantees of PulsarSink, one can call {@link + * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the delivery guarantee is {@link + * DeliveryGuarantee#EXACTLY_ONCE}, and it requires the Pulsar broker to turn on transaction + * support. + * + * {@code + * PulsarSink sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .setDeliveryGuarantee(deliveryGuarantee) + * .build(); + * } + * + * @see PulsarSink for a more detailed explanation of the different guarantees. + * @param The input type of the sink. + */ +@PublicEvolving +public class PulsarSinkBuilder { +private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkBuilder.class); + +private final PulsarConfigBuilder configBuilder; + +private DeliveryGuarantee delive
[GitHub] [flink] pnowojski merged pull request #18392: [FLINK-25590][metrics] Introduce RequestedMemoryUsage and log warnings if usage exceeds 100%
pnowojski merged pull request #18392: URL: https://github.com/apache/flink/pull/18392 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on pull request #18392: [FLINK-25590][metrics] Introduce RequestedMemoryUsage and log warnings if usage exceeds 100%
pnowojski commented on pull request #18392: URL: https://github.com/apache/flink/pull/18392#issuecomment-1021993369 Ops, I forgot to squash commits before merging :| -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-25590) Logging warning of insufficient memory for all configured buffers
[ https://issues.apache.org/jira/browse/FLINK-25590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-25590. -- Fix Version/s: 1.15.0 Assignee: Piotr Nowojski Resolution: Fixed Merged to master as cfe5e9a728d and cfe5e9a728d^ > Logging warning of insufficient memory for all configured buffers > - > > Key: FLINK-25590 > URL: https://issues.apache.org/jira/browse/FLINK-25590 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.15.0 >Reporter: Anton Kalashnikov >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Right now, if exclusive buffers for the input channel and one buffer for each > subpartition would be allocated on start successfully but there would be not > enough memory for the rest of the buffers(floating buffers, rest buffers for > subpartitions), then we see nothing in the log about that(as I understand). > So first of all, we need to check what logs we have right now about > situations when flink doesn't have enough memory for all configured buffers. > And if we have nothing (or not enough) we should add such a log. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zentol merged pull request #18501: [FLINK-25348][build] Clear japicmp exclusions
zentol merged pull request #18501: URL: https://github.com/apache/flink/pull/18501 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-25348) Update release guide to reset japicmp exceptions for every release
[ https://issues.apache.org/jira/browse/FLINK-25348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-25348. Resolution: Fixed master: d9bb8d7946fa09bb77ffddcc6eea98435ef7f825 I've also updated the release guide. > Update release guide to reset japicmp exceptions for every release > -- > > Key: FLINK-25348 > URL: https://issues.apache.org/jira/browse/FLINK-25348 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.15.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > I propose to clean up the japicmp maven plugin exclusion for every minor > release for @Public and the exclusions for @PublicEvolving with every minor > release. Currently, we don’t do this and that’s why we have accumulated quite > some list of exclusions that a) might shadow other problems and b) nobody > really knows why they are still relevant. I would propose to make this part > of the release guide. The result should be that we minimize our set of > exclusions. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zhuzhurk commented on a change in pull request #18462: [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler
zhuzhurk commented on a change in pull request #18462: URL: https://github.com/apache/flink/pull/18462#discussion_r791884986 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java ## @@ -190,10 +191,24 @@ private void maybeSetParallelism(final ExecutionJobVertex jobVertex) { jobVertex.getName(), parallelism); -jobVertex.setParallelism(parallelism); +changeJobVertexParallelism(jobVertex, parallelism); } } +private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parallelism) { +// update PlanJson Review comment: Would you add some comments for why we need to update the plan json? e.g. "it is needed to enable REST APIs to return the latest parallelism of job vertices." ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -676,7 +727,7 @@ private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) { @Override public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) { -return getExecutionVertex(executionVertexId).getResourceProfile(); +return getExecutionJobVertex(executionVertexId.getJobVertexId()).getResourceProfile(); Review comment: What's this change for? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java ## @@ -762,6 +762,11 @@ public void setInternalTaskFailuresListener( // Actions // +@Override +public void notifyNewJobVertexInitialized(List vertices) { Review comment: maybe `notifyNewlyInitializedJobVertices()`? because the job vertices are just newly initialized rather than newly added. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperati
[GitHub] [flink] TanYuxin-tyx opened a new pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
TanYuxin-tyx opened a new pull request #18515: URL: https://github.com/apache/flink/pull/18515 ## What is the purpose of the change Currently, if the partition file has been lost for blocking shuffle, FileNotFoundException will be thrown and the partition data will not be regenerated. This change makes it throw PartitionNotFoundException instead. ## Brief change log - *Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle.* ## Verifying this change This change added tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer commented on pull request #18512: [FLINK-25811][hotfix][connector/base] changing failed requests handler to accept List in AsyncSinkWriter
dannycranmer commented on pull request #18512: URL: https://github.com/apache/flink/pull/18512#issuecomment-1022002473 Thanks for the improvement @vahmed-hamdy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer merged pull request #18512: [FLINK-25811][hotfix][connector/base] changing failed requests handler to accept List in AsyncSinkWriter
dannycranmer merged pull request #18512: URL: https://github.com/apache/flink/pull/18512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order
[ https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-25811: -- Fix Version/s: 1.15.0 > Fix generic AsyncSinkWriter retrying requests in reverse order > -- > > Key: FLINK-25811 > URL: https://issues.apache.org/jira/browse/FLINK-25811 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > {{AsyncSinkWriter}} retries failed request in reverse order. > *Scope:* > * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} > module. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25816) Changelog keyed state backend would come across NPE during notification
[ https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-25816: -- Affects Version/s: 1.15.0 > Changelog keyed state backend would come across NPE during notification > --- > > Key: FLINK-25816 > URL: https://issues.apache.org/jira/browse/FLINK-25816 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Instance: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order
[ https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-25811: -- Affects Version/s: (was: 1.15.0) > Fix generic AsyncSinkWriter retrying requests in reverse order > -- > > Key: FLINK-25811 > URL: https://issues.apache.org/jira/browse/FLINK-25811 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > > h2. Motivation > {{AsyncSinkWriter}} retries failed request in reverse order. > *Scope:* > * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} > module. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order
[ https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-25811: - Assignee: Ahmed Hamdy > Fix generic AsyncSinkWriter retrying requests in reverse order > -- > > Key: FLINK-25811 > URL: https://issues.apache.org/jira/browse/FLINK-25811 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > {{AsyncSinkWriter}} retries failed request in reverse order. > *Scope:* > * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} > module. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-benchmarks] wsry commented on pull request #44: [FLINK-25704] Fix the blocking partition benchmark regression caused by FLINK-25637
wsry commented on pull request #44: URL: https://github.com/apache/flink-benchmarks/pull/44#issuecomment-1022004795 > NETWORK_SORT_SHUFFLE_MIN_PARALLELISM @pnowojski Thanks for the review. ```NETWORK_SORT_SHUFFLE_MIN_PARALLELISM``` is a config option to config which blocking shuffle implementation to use, either the hash-based one or the sort-based one. If task parallelism is smaller than this config value, the hash-based blocking shuffle will be used, otherwise, the sort-based blocking shuffle will be used. Previously, the default value of this config option is Integer.MAX_VALUE, which mean hash-shuffle will be used by default, FLINK-25637 changed the default value to 1, which means sort-shuffle will be used by default. The changes in this PR restore the previous default value for the corresponding benchmark cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order
[ https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-25811. --- Resolution: Fixed > Fix generic AsyncSinkWriter retrying requests in reverse order > -- > > Key: FLINK-25811 > URL: https://issues.apache.org/jira/browse/FLINK-25811 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > {{AsyncSinkWriter}} retries failed request in reverse order. > *Scope:* > * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} > module. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot commented on pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
flinkbot commented on pull request #18515: URL: https://github.com/apache/flink/pull/18515#issuecomment-1022004860 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25811) Fix generic AsyncSinkWriter retrying requests in reverse order
[ https://issues.apache.org/jira/browse/FLINK-25811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482338#comment-17482338 ] Danny Cranmer commented on FLINK-25811: --- Also related to https://github.com/apache/flink/pull/18488/ > Fix generic AsyncSinkWriter retrying requests in reverse order > -- > > Key: FLINK-25811 > URL: https://issues.apache.org/jira/browse/FLINK-25811 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > {{AsyncSinkWriter}} retries failed request in reverse order. > *Scope:* > * change order of retry in {{AsyncSinkWriter}} in {{flink-connector-base}} > module. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25816) Changelog keyed state backend would come across NPE during notification
[ https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482339#comment-17482339 ] Roman Khachatryan commented on FLINK-25816: --- I think the bug confirms my previous [apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] that abortion notification adds risk and complexity while being absolutely not necessary. So I'd propose to just remove them. > Changelog keyed state backend would come across NPE during notification > --- > > Key: FLINK-25816 > URL: https://issues.apache.org/jira/browse/FLINK-25816 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Instance: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25816) Changelog keyed state backend would come across NPE during notification
[ https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482339#comment-17482339 ] Roman Khachatryan edited comment on FLINK-25816 at 1/26/22, 9:14 AM: - I think the bug confirms my previous [apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] that abortion notification adds risk and complexity while being absolutely not necessary. So I'd propose to just remove them (for the nested state backend). was (Author: roman_khachatryan): I think the bug confirms my previous [apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] that abortion notification adds risk and complexity while being absolutely not necessary. So I'd propose to just remove them. > Changelog keyed state backend would come across NPE during notification > --- > > Key: FLINK-25816 > URL: https://issues.apache.org/jira/browse/FLINK-25816 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Instance: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] (FLINK-25816) Changelog keyed state backend would come across NPE during notification
[ https://issues.apache.org/jira/browse/FLINK-25816 ] Roman Khachatryan deleted comment on FLINK-25816: --- was (Author: roman_khachatryan): I think the bug confirms my previous [apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] that abortion notification adds risk and complexity while being absolutely not necessary. So I'd propose to just remove them (for the nested state backend). > Changelog keyed state backend would come across NPE during notification > --- > > Key: FLINK-25816 > URL: https://issues.apache.org/jira/browse/FLINK-25816 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Instance: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] fapaul commented on a change in pull request #18412: [FLINK-25696][datastream] Introduce metadataConsumer to InitContext in Sink
fapaul commented on a change in pull request #18412: URL: https://github.com/apache/flink/pull/18412#discussion_r792434481 ## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java ## @@ -170,6 +171,15 @@ * previous execution. */ OptionalLong getRestoredCheckpointId(); + +/** + * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type + * {@link MetaT} to the consumer. The consumer can accept metadata events in an asynchronous Review comment: If you want to recommend not executing the metadata consumer as part of the mailbox you should make it clearer. ```suggestion * It is recommended to use a separate thread pool to publish the metadata because enqueuing a lot of these messages in the mailbox may lead to a performance decrease. ``` ## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java ## @@ -105,5 +107,14 @@ * Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */ SerializationSchema.InitializationContext asSerializationSchemaInitializationContext(); + +/** + * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type + * {@link MetaT} to the consumer. The consumer can accept metadata events in an asynchronous + * thread, and the {@link Consumer#accept} method is executed very fast. + */ +default Optional> metadataConsumer() { Review comment: I'd mark this `@Experimental` to leave some room to change the threading model later if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…
dannycranmer commented on a change in pull request #18483: URL: https://github.com/apache/flink/pull/18483#discussion_r792438959 ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java ## @@ -111,7 +111,7 @@ return new KinesisDataStreamsSinkBuilder<>(); } -@Experimental +@Internal Review comment: Having an `@Internal` public method on a `@Public` class seems like a bad design to me. How does this annotation stop users using it? Is this smell inherited from the base `SInk` or something we have added here ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java ## @@ -111,7 +111,7 @@ return new KinesisDataStreamsSinkBuilder<>(); } -@Experimental +@Internal Review comment: Having an `@Internal` public method on a `@Public` class seems like a bad design to me. How does this annotation stop users using it? Is this smell inherited from the base `SInk` or something we have added here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…
dannycranmer commented on a change in pull request #18483: URL: https://github.com/apache/flink/pull/18483#discussion_r792439657 ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java ## @@ -54,7 +54,7 @@ private KinesisDataStreamsSinkElementConverter( this.partitionKeyGenerator = partitionKeyGenerator; } -@Experimental +@Internal Review comment: Same question as above, why do we need to annotate these methods? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
flinkbot edited a comment on pull request #18515: URL: https://github.com/apache/flink/pull/18515#issuecomment-1022004860 ## CI report: * 7ddd8e3928e4c3defabb6ed8cc167a3b1e832b3c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25816) Changelog keyed state backend would come across NPE during notification
[ https://issues.apache.org/jira/browse/FLINK-25816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482342#comment-17482342 ] Roman Khachatryan commented on FLINK-25816: --- I think the bug confirms my previous [apprehension|https://github.com/apache/flink/pull/18382#discussion_r788780719] that abortion notification adds risk and complexity while being absolutely not necessary. So I'd propose to just remove them (for the nested state backend). > Changelog keyed state backend would come across NPE during notification > --- > > Key: FLINK-25816 > URL: https://issues.apache.org/jira/browse/FLINK-25816 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Instance: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30158&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…
CrynetLogistics commented on a change in pull request #18483: URL: https://github.com/apache/flink/pull/18483#discussion_r792442692 ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java ## @@ -54,7 +54,7 @@ private KinesisDataStreamsSinkElementConverter( this.partitionKeyGenerator = partitionKeyGenerator; } -@Experimental +@Internal Review comment: Because otherwise I get `org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter.builder(): Returned leaf type org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkElementConverter$Builder does not satisfy`. I cant seem to find a solution, will keep trying -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18515: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
flinkbot edited a comment on pull request #18515: URL: https://github.com/apache/flink/pull/18515#issuecomment-1022004860 ## CI report: * 7ddd8e3928e4c3defabb6ed8cc167a3b1e832b3c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30212) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…
CrynetLogistics commented on a change in pull request #18483: URL: https://github.com/apache/flink/pull/18483#discussion_r792444851 ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java ## @@ -111,7 +111,7 @@ return new KinesisDataStreamsSinkBuilder<>(); } -@Experimental +@Internal Review comment: The user is never expected to call createWriter right, I guess they're effectively prevented from doing so because they'll never have a InitContext object. I thought the createWriter was only here for the sink operator to use... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on pull request #18470: [FLINK-25774][network] Restrict the maximum number of buffers can be used per result partition for sort-shuffle
wsry commented on pull request #18470: URL: https://github.com/apache/flink/pull/18470#issuecomment-1022016135 Rebased master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fapaul commented on a change in pull request #18428: [FLINK-25575] Add Sink V2 operators and translation
fapaul commented on a change in pull request #18428: URL: https://github.com/apache/flink/pull/18428#discussion_r792447708 ## File path: flink-architecture-tests/violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 ## @@ -93,7 +93,6 @@ org.apache.flink.connector.file.src.util.Pool.recycler(): Returned leaf type org org.apache.flink.connector.file.src.util.Utils.forEachRemaining(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, java.util.function.Consumer): Argument leaf type org.apache.flink.connector.file.src.reader.BulkFormat$Reader does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions.builder(): Returned leaf type org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions$JDBCExactlyOnceOptionsBuilder does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder(): Returned leaf type org.apache.flink.connector.jdbc.JdbcExecutionOptions$Builder does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated -org.apache.flink.connector.jdbc.JdbcSink.exactlyOnceSink(java.lang.String, org.apache.flink.connector.jdbc.JdbcStatementBuilder, org.apache.flink.connector.jdbc.JdbcExecutionOptions, org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions, org.apache.flink.util.function.SerializableSupplier): Argument leaf type org.apache.flink.util.function.SerializableSupplier does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated Review comment: I guess this change should have belonged to this ticket https://issues.apache.org/jira/browse/FLINK-25570 I am not sure why the tests did not fail before. I can make this change on a separate commit but I would keep it on this PR. For the other archunit changes afaict they are related to changes of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25806) Remove legacy high availability services
[ https://issues.apache.org/jira/browse/FLINK-25806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482344#comment-17482344 ] Matthias Pohl commented on FLINK-25806: --- I linked FLINK-25432 as well: We can remove the ordering of cleanups introduced in FLINK-25432 (see `DefaultResourceCleaner`) as part of removing the legacy high availability services. > Remove legacy high availability services > > > Key: FLINK-25806 > URL: https://issues.apache.org/jira/browse/FLINK-25806 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Till Rohrmann >Priority: Blocker > Fix For: 1.16.0 > > > After FLINK-24038, we should consider removing the legacy high availability > services {{ZooKeeperHaServices}} and {{KubernetesHaServices}} since they are > now subsumed by the multiple component leader election service that only uses > a single leader election per component. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance
flinkbot edited a comment on pull request #18505: URL: https://github.com/apache/flink/pull/18505#issuecomment-1021169033 ## CI report: * 62ce77fe716ae2cfa26fb5cc04948683acc33e40 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30196) * 7ab24a6fde4e9c0f15e1bddbb7134dbd656d0c40 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30198) * 94e63928b2c567014e7f0861c6069186914950e4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30206) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #17485: [FLINK-24038] Add support for single leader election per JobManager process
XComp commented on a change in pull request #17485: URL: https://github.com/apache/flink/pull/17485#discussion_r786874117 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java ## @@ -78,21 +84,46 @@ public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor) throws Exception { +final String configMapName = getConfigMapNameFunction.apply(jobID); +KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, configMapName, clusterId); return KubernetesUtils.createCompletedCheckpointStore( configuration, kubeClient, executor, -getConfigMapNameFunction.apply(jobID), +configMapName, lockIdentity, maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, ioExecutor); } @Override -public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) { -return new KubernetesCheckpointIDCounter( -kubeClient, getConfigMapNameFunction.apply(jobID), lockIdentity); +public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception { Review comment: To me, it would feel more natural to have a subclass of `KubernetesCheckpointRecoveryFactory` that takes care of the ConfigMap creation. But I don't have a strong argument towards refactoring this code because we wouldn't gain much from such a refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on a change in pull request #18474: [FLINK-25786][network] Adjust the generation of subpartition data storage order for sort-shuffle from random shuffle to random shift
wsry commented on a change in pull request #18474: URL: https://github.com/apache/flink/pull/18474#discussion_r792449522 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ## @@ -492,10 +490,13 @@ public int getNumberOfQueuedBuffers(int targetSubpartition) { } private int[] getRandomSubpartitionOrder(int numSubpartitions) { -List list = -IntStream.range(0, numSubpartitions).boxed().collect(Collectors.toList()); -Collections.shuffle(list); -return list.stream().mapToInt(Integer::intValue).toArray(); +int[] order = new int[numSubpartitions]; +Random random = new Random(); +int shift = random.nextInt(numSubpartitions); Review comment: Yes, it is expected. The motivation is for better sequential disk IO when there is no enough resources to run the downstream consumer tasks in parallel. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ruanhang1993 opened a new pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe
ruanhang1993 opened a new pull request #18516: URL: https://github.com/apache/flink/pull/18516 ## What is the purpose of the change This pull request adds savepoint and metric test cases in source suite of the connector testfram. ## Brief change log - Add savepoint and metric test cases in the source suite of the connector testfram - Change the tests in the Kafka and Pulsar connector ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25288) Add savepoint and metric cases in DataStream source suite of connector testing framework
[ https://issues.apache.org/jira/browse/FLINK-25288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-25288: --- Labels: pull-request-available (was: ) > Add savepoint and metric cases in DataStream source suite of connector > testing framework > > > Key: FLINK-25288 > URL: https://issues.apache.org/jira/browse/FLINK-25288 > Project: Flink > Issue Type: Sub-task > Components: Test Infrastructure >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18418: [FLINK-25719][python] Support General Python UDF in Thread Mode
flinkbot edited a comment on pull request #18418: URL: https://github.com/apache/flink/pull/18418#issuecomment-1017307428 ## CI report: * 65bbedd87b4b4962bf1b11764ad284419cbf24ee Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30195) * 3f39b61a0b956fde5f61790a9b1dd352789d28a6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30205) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18470: [FLINK-25774][network] Restrict the maximum number of buffers can be used per result partition for sort-shuffle
flinkbot edited a comment on pull request #18470: URL: https://github.com/apache/flink/pull/18470#issuecomment-1020036276 ## CI report: * decf33d57cf16564786d77240287fb1aeb034b39 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30043) * 4cce15f9d9577fffbdad21c7fb9c286509a04ef5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] tillrohrmann commented on a change in pull request #501: Release announcement for Statefun 3.2.0
tillrohrmann commented on a change in pull request #501: URL: https://github.com/apache/flink-web/pull/501#discussion_r792452976 ## File path: _posts/2022-01-27-release-statefun-3.2.0.md ## @@ -0,0 +1,111 @@ +--- +layout: post +title: "Stateful Functions 3.2.0 Release Announcement" +subtitle: "The Apache Flink community is happy to announce the release of Stateful Functions (StateFun) 3.2.0." +date: 2022-01-27T08:00:00.000Z +categories: news +authors: +- trohrmann: + name: "Till Rohrmann" + twitter: "stsffap" +- igalshilman: + name: "Igal Shilman" + twitter: "IgalShilman" +--- + +Stateful Functions is a cross-platform stack for building Stateful Serverless applications, making it radically simpler to develop scalable, consistent, and elastic distributed applications. +This new release brings various improvements to the StateFun runtime, a leaner way to specify StateFun module components, and a brand new GoLang SDK! Review comment: Good catch. Will correct it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe
flinkbot commented on pull request #18516: URL: https://github.com/apache/flink/pull/18516#issuecomment-1022021934 ## CI report: * 741b593b95d9501c69f8f7014729b0983350bfe7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe
flinkbot commented on pull request #18516: URL: https://github.com/apache/flink/pull/18516#issuecomment-1022022034 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 741b593b95d9501c69f8f7014729b0983350bfe7 (Wed Jan 26 09:33:53 UTC 2022) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-25288).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on pull request #18500: [FLINK-25312][hive] HiveCatalog supports Flink's managed table
LadyForest commented on pull request #18500: URL: https://github.com/apache/flink/pull/18500#issuecomment-1022024594 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482349#comment-17482349 ] Etienne Chauchot commented on FLINK-25771: -- [~gaoyunhaii] thanks for the pointer. This issue should be fixed by the above PR > CassandraConnectorITCase.testRetrialAndDropTables fails on AZP > -- > > Key: FLINK-25771 > URL: https://issues.apache.org/jira/browse/FLINK-25771 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.13.5 >Reporter: Till Rohrmann >Assignee: Etienne Chauchot >Priority: Critical > Labels: pull-request-available, test-stability > > The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP > with > {code} > Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: > All host(s) tried for query failed (tried: /172.17.0.1:59220 > (com.datastax.driver.core.exceptions.OperationTimedOutException: > [/172.17.0.1] Timed out waiting for server response)) > Jan 23 01:02:52 at > com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84) > Jan 23 01:02:52 at > com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37) > Jan 23 01:02:52 at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) > Jan 23 01:02:52 at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) > Jan 23 01:02:52 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63) > Jan 23 01:02:52 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > Jan 23 01:02:52 at > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554) > Jan 23 01:02:52 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jan 23 01:02:52 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Jan 23 01:02:52 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Jan 23 01:02:52 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 23 01:02:52 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > Jan 23 01:02:52 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Jan 23 01:02:52 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Jan 23 01:02:52 at > org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196) > Jan 23 01:02:52 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jan 23 01:02:52 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > Jan 23 01:02:52 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > Jan 23 01:02:52 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > Jan 23 01:02:52 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Jan 23 01:02:52 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) > Jan 23 01:02:52 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > Jan 23 01:02:52 at org.junit.runners.Suite.runChild(Suite.java:128) > Jan 23 01:02:52
[GitHub] [flink] rkhachatryan commented on a change in pull request #18514: [FLINK-25816][changelog] Refactor the logic of notifying materialization id to nested state backend
rkhachatryan commented on a change in pull request #18514: URL: https://github.com/apache/flink/pull/18514#discussion_r792455364 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -526,18 +510,16 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { } Long materializationID = materializationIdByCheckpointId.remove(checkpointId); if (materializationID != null) { -Set checkpoints = pendingMaterializationConfirmations.get(materializationID); -checkpoints.remove(checkpointId); -if (checkpoints.isEmpty()) { Review comment: The purpose of maintaining the `Set` of checkpoints was to not notfiy of abortion if any of the checkpoints is still not confirmed. For example, there is 1. single materialization 1. two pending checkpoints using it 1. 1st get's aborted (and notified) 1. later, 2nd get's confirmed (and notified) In (3), we'd notify of abortion and of completion in (4) - of the same materialization. The Set would prevent this. I don't see how the updated version addresses this. Or am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18470: [FLINK-25774][network] Restrict the maximum number of buffers can be used per result partition for sort-shuffle
flinkbot edited a comment on pull request #18470: URL: https://github.com/apache/flink/pull/18470#issuecomment-1020036276 ## CI report: * decf33d57cf16564786d77240287fb1aeb034b39 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30043) * 4cce15f9d9577fffbdad21c7fb9c286509a04ef5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30214) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-25817) FLIP-201: Persist local state in working directory
Till Rohrmann created FLINK-25817: - Summary: FLIP-201: Persist local state in working directory Key: FLINK-25817 URL: https://issues.apache.org/jira/browse/FLINK-25817 Project: Flink Issue Type: New Feature Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Till Rohrmann This issue is the umbrella ticket for [FLIP-201|https://cwiki.apache.org/confluence/x/wJuqCw] which aims at adding support for persisting local state in Flink's working directory. This would enable Flink in certain scenarios to recover locally even in case of process failures. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18514: [FLINK-25816][changelog] Refactor the logic of notifying materialization id to nested state backend
flinkbot edited a comment on pull request #18514: URL: https://github.com/apache/flink/pull/18514#issuecomment-1021939543 ## CI report: * db5af6253d30f1e4e782269f07a24eeb72a9857f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30204) * 34a3acb29b3077f40aabb517900694bc08b06d58 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18516: [FLINK-25288][tests] add savepoint and metric test cases in source suite of connector testframe
flinkbot edited a comment on pull request #18516: URL: https://github.com/apache/flink/pull/18516#issuecomment-1022021934 ## CI report: * 741b593b95d9501c69f8f7014729b0983350bfe7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30215) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18500: [FLINK-25312][hive] HiveCatalog supports Flink's managed table
flinkbot edited a comment on pull request #18500: URL: https://github.com/apache/flink/pull/18500#issuecomment-1021030168 ## CI report: * d7dc483a078a947fe4d397c17e3ec3fa9ec8db72 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30191) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18514: [FLINK-25816][changelog] Refactor the logic of notifying materialization id to nested state backend
flinkbot edited a comment on pull request #18514: URL: https://github.com/apache/flink/pull/18514#issuecomment-1021939543 ## CI report: * db5af6253d30f1e4e782269f07a24eeb72a9857f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30204) * 34a3acb29b3077f40aabb517900694bc08b06d58 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30216) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on a change in pull request #18505: [FLINK-25796][network] Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance
wsry commented on a change in pull request #18505: URL: https://github.com/apache/flink/pull/18505#discussion_r792462118 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ## @@ -41,7 +41,7 @@ * ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards to fully release all * resources associated the partition and propagate failure cause to the consumer if possible. */ -public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvider { +public interface ResultPartitionWriter extends AvailabilityProvider { Review comment: It is removed by accident. When I am checking where the close method is called, the IDE gives all places calling AutoCloseable#close, I removed it to reduce the result set size but forgot to add it back. Thanks for pointing it out. I will add it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on pull request #18502: [hotfix][docs][kafka] Add explanation how Kafka Source deals with idl…
SteNicholas commented on pull request #18502: URL: https://github.com/apache/flink/pull/18502#issuecomment-1022031621 @fapaul , help to merge this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…
CrynetLogistics commented on a change in pull request #18483: URL: https://github.com/apache/flink/pull/18483#discussion_r792465363 ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java ## @@ -111,7 +111,7 @@ return new KinesisDataStreamsSinkBuilder<>(); } -@Experimental +@Internal Review comment: i.e. ``` KinesisDataStreamsSink.createWriter(org.apache.flink.api.connector.sink.Sink$InitContext, java.util.List): Argument leaf type org.apache.flink.api.connector.sink.Sink$InitContext does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated KinesisDataStreamsSink.createWriter(org.apache.flink.api.connector.sink.Sink$InitContext, java.util.List): Returned leaf type org.apache.flink.api.connector.sink.SinkWriter does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan opened a new pull request #18517: [FLINK-25816][state] Remove checkpoint abortion notification of notify backend
rkhachatryan opened a new pull request #18517: URL: https://github.com/apache/flink/pull/18517 ## What is the purpose of the change ``` The notification currently causes an exception and adds complexity. It's also not necessary, unlikely to be delivered (because of the difference in checkpoint/materialization intervals) and unlikely to be utilized (it will arrive only after the nested snapshot has completed and most likely do the same GC as in completion notification). ``` ## Verifying this change This change is a trivial rework without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] beyond1920 commented on a change in pull request #18331: [FLINK-25614][table/runtime] Let LocalWindowAggregate be chained with upstream
beyond1920 commented on a change in pull request #18331: URL: https://github.com/apache/flink/pull/18331#discussion_r792467756 ## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java ## @@ -401,7 +402,8 @@ public WindowedSliceAssigner(int windowEndIndex, SliceAssigner innerAssigner) { @Override public long assignSliceEnd(RowData element, ClockService clock) { -return element.getLong(windowEndIndex); +TimestampData windowEnd = element.getTimestamp(windowEndIndex, 3); Review comment: How about `return element.getTimestamp(windowEndIndex, 3).getMillisecond();` ## File path: flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java ## @@ -963,6 +965,10 @@ private static long epochMills(ZoneId shiftTimeZone, String timestampStr) { return localDateTime.toInstant(zoneOffset).toEpochMilli(); } +private static TimestampData wrapTs(long epochMillis) { Review comment: How about import static `TimestampData.fromEpochMillis` and use `fromEpochMillis` directly instead of add an extra `wrapTs` method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-25818) Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions
Martijn Visser created FLINK-25818: -- Summary: Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions Key: FLINK-25818 URL: https://issues.apache.org/jira/browse/FLINK-25818 Project: Flink Issue Type: Improvement Components: Connectors / Kafka, Documentation Reporter: Martijn Visser Add a section to the Kafka Source documentation to explain what happens with the Kafka Source with regards to idleness when parallelism is higher then the number of partitions -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] CrynetLogistics commented on a change in pull request #18483: [FLINK-24041][connectors] Removed public setter for elementConverter in As…
CrynetLogistics commented on a change in pull request #18483: URL: https://github.com/apache/flink/pull/18483#discussion_r792468424 ## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java ## @@ -54,7 +54,7 @@ private KinesisDataStreamsSinkElementConverter( this.partitionKeyGenerator = partitionKeyGenerator; } -@Experimental +@Internal Review comment: I guess I can make the `ElementConverter` `@Internal` But it's still the case that @vahmed-hamdy is using some parts for public parts and the converter is an internal tool. ``` KinesisDataStreamsSinkElementConverter.apply(java.lang.Object, org.apache.flink.api.connector.sink.SinkWriter$Context): Argument leaf type org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated KinesisDataStreamsSinkElementConverter.apply(java.lang.Object, org.apache.flink.api.connector.sink.SinkWriter$Context): Argument leaf type org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] YuriGusev opened a new pull request #18518: [FLINK-24229][connectors/dynamodb] Added DynamoDB connector
YuriGusev opened a new pull request #18518: URL: https://github.com/apache/flink/pull/18518 ## What is the purpose of the change _User stories:_ As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. _Scope:_ * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase class. * The implementation can for now reside in its own module in flink-connectors. * Implement an asynchornous sink writer for DynamoDB by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the requeueFailedRequestEntry method. * The implementation should utilize DynamoDb batch API. * The implemented Sink Writer will be used by the Sink class that is created as part of this story. * Java / code-level docs. * Unit/Integration testing. ## Brief change log - Added new DynamoDB Sink into a new module flink-connectors/flink-connector-dynamodb ## Verifying this change This change added tests and can be verified as follows: - Integration tests in org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbSinkITCase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18517: [FLINK-25816][state] Remove checkpoint abortion notification of notify backend
flinkbot commented on pull request #18517: URL: https://github.com/apache/flink/pull/18517#issuecomment-1022038055 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit b7c4e7b1afd0d8023c5714c6dcadabf5b4b0c047 (Wed Jan 26 09:54:32 UTC 2022) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-25818) Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions
[ https://issues.apache.org/jira/browse/FLINK-25818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-25818: -- Assignee: Martijn Visser > Add explanation how Kafka Source deals with idleness when parallelism is > higher then the number of partitions > - > > Key: FLINK-25818 > URL: https://issues.apache.org/jira/browse/FLINK-25818 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Documentation >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > > Add a section to the Kafka Source documentation to explain what happens with > the Kafka Source with regards to idleness when parallelism is higher then the > number of partitions -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18502: [FLINK-25818][Docs][Kafka] Add explanation how Kafka Source deals with idl…
flinkbot edited a comment on pull request #18502: URL: https://github.com/apache/flink/pull/18502#issuecomment-1021048562 ## CI report: * d446d2d97b5b87ed87a923b1122c32602dc526b3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30151) * 1bb532a98042963c9aeeac75303c63a6ba3b8de5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org