Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008


##
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html:
##


Review Comment:
   I guess this file is a leftover and should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008


##
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html:
##


Review Comment:
   I guess a leftover and should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


ashangit commented on PR #833:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2158258410

   Updated the PR to take in account this 
[comment](https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17853606&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17853606)
 in the jira ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi merged PR #24891:
URL: https://github.com/apache/flink/pull/24891


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on PR #24891:
URL: https://github.com/apache/flink/pull/24891#issuecomment-2158262913

   @wForget thanks for your efforts!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]

2024-06-10 Thread via GitHub


WencongLiu opened a new pull request, #24920:
URL: https://github.com/apache/flink/pull/24920

   ## What is the purpose of the change
   
   _Support custom shuffle for lookup join_
   
   ## Verifying this change
   
   UT and ITs.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]

2024-06-10 Thread via GitHub


flinkbot commented on PR #24920:
URL: https://github.com/apache/flink/pull/24920#issuecomment-2158287262

   
   ## CI report:
   
   * 97c210a011dd34e0c19deffe746cb3e88b355ce2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


pnowojski commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633225150


##
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html:
##


Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1633243846


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   No strong opinion just asking. Compaction/compression normally adds postfix, 
why prefix here?



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   Are we calling this with `..`?



##
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter
 
 @Nullable final Path targetPath;
 
-private CompactingFileWriter.Type writeType = null;
+private Type writeType = null;

Review Comment:
   When we remove the `CompactingFileWriter` prefix then `Type` itself is hard 
to read. I suggest either put back the prefix or rename it to something more 
meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


hlteoh37 commented on code in PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633262253


##
flink-connector-aws/flink-connector-kinesis/pom.xml:
##
@@ -354,6 +354,12 @@ under the License.
 
 
+
+
org.apache.flink.connector.aws.sink
+
+
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink
+

Review Comment:
   Ok sounds good - we discussed offline and suggested changing the shading in 
`flink-connector-kinesis` to include the whole `flink-connector-aws-base` 
module instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


z3d1k commented on code in PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633275869


##
flink-connector-aws/flink-connector-kinesis/pom.xml:
##
@@ -354,6 +354,12 @@ under the License.
 
 
+
+
org.apache.flink.connector.aws.sink
+
+
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink
+

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-10 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1633282273


##
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter
 
 @Nullable final Path targetPath;
 
-private CompactingFileWriter.Type writeType = null;
+private Type writeType = null;

Review Comment:
   It was used in a mixed manner in the file, in some places `Type`, some other 
places `CompactingFileWriter.Type`. The class itself is (indirectly) 
implementing the `CompactingFileWriter` so IMO `Type` and the variable name is 
definitive enough, but TBH I do not have a strong opinion, personally I prefer 
shorter code if I have a choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]

2024-06-10 Thread via GitHub


lincoln-lil commented on code in PR #24889:
URL: https://github.com/apache/flink/pull/24889#discussion_r164828


##
docs/layouts/shortcodes/generated/optimizer_config_configuration.html:
##
@@ -65,6 +71,12 @@
 Enum
 When it is `TRY_RESOLVE`, the optimizer tries to resolve the 
correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog 
pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), 
Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only 
changelog pipeline. For updates, there are  three main NDU problems:1. 
Non-deterministic functions, include scalar, table, aggregate functions, both 
builtin and custom ones.2. LookupJoin on an evolving source3. 
Cdc-source carries metadata fields which are system columns, not belongs to the 
entity data itself.For the first step, the optimizer automatically 
enables the materialization for No.2(LookupJoin) if needed, and gives the 
detailed error message for No.1(Non-deterministic functions) and 
No.3(Cdc-source with metadata) which is relatively easier to solve by changing 
the SQL.Default value is `IGNORE`, the optimizer does no changes.
 Possible values:"TRY_RESOLVE""IGNORE"
 
+
+
table.optimizer.reuse-optimize-block-with-digest-enabled

Review Comment:
   Add 'Batch & Streaming' label.



##
docs/layouts/shortcodes/generated/optimizer_config_configuration.html:
##
@@ -125,5 +125,11 @@
 Boolean
 If set to true, it will merge projects when converting SqlNode 
to RelNode.Note: it is not recommended to turn on unless you are aware of 
possible side effects, such as causing the output of certain non-deterministic 
expressions to not meet expectations(see FLINK-20887).
 
+
+table.optimizer.union-all-as-breakpoint-enabled 
Streaming

Review Comment:
   Should be Batch & Streaming, see 
`BatchCommonSubGraphBasedOptimizer#doOptimize`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


dannycranmer commented on PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158665814

   There is a risk this will break user code depending on these libs, right? 
Given this is a bug, and they should not be using the shaded libs, I think this 
is ok, thoughts?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35435] Add timeout Configuration to Async Sink [flink]

2024-06-10 Thread via GitHub


dannycranmer merged PR #24839:
URL: https://github.com/apache/flink/pull/24839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-10 Thread via GitHub


dannycranmer commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1633458080


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+/**
+ * Default implementation of {@link ElementConverter} that lazily falls back 
to {@link
+ * DynamoDbTypeInformedElementConverter}.
+ */
+@PublicEvolving
+public class DefaultDynamoDbElementConverter
+implements ElementConverter {
+
+private ElementConverter elementConverter;
+
+public DefaultDynamoDbElementConverter() {}
+
+@Override
+public DynamoDbWriteRequest apply(T t, SinkWriter.Context context) {
+if (elementConverter == null) {
+TypeInformation typeInfo = (TypeInformation) 
TypeInformation.of(t.getClass());
+if (!(typeInfo instanceof CompositeType)) {
+throw new IllegalArgumentException("The input type must be a 
CompositeType.");
+}
+
+elementConverter =
+new 
DynamoDbTypeInformedElementConverter<>((CompositeType) typeInfo);
+}
+
+return elementConverter.apply(t, context);
+}
+
+@Override
+public void open(Sink.InitContext context) {}

Review Comment:
   nit: Do we need this? I think it has a no-op default implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-10 Thread via GitHub


dannycranmer commented on PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#issuecomment-2158683163

   @hlteoh37 / @z3d1k can you please take a look at this one too?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


z3d1k commented on PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158839029

   > There is a risk this will break user code depending on these libs, right? 
Given this is a bug, and they should not be using the shaded libs, I think this 
is ok, thoughts?
   
   Classes that we add to relocation are specific for connector implementation.
   Users not only should not generally use shaded libs, they also should not 
use internal connector classes that are being affected by this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-10 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1633781977


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer
+* [Sample application](./example-datastream-job): Sample application showing 
the usage of the connector with DataStream API. It also demonstrates how to 
configure the request signer. 

Review Comment:
   I moved the example application under tests, as suggested, merging 
everything in a single class.
   The usage of the AMP connector is commented out, being a separate 
dependency, but still present to show the usage, in case someone wants to use 
it.
   
   
https://github.com/apache/flink-connector-prometheus/blob/5afb890aa8fe860ce25bcf89f7cdee93a6d2c4be/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-10 Thread via GitHub


nicusX commented on PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2159216321

   I moved the example application under tests and removed the separate module 
completely.
   I added links to the example application in the README.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]

2024-06-10 Thread via GitHub


morazow commented on PR #100:
URL: 
https://github.com/apache/flink-connector-kafka/pull/100#issuecomment-2159296755

   Thanks @dongwoo6kim ,
   
   Tests looks good from my side ๐Ÿ‘ 
   
   (_Recently I faced similar issue which maybe related, when running batch 
mode with setting `startingOffsets`. The change should solve that issue. But we 
may create issue for it_)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1633910473


##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   You're right. Since I generated the documentation with Java 11, it output 
PKCS12. Do you know how I can stop the default value in the documentation from 
being generated and instead add it manually?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634004521


##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   I found an `OverrideDefault` annotation, will try that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


1996fanrui commented on PR #833:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2159610745

   Thanks @ashangit for the fix and everyone for the review!
   
   Merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


1996fanrui merged PR #833:
URL: https://github.com/apache/flink-kubernetes-operator/pull/833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


1996fanrui commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634051534


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   This PR has not been updated for almost a month, and the author has not 
responded. 
   
   The flink kubernetes operator will be released this week, so I will take 
over this PR if it's still not updated or responded within 24 hours. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-10 Thread via GitHub


reswqa commented on code in PR #24845:
URL: https://github.com/apache/flink/pull/24845#discussion_r1634061063


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link java.util.Set}. The serializer relies on an element 
serializer for the
+ * serialization of the set's elements.
+ *
+ * The serialization format for the set is as follows: four bytes for the 
length of the set,
+ * followed by the serialized representation of each element.
+ *
+ * @param  The type of element in the set.
+ */
+@Internal
+public final class SetSerializer extends TypeSerializer> {
+
+private static final long serialVersionUID = 1L;
+
+/** The serializer for the elements of the set. */
+private final TypeSerializer elementSerializer;
+
+/**
+ * Creates a set serializer that uses the given serializer to serialize 
the set's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the set
+ */
+public SetSerializer(TypeSerializer elementSerializer) {
+this.elementSerializer = checkNotNull(elementSerializer);
+}
+
+// 
+//  SetSerializer specific properties
+// 
+
+/**
+ * Gets the serializer for the elements of the set.
+ *
+ * @return The serializer for the elements of the set
+ */
+public TypeSerializer getElementSerializer() {
+return elementSerializer;
+}
+
+// 
+//  Type Serializer implementation
+// 
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public TypeSerializer> duplicate() {
+TypeSerializer duplicateElement = elementSerializer.duplicate();
+return duplicateElement == elementSerializer ? this : new 
SetSerializer<>(duplicateElement);
+}
+
+@Override
+public Set createInstance() {
+return new HashSet<>(0);
+}
+
+@Override
+public Set copy(Set from) {
+Set newSet = new HashSet<>(from.size());
+for (T element : from) {
+newSet.add(elementSerializer.copy(element));
+}
+return newSet;
+}
+
+@Override
+public Set copy(Set from, Set reuse) {
+return copy(from);
+}
+
+@Override
+public int getLength() {
+return -1; // var length
+}
+
+@Override
+public void serialize(Set set, DataOutputView target) throws 
IOException {
+final int size = set.size();
+target.writeInt(size);
+for (T element : set) {
+elementSerializer.serialize(element, target);

Review Comment:
   > I also found that the serializing a list with null values will throw NPE 
as we reused the existing ListSerializer impl, which does not allow null values 
(it is only used for serializing ListState before where null value is 
explicitly forbidden in the contract). Directly adding null marker for it will 
break state compatibility, I plan to introduce a new list serializer that 
accepts null values for serializing user objects in a new JIRA, WDYT
   
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, ple

Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-10 Thread via GitHub


reswqa commented on code in PR #24845:
URL: https://github.com/apache/flink/pull/24845#discussion_r1634076649


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link java.util.Set}. The serializer relies on an element 
serializer for the
+ * serialization of the set's elements.
+ *
+ * The serialization format for the set is as follows: four bytes for the 
length of the set,
+ * followed by the serialized representation of each element.
+ *
+ * @param  The type of element in the set.
+ */
+@Internal
+public final class SetSerializer extends TypeSerializer> {
+
+private static final long serialVersionUID = 1L;
+
+/** The serializer for the elements of the set. */
+private final TypeSerializer elementSerializer;
+
+/**
+ * Creates a set serializer that uses the given serializer to serialize 
the set's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the set
+ */
+public SetSerializer(TypeSerializer elementSerializer) {
+this.elementSerializer = checkNotNull(elementSerializer);
+}
+
+// 
+//  SetSerializer specific properties
+// 
+
+/**
+ * Gets the serializer for the elements of the set.
+ *
+ * @return The serializer for the elements of the set
+ */
+public TypeSerializer getElementSerializer() {
+return elementSerializer;
+}
+
+// 
+//  Type Serializer implementation
+// 
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public TypeSerializer> duplicate() {
+TypeSerializer duplicateElement = elementSerializer.duplicate();
+return duplicateElement == elementSerializer ? this : new 
SetSerializer<>(duplicateElement);
+}
+
+@Override
+public Set createInstance() {
+return new HashSet<>(0);
+}
+
+@Override
+public Set copy(Set from) {
+Set newSet = new HashSet<>(from.size());
+for (T element : from) {
+newSet.add(elementSerializer.copy(element));
+}
+return newSet;
+}
+
+@Override
+public Set copy(Set from, Set reuse) {
+return copy(from);
+}
+
+@Override
+public int getLength() {
+return -1; // var length
+}
+
+@Override
+public void serialize(Set set, DataOutputView target) throws 
IOException {
+final int size = set.size();
+target.writeInt(size);
+for (T element : set) {
+elementSerializer.serialize(element, target);

Review Comment:
   > I also found that the serializing a list with null values will throw NPE 
as we reused the existing ListSerializer impl, which does not allow null values 
(it is only used for serializing ListState before where null value is 
explicitly forbidden in the contract). Directly adding null marker for it will 
break state compatibility, I plan to introduce a new list serializer that 
accepts null values for serializing user objects in a new JIRA, WDYT
   
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, ple

Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-10 Thread via GitHub


gong commented on PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2159656572

   > @gong would you like to open a PR for release-3.1 branch?
   
   ok, I will open the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-10 Thread via GitHub


gong commented on PR #3407:
URL: https://github.com/apache/flink-cdc/pull/3407#issuecomment-2159661569

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634091321


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   Sorry, I will update this PR today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


cxzl25 commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159691770

   Gentle ping @xintongsong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634118107


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   @gyfora @1996fanrui 
   I think the existing matching logic is not too complicated, and the PR is 
not too large. If the maximum parallelism of other source components is 
expanded in the future, it will not be too late to split it;
   
   If we want to split the matching logic of Kafka and Pulsar, we may need to 
perform two regular matching on the source metrics data set to obtain the 
number of partitions of these two queues separately, which does not seem to be 
necessary;
   
   It seems that there is no simple or matching. The current regular expression 
is or, and it is also distinguished in the map function. Maybe you have a 
better answer to tell me, thank you.
   
   I suggest keeping the logic of the existing PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]

2024-06-10 Thread via GitHub


ljz2051 commented on code in PR #24873:
URL: https://github.com/apache/flink/pull/24873#discussion_r1634123958


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * FileMerging Compatibility IT case which tests recovery from a checkpoint 
created in different
+ * fileMerging mode (i.e. fileMerging enabled/disabled).
+ */
+public class SnapshotFileMergingCompatibilityITCase extends TestLogger {
+
+public static Collection parameters() {
+return Arrays.asList(
+new Object[][] {
+{RestoreMode.CLAIM, true},
+{RestoreMode.CLAIM, false},
+{RestoreMode.NO_CLAIM, true},
+{RestoreMode.NO_CLAIM, false}
+});
+}
+
+@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = 
{1}")
+@MethodSource("parameters")
+public void testSwitchFromDisablingToEnablingFileMerging(
+RestoreMode restoreMode, boolean fileMergingAcrossBoundary, 
@TempDir Path checkpointDir)
+throws Exception {
+testSwitchingFileMerging(
+checkpointDir, false, true, restoreMode, 
fileMergingAcrossBoundary);
+}
+
+@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = 
{1}")
+@MethodSource("parameters")
+public void testSwitchFromEnablingToDisablingFileMerging(
+RestoreMode restoreMode, boolean fileMergingAcrossBoundary, 
@TempDir Path checkpointDir)
+throws Exception {
+testSwitchingFileMerging(
+checkpointDir, true, false, restoreMode, 
fileMergingAcrossBoundary);
+}
+
+private void testSwitchingFileMerging(
+Path checkpointDir,
+boolean firstFileMergingSwitch,
+boolean secondFileMergingSwitch,
+RestoreMode restoreMode,
+boolean fileMergingAcrossBoundary)
+throws Exception {
+final Configuration config = new Configuration();
+config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toUri().toString());
+config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, 
fileMergingAcrossBoundary);
+config.set(CheckpointingOptions.FILE_MERGING_ENABLED, 
firstFileMergingSwitch);
+MiniClusterWithClientResource firstCluster =
+new MiniClusterWithClientResource(
+new MiniClusterResourceConfiguration.Builder()
+.setConfiguration(config)
+.setNumberTaskManagers(2)
+.setNumberSlotsPerTaskManager(2)
+.build());
+EmbeddedRocksDBStateBackend stateBackend1 = new 
EmbeddedRocksDBStateBackend();
+stateBackend1.configure(config, 
Thread.currentThread().getContextClassLoader());
+firstCluster.before();
+String externalCheckpoint;
+try {
+externalCheckpoint =
+runJobAndGetExternalizedCheckpoint(
+stateBackend1, null, firstCluster, restoreM

Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]

2024-06-10 Thread via GitHub


yuxiqian commented on code in PR #3398:
URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1634142791


##
flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh:
##
@@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then
   exit 1
 fi
 
-# Setup Flink related configurations
-# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
-_FLINK_HOME_DETERMINED=1

Review Comment:
   Should `_FLINK_HOME_DETERMINED` env var be preserved here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634153986


##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+@ParameterizedTest
+@MethodSource("parameters")
+void testInternalSSLWrongKeystoreType(String sslProvider) {
+final Configuration config = 
createInternalSslConfigWithKeyAndTrustStores(sslProvider);
+config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, "JCEKS");
+
+assertThatThrownBy(() -> 
SSLUtils.createInternalServerSSLEngineFactory(config))
+.isInstanceOf(java.io.IOException.class);

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634154163


##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -148,6 +148,17 @@ void testRESTClientSSLWrongPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+/** Tests that REST Client SSL Engine creation fails with bad SSL 
configuration. */
+@ParameterizedTest
+@MethodSource("parameters")
+void testRESTClientSSLBadTruststoreType(String sslProvider) {
+Configuration clientConfig = 
createRestSslConfigWithTrustStore(sslProvider);
+clientConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_TYPE, "BKS");

Review Comment:
   Changed the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634155084


##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +71,38 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException
+| CertificateException
+| NoSuchAlgorithmException
+| KeyStoreException e) {
+throw new RemoteTransportException(

Review Comment:
   I simplified it a bit to catch GeneralSecurityException, but I think that's 
the most we can do. The Scala base class doesn't throw any exceptions, so we 
have to catch it in the Java override.



##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   updated documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2159778519

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-06-10 Thread via GitHub


liuyongvs commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2159808362

   hi @snuyanzin @superdiaodiao do we need supports encoding args ?
   db2 
https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode
   max compute 
https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


xintongsong commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159816793

   > Thanks, I've updated.
   
   The commit message is not yet update. Have you forgot to push the changes?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-06-10 Thread via GitHub


superdiaodiao commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2159830832

   > hi @snuyanzin @superdiaodiao do we need supports encoding args ?
   > db2 
https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode
   > max compute 
https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode
   > 
   
   Calcite, Spark also need only one arg, it is enough to handle cases and 
UTF-8 can support all.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


cxzl25 commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159856148

   > The commit message is not yet update
   
   Sorry, it's updated now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


ramkrish86 commented on code in PR #24896:
URL: https://github.com/apache/flink/pull/24896#discussion_r1634237585


##
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java:
##
@@ -59,7 +59,7 @@ public void write(byte[] b, int off, int len) throws 
IOException {
 
 @Override
 public void flush() throws IOException {
-out.hsync();
+out.hflush();

Review Comment:
   I checked this change. In ABFS case hflush and hsync internally does the 
same I believe. So that functionality might not break. In HDFS case yes this 
might make things costly. Thanks for fixing this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35533][runtime] Support Flink hybrid shuffle integration with Apache Celeborn [flink]

2024-06-10 Thread via GitHub


reswqa commented on code in PR #24900:
URL: https://github.com/apache/flink/pull/24900#discussion_r1634238153


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -148,12 +162,52 @@ public void close() throws IOException {
 // 

 
 private List createTierConsumerAgents(
-List tieredStorageConsumerSpecs) {
+List tieredStorageConsumerSpecs,
+List> shuffleDescriptors) {
 ArrayList tierConsumerAgents = new ArrayList<>();
+
+List> transformedTierShuffleDescriptors =
+transformTierShuffleDescriptors(shuffleDescriptors);
+// Each tier only requires one inner list of 
transformedTierShuffleDescriptors, so the size
+// of transformedTierShuffleDescriptors and the size of tierFactories 
are the same.
+checkState(transformedTierShuffleDescriptors.size() == 
tierFactories.size());
+int index = 0;

Review Comment:
   Why not use `for-i` loop if we do need the iteration index?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -148,12 +162,52 @@ public void close() throws IOException {
 // 

 
 private List createTierConsumerAgents(
-List tieredStorageConsumerSpecs) {
+List tieredStorageConsumerSpecs,
+List> shuffleDescriptors) {
 ArrayList tierConsumerAgents = new ArrayList<>();
+
+List> transformedTierShuffleDescriptors =
+transformTierShuffleDescriptors(shuffleDescriptors);
+// Each tier only requires one inner list of 
transformedTierShuffleDescriptors, so the size
+// of transformedTierShuffleDescriptors and the size of tierFactories 
are the same.
+checkState(transformedTierShuffleDescriptors.size() == 
tierFactories.size());
+int index = 0;
 for (TierFactory tierFactory : tierFactories) {
 tierConsumerAgents.add(
-
tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService));
+tierFactory.createConsumerAgent(
+tieredStorageConsumerSpecs,
+transformedTierShuffleDescriptors.get(index++),
+nettyService));
 }
 return tierConsumerAgents;
 }
+
+/**
+ * Before transforming the shuffle descriptors, the number of tier shuffle 
descriptors is
+ * numPartitions * numTiers (That means shuffleDescriptors.size() is 
numPartitions, while the
+ * shuffleDescriptors.get(0).size() is numTiers). After transforming, the 
number of tier shuffle
+ * descriptors is numTiers * numPartitions (That means 
transformedList.size() is numTiers, while
+ * transformedList.get(0).size() is numPartitions).
+ */
+private static List> 
transformTierShuffleDescriptors(
+List> shuffleDescriptors) {
+int numTiers = 0;
+int numDescriptors = shuffleDescriptors.size();

Review Comment:
   Can be `numPartitions` according to the java doc.



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -490,6 +490,24 @@ public enum CompressionCodec {
 + " is configured. The new mode is 
currently in an experimental phase. It can be set to false to fallback to the 
legacy mode "
 + " if something unexpected. Once the new 
mode reaches a stable state, the legacy mode as well as the option will be 
removed.");
 
+/** The option to configure the tiered factory creator remote class name 
for hybrid shuffle. */
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+@Experimental
+public static final ConfigOption
+NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME =
+
key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"The option configures the class that is 
responsible for creating an "
++ "external remote tier factory 
for hybrid shuffle. Note that "
++ "only Celeborn can be accepted 
as the remote shuffle tier "

Review Comment:
   only Celeborn -> only Apache Celeborn
   
   How do we make sure that only Apache Celeborn is the valid option? I didn't 
notice if we have the corresponding validation.



##
flink-runtime/src/test/jav

Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634247827


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   fair enough, we can keep it like this for now but if we add more similar 
logic then we need to split it up :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora merged PR #827:
URL: https://github.com/apache/flink-kubernetes-operator/pull/827


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-10 Thread via GitHub


X-czh commented on PR #24845:
URL: https://github.com/apache/flink/pull/24845#issuecomment-2159873214

   rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35052] Reject unsupported versions in the webhook validator [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora merged PR #831:
URL: https://github.com/apache/flink-kubernetes-operator/pull/831


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding affinity rule. [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora closed pull request #737: Adding affinity rule.
URL: https://github.com/apache/flink-kubernetes-operator/pull/737


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding affinity rule. [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora commented on PR #737:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/737#issuecomment-2159881613

   I am closing this PR due to inactivity, please re-open if you pick it up 
again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634261181


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   Sure. Thanks! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


xintongsong commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159897218

   Thanks, @cxzl25. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


xintongsong closed pull request #24896: [FLINK-35531] Avoid calling hsync in 
flush method in BaseHadoopFsRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/24896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-35402 [flink-operator][Deployment] add labels to metadata [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora merged PR #829:
URL: https://github.com/apache/flink-kubernetes-operator/pull/829


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-10 Thread via GitHub


Zakelly commented on code in PR #24895:
URL: https://github.com/apache/flink/pull/24895#discussion_r1634298332


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception {
 }
 
 public void advanceWatermark(long time) throws Exception {
-currentWatermark = time;
+Preconditions.checkState(
+tryAdvanceWatermark(
+time,
+() -> {
+// Never stop advancing.
+return false;
+}));
+}
 
+/**
+ * @return true if following watermarks can be processed immediately. 
False if the firing timers
+ * should be interrupted as soon as possible.
+ */
+public boolean tryAdvanceWatermark(
+long time, InternalTimeServiceManager.ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+throws Exception {
+currentWatermark = time;
 InternalTimer timer;
-
+boolean stop = cancellationContext.isCancelled();
 while ((timer = eventTimeTimersQueue.peek()) != null
 && timer.getTimestamp() <= time
-&& !cancellationContext.isCancelled()) {
+&& !stop) {
 keyContext.setCurrentKey(timer.getKey());
 eventTimeTimersQueue.poll();
 triggerTarget.onEventTime(timer);
 taskIOMetricGroup.getNumFiredTimers().inc();
+// Check if we should stop advancing after at least one iteration 
to guarantee progress
+// and prevent a potential starvation.
+stop = cancellationContext.isCancelled() || 
shouldStopAdvancingFn.test();
 }
+return true;

Review Comment:
   Should be `return stop` ?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception {
 }
 
 public void advanceWatermark(long time) throws Exception {
-currentWatermark = time;
+Preconditions.checkState(
+tryAdvanceWatermark(
+time,
+() -> {
+// Never stop advancing.
+return false;
+}));
+}
 
+/**
+ * @return true if following watermarks can be processed immediately. 
False if the firing timers
+ * should be interrupted as soon as possible.
+ */
+public boolean tryAdvanceWatermark(
+long time, InternalTimeServiceManager.ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+throws Exception {
+currentWatermark = time;
 InternalTimer timer;
-
+boolean stop = cancellationContext.isCancelled();
 while ((timer = eventTimeTimersQueue.peek()) != null
 && timer.getTimestamp() <= time
-&& !cancellationContext.isCancelled()) {
+&& !stop) {
 keyContext.setCurrentKey(timer.getKey());
 eventTimeTimersQueue.poll();
 triggerTarget.onEventTime(timer);
 taskIOMetricGroup.getNumFiredTimers().inc();
+// Check if we should stop advancing after at least one iteration 
to guarantee progress
+// and prevent a potential starvation.
+stop = cancellationContext.isCancelled() || 
shouldStopAdvancingFn.test();
 }
+return true;

Review Comment:
   Should be `return !stop` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-11 Thread via GitHub


pnowojski commented on code in PR #24895:
URL: https://github.com/apache/flink/pull/24895#discussion_r1634312543


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception {
 }
 
 public void advanceWatermark(long time) throws Exception {
-currentWatermark = time;
+Preconditions.checkState(
+tryAdvanceWatermark(
+time,
+() -> {
+// Never stop advancing.
+return false;
+}));
+}
 
+/**
+ * @return true if following watermarks can be processed immediately. 
False if the firing timers
+ * should be interrupted as soon as possible.
+ */
+public boolean tryAdvanceWatermark(
+long time, InternalTimeServiceManager.ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+throws Exception {
+currentWatermark = time;
 InternalTimer timer;
-
+boolean stop = cancellationContext.isCancelled();
 while ((timer = eventTimeTimersQueue.peek()) != null
 && timer.getTimestamp() <= time
-&& !cancellationContext.isCancelled()) {
+&& !stop) {
 keyContext.setCurrentKey(timer.getKey());
 eventTimeTimersQueue.poll();
 triggerTarget.onEventTime(timer);
 taskIOMetricGroup.getNumFiredTimers().inc();
+// Check if we should stop advancing after at least one iteration 
to guarantee progress
+// and prevent a potential starvation.
+stop = cancellationContext.isCancelled() || 
shouldStopAdvancingFn.test();
 }
+return true;

Review Comment:
   That explains failing test :) Thanks for spotting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. [flink-cdc]

2024-06-11 Thread via GitHub


steveGuRen commented on PR #2986:
URL: https://github.com/apache/flink-cdc/pull/2986#issuecomment-2159992536

   @joyCurry30 @leonardBang  Logically, it is considered a bug when I have to 
read all tables in the schema, and the first execution of "alter table sql" is 
not triggered. However, since TableDiscoveryUtils loads all tables, it works. 
But in a SaaS environment, with so many tables(five hundred thousand, a tenant 
has their own tables), this will cause issues. Because we can't load all tables 
when have to discover the new Table with query sql by "show create sql " for 
every table.
   So I fix the TableDiscoveryUtils so that just load a table in start or ddl 
change, and shouEmit logic updates too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. [flink-cdc]

2024-06-11 Thread via GitHub


steveGuRen commented on PR #2986:
URL: https://github.com/apache/flink-cdc/pull/2986#issuecomment-216384

   In the design of TiDB, I find that they take into account the scenario of 
having a large number of tables in a SaaS environment. Therefore, special 
attention is given to the performance of DDL operations in such contexts. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634351465


##
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter
 
 @Nullable final Path targetPath;
 
-private CompactingFileWriter.Type writeType = null;
+private Type writeType = null;

Review Comment:
   Reading the code and seeing `Type` purely requires some digging. Changing 
the name to `WriterType` would be unnecessary duplication so I would vote on 
have prefixed from where it comes from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Flink 1.19.1 release [flink-web]

2024-06-11 Thread via GitHub


leonardBang commented on code in PR #745:
URL: https://github.com/apache/flink-web/pull/745#discussion_r1634358337


##
docs/content/posts/2024-06-14-release-1.19.1.md:
##
@@ -0,0 +1,128 @@
+---
+title:  "Apache Flink 1.19.1 Release Announcement"
+date: "2024-06-14T00:00:00.000Z"
+aliases:
+- /news/2024/06/14/release-1.19.1.html

Review Comment:
   tips: please update the date when fire the release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 merged PR #142:
URL: https://github.com/apache/flink-connector-aws/pull/142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-11 Thread via GitHub


reswqa merged PR #24845:
URL: https://github.com/apache/flink/pull/24845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35282][FLINK-35520] PyFlink Support for Apache Beam > 2.49 [flink]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #24908:
URL: https://github.com/apache/flink/pull/24908#discussion_r1634474656


##
docs/layouts/shortcodes/generated/python_configuration.html:
##
@@ -24,7 +24,7 @@
 python.executable
 "python"
 String
-Specify the path of the python interpreter used to execute the 
python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam 
(version == 2.43.0), Pip (version >= 20.3) and SetupTools (version >= 
37.0.0). Please ensure that the specified environment meets the above 
requirements. The option is equivalent to the command line option 
"-pyexec".
+   Specify the path of the python interpreter used to execute the 
python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam 
(version >= 2.52.0), Pip (version >= 20.3) and SetupTools (version >= 
37.0.0). Please ensure that the specified environment meets the above 
requirements. The option is equivalent to the command line option 
"-pyexec".

Review Comment:
   Verified that Beam >= 2.48 still supports Python 3.8+



##
pom.xml:
##
@@ -160,7 +160,7 @@ under the License.
1.3
3.23.1
0.10.9.7
-   2.43.0

Review Comment:
   Could we update this to 2.54?
   
   This mitigates additional CVEs from netty that we pull in via Beam
   
   https://github.com/apache/beam/issues/29861



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]

2024-06-11 Thread via GitHub


Zakelly closed pull request #24873: [FLINK-32081][checkpoint] Compatibility 
between file-merging on and off across job runs
URL: https://github.com/apache/flink/pull/24873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3396:
URL: https://github.com/apache/flink-cdc/pull/3396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


GOODBOY008 opened a new pull request, #3408:
URL: https://github.com/apache/flink-cdc/pull/3408

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


GOODBOY008 commented on PR #3408:
URL: https://github.com/apache/flink-cdc/pull/3408#issuecomment-2160480202

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3408:
URL: https://github.com/apache/flink-cdc/pull/3408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3407:
URL: https://github.com/apache/flink-cdc/pull/3407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-06-11 Thread via GitHub


morazow commented on PR #24426:
URL: https://github.com/apache/flink/pull/24426#issuecomment-2160556166

   Thanks @HuangXingBo for the update!
   
   I have changed the GitHub actions to run the `auditwheel` repair after 
installing `patchelf` in Linux. Could you please have another look?
   
   You can find the build wheel artifacts for this change here: 
https://github.com/morazow/flink/actions/runs/9464510932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634722169


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   I did not touched this method, I just moved the getWriterType above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   I did not touch this method, I just moved the getWriterType above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touched this method, I just moved the `getWriterType` above this 
to keep the method visibility order in the file intact, because that method 
became package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touch this method, I just moved the `getWriterType` above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 [flink]

2024-06-11 Thread via GitHub


pan3793 commented on PR #24905:
URL: https://github.com/apache/flink/pull/24905#issuecomment-2160590805

   ping @JingGe, is there any thing I can do before merging?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]

2024-06-11 Thread via GitHub


qg-lin opened a new pull request, #3409:
URL: https://github.com/apache/flink-cdc/pull/3409

   https://issues.apache.org/jira/browse/FLINK-35540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634845033


##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +71,38 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException
+| CertificateException
+| NoSuchAlgorithmException
+| KeyStoreException e) {
+throw new RemoteTransportException(

Review Comment:
   I've checked out the code and played with it. I agree that's what we can do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634861389


##
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java:
##
@@ -285,7 +291,14 @@ private static KeyManagerFactory getKeyManagerFactory(
 : SecurityOptions.SSL_REST_KEY_PASSWORD,
 SecurityOptions.SSL_KEY_PASSWORD);
 
-KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+// do not use getAndCheckOption here as there is no fallback option 
and a default is
+// specified
+String keystoreType =
+internal
+? 
config.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE)
+: config.get(SecurityOptions.SSL_REST_KEYSTORE_TYPE);

Review Comment:
   Just a clarification for other reviewers, since there is default value it 
just doesn't make sense to provide fallback.



##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +68,35 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException | GeneralSecurityException e) {
+throw new RemoteTransportException(
+"Server SSL connection could not be established because 
SSL context could not be constructed",

Review Comment:
   Here we can be more specific: "Server SSL connection could not be 
established because keystore could not be loaded"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]

2024-06-11 Thread via GitHub


LadyForest closed pull request #24889: [Flink-35473][table] Improve Table/SQL 
Configuration for Flink 2.0
URL: https://github.com/apache/flink/pull/24889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding Opensearch Connector v2.0.0 [flink-web]

2024-06-11 Thread via GitHub


snuyanzin merged PR #741:
URL: https://github.com/apache/flink-web/pull/741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding Opensearch Connector v1.2.0 [flink-web]

2024-06-11 Thread via GitHub


snuyanzin merged PR #740:
URL: https://github.com/apache/flink-web/pull/740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Enable ci for v1.2 [flink-connector-opensearch]

2024-06-11 Thread via GitHub


snuyanzin merged PR #47:
URL: https://github.com/apache/flink-connector-opensearch/pull/47


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]

2024-06-11 Thread via GitHub


snuyanzin opened a new pull request, #24921:
URL: https://github.com/apache/flink/pull/24921

   
   ## What is the purpose of the change
   Add docs for new Opensearch connector release
   In fact docs are same for both v1 and v2, so just referencing to one branch 
(also references to multiple branches are not supported here IIRC)
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24921:
URL: https://github.com/apache/flink/pull/24921#issuecomment-2161796100

   
   ## CI report:
   
   * 84a66fe40a34e3e74b68beee473005b903a69fe3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]

2024-06-11 Thread via GitHub


1996fanrui commented on PR #24881:
URL: https://github.com/apache/flink/pull/24881#issuecomment-2161888308

   > Thanks @GOODBOY008 for the quick update, LGTM now. Hi @1996fanrui, could 
you help double check this PR?
   
   Thank @Jiabao-Sun for the ping, I will check it these 2 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3409:
URL: https://github.com/apache/flink-cdc/pull/3409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-11 Thread via GitHub


1996fanrui commented on PR #24895:
URL: https://github.com/apache/flink/pull/24895#issuecomment-2161973417

   It seems one test related to timer fails, it maybe caused by this PR.
   
   
`StreamTaskCancellationTest.testCancelTaskShouldPreventAdditionalEventTimeTimersFromBeingFired:272->testCancelTaskShouldPreventAdditionalTimersFromBeingFired:300
 ยป IllegalState`
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60198&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10477
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35234][hotfix][cdc-common] Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString [flink-cdc]

2024-06-11 Thread via GitHub


Jiabao-Sun commented on code in PR #3255:
URL: https://github.com/apache/flink-cdc/pull/3255#discussion_r1635733171


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java:
##
@@ -130,7 +130,9 @@ static Duration convertToDuration(Object o) {
 }
 
 static String convertToString(Object o) {
-if (o.getClass() == String.class) {
+if (o == null) {

Review Comment:
   The inner null check seems redundant now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on PR #3360:
URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2161996630

   Hi @yuanoOo, thanks for your great contribution! Since @whhe is familiar 
with OceanBase, do you have time to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]

2024-06-11 Thread via GitHub


fredia opened a new pull request, #24922:
URL: https://github.com/apache/flink/pull/24922

   
   
   ## What is the purpose of the change
   
   Add file size and file count metrics for file-merging.
   
   
   ## Brief change log
   - Add `logicalFileCount`, `logicalFileSize`, `physicalFileCount`, 
`physicalFileSize` metrics.
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   - `FileMergingMetricsTest`

   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-06-11 Thread via GitHub


Zakelly merged PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24922:
URL: https://github.com/apache/flink/pull/24922#issuecomment-2162053937

   
   ## CI report:
   
   * 3405d0cc2c400c9c2d3a596b600b2ebda07af2f9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]

2024-06-11 Thread via GitHub


schlosna opened a new pull request, #24923:
URL: https://github.com/apache/flink/pull/24923

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]

2024-06-11 Thread via GitHub


Zakelly opened a new pull request, #24924:
URL: https://github.com/apache/flink/pull/24924

   ## What is the purpose of the change
   
   In checkpoint file merging, we should take `PlaceholderStreamStateHandle` 
into account during lifecycle, since it can be a file merged one.
   
   ## Brief change log
   
- take `PlaceholderStreamStateHandle` into account in 
`reusePreviousStateHandle` and `couldReuseStateHandle`
   
   ## Verifying this change
   
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Revert FLINK-34123&35068: Add built-in dedicated serialization support for Maps, Lists, Sets, and Collections [flink]

2024-06-11 Thread via GitHub


X-czh opened a new pull request, #24925:
URL: https://github.com/apache/flink/pull/24925

   ## What is the purpose of the change
   
   Reverting previous changes that break state compatibility.
   
   [FLINK-34123](https://issues.apache.org/jira/browse/FLINK-34123) & 
[FLINK-35068](https://issues.apache.org/jira/browse/FLINK-35068) introduced 
built-in dedicated serialization support for Maps, Sets, Lists, and 
Collections. However, if a user defines a collection-typed value in state, the 
default serializer is changed from Kryo to dedicated serializer, breaking 
compatibility. Furthermore, since RocksDBStateBackend does not support 
serialization format change for the keys in MapState, we cannot implement a 
fully-compatible solution for this case.
   
   ## Brief change log
   
   Revert the following three commits:
   1. [FLINK-34123][core][type] Introduce built-in serialization support for 
Map, List, and Collection
   2. [FLINK-34123][docs][type] Add doc for built-in serialization support for 
Map, List, and Collection
   3. [FLINK-35068][core][type] Introduce built-in serialization support for 
java.util.Set
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24923:
URL: https://github.com/apache/flink/pull/24923#issuecomment-2162103603

   
   ## CI report:
   
   * 88f9a4cf7a8e79d861755262f84f91a4b1b0d026 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24924:
URL: https://github.com/apache/flink/pull/24924#issuecomment-2162103785

   
   ## CI report:
   
   * 9ce0a6f2bb299224ed170ce0944de94703253b81 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   5   6   7   8   9   10   >